You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/10/09 19:07:24 UTC

svn commit: r823617 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: dht/BootStrapper.java gms/Gossiper.java locator/AbstractReplicationStrategy.java service/StorageLoadBalancer.java service/StorageService.java

Author: jbellis
Date: Fri Oct  9 17:07:24 2009
New Revision: 823617

URL: http://svn.apache.org/viewvc?rev=823617&view=rev
Log:
fix regressions
patch by jbellis reviewed by Eric Evans for CASSANDRA-477

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=823617&r1=823616&r2=823617&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Fri Oct  9 17:07:24 2009
@@ -37,6 +37,7 @@
  import org.apache.commons.lang.ArrayUtils;
 
  import org.apache.cassandra.locator.TokenMetadata;
+ import org.apache.cassandra.locator.AbstractReplicationStrategy;
  import org.apache.cassandra.net.*;
  import org.apache.cassandra.net.io.StreamContextManager;
  import org.apache.cassandra.net.io.IStreamComplete;
@@ -208,6 +209,7 @@
 
             if (!maxEndpoint.equals(StorageService.getLocalStorageEndPoint()))
             {
+                StorageService.instance().retrofitPorts(Arrays.asList(maxEndpoint));
                 Token<?> t = getBootstrapTokenFrom(maxEndpoint);
                 logger_.info("Setting token to " + t + " to assume load from " + maxEndpoint.getHost());
                 ss.updateToken(t);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=823617&r1=823616&r2=823617&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Fri Oct  9 17:07:24 2009
@@ -95,7 +95,7 @@
     final static String GOSSIP_DIGEST_ACK_VERB = "GAV";
     /* GA2V - abbreviation for GOSSIP-DIGEST-ACK2-VERB */
     final static String GOSSIP_DIGEST_ACK2_VERB = "GA2V";
-    final static int intervalInMillis_ = 1000;
+    public final static int intervalInMillis_ = 1000;
     private static Logger logger_ = Logger.getLogger(Gossiper.class);
     static Gossiper gossiper_;
 
@@ -522,6 +522,8 @@
                     {
                         reqdEndPointState = new EndPointState(epState.getHeartBeatState());
                     }
+                    if (logger_.isTraceEnabled())
+                        logger_.trace("Adding state " + key + ": " + appState.getState());
                     reqdEndPointState.addApplicationState(key, appState);
                 }
             }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=823617&r1=823616&r2=823617&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java Fri Oct  9 17:07:24 2009
@@ -73,7 +73,7 @@
      * This method changes the ports of the endpoints from
      * the control port to the storage ports.
     */
-    protected void retrofitPorts(List<EndPoint> eps)
+    public void retrofitPorts(List<EndPoint> eps)
     {
         for ( EndPoint ep : eps )
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=823617&r1=823616&r2=823617&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java Fri Oct  9 17:07:24 2009
@@ -351,8 +351,9 @@
 
     public void startBroadcasting()
     {
-        /* starts a load timer thread */
-        loadTimer_.schedule(new LoadDisseminator(), BROADCAST_INTERVAL, BROADCAST_INTERVAL);
+        // send the first broadcast "right away" (i.e., in 2 gossip heartbeats, when we should have someone to talk to);
+        // after that send every BROADCAST_INTERVAL.
+        loadTimer_.schedule(new LoadDisseminator(), 2 * Gossiper.intervalInMillis_, BROADCAST_INTERVAL);
     }
 
     /** wait for node information to be available.  if the rest of the cluster just came up,

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=823617&r1=823616&r2=823617&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Fri Oct  9 17:07:24 2009
@@ -226,12 +226,12 @@
         MessagingService.instance().registerVerbHandlers(mbrshipCleanerVerbHandler_, new MembershipCleanerVerbHandler() );
         MessagingService.instance().registerVerbHandlers(rangeVerbHandler_, new RangeVerbHandler());
         // see BootStrapper for a summary of how the bootstrap verbs interact
+        MessagingService.instance().registerVerbHandlers(bootstrapTokenVerbHandler_, new BootStrapper.BootstrapTokenVerbHandler());
         MessagingService.instance().registerVerbHandlers(bootstrapMetadataVerbHandler_, new BootstrapMetadataVerbHandler() );
         MessagingService.instance().registerVerbHandlers(bootStrapInitiateVerbHandler_, new BootStrapper.BootStrapInitiateVerbHandler());
         MessagingService.instance().registerVerbHandlers(bootStrapInitiateDoneVerbHandler_, new BootStrapper.BootstrapInitiateDoneVerbHandler());
         MessagingService.instance().registerVerbHandlers(bootStrapTerminateVerbHandler_, new BootStrapper.BootstrapTerminateVerbHandler());
-        MessagingService.instance().registerVerbHandlers(bootstrapTokenVerbHandler_, new BootStrapper.BootstrapTokenVerbHandler());
-        
+
         StageManager.registerStage(StorageService.mutationStage_,
                                    new MultiThreadedStage(StorageService.mutationStage_, DatabaseDescriptor.getConcurrentWriters()));
         StageManager.registerStage(StorageService.readStage_,
@@ -266,15 +266,24 @@
 
         StorageLoadBalancer.instance().startBroadcasting();
 
+        // have to start the gossip service before we can see any info on other nodes.  this is necessary
+        // for bootstrap to get the load info it needs.
+        // (we won't be part of the storage ring though until we add a nodeId to our state, below.)
+        Gossiper.instance().register(this);
+        Gossiper.instance().start(udpAddr_, storageMetadata_.getGeneration());
+
         if (isBootstrapMode)
         {
-            BootStrapper.startBootstrap();
+            BootStrapper.startBootstrap(); // handles token update
+        }
+        else
+        {
+            tokenMetadata_.update(storageMetadata_.getToken(), StorageService.tcpAddr_, isBootstrapMode);
         }
 
-        Gossiper.instance().register(this);
-        Gossiper.instance().start(udpAddr_, storageMetadata_.getGeneration());
-        /* Make sure this token gets gossiped around. */
-        tokenMetadata_.update(storageMetadata_.getToken(), StorageService.tcpAddr_, isBootstrapMode);
+        // Gossip my token.
+        // note that before we do this we've (a) finalized what the token is actually going to be, and
+        // (b) added a bootstrap state (done by startBootstrap)
         ApplicationState state = new ApplicationState(StorageService.getPartitioner().getTokenFactory().toString(storageMetadata_.getToken()));
         Gossiper.instance().addApplicationState(StorageService.nodeId_, state);
     }
@@ -935,6 +944,11 @@
         return nodePicker_.getHintedStorageEndPoints(partitioner_.getToken(key));
     }
 
+    public void retrofitPorts(List<EndPoint> eps)
+    {
+        nodePicker_.retrofitPorts(eps);
+    }
+
     /**
      * This function finds the most suitable endpoint given a key.
      * It checks for locality and alive test.