You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/07/31 22:57:07 UTC

[1/6] git commit: Merge branch 'cassandra-1.1' into trunk

Updated Branches:
  refs/heads/cassandra-1.1 953bcb345 -> 988c4132d
  refs/heads/trunk 738bd9e15 -> a5e0331c7


Merge branch 'cassandra-1.1' into trunk

Conflicts:
	src/java/org/apache/cassandra/dht/BootStrapper.java
	src/java/org/apache/cassandra/service/StorageService.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a5e0331c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a5e0331c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a5e0331c

Branch: refs/heads/trunk
Commit: a5e0331c70bbe1139f0161356211699ada0e1255
Parents: 738bd9e 988c413
Author: Brandon Williams <br...@apache.org>
Authored: Tue Jul 31 15:56:46 2012 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Tue Jul 31 15:56:46 2012 -0500

----------------------------------------------------------------------
 .../org/apache/cassandra/dht/BootStrapper.java     |   15 ++-
 .../apache/cassandra/service/StorageService.java   |   69 +++++++--------
 2 files changed, 41 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5e0331c/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/dht/BootStrapper.java
index ba14c32,8afcdee..1942be8
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@@ -15,33 -15,37 +15,37 @@@
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
 -
  package org.apache.cassandra.dht;
  
 - import java.io.IOException;
 - import java.net.InetAddress;
 - import java.util.*;
 - import java.util.concurrent.TimeUnit;
 - import java.util.concurrent.locks.Condition;
 -
 - import com.google.common.base.Charsets;
 - import org.apache.cassandra.config.Schema;
 - import org.apache.cassandra.gms.*;
 -
 - import org.apache.commons.lang.ArrayUtils;
 - import org.slf4j.Logger;
 - import org.slf4j.LoggerFactory;
 -
 - import org.apache.cassandra.config.ConfigurationException;
 - import org.apache.cassandra.config.DatabaseDescriptor;
 - import org.apache.cassandra.db.Table;
 - import org.apache.cassandra.locator.AbstractReplicationStrategy;
 - import org.apache.cassandra.locator.TokenMetadata;
 - import org.apache.cassandra.net.IAsyncCallback;
 - import org.apache.cassandra.net.IVerbHandler;
 - import org.apache.cassandra.net.Message;
 - import org.apache.cassandra.net.MessagingService;
 - import org.apache.cassandra.service.StorageService;
 - import org.apache.cassandra.streaming.OperationType;
 - import org.apache.cassandra.utils.FBUtilities;
 - import org.apache.cassandra.utils.SimpleCondition;
 -
 +import java.io.DataInput;
 +import java.io.DataOutput;
 +import java.io.IOException;
 +import java.net.InetAddress;
 +import java.util.*;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.locks.Condition;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
++import org.apache.cassandra.config.Schema;
++import org.apache.cassandra.gms.*;
 +
 +import org.apache.cassandra.config.ConfigurationException;
 +import org.apache.cassandra.config.DatabaseDescriptor;
- import org.apache.cassandra.config.Schema;
 +import org.apache.cassandra.db.Table;
- import org.apache.cassandra.db.TypeSizes;
- import org.apache.cassandra.gms.FailureDetector;
- import org.apache.cassandra.io.IVersionedSerializer;
 +import org.apache.cassandra.locator.AbstractReplicationStrategy;
 +import org.apache.cassandra.locator.TokenMetadata;
- import org.apache.cassandra.net.*;
++import org.apache.cassandra.net.IAsyncCallback;
++import org.apache.cassandra.net.IVerbHandler;
++import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.streaming.OperationType;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.SimpleCondition;
++import org.apache.cassandra.db.TypeSizes;
++import org.apache.cassandra.gms.FailureDetector;
++import org.apache.cassandra.io.IVersionedSerializer;
++import org.apache.cassandra.net.*;
  
  public class BootStrapper
  {
@@@ -83,49 -87,40 +87,50 @@@
      }
  
      /**
 -     * if initialtoken was specified, use that.
 -     * otherwise, pick a token to assume half the load of the most-loaded node.
 +     * if initialtoken was specified, use that (split on comma).
 +     * otherwise, if num_tokens == 1, pick a token to assume half the load of the most-loaded node.
 +     * else choose num_tokens tokens at random
       */
 -    public static Token getBootstrapToken(final TokenMetadata metadata, final Map<InetAddress, Double> load) throws IOException, ConfigurationException
 +    public static Collection<Token> getBootstrapTokens(final TokenMetadata metadata, Map<InetAddress, Double> load) throws IOException, ConfigurationException
      {
 -        // if user specified a token, use that
 -        if (DatabaseDescriptor.getInitialToken() != null)
 +        Collection<String> initialTokens = DatabaseDescriptor.getInitialTokens();
++        // if user specified tokens, use those
 +        if (initialTokens.size() > 0)
          {
 -            logger.debug("token manually specified as " + DatabaseDescriptor.getInitialToken());
 -            Token token = StorageService.getPartitioner().getTokenFactory().fromString(DatabaseDescriptor.getInitialToken());
 -            if (metadata.getEndpoint(token) != null)
 -                throw new ConfigurationException("Bootstraping to existing token " + token + " is not allowed (decommission/removetoken the old node first).");
 -            return token;
 -        }
 -
 -        // if there is a schema, then we're joining an existing cluster so get a "balanced" token
 -        for (Map.Entry<InetAddress, EndpointState> entry : Gossiper.instance.getEndpointStates())
 -        {
 -            if (entry.getKey().equals(FBUtilities.getBroadcastAddress()))
 +            logger.debug("tokens manually specified as {}",  initialTokens);
 +            List<Token> tokens = new ArrayList<Token>();
 +            for (String tokenString : initialTokens)
              {
 -                // skip ourselves to avoid confusing the tests, which always load a schema first thing
 -                continue;
 +                Token token = StorageService.getPartitioner().getTokenFactory().fromString(tokenString);
 +                if (metadata.getEndpoint(token) != null)
 +                    throw new ConfigurationException("Bootstraping to existing token " + tokenString + " is not allowed (decommission/removetoken the old node first).");
 +                tokens.add(token);
              }
 -
 -            VersionedValue schemaValue = entry.getValue().getApplicationState(ApplicationState.SCHEMA);
 -            if (schemaValue != null && !schemaValue.value.equals(Schema.emptyVersion.toString()))
 -                return getBalancedToken(metadata, load);
 +            return tokens;
          }
  
 -        // no schema; pick a random token (so multiple non-seeds starting up simultaneously in a new cluster
 -        // don't get the same token; see CASSANDRA-3219)
 -        return StorageService.getPartitioner().getRandomToken();
 +        int numTokens = DatabaseDescriptor.getNumTokens();
 +        if (numTokens < 1)
 +            throw new ConfigurationException("num_tokens must be >= 1");
 +        if (numTokens == 1)
 +            return Collections.singleton(getBalancedToken(metadata, load));
 +
 +        return getRandomTokens(metadata, numTokens);
 +    }
 +
 +    public static Collection<Token> getRandomTokens(TokenMetadata metadata, int numTokens)
 +    {
 +        Set<Token> tokens = new HashSet<Token>(numTokens);
 +        while (tokens.size() < numTokens)
 +        {
 +            Token token = StorageService.getPartitioner().getRandomToken();
 +            if (metadata.getEndpoint(token) == null)
 +                tokens.add(token);
 +        }
 +        return tokens;
      }
  
 +    @Deprecated
      public static Token getBalancedToken(TokenMetadata metadata, Map<InetAddress, Double> load)
      {
          InetAddress maxEndpoint = getBootstrapSource(metadata, load);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5e0331c/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 7a77d02,b1eaa1e..8abd3e8
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -510,55 -557,48 +510,48 @@@ public class StorageService implements 
  
          HintedHandOffManager.instance.start();
  
-         boolean schemaPresent = false;
-         if (DatabaseDescriptor.isAutoBootstrap() && !SystemTable.bootstrapComplete() && delay > 0)
-         {
-             // wait a couple gossip rounds so our schema check has something to go by
-             FBUtilities.sleep(2 * Gossiper.intervalInMillis);
-         }
-         for (Map.Entry<InetAddress, EndpointState> entry : Gossiper.instance.getEndpointStates())
-         {
-             if (entry.getKey().equals(FBUtilities.getBroadcastAddress()))
-             {
-                 // skip ourselves to avoid confusing the tests, which always load a schema first thing
-                 continue;
-             }
- 
-             VersionedValue schemaValue = entry.getValue().getApplicationState(ApplicationState.SCHEMA);
-             if (schemaValue != null && !schemaValue.value.equals(Schema.emptyVersion.toString()))
-             {
-                 schemaPresent = true;
-                 break;
-             }
-         }
- 
-         // We can bootstrap at startup, or if we detect a previous attempt that failed.  Either way, if the user
-         // manually sets auto_bootstrap to false, we'll skip streaming data from other nodes and jump directly
-         // into the ring.
+         // We bootstrap if we haven't successfully bootstrapped before, as long as we are not a seed.
+         // If we are a seed, or if the user manually sets auto_bootstrap to false,
+         // we'll skip streaming data from other nodes and jump directly into the ring.
+         //
+         // The seed check allows us to skip the RING_DELAY sleep for the single-node cluster case,
+         // which is useful for both new users and testing.
          //
-         // The one exception is if after the above sleep we still have no schema information, we'll assume
-         // we're part of a fresh cluster start, and also skip bootstrap.  This is less confusing for new users,
-         // as well as avoiding the nonsensical state of trying to stream from cluster with no active peers.
+         // We attempted to replace this with a schema-presence check, but you need a meaningful sleep
+         // to get schema info from gossip which defeats the purpose.  See CASSANDRA-4427 for the gory details.
 -        Token<?> token;
 -        InetAddress current = null;
 -        logger_.debug("Bootstrap variables: {} {} {} {}",
 +        Set<InetAddress> current = new HashSet<InetAddress>();
 +        Collection<Token> tokens;
-         logger.debug(String.format("Bootstrap variables: %s %s %s %s",
-                       DatabaseDescriptor.isAutoBootstrap(), SystemTable.bootstrapInProgress(), SystemTable.bootstrapComplete(), schemaPresent));
++        logger.debug("Bootstrap variables: {} {} {} {}",
+                       new Object[]{ DatabaseDescriptor.isAutoBootstrap(),
+                                     SystemTable.bootstrapInProgress(),
+                                     SystemTable.bootstrapComplete(),
+                                     DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress())});
          if (DatabaseDescriptor.isAutoBootstrap()
-             && (SystemTable.bootstrapInProgress() || (!SystemTable.bootstrapComplete() && schemaPresent)))
+             && !SystemTable.bootstrapComplete()
+             && !DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress()))
          {
              if (SystemTable.bootstrapInProgress())
 -                logger_.warn("Detected previous bootstrap failure; retrying");
 +                logger.warn("Detected previous bootstrap failure; retrying");
              else
                  SystemTable.setBootstrapState(SystemTable.BootstrapState.IN_PROGRESS);
              setMode(Mode.JOINING, "waiting for ring information", true);
              // first sleep the delay to make sure we see all our peers
-             try
+             for (int i = 0; i < delay; i += 1000)
              {
-                 Thread.sleep(delay);
-             }
-             catch (InterruptedException e)
-             {
-                 throw new AssertionError(e);
+                 // if we see schema, we can proceed to the next check directly
+                 if (!Schema.instance.getVersion().equals(Schema.emptyVersion))
+                 {
 -                    logger_.debug("got schema: {}", Schema.instance.getVersion());
++                    logger.debug("got schema: {}", Schema.instance.getVersion());
+                     break;
+                 }
+                 try
+                 {
+                     Thread.sleep(1000);
+                 }
+                 catch (InterruptedException e)
+                 {
+                     throw new AssertionError(e);
+                 }
              }
              // if our schema hasn't matched yet, keep sleeping until it does
              // (post CASSANDRA-1391 we don't expect this to be necessary very often, but it doesn't hurt to be careful)