You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2014/04/09 00:44:04 UTC

[2/6] git commit: Put nodes into hibernate when join_ring is false.

Put nodes into hibernate when join_ring is false.

Patch by brandonwilliams, reviewed by Tyler Hobbs for CASSANDRA-6961


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d7ae7942
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d7ae7942
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d7ae7942

Branch: refs/heads/cassandra-2.1
Commit: d7ae79427e1153e0065933cdd9df43b08da3fabc
Parents: b1a3070
Author: Brandon Williams <br...@apache.org>
Authored: Tue Apr 8 17:31:38 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Tue Apr 8 17:32:22 2014 -0500

----------------------------------------------------------------------
 .../cassandra/service/StorageService.java       | 134 +++++++++++--------
 1 file changed, 76 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7ae7942/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 1c4fd27..eec4712 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -42,6 +42,7 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.Uninterruptibles;
 
+import org.apache.cassandra.cql3.CQL3Type;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;
 import org.slf4j.Logger;
@@ -192,6 +193,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     private final ObjectName jmxObjectName;
 
+    private Collection<Token> bootstrapTokens = null;
+
     public void finishBootstrapping()
     {
         isBootstrapMode = false;
@@ -606,12 +609,21 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }, "StorageServiceShutdownHook");
         Runtime.getRuntime().addShutdownHook(drainOnShutdown);
 
+        prepareToJoin();
         if (Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true")))
         {
             joinTokenRing(delay);
         }
         else
         {
+            Collection<Token> tokens = SystemKeyspace.getSavedTokens();
+            if (!tokens.isEmpty())
+            {
+                tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress());
+                // order is important here, the gossiper can fire in between adding these two states.  It's ok to send TOKENS without STATUS, but *not* vice versa.
+                Gossiper.instance.addLocalApplicationState(ApplicationState.TOKENS, valueFactory.tokens(tokens));
+                Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.hibernate(true));
+            }
             logger.info("Not joining ring as requested. Use JMX (StorageService->joinRing()) to initiate ring joining");
         }
     }
@@ -621,52 +633,58 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return DatabaseDescriptor.isAutoBootstrap() && !SystemKeyspace.bootstrapComplete() && !DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress());
     }
 
-    private void joinTokenRing(int delay) throws ConfigurationException
+    private void prepareToJoin() throws ConfigurationException
     {
-        joined = true;
-
-        Collection<Token> tokens = null;
-        Map<ApplicationState, VersionedValue> appStates = new HashMap<ApplicationState, VersionedValue>();
-
-        if (DatabaseDescriptor.getReplaceTokens().size() > 0 || DatabaseDescriptor.getReplaceNode() != null)
-            throw new RuntimeException("Replace method removed; use cassandra.replace_address instead");
-        if (DatabaseDescriptor.isReplacing())
-        {
-            if (!DatabaseDescriptor.isAutoBootstrap())
-                throw new RuntimeException("Trying to replace_address with auto_bootstrap disabled will not work, check your configuration");
-            tokens = prepareReplacementInfo();
-            appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true));
-            appStates.put(ApplicationState.TOKENS, valueFactory.tokens(tokens));
-        }
-        else if (shouldBootstrap())
+        if (!joined)
         {
-            checkForEndpointCollision();
-        }
+            Map<ApplicationState, VersionedValue> appStates = new HashMap<ApplicationState, VersionedValue>();
 
-        // have to start the gossip service before we can see any info on other nodes.  this is necessary
-        // for bootstrap to get the load info it needs.
-        // (we won't be part of the storage ring though until we add a counterId to our state, below.)
-        // Seed the host ID-to-endpoint map with our own ID.
-        getTokenMetadata().updateHostId(SystemKeyspace.getLocalHostId(), FBUtilities.getBroadcastAddress());
-        appStates.put(ApplicationState.NET_VERSION, valueFactory.networkVersion());
-        appStates.put(ApplicationState.HOST_ID, valueFactory.hostId(SystemKeyspace.getLocalHostId()));
-        appStates.put(ApplicationState.RPC_ADDRESS, valueFactory.rpcaddress(DatabaseDescriptor.getRpcAddress()));
-        appStates.put(ApplicationState.RELEASE_VERSION, valueFactory.releaseVersion());
-        logger.info("Starting up server gossip");
-        Gossiper.instance.register(this);
-        Gossiper.instance.start(SystemKeyspace.incrementAndGetGeneration(), appStates); // needed for node-ring gathering.
-        // gossip snitch infos (local DC and rack)
-        gossipSnitchInfo();
-        // gossip Schema.emptyVersion forcing immediate check for schema updates (see MigrationManager#maybeScheduleSchemaPull)
-        Schema.instance.updateVersionAndAnnounce(); // Ensure we know our own actual Schema UUID in preparation for updates
+            if (DatabaseDescriptor.isReplacing() && !(Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true"))))
+                throw new ConfigurationException("Cannot set both join_ring=false and attempt to replace a node");
+            if (DatabaseDescriptor.getReplaceTokens().size() > 0 || DatabaseDescriptor.getReplaceNode() != null)
+                throw new RuntimeException("Replace method removed; use cassandra.replace_address instead");
+            if (DatabaseDescriptor.isReplacing())
+            {
+                if (!DatabaseDescriptor.isAutoBootstrap())
+                    throw new RuntimeException("Trying to replace_address with auto_bootstrap disabled will not work, check your configuration");
+                bootstrapTokens = prepareReplacementInfo();
+                appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true));
+                appStates.put(ApplicationState.TOKENS, valueFactory.tokens(bootstrapTokens));
+            }
+            else if (shouldBootstrap())
+            {
+                checkForEndpointCollision();
+            }
+            // have to start the gossip service before we can see any info on other nodes.  this is necessary
+            // for bootstrap to get the load info it needs.
+            // (we won't be part of the storage ring though until we add a counterId to our state, below.)
+            // Seed the host ID-to-endpoint map with our own ID.
+            getTokenMetadata().updateHostId(SystemKeyspace.getLocalHostId(), FBUtilities.getBroadcastAddress());
+            appStates.put(ApplicationState.NET_VERSION, valueFactory.networkVersion());
+            appStates.put(ApplicationState.HOST_ID, valueFactory.hostId(SystemKeyspace.getLocalHostId()));
+            appStates.put(ApplicationState.RPC_ADDRESS, valueFactory.rpcaddress(DatabaseDescriptor.getRpcAddress()));
+            appStates.put(ApplicationState.RELEASE_VERSION, valueFactory.releaseVersion());
+            logger.info("Starting up server gossip");
+            Gossiper.instance.register(this);
+            Gossiper.instance.start(SystemKeyspace.incrementAndGetGeneration(), appStates); // needed for node-ring gathering.
+            // gossip snitch infos (local DC and rack)
+            gossipSnitchInfo();
+            // gossip Schema.emptyVersion forcing immediate check for schema updates (see MigrationManager#maybeScheduleSchemaPull)
+            Schema.instance.updateVersionAndAnnounce(); // Ensure we know our own actual Schema UUID in preparation for updates
 
 
-        if (!MessagingService.instance().isListening())
-            MessagingService.instance().listen(FBUtilities.getLocalAddress());
-        LoadBroadcaster.instance.startBroadcasting();
+            if (!MessagingService.instance().isListening())
+                MessagingService.instance().listen(FBUtilities.getLocalAddress());
+            LoadBroadcaster.instance.startBroadcasting();
 
-        HintedHandOffManager.instance.start();
-        BatchlogManager.instance.start();
+            HintedHandOffManager.instance.start();
+            BatchlogManager.instance.start();
+        }
+    }
+
+    private void joinTokenRing(int delay) throws ConfigurationException
+    {
+        joined = true;
 
         // We bootstrap if we haven't successfully bootstrapped before, as long as we are not a seed.
         // If we are a seed, or if the user manually sets auto_bootstrap to false,
@@ -727,7 +745,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                     throw new UnsupportedOperationException(s);
                 }
                 setMode(Mode.JOINING, "getting bootstrap token", true);
-                tokens = BootStrapper.getBootstrapTokens(tokenMetadata);
+                bootstrapTokens = BootStrapper.getBootstrapTokens(tokenMetadata);
             }
             else
             {
@@ -745,7 +763,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                     }
 
                     // check for operator errors...
-                    for (Token token : tokens)
+                    for (Token token : bootstrapTokens)
                     {
                         InetAddress existing = tokenMetadata.getEndpoint(token);
                         if (existing != null)
@@ -773,46 +791,46 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                     }
 
                 }
-                setMode(Mode.JOINING, "Replacing a node with token(s): " + tokens, true);
+                setMode(Mode.JOINING, "Replacing a node with token(s): " + bootstrapTokens, true);
             }
 
-            bootstrap(tokens);
+            bootstrap(bootstrapTokens);
             assert !isBootstrapMode; // bootstrap will block until finished
         }
         else
         {
-            tokens = SystemKeyspace.getSavedTokens();
-            if (tokens.isEmpty())
+            bootstrapTokens = SystemKeyspace.getSavedTokens();
+            if (bootstrapTokens.isEmpty())
             {
                 Collection<String> initialTokens = DatabaseDescriptor.getInitialTokens();
                 if (initialTokens.size() < 1)
                 {
-                    tokens = BootStrapper.getRandomTokens(tokenMetadata, DatabaseDescriptor.getNumTokens());
+                    bootstrapTokens = BootStrapper.getRandomTokens(tokenMetadata, DatabaseDescriptor.getNumTokens());
                     if (DatabaseDescriptor.getNumTokens() == 1)
-                        logger.warn("Generated random token " + tokens + ". Random tokens will result in an unbalanced ring; see http://wiki.apache.org/cassandra/Operations");
+                        logger.warn("Generated random token " + bootstrapTokens + ". Random tokens will result in an unbalanced ring; see http://wiki.apache.org/cassandra/Operations");
                     else
-                        logger.info("Generated random tokens. tokens are {}", tokens);
+                        logger.info("Generated random tokens. tokens are {}", bootstrapTokens);
                 }
                 else
                 {
-                    tokens = new ArrayList<Token>(initialTokens.size());
+                    bootstrapTokens = new ArrayList<Token>(initialTokens.size());
                     for (String token : initialTokens)
-                        tokens.add(getPartitioner().getTokenFactory().fromString(token));
-                    logger.info("Saved tokens not found. Using configuration value: {}", tokens);
+                        bootstrapTokens.add(getPartitioner().getTokenFactory().fromString(token));
+                    logger.info("Saved tokens not found. Using configuration value: {}", bootstrapTokens);
                 }
             }
             else
             {
                 // if we were already bootstrapped with 1 token but num_tokens is set higher in the config,
                 // then we need to migrate to multi-token
-                if (tokens.size() == 1 && DatabaseDescriptor.getNumTokens() > 1)
+                if (bootstrapTokens.size() == 1 && DatabaseDescriptor.getNumTokens() > 1)
                 {
                     // wait for ring info
                     logger.info("Sleeping for ring delay (" + delay + "ms)");
                     Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS);
                     logger.info("Calculating new tokens");
                     // calculate num_tokens tokens evenly spaced in the range (left, right]
-                    Token right = tokens.iterator().next();
+                    Token right = bootstrapTokens.iterator().next();
                     TokenMetadata clone = tokenMetadata.cloneOnlyTokenMap();
                     clone.updateNormalToken(right, FBUtilities.getBroadcastAddress());
                     Token left = clone.getPredecessor(right);
@@ -842,12 +860,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                                 r = midpoint;
                             midpoint = getPartitioner().midpoint(l, r);
                         }
-                        tokens.add(midpoint);
+                        bootstrapTokens.add(midpoint);
                     }
-                    logger.info("Split previous range (" + left + ", " + right + "] into " + tokens);
+                    logger.info("Split previous range (" + left + ", " + right + "] into " + bootstrapTokens);
                 }
                 else
-                    logger.info("Using saved tokens " + tokens);
+                    logger.info("Using saved tokens " + bootstrapTokens);
             }
         }
 
@@ -862,7 +880,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         {
             // start participating in the ring.
             SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED);
-            setTokens(tokens);
+            setTokens(bootstrapTokens);
             // remove the existing info about the replaced node.
             if (!current.isEmpty())
                 for (InetAddress existing : current)