You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/09/04 17:43:06 UTC
git commit: Binary encoding of tokens. Also promotes new gossip
states: HOST_ID and TOKENS Patch by brandonwilliams,
reviewed by eevans for CASSANDRA-4383
Updated Branches:
refs/heads/trunk cf5a31ff9 -> b475bc69b
Binary encoding of tokens.
Also promotes new gossip states: HOST_ID and TOKENS
Patch by brandonwilliams, reviewed by eevans for CASSANDRA-4383
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b475bc69
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b475bc69
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b475bc69
Branch: refs/heads/trunk
Commit: b475bc69b7e6d08e6d12527578552e67e8c0f88a
Parents: cf5a31f
Author: Brandon Williams <br...@apache.org>
Authored: Tue Sep 4 10:41:58 2012 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Tue Sep 4 10:41:58 2012 -0500
----------------------------------------------------------------------
.../org/apache/cassandra/gms/ApplicationState.java | 2 +
.../org/apache/cassandra/gms/FailureDetector.java | 4 +
src/java/org/apache/cassandra/gms/Gossiper.java | 28 ++++-
.../org/apache/cassandra/gms/TokenSerializer.java | 68 ++++++++++
.../org/apache/cassandra/gms/VersionedValue.java | 44 +++++--
.../apache/cassandra/service/StorageService.java | 105 +++++++--------
test/unit/org/apache/cassandra/Util.java | 5 +-
.../org/apache/cassandra/dht/BootStrapperTest.java | 12 +-
.../apache/cassandra/gms/SerializationsTest.java | 3 +-
.../service/AntiEntropyServiceTestAbstract.java | 2 +-
.../cassandra/service/LeaveAndBootstrapTest.java | 46 ++++---
.../org/apache/cassandra/service/MoveTest.java | 22 ++--
.../org/apache/cassandra/service/RemoveTest.java | 4 -
13 files changed, 234 insertions(+), 111 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b475bc69/src/java/org/apache/cassandra/gms/ApplicationState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/ApplicationState.java b/src/java/org/apache/cassandra/gms/ApplicationState.java
index f23a6fc..777dfc5 100644
--- a/src/java/org/apache/cassandra/gms/ApplicationState.java
+++ b/src/java/org/apache/cassandra/gms/ApplicationState.java
@@ -31,6 +31,8 @@ public enum ApplicationState
X_11_PADDING, // padding specifically for 1.1
SEVERITY,
NET_VERSION,
+ HOST_ID,
+ TOKENS,
// pad to allow adding new states to existing cluster
X1,
X2,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b475bc69/src/java/org/apache/cassandra/gms/FailureDetector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java b/src/java/org/apache/cassandra/gms/FailureDetector.java
index c58a559..2b3905a 100644
--- a/src/java/org/apache/cassandra/gms/FailureDetector.java
+++ b/src/java/org/apache/cassandra/gms/FailureDetector.java
@@ -101,7 +101,11 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean
private void appendEndpointState(StringBuilder sb, EndpointState endpointState)
{
for (Map.Entry<ApplicationState, VersionedValue> state : endpointState.applicationState.entrySet())
+ {
+ if (state.getKey() == ApplicationState.TOKENS)
+ continue;
sb.append(" ").append(state.getKey()).append(":").append(state.getValue().value).append("\n");
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b475bc69/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index a771197..2404e40 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -618,6 +618,22 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
return endpointStateMap.entrySet();
}
+ public boolean usesHostId(InetAddress endpoint)
+ {
+ if (MessagingService.instance().knowsVersion(endpoint) && MessagingService.instance().getVersion(endpoint) >= MessagingService.VERSION_12)
+ return true;
+ else if (getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.NET_VERSION) != null && Integer.valueOf(getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.NET_VERSION).value) >= MessagingService.VERSION_12)
+ return true;
+ return false;
+ }
+
+ public UUID getHostId(InetAddress endpoint)
+ {
+ if (!usesHostId(endpoint))
+ throw new RuntimeException("Host " + endpoint + " does not use new-style tokens!");
+ return UUID.fromString(getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.HOST_ID).value);
+ }
+
EndpointState getStateForVersionBiggerThan(InetAddress forEndpoint, int version)
{
EndpointState epState = endpointStateMap.get(forEndpoint);
@@ -1075,7 +1091,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
/**
* This should *only* be used for testing purposes.
*/
- public void initializeNodeUnsafe(InetAddress addr, int generationNbr) {
+ public void initializeNodeUnsafe(InetAddress addr, UUID uuid, int generationNbr) {
/* initialize the heartbeat state for this localEndpoint */
EndpointState localState = endpointStateMap.get(addr);
if ( localState == null )
@@ -1087,6 +1103,16 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
}
// always add the version state
localState.addApplicationState(ApplicationState.NET_VERSION, StorageService.instance.valueFactory.networkVersion());
+ localState.addApplicationState(ApplicationState.HOST_ID, StorageService.instance.valueFactory.hostId(uuid));
+ }
+
+ /**
+ * This should *only* be used for testing purposes
+ */
+ public void injectApplicationState(InetAddress endpoint, ApplicationState state, VersionedValue value)
+ {
+ EndpointState localState = endpointStateMap.get(endpoint);
+ localState.addApplicationState(state, value);
}
public long getEndpointDowntime(String address) throws UnknownHostException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b475bc69/src/java/org/apache/cassandra/gms/TokenSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/TokenSerializer.java b/src/java/org/apache/cassandra/gms/TokenSerializer.java
new file mode 100644
index 0000000..b55967c
--- /dev/null
+++ b/src/java/org/apache/cassandra/gms/TokenSerializer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.gms;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.ISerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+
+
+public class TokenSerializer
+{
+ private static final Logger logger = LoggerFactory.getLogger(TokenSerializer.class);
+
+ public static void serialize(IPartitioner partitioner, Collection<Token> tokens, DataOutput dos) throws IOException
+ {
+ for (Token<?> token : tokens)
+ {
+ byte[] bintoken = partitioner.getTokenFactory().toByteArray(token).array();
+ dos.writeInt(bintoken.length);
+ dos.write(bintoken);
+ }
+ dos.writeInt(0);
+ }
+
+ public static Collection<Token> deserialize(IPartitioner partitioner, DataInput dis) throws IOException
+ {
+ Collection<Token> tokens = new ArrayList<Token>();
+ while (true)
+ {
+ int size = dis.readInt();
+ if (size < 1)
+ break;
+ logger.trace("Reading token of {} bytes", size);
+ byte[] bintoken = new byte[size];
+ dis.readFully(bintoken);
+ tokens.add(partitioner.getTokenFactory().fromByteArray(ByteBuffer.wrap(bintoken)));
+ }
+ return tokens;
+ }
+
+ public static long serializedSize(Collection<Token> tokens, TypeSizes typeSizes)
+ {
+ throw new UnsupportedOperationException();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b475bc69/src/java/org/apache/cassandra/gms/VersionedValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java
index f3d5541..92c79eb 100644
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@ -18,12 +18,16 @@
package org.apache.cassandra.gms;
import java.io.*;
+
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
+import com.google.common.collect.Iterables;
+import static com.google.common.base.Charsets.ISO_8859_1;
+
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Token;
@@ -47,6 +51,7 @@ import org.apache.commons.lang.StringUtils;
public class VersionedValue implements Comparable<VersionedValue>
{
+
public static final IVersionedSerializer<VersionedValue> serializer = new VersionedValueSerializer();
// this must be a char that cannot be present in any token
@@ -67,8 +72,6 @@ public class VersionedValue implements Comparable<VersionedValue>
// values for ApplicationState.REMOVAL_COORDINATOR
public final static String REMOVAL_COORDINATOR = "REMOVER";
- // network proto version from MS
- public final static String NET_VERSION = "NET_VERSION";
public final int version;
public final String value;
@@ -110,26 +113,21 @@ public class VersionedValue implements Comparable<VersionedValue>
this.partitioner = partitioner;
}
- public VersionedValue bootstrapping(Collection<Token> tokens, UUID hostId)
+ public VersionedValue bootstrapping(Collection<Token> tokens)
{
return new VersionedValue(versionString(VersionedValue.STATUS_BOOTSTRAPPING,
- hostId.toString(),
makeTokenString(tokens)));
}
- public VersionedValue normal(Collection<Token> tokens, UUID hostId)
+ public VersionedValue normal(Collection<Token> tokens)
{
return new VersionedValue(versionString(VersionedValue.STATUS_NORMAL,
- hostId.toString(),
makeTokenString(tokens)));
}
private String makeTokenString(Collection<Token> tokens)
{
- List<String> tokenStrings = new ArrayList<String>();
- for (Token<?> token : tokens)
- tokenStrings.add(partitioner.getTokenFactory().toString(token));
- return StringUtils.join(tokenStrings, VersionedValue.DELIMITER);
+ return partitioner.getTokenFactory().toString(Iterables.get(tokens, 0));
}
public VersionedValue load(double load)
@@ -145,14 +143,14 @@ public class VersionedValue implements Comparable<VersionedValue>
public VersionedValue leaving(Collection<Token> tokens)
{
return new VersionedValue(versionString(VersionedValue.STATUS_LEAVING,
- makeTokenString(tokens)));
+ makeTokenString(tokens)));
}
public VersionedValue left(Collection<Token> tokens, long expireTime)
{
return new VersionedValue(versionString(VersionedValue.STATUS_LEFT,
- Long.toString(expireTime),
- makeTokenString(tokens)));
+ Long.toString(expireTime),
+ makeTokenString(tokens)));
}
public VersionedValue moving(Token token)
@@ -160,6 +158,26 @@ public class VersionedValue implements Comparable<VersionedValue>
return new VersionedValue(VersionedValue.STATUS_MOVING + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
}
+ public VersionedValue hostId(UUID hostId)
+ {
+ return new VersionedValue(hostId.toString());
+ }
+
+ public VersionedValue tokens(Collection<Token> tokens)
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ try
+ {
+ TokenSerializer.serialize(partitioner, tokens, dos);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ return new VersionedValue(new String(bos.toByteArray(), ISO_8859_1));
+ }
+
public VersionedValue removingNonlocal(UUID hostId)
{
return new VersionedValue(versionString(VersionedValue.REMOVING_TOKEN, hostId.toString()));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b475bc69/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 0c6e7df..4f06f1c 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -17,6 +17,8 @@
*/
package org.apache.cassandra.service;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
@@ -29,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import static com.google.common.base.Charsets.ISO_8859_1;
import com.google.common.collect.*;
import org.apache.log4j.Level;
import org.apache.commons.lang.StringUtils;
@@ -180,8 +183,9 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
logger.debug("Setting tokens to {}", tokens);
SystemTable.updateTokens(tokens);
tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress());
- Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS,
- valueFactory.normal(getLocalTokens(), SystemTable.getLocalHostId()));
+ // order is important here, the gossiper can fire in between adding these two states. It's ok to send TOKENS without STATUS, but *not* vice versa.
+ Gossiper.instance.addLocalApplicationState(ApplicationState.TOKENS, valueFactory.tokens(getLocalTokens()));
+ Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.normal(getLocalTokens()));
setMode(Mode.NORMAL, false);
}
@@ -495,6 +499,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
Gossiper.instance.start(SystemTable.incrementAndGetGeneration()); // needed for node-ring gathering.
// gossip network proto version
Gossiper.instance.addLocalApplicationState(ApplicationState.NET_VERSION, valueFactory.networkVersion());
+ Gossiper.instance.addLocalApplicationState(ApplicationState.HOST_ID, valueFactory.hostId(SystemTable.getLocalHostId()));
// gossip schema version when gossiper is running
Schema.instance.updateVersionAndAnnounce();
// add rpc listening info
@@ -803,8 +808,10 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
if (0 == DatabaseDescriptor.getReplaceTokens().size())
{
// if not an existing token then bootstrap
+ // order is important here, the gossiper can fire in between adding these two states. It's ok to send TOKENS without STATUS, but *not* vice versa.
+ Gossiper.instance.addLocalApplicationState(ApplicationState.TOKENS, valueFactory.tokens(tokens));
Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS,
- valueFactory.bootstrapping(tokens, SystemTable.getLocalHostId()));
+ valueFactory.bootstrapping(tokens));
setMode(Mode.JOINING, "sleeping " + RING_DELAY + " ms for pending range setup", true);
try
{
@@ -1105,19 +1112,22 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
}
}
- /**
- * Checks MS for the version, provided MS _really_ knows it (has directly communicated with the node) otherwise falls back to checking the gossipped version (learned about this node indirectly)
- * If both fail, the node is too old to use hostid-style status serialization
- * @param endpoint
- * @return boolean whether or not to use hostid
- */
- private boolean usesHostId(InetAddress endpoint)
+ private byte[] getApplicationStateValue(InetAddress endpoint, ApplicationState appstate)
{
- if (MessagingService.instance().knowsVersion(endpoint) && MessagingService.instance().getVersion(endpoint) >= MessagingService.VERSION_12)
- return true;
- else if (Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.NET_VERSION) != null && Integer.valueOf(Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.NET_VERSION).value) >= MessagingService.VERSION_12)
- return true;
- return false;
+ String vvalue = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(appstate).value;
+ return vvalue.getBytes(ISO_8859_1);
+ }
+
+ private Collection<Token> getTokensFor(InetAddress endpoint)
+ {
+ try
+ {
+ return TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(getApplicationStateValue(endpoint, ApplicationState.TOKENS))));
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
}
/**
@@ -1132,18 +1142,13 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
// Parse versioned values according to end-point version:
// versions < 1.2 .....: STATUS,TOKEN
- // versions >= 1.2 .....: STATUS,HOST_ID,TOKEN,TOKEN,...
- int tokenPos;
- if (usesHostId(endpoint))
- {
- assert pieces.length >= 3;
- tokenPos = 2;
- }
- else tokenPos = 1;
-
- Collection<Token> tokens = new ArrayList<Token>();
- for (int i = tokenPos; i < pieces.length; ++i)
- tokens.add(getPartitioner().getTokenFactory().fromString(pieces[i]));
+ // versions >= 1.2 .....: use TOKENS app state
+ Collection<Token> tokens;
+ // explicitly check for TOKENS, because a bootstrapping node might be bootstrapping in legacy mode; that is, not using vnodes and no token specified
+ if (Gossiper.instance.usesHostId(endpoint) && Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.TOKENS) != null)
+ tokens = getTokensFor(endpoint);
+ else
+ tokens = Arrays.asList(getPartitioner().getTokenFactory().fromString(pieces[1]));
if (logger.isDebugEnabled())
logger.debug("Node " + endpoint + " state bootstrapping, token " + tokens);
@@ -1166,8 +1171,8 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
tokenMetadata.addBootstrapTokens(tokens, endpoint);
calculatePendingRanges();
- if (usesHostId(endpoint))
- tokenMetadata.updateHostId(UUID.fromString(pieces[1]), endpoint);
+ if (Gossiper.instance.usesHostId(endpoint))
+ tokenMetadata.updateHostId(Gossiper.instance.getHostId(endpoint), endpoint);
}
/**
@@ -1183,20 +1188,14 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
// Parse versioned values according to end-point version:
// versions < 1.2 .....: STATUS,TOKEN
- // versions >= 1.2 .....: STATUS,HOST_ID,TOKEN,TOKEN,...
- int tokensPos;
- if (usesHostId(endpoint))
- {
- assert pieces.length >= 3;
- tokensPos = 2;
- }
- else
- tokensPos = 1;
- logger.debug("Using token position {} for {}", tokensPos, endpoint);
+ // versions >= 1.2 .....: uses HOST_ID/TOKENS app states
+
+ Collection<Token> tokens;
- Collection<Token> tokens = new ArrayList<Token>();
- for (int i = tokensPos; i < pieces.length; ++i)
- tokens.add(getPartitioner().getTokenFactory().fromString(pieces[i]));
+ if (Gossiper.instance.usesHostId(endpoint))
+ tokens = getTokensFor(endpoint);
+ else
+ tokens = Arrays.asList(getPartitioner().getTokenFactory().fromString(pieces[1]));
if (logger.isDebugEnabled())
logger.debug("Node " + endpoint + " state normal, token " + tokens);
@@ -1205,8 +1204,8 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
logger.info("Node " + endpoint + " state jump to normal");
// Order Matters, TM.updateHostID() should be called before TM.updateNormalToken(), (see CASSANDRA-4300).
- if (usesHostId(endpoint))
- tokenMetadata.updateHostId(UUID.fromString(pieces[1]), endpoint);
+ if (Gossiper.instance.usesHostId(endpoint))
+ tokenMetadata.updateHostId(Gossiper.instance.getHostId(endpoint), endpoint);
Set<Token> tokensToUpdateInMetadata = new HashSet<Token>();
Set<Token> tokensToUpdateInSystemTable = new HashSet<Token>();
@@ -1278,9 +1277,11 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
private void handleStateLeaving(InetAddress endpoint, String[] pieces)
{
assert pieces.length >= 2;
- Collection<Token> tokens = new ArrayList<Token>();
- for (int i = 1; i < pieces.length; ++i)
- tokens.add(getPartitioner().getTokenFactory().fromString(pieces[i]));
+ Collection<Token> tokens;
+ if (Gossiper.instance.usesHostId(endpoint))
+ tokens = getTokensFor(endpoint);
+ else
+ tokens = Arrays.asList(getPartitioner().getTokenFactory().fromString(pieces[1]));
if (logger.isDebugEnabled())
logger.debug("Node " + endpoint + " state leaving, tokens " + tokens);
@@ -1314,16 +1315,12 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
private void handleStateLeft(InetAddress endpoint, String[] pieces)
{
assert pieces.length >= 2;
- Collection<Token> tokens = null;
+ Collection<Token> tokens;
Integer version = MessagingService.instance().getVersion(endpoint);
- if (version < MessagingService.VERSION_12)
+ if (!Gossiper.instance.usesHostId(endpoint))
tokens = Arrays.asList(getPartitioner().getTokenFactory().fromString(pieces[1]));
else
- {
- tokens = new ArrayList<Token>(pieces.length - 2);
- for (int i = 2; i < pieces.length; ++i)
- tokens.add(getPartitioner().getTokenFactory().fromString(pieces[i]));
- }
+ tokens = getTokensFor(endpoint);
if (logger.isDebugEnabled())
logger.debug("Node " + endpoint + " state left, tokens " + tokens);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b475bc69/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index 336755c..7a04f38 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -230,10 +230,11 @@ public class Util
for (int i=0; i<endpointTokens.size(); i++)
{
InetAddress ep = InetAddress.getByName("127.0.0." + String.valueOf(i + 1));
- Gossiper.instance.initializeNodeUnsafe(ep, 1);
+ Gossiper.instance.initializeNodeUnsafe(ep, hostIds.get(i), 1);
+ Gossiper.instance.injectApplicationState(ep, ApplicationState.TOKENS, new VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(endpointTokens.get(i))));
ss.onChange(ep,
ApplicationState.STATUS,
- new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(endpointTokens.get(i)), hostIds.get(i)));
+ new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(endpointTokens.get(i))));
hosts.add(ep);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b475bc69/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
index 1e6d1cb..f7d1c7c 100644
--- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
+++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
@@ -85,10 +85,10 @@ public class BootStrapperTest extends SchemaLoader
Map<InetAddress, Double> load = new HashMap<InetAddress, Double>();
for (int i = 0; i < addrs.length; i++)
{
- Gossiper.instance.initializeNodeUnsafe(addrs[i], 1);
+ Gossiper.instance.initializeNodeUnsafe(addrs[i], UUID.randomUUID(), 1);
load.put(addrs[i], (double)i+2);
// also make bootstrapping nodes present in gossip
- Gossiper.instance.initializeNodeUnsafe(bootstrapAddrs[i], 1);
+ Gossiper.instance.initializeNodeUnsafe(bootstrapAddrs[i], UUID.randomUUID(), 1);
}
// give every node a bootstrap source.
@@ -102,9 +102,10 @@ public class BootStrapperTest extends SchemaLoader
Range<Token> range = ss.getPrimaryRangeForEndpoint(bootstrapSource);
Token token = StorageService.getPartitioner().midpoint(range.left, range.right);
assert range.contains(token);
+ Gossiper.instance.injectApplicationState(bootstrapAddrs[i], ApplicationState.TOKENS, ss.valueFactory.tokens(Collections.singleton(token)));
ss.onChange(bootstrapAddrs[i],
ApplicationState.STATUS,
- StorageService.instance.valueFactory.bootstrapping(Collections.<Token>singleton(token), bootstrapHostIds[i]));
+ StorageService.instance.valueFactory.bootstrapping(Collections.<Token>singleton(token)));
}
// any further attempt to bootsrtap should fail since every node in the cluster is splitting.
@@ -123,7 +124,7 @@ public class BootStrapperTest extends SchemaLoader
Token token = StorageService.getPartitioner().midpoint(range.left, range.right);
ss.onChange(bootstrapAddrs[2],
ApplicationState.STATUS,
- StorageService.instance.valueFactory.normal(Collections.singleton(token), bootstrapHostIds[2]));
+ StorageService.instance.valueFactory.normal(Collections.singleton(token)));
load.put(bootstrapAddrs[2], 0d);
InetAddress addr = BootStrapper.getBootstrapSource(ss.getTokenMetadata(), load);
assert addr != null && addr.equals(addrs[2]);
@@ -155,9 +156,10 @@ public class BootStrapperTest extends SchemaLoader
Range<Token> range5 = ss.getPrimaryRangeForEndpoint(five);
Token fakeToken = StorageService.getPartitioner().midpoint(range5.left, range5.right);
assert range5.contains(fakeToken);
+ Gossiper.instance.injectApplicationState(myEndpoint, ApplicationState.TOKENS, ss.valueFactory.tokens(Collections.singleton(fakeToken)));
ss.onChange(myEndpoint,
ApplicationState.STATUS,
- StorageService.instance.valueFactory.bootstrapping(Collections.<Token>singleton(fakeToken), UUID.randomUUID()));
+ StorageService.instance.valueFactory.bootstrapping(Collections.<Token>singleton(fakeToken)));
tmd = ss.getTokenMetadata();
InetAddress source4 = BootStrapper.getBootstrapSource(tmd, load);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b475bc69/test/unit/org/apache/cassandra/gms/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/gms/SerializationsTest.java b/test/unit/org/apache/cassandra/gms/SerializationsTest.java
index b14608f..c27218b 100644
--- a/test/unit/org/apache/cassandra/gms/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/gms/SerializationsTest.java
@@ -114,8 +114,7 @@ public class SerializationsTest extends AbstractSerializationsTester
private static EndpointState EndpointSt = new EndpointState(HeartbeatSt);
private static VersionedValue.VersionedValueFactory vvFact = new VersionedValue.VersionedValueFactory(StorageService.getPartitioner());
private static VersionedValue vv0 = vvFact.load(23d);
- private static VersionedValue vv1 = vvFact.bootstrapping(Collections.<Token>singleton(StorageService.getPartitioner().getRandomToken()),
- UUID.randomUUID());
+ private static VersionedValue vv1 = vvFact.bootstrapping(Collections.<Token>singleton(StorageService.getPartitioner().getRandomToken()));
private static List<GossipDigest> Digests = new ArrayList<GossipDigest>();
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b475bc69/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
index acf5173..dd4e1f2 100644
--- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
+++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
@@ -98,7 +98,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
tmd.updateNormalToken(StorageService.getPartitioner().getMinimumToken(), REMOTE);
assert tmd.isMember(REMOTE);
- Gossiper.instance.initializeNodeUnsafe(REMOTE, 1);
+ Gossiper.instance.initializeNodeUnsafe(REMOTE, UUID.randomUUID(), 1);
local_range = StorageService.instance.getLocalPrimaryRange();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b475bc69/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
index 3b76c2c..7e2130c 100644
--- a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
+++ b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
@@ -164,16 +164,17 @@ public class LeaveAndBootstrapTest
// boot two new nodes with keyTokens.get(5) and keyTokens.get(7)
InetAddress boot1 = InetAddress.getByName("127.0.1.1");
- Gossiper.instance.initializeNodeUnsafe(boot1, 1);
- UUID boot1Id = UUID.randomUUID();
+ Gossiper.instance.initializeNodeUnsafe(boot1, UUID.randomUUID(), 1);
+ Gossiper.instance.injectApplicationState(boot1, ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(5))));
ss.onChange(boot1,
ApplicationState.STATUS,
- valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(5)), boot1Id));
+ valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(5))));
InetAddress boot2 = InetAddress.getByName("127.0.1.2");
- Gossiper.instance.initializeNodeUnsafe(boot2, 1);
+ Gossiper.instance.initializeNodeUnsafe(boot2, UUID.randomUUID(), 1);
+ Gossiper.instance.injectApplicationState(boot2, ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(7))));
ss.onChange(boot2,
ApplicationState.STATUS,
- valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(7)), UUID.randomUUID()));
+ valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(7))));
Collection<InetAddress> endpoints = null;
@@ -329,7 +330,7 @@ public class LeaveAndBootstrapTest
valueFactory.left(Collections.singleton(endpointTokens.get(LEAVING[0])), Gossiper.computeExpireTime()));
ss.onChange(hosts.get(LEAVING[2]), ApplicationState.STATUS,
valueFactory.left(Collections.singleton(endpointTokens.get(LEAVING[2])), Gossiper.computeExpireTime()));
- ss.onChange(boot1, ApplicationState.STATUS, valueFactory.normal(Collections.singleton(keyTokens.get(5)), boot1Id));
+ ss.onChange(boot1, ApplicationState.STATUS, valueFactory.normal(Collections.singleton(keyTokens.get(5))));
// adjust precalcuated results. this changes what the epected endpoints are.
expectedEndpoints.get("Keyspace1").get(new BigIntegerToken("55")).removeAll(makeAddrs("127.0.0.7", "127.0.0.8"));
@@ -462,18 +463,20 @@ public class LeaveAndBootstrapTest
assertTrue(tmd.getBootstrapTokens().isEmpty());
// Bootstrap the node immedidiately to keyTokens.get(4) without going through STATE_LEFT
+ Gossiper.instance.injectApplicationState(hosts.get(2), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(4))));
ss.onChange(hosts.get(2),
ApplicationState.STATUS,
- valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(4)), hostIds.get(2)));
+ valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(4))));
assertFalse(tmd.isMember(hosts.get(2)));
assertFalse(tmd.isLeaving(hosts.get(2)));
assertTrue(tmd.getBootstrapTokens().get(keyTokens.get(4)).equals(hosts.get(2)));
// Bootstrap node hosts.get(3) to keyTokens.get(1)
+ Gossiper.instance.injectApplicationState(hosts.get(3), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(1))));
ss.onChange(hosts.get(3),
ApplicationState.STATUS,
- valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(1)), hostIds.get(3)));
+ valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(1))));
assertFalse(tmd.isMember(hosts.get(3)));
assertFalse(tmd.isLeaving(hosts.get(3)));
@@ -481,9 +484,10 @@ public class LeaveAndBootstrapTest
assertTrue(tmd.getBootstrapTokens().get(keyTokens.get(1)).equals(hosts.get(3)));
// Bootstrap node hosts.get(2) further to keyTokens.get(3)
+ Gossiper.instance.injectApplicationState(hosts.get(2), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(3))));
ss.onChange(hosts.get(2),
ApplicationState.STATUS,
- valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(3)), hostIds.get(2)));
+ valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(3))));
assertFalse(tmd.isMember(hosts.get(2)));
assertFalse(tmd.isLeaving(hosts.get(2)));
@@ -492,10 +496,10 @@ public class LeaveAndBootstrapTest
assertTrue(tmd.getBootstrapTokens().get(keyTokens.get(1)).equals(hosts.get(3)));
// Go to normal again for both nodes
- ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(keyTokens.get(3)),
- hostIds.get(2)));
- ss.onChange(hosts.get(3), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(keyTokens.get(2)),
- hostIds.get(3)));
+ Gossiper.instance.injectApplicationState(hosts.get(3), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(2))));
+ Gossiper.instance.injectApplicationState(hosts.get(2), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(3))));
+ ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(keyTokens.get(3))));
+ ss.onChange(hosts.get(3), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(keyTokens.get(2))));
assertTrue(tmd.isMember(hosts.get(2)));
assertFalse(tmd.isLeaving(hosts.get(2)));
@@ -531,8 +535,8 @@ public class LeaveAndBootstrapTest
assertTrue(tmd.getToken(hosts.get(2)).equals(endpointTokens.get(2)));
// back to normal
- ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(keyTokens.get(2)),
- hostIds.get(2)));
+ Gossiper.instance.injectApplicationState(hosts.get(2), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(2))));
+ ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(keyTokens.get(2))));
assertTrue(tmd.getLeavingEndpoints().isEmpty());
assertTrue(tmd.getToken(hosts.get(2)).equals(keyTokens.get(2)));
@@ -541,8 +545,8 @@ public class LeaveAndBootstrapTest
ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.leaving(Collections.singleton(keyTokens.get(2))));
ss.onChange(hosts.get(2), ApplicationState.STATUS,
valueFactory.left(Collections.singleton(keyTokens.get(2)), Gossiper.computeExpireTime()));
- ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(keyTokens.get(4)),
- hostIds.get(2)));
+ Gossiper.instance.injectApplicationState(hosts.get(2), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(4))));
+ ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(keyTokens.get(4))));
assertTrue(tmd.getBootstrapTokens().isEmpty());
assertTrue(tmd.getLeavingEndpoints().isEmpty());
@@ -567,6 +571,7 @@ public class LeaveAndBootstrapTest
Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 6);
// node 2 leaves with _different_ token
+ Gossiper.instance.injectApplicationState(hosts.get(2), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(0))));
ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.leaving(Collections.singleton(keyTokens.get(0))));
assertTrue(tmd.getToken(hosts.get(2)).equals(keyTokens.get(0)));
@@ -574,9 +579,10 @@ public class LeaveAndBootstrapTest
assertTrue(tmd.getEndpoint(endpointTokens.get(2)) == null);
// go to boostrap
+ Gossiper.instance.injectApplicationState(hosts.get(2), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(1))));
ss.onChange(hosts.get(2),
ApplicationState.STATUS,
- valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(1)), hostIds.get(2)));
+ valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(1))));
assertFalse(tmd.isLeaving(hosts.get(2)));
assertTrue(tmd.getBootstrapTokens().size() == 1);
@@ -621,13 +627,15 @@ public class LeaveAndBootstrapTest
assertFalse(tmd.isMember(hosts.get(2)));
// node hosts.get(4) goes to bootstrap
- ss.onChange(hosts.get(3), ApplicationState.STATUS, valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(1)), hostIds.get(3)));
+ Gossiper.instance.injectApplicationState(hosts.get(3), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(1))));
+ ss.onChange(hosts.get(3), ApplicationState.STATUS, valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(1))));
assertFalse(tmd.isMember(hosts.get(3)));
assertTrue(tmd.getBootstrapTokens().size() == 1);
assertTrue(tmd.getBootstrapTokens().get(keyTokens.get(1)).equals(hosts.get(3)));
// and then directly to 'left'
+ Gossiper.instance.injectApplicationState(hosts.get(2), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(1))));
ss.onChange(hosts.get(2), ApplicationState.STATUS,
valueFactory.left(Collections.singleton(keyTokens.get(1)), Gossiper.computeExpireTime()));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b475bc69/test/unit/org/apache/cassandra/service/MoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/MoveTest.java b/test/unit/org/apache/cassandra/service/MoveTest.java
index cf42564..ce7864c 100644
--- a/test/unit/org/apache/cassandra/service/MoveTest.java
+++ b/test/unit/org/apache/cassandra/service/MoveTest.java
@@ -136,7 +136,7 @@ public class MoveTest
}
// moving endpoint back to the normal state
- ss.onChange(hosts.get(MOVING_NODE), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(newToken), hostIds.get(MOVING_NODE)));
+ ss.onChange(hosts.get(MOVING_NODE), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(newToken)));
}
/*
@@ -181,15 +181,17 @@ public class MoveTest
// boot two new nodes with keyTokens.get(5) and keyTokens.get(7)
InetAddress boot1 = InetAddress.getByName("127.0.1.1");
- Gossiper.instance.initializeNodeUnsafe(boot1, 1);
+ Gossiper.instance.initializeNodeUnsafe(boot1, UUID.randomUUID(), 1);
+ Gossiper.instance.injectApplicationState(boot1, ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(5))));
ss.onChange(boot1,
ApplicationState.STATUS,
- valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(5)), UUID.randomUUID()));
+ valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(5))));
InetAddress boot2 = InetAddress.getByName("127.0.1.2");
- Gossiper.instance.initializeNodeUnsafe(boot2, 1);
+ Gossiper.instance.initializeNodeUnsafe(boot2, UUID.randomUUID(), 1);
+ Gossiper.instance.injectApplicationState(boot2, ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(7))));
ss.onChange(boot2,
ApplicationState.STATUS,
- valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(7)), UUID.randomUUID()));
+ valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(7))));
// don't require test update every time a new keyspace is added to test/conf/cassandra.yaml
Map<String, AbstractReplicationStrategy> tableStrategyMap = new HashMap<String, AbstractReplicationStrategy>();
@@ -477,7 +479,7 @@ public class MoveTest
{
ss.onChange(hosts.get(movingIndex),
ApplicationState.STATUS,
- valueFactory.normal(Collections.singleton(newTokens.get(movingIndex)), hostIds.get(movingIndex)));
+ valueFactory.normal(Collections.singleton(newTokens.get(movingIndex))));
}
}
@@ -506,8 +508,8 @@ public class MoveTest
assertTrue(tmd.getToken(hosts.get(2)).equals(endpointTokens.get(2)));
// back to normal
- ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(newToken),
- hostIds.get(2)));
+ Gossiper.instance.injectApplicationState(hosts.get(2), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(newToken)));
+ ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(newToken)));
assertTrue(tmd.getMovingEndpoints().isEmpty());
assertTrue(tmd.getToken(hosts.get(2)).equals(newToken));
@@ -515,8 +517,8 @@ public class MoveTest
newToken = positionToken(8);
// node 2 goes through leave and left and then jumps to normal at its new token
ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.moving(newToken));
- ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(newToken),
- hostIds.get(2)));
+ Gossiper.instance.injectApplicationState(hosts.get(2), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(newToken)));
+ ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(newToken)));
assertTrue(tmd.getBootstrapTokens().isEmpty());
assertTrue(tmd.getMovingEndpoints().isEmpty());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b475bc69/test/unit/org/apache/cassandra/service/RemoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java
index cfbc013..58ae797 100644
--- a/test/unit/org/apache/cassandra/service/RemoveTest.java
+++ b/test/unit/org/apache/cassandra/service/RemoveTest.java
@@ -88,10 +88,6 @@ public class RemoveTest
MessagingService.instance().listen(FBUtilities.getBroadcastAddress());
Gossiper.instance.start(1);
- for (int i = 0; i < 6; i++)
- {
- Gossiper.instance.initializeNodeUnsafe(hosts.get(i), 1);
- }
removalhost = hosts.get(5);
hosts.remove(removalhost);
removalId = hostIds.get(5);