You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/06/20 04:12:40 UTC
git commit: Gossip protocol version,
use it to determine if new host id should be used. Patch by
brandonwilliams, reviewed by Vijay for CASSANDRA-4317
Updated Branches:
refs/heads/trunk bbcbfd865 -> e6530cc37
Gossip protocol version, use it to determine if new host id should be
used.
Patch by brandonwilliams, reviewed by Vijay for CASSANDRA-4317
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e6530cc3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e6530cc3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e6530cc3
Branch: refs/heads/trunk
Commit: e6530cc3723a7d2fdc84400bf8cd722474eed589
Parents: bbcbfd8
Author: Brandon Williams <br...@apache.org>
Authored: Tue Jun 19 21:10:38 2012 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Tue Jun 19 21:10:38 2012 -0500
----------------------------------------------------------------------
.../org/apache/cassandra/gms/ApplicationState.java | 1 +
.../org/apache/cassandra/gms/VersionedValue.java | 7 ++++
.../org/apache/cassandra/net/MessagingService.java | 5 +++
.../apache/cassandra/service/StorageService.java | 26 ++++++++++++--
4 files changed, 35 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6530cc3/src/java/org/apache/cassandra/gms/ApplicationState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/ApplicationState.java b/src/java/org/apache/cassandra/gms/ApplicationState.java
index 4520426..518aa80 100644
--- a/src/java/org/apache/cassandra/gms/ApplicationState.java
+++ b/src/java/org/apache/cassandra/gms/ApplicationState.java
@@ -29,6 +29,7 @@ public enum ApplicationState
INTERNAL_IP,
RPC_ADDRESS,
SEVERITY,
+ NET_VERSION,
// pad to allow adding new states to existing cluster
X1,
X2,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6530cc3/src/java/org/apache/cassandra/gms/VersionedValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java
index 61bcbe5..ff41fcb 100644
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@ -64,6 +64,8 @@ public class VersionedValue implements Comparable<VersionedValue>
// values for ApplicationState.REMOVAL_COORDINATOR
public final static String REMOVAL_COORDINATOR = "REMOVER";
+ // network proto version from MS
+ public final static String NET_VERSION = "NET_VERSION";
public final int version;
public final String value;
@@ -184,6 +186,11 @@ public class VersionedValue implements Comparable<VersionedValue>
{
return new VersionedValue(FBUtilities.getReleaseVersionString());
}
+
+ public VersionedValue networkVersion()
+ {
+ return new VersionedValue(String.valueOf(MessagingService.current_version));
+ }
public VersionedValue internalIP(String private_ip)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6530cc3/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 9c92402..c0af377 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -798,6 +798,11 @@ public final class MessagingService implements MessagingServiceMBean
return getVersion(InetAddress.getByName(address));
}
+ public boolean knowsVersion(InetAddress endpoint)
+ {
+ return versions.get(endpoint) != null;
+ }
+
public void incrementDroppedMessages(Verb verb)
{
assert DROPPABLE_VERBS.contains(verb) : "Verb " + verb + " should not legally be dropped";
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6530cc3/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 0455b1d..012f2ec 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -318,6 +318,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
setMode(Mode.CLIENT, false);
Gossiper.instance.register(this);
Gossiper.instance.start((int)(System.currentTimeMillis() / 1000)); // needed for node-ring gathering.
+ Gossiper.instance.addLocalApplicationState(ApplicationState.NET_VERSION, valueFactory.networkVersion());
MessagingService.instance().listen(FBUtilities.getLocalAddress());
// sleep a while to allow gossip to warm up (the other nodes need to know about this one before they can reply).
@@ -465,6 +466,8 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
Gossiper.instance.register(this);
Gossiper.instance.register(migrationManager);
Gossiper.instance.start(SystemTable.incrementAndGetGeneration()); // needed for node-ring gathering.
+ // gossip network proto version
+ Gossiper.instance.addLocalApplicationState(ApplicationState.NET_VERSION, valueFactory.networkVersion());
// gossip schema version when gossiper is running
Schema.instance.updateVersionAndAnnounce();
// add rpc listening info
@@ -1005,6 +1008,21 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
}
/**
+ * Checks MS for the version, provided MS _really_ knows it (has directly communicated with the node) otherwise falls back to checking the gossipped version (learned about this node indirectly)
+ * If both fail, the node is too old to use hostid-style status serialization
+ * @param endpoint
+ * @return boolean whether or not to use hostid
+ */
+ private boolean usesHostId(InetAddress endpoint)
+ {
+ if (MessagingService.instance().knowsVersion(endpoint) && MessagingService.instance().getVersion(endpoint) >= MessagingService.VERSION_12)
+ return true;
+ else if (Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.NET_VERSION) != null && Integer.valueOf(Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.NET_VERSION).value) >= MessagingService.VERSION_12)
+ return true;
+ return false;
+ }
+
+ /**
* Handle node bootstrap
*
* @param endpoint bootstrapping node
@@ -1018,7 +1036,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
// versions < 1.2 .....: STATUS,TOKEN
// versions >= 1.2 .....: STATUS,HOST_ID,TOKEN,TOKEN,...
int tokenPos;
- if (MessagingService.instance().getVersion(endpoint) >= MessagingService.VERSION_12)
+ if (usesHostId(endpoint))
{
assert pieces.length >= 3;
tokenPos = 2;
@@ -1048,7 +1066,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
tokenMetadata.addBootstrapToken(token, endpoint);
calculatePendingRanges();
- if (MessagingService.instance().getVersion(endpoint) >= MessagingService.VERSION_12)
+ if (usesHostId(endpoint))
tokenMetadata.updateHostId(UUID.fromString(pieces[1]), endpoint);
}
@@ -1067,7 +1085,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
// versions < 1.2 .....: STATUS,TOKEN
// versions >= 1.2 .....: STATUS,HOST_ID,TOKEN,TOKEN,...
int tokensPos;
- if (MessagingService.instance().getVersion(endpoint) >= MessagingService.VERSION_12)
+ if (usesHostId(endpoint))
{
assert pieces.length >= 3;
tokensPos = 2;
@@ -1084,7 +1102,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
logger.info("Node " + endpoint + " state jump to normal");
// Order Matters, TM.updateHostID() should be called before TM.updateNormalToken(), (see CASSANDRA-4300).
- if (MessagingService.instance().getVersion(endpoint) >= MessagingService.VERSION_12)
+ if (usesHostId(endpoint))
tokenMetadata.updateHostId(UUID.fromString(pieces[1]), endpoint);
// we don't want to update if this node is responsible for the token and it has a later startup time than endpoint.