You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/07/31 22:57:07 UTC
[1/6] git commit: Merge branch 'cassandra-1.1' into trunk
Updated Branches:
refs/heads/cassandra-1.1 953bcb345 -> 988c4132d
refs/heads/trunk 738bd9e15 -> a5e0331c7
Merge branch 'cassandra-1.1' into trunk
Conflicts:
src/java/org/apache/cassandra/dht/BootStrapper.java
src/java/org/apache/cassandra/service/StorageService.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a5e0331c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a5e0331c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a5e0331c
Branch: refs/heads/trunk
Commit: a5e0331c70bbe1139f0161356211699ada0e1255
Parents: 738bd9e 988c413
Author: Brandon Williams <br...@apache.org>
Authored: Tue Jul 31 15:56:46 2012 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Tue Jul 31 15:56:46 2012 -0500
----------------------------------------------------------------------
.../org/apache/cassandra/dht/BootStrapper.java | 15 ++-
.../apache/cassandra/service/StorageService.java | 69 +++++++--------
2 files changed, 41 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5e0331c/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/dht/BootStrapper.java
index ba14c32,8afcdee..1942be8
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@@ -15,33 -15,37 +15,37 @@@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.cassandra.dht;
- import java.io.IOException;
- import java.net.InetAddress;
- import java.util.*;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.locks.Condition;
-
- import com.google.common.base.Charsets;
- import org.apache.cassandra.config.Schema;
- import org.apache.cassandra.gms.*;
-
- import org.apache.commons.lang.ArrayUtils;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import org.apache.cassandra.config.ConfigurationException;
- import org.apache.cassandra.config.DatabaseDescriptor;
- import org.apache.cassandra.db.Table;
- import org.apache.cassandra.locator.AbstractReplicationStrategy;
- import org.apache.cassandra.locator.TokenMetadata;
- import org.apache.cassandra.net.IAsyncCallback;
- import org.apache.cassandra.net.IVerbHandler;
- import org.apache.cassandra.net.Message;
- import org.apache.cassandra.net.MessagingService;
- import org.apache.cassandra.service.StorageService;
- import org.apache.cassandra.streaming.OperationType;
- import org.apache.cassandra.utils.FBUtilities;
- import org.apache.cassandra.utils.SimpleCondition;
-
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
++import org.apache.cassandra.config.Schema;
++import org.apache.cassandra.gms.*;
+
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.config.DatabaseDescriptor;
- import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.Table;
- import org.apache.cassandra.db.TypeSizes;
- import org.apache.cassandra.gms.FailureDetector;
- import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.TokenMetadata;
- import org.apache.cassandra.net.*;
++import org.apache.cassandra.net.IAsyncCallback;
++import org.apache.cassandra.net.IVerbHandler;
++import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.OperationType;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.SimpleCondition;
++import org.apache.cassandra.db.TypeSizes;
++import org.apache.cassandra.gms.FailureDetector;
++import org.apache.cassandra.io.IVersionedSerializer;
++import org.apache.cassandra.net.*;
public class BootStrapper
{
@@@ -83,49 -87,40 +87,50 @@@
}
/**
- * if initialtoken was specified, use that.
- * otherwise, pick a token to assume half the load of the most-loaded node.
+ * if initialtoken was specified, use that (split on comma).
+ * otherwise, if num_tokens == 1, pick a token to assume half the load of the most-loaded node.
+ * else choose num_tokens tokens at random
*/
- public static Token getBootstrapToken(final TokenMetadata metadata, final Map<InetAddress, Double> load) throws IOException, ConfigurationException
+ public static Collection<Token> getBootstrapTokens(final TokenMetadata metadata, Map<InetAddress, Double> load) throws IOException, ConfigurationException
{
- // if user specified a token, use that
- if (DatabaseDescriptor.getInitialToken() != null)
+ Collection<String> initialTokens = DatabaseDescriptor.getInitialTokens();
++ // if user specified tokens, use those
+ if (initialTokens.size() > 0)
{
- logger.debug("token manually specified as " + DatabaseDescriptor.getInitialToken());
- Token token = StorageService.getPartitioner().getTokenFactory().fromString(DatabaseDescriptor.getInitialToken());
- if (metadata.getEndpoint(token) != null)
- throw new ConfigurationException("Bootstraping to existing token " + token + " is not allowed (decommission/removetoken the old node first).");
- return token;
- }
-
- // if there is a schema, then we're joining an existing cluster so get a "balanced" token
- for (Map.Entry<InetAddress, EndpointState> entry : Gossiper.instance.getEndpointStates())
- {
- if (entry.getKey().equals(FBUtilities.getBroadcastAddress()))
+ logger.debug("tokens manually specified as {}", initialTokens);
+ List<Token> tokens = new ArrayList<Token>();
+ for (String tokenString : initialTokens)
{
- // skip ourselves to avoid confusing the tests, which always load a schema first thing
- continue;
+ Token token = StorageService.getPartitioner().getTokenFactory().fromString(tokenString);
+ if (metadata.getEndpoint(token) != null)
+ throw new ConfigurationException("Bootstraping to existing token " + tokenString + " is not allowed (decommission/removetoken the old node first).");
+ tokens.add(token);
}
-
- VersionedValue schemaValue = entry.getValue().getApplicationState(ApplicationState.SCHEMA);
- if (schemaValue != null && !schemaValue.value.equals(Schema.emptyVersion.toString()))
- return getBalancedToken(metadata, load);
+ return tokens;
}
- // no schema; pick a random token (so multiple non-seeds starting up simultaneously in a new cluster
- // don't get the same token; see CASSANDRA-3219)
- return StorageService.getPartitioner().getRandomToken();
+ int numTokens = DatabaseDescriptor.getNumTokens();
+ if (numTokens < 1)
+ throw new ConfigurationException("num_tokens must be >= 1");
+ if (numTokens == 1)
+ return Collections.singleton(getBalancedToken(metadata, load));
+
+ return getRandomTokens(metadata, numTokens);
+ }
+
+ public static Collection<Token> getRandomTokens(TokenMetadata metadata, int numTokens)
+ {
+ Set<Token> tokens = new HashSet<Token>(numTokens);
+ while (tokens.size() < numTokens)
+ {
+ Token token = StorageService.getPartitioner().getRandomToken();
+ if (metadata.getEndpoint(token) == null)
+ tokens.add(token);
+ }
+ return tokens;
}
+ @Deprecated
public static Token getBalancedToken(TokenMetadata metadata, Map<InetAddress, Double> load)
{
InetAddress maxEndpoint = getBootstrapSource(metadata, load);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5e0331c/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 7a77d02,b1eaa1e..8abd3e8
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -510,55 -557,48 +510,48 @@@ public class StorageService implements
HintedHandOffManager.instance.start();
- boolean schemaPresent = false;
- if (DatabaseDescriptor.isAutoBootstrap() && !SystemTable.bootstrapComplete() && delay > 0)
- {
- // wait a couple gossip rounds so our schema check has something to go by
- FBUtilities.sleep(2 * Gossiper.intervalInMillis);
- }
- for (Map.Entry<InetAddress, EndpointState> entry : Gossiper.instance.getEndpointStates())
- {
- if (entry.getKey().equals(FBUtilities.getBroadcastAddress()))
- {
- // skip ourselves to avoid confusing the tests, which always load a schema first thing
- continue;
- }
-
- VersionedValue schemaValue = entry.getValue().getApplicationState(ApplicationState.SCHEMA);
- if (schemaValue != null && !schemaValue.value.equals(Schema.emptyVersion.toString()))
- {
- schemaPresent = true;
- break;
- }
- }
-
- // We can bootstrap at startup, or if we detect a previous attempt that failed. Either way, if the user
- // manually sets auto_bootstrap to false, we'll skip streaming data from other nodes and jump directly
- // into the ring.
+ // We bootstrap if we haven't successfully bootstrapped before, as long as we are not a seed.
+ // If we are a seed, or if the user manually sets auto_bootstrap to false,
+ // we'll skip streaming data from other nodes and jump directly into the ring.
+ //
+ // The seed check allows us to skip the RING_DELAY sleep for the single-node cluster case,
+ // which is useful for both new users and testing.
//
- // The one exception is if after the above sleep we still have no schema information, we'll assume
- // we're part of a fresh cluster start, and also skip bootstrap. This is less confusing for new users,
- // as well as avoiding the nonsensical state of trying to stream from cluster with no active peers.
+ // We attempted to replace this with a schema-presence check, but you need a meaningful sleep
+ // to get schema info from gossip which defeats the purpose. See CASSANDRA-4427 for the gory details.
- Token<?> token;
- InetAddress current = null;
- logger_.debug("Bootstrap variables: {} {} {} {}",
+ Set<InetAddress> current = new HashSet<InetAddress>();
+ Collection<Token> tokens;
- logger.debug(String.format("Bootstrap variables: %s %s %s %s",
- DatabaseDescriptor.isAutoBootstrap(), SystemTable.bootstrapInProgress(), SystemTable.bootstrapComplete(), schemaPresent));
++ logger.debug("Bootstrap variables: {} {} {} {}",
+ new Object[]{ DatabaseDescriptor.isAutoBootstrap(),
+ SystemTable.bootstrapInProgress(),
+ SystemTable.bootstrapComplete(),
+ DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress())});
if (DatabaseDescriptor.isAutoBootstrap()
- && (SystemTable.bootstrapInProgress() || (!SystemTable.bootstrapComplete() && schemaPresent)))
+ && !SystemTable.bootstrapComplete()
+ && !DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress()))
{
if (SystemTable.bootstrapInProgress())
- logger_.warn("Detected previous bootstrap failure; retrying");
+ logger.warn("Detected previous bootstrap failure; retrying");
else
SystemTable.setBootstrapState(SystemTable.BootstrapState.IN_PROGRESS);
setMode(Mode.JOINING, "waiting for ring information", true);
// first sleep the delay to make sure we see all our peers
- try
+ for (int i = 0; i < delay; i += 1000)
{
- Thread.sleep(delay);
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
+ // if we see schema, we can proceed to the next check directly
+ if (!Schema.instance.getVersion().equals(Schema.emptyVersion))
+ {
- logger_.debug("got schema: {}", Schema.instance.getVersion());
++ logger.debug("got schema: {}", Schema.instance.getVersion());
+ break;
+ }
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
}
// if our schema hasn't matched yet, keep sleeping until it does
// (post CASSANDRA-1391 we don't expect this to be necessary very often, but it doesn't hurt to be careful)