You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/06/20 04:12:40 UTC

git commit: Gossip protocol version, use it to determine if new host id should be used. Patch by brandonwilliams, reviewed by Vijay for CASSANDRA-4317

Updated Branches:
  refs/heads/trunk bbcbfd865 -> e6530cc37


Gossip protocol version, use it to determine if new host id should be
used.
Patch by brandonwilliams, reviewed by Vijay for CASSANDRA-4317


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e6530cc3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e6530cc3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e6530cc3

Branch: refs/heads/trunk
Commit: e6530cc3723a7d2fdc84400bf8cd722474eed589
Parents: bbcbfd8
Author: Brandon Williams <br...@apache.org>
Authored: Tue Jun 19 21:10:38 2012 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Tue Jun 19 21:10:38 2012 -0500

----------------------------------------------------------------------
 .../org/apache/cassandra/gms/ApplicationState.java |    1 +
 .../org/apache/cassandra/gms/VersionedValue.java   |    7 ++++
 .../org/apache/cassandra/net/MessagingService.java |    5 +++
 .../apache/cassandra/service/StorageService.java   |   26 ++++++++++++--
 4 files changed, 35 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6530cc3/src/java/org/apache/cassandra/gms/ApplicationState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/ApplicationState.java b/src/java/org/apache/cassandra/gms/ApplicationState.java
index 4520426..518aa80 100644
--- a/src/java/org/apache/cassandra/gms/ApplicationState.java
+++ b/src/java/org/apache/cassandra/gms/ApplicationState.java
@@ -29,6 +29,7 @@ public enum ApplicationState
     INTERNAL_IP,
     RPC_ADDRESS,
     SEVERITY,
+    NET_VERSION,
     // pad to allow adding new states to existing cluster
     X1,
     X2,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6530cc3/src/java/org/apache/cassandra/gms/VersionedValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java
index 61bcbe5..ff41fcb 100644
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@ -64,6 +64,8 @@ public class VersionedValue implements Comparable<VersionedValue>
 
     // values for ApplicationState.REMOVAL_COORDINATOR
     public final static String REMOVAL_COORDINATOR = "REMOVER";
+    // network proto version from MS
+    public final static String NET_VERSION = "NET_VERSION";
 
     public final int version;
     public final String value;
@@ -184,6 +186,11 @@ public class VersionedValue implements Comparable<VersionedValue>
         {
             return new VersionedValue(FBUtilities.getReleaseVersionString());
         }
+        
+        public VersionedValue networkVersion()
+        {
+            return new VersionedValue(String.valueOf(MessagingService.current_version));
+        }
 
         public VersionedValue internalIP(String private_ip)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6530cc3/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 9c92402..c0af377 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -798,6 +798,11 @@ public final class MessagingService implements MessagingServiceMBean
         return getVersion(InetAddress.getByName(address));
     }
 
+    public boolean knowsVersion(InetAddress endpoint)
+    {
+        return versions.get(endpoint) != null;
+    }
+
     public void incrementDroppedMessages(Verb verb)
     {
         assert DROPPABLE_VERBS.contains(verb) : "Verb " + verb + " should not legally be dropped";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6530cc3/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 0455b1d..012f2ec 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -318,6 +318,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         setMode(Mode.CLIENT, false);
         Gossiper.instance.register(this);
         Gossiper.instance.start((int)(System.currentTimeMillis() / 1000)); // needed for node-ring gathering.
+        Gossiper.instance.addLocalApplicationState(ApplicationState.NET_VERSION, valueFactory.networkVersion());
         MessagingService.instance().listen(FBUtilities.getLocalAddress());
 
         // sleep a while to allow gossip to warm up (the other nodes need to know about this one before they can reply).
@@ -465,6 +466,8 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         Gossiper.instance.register(this);
         Gossiper.instance.register(migrationManager);
         Gossiper.instance.start(SystemTable.incrementAndGetGeneration()); // needed for node-ring gathering.
+        // gossip network proto version
+        Gossiper.instance.addLocalApplicationState(ApplicationState.NET_VERSION, valueFactory.networkVersion());
         // gossip schema version when gossiper is running
         Schema.instance.updateVersionAndAnnounce();
         // add rpc listening info
@@ -1005,6 +1008,21 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
     }
 
     /**
+     * Checks MS for the version, provided MS _really_ knows it (has directly communicated with the node) otherwise falls back to checking the gossipped version (learned about this node indirectly)
+     * If both fail, the node is too old to use hostid-style status serialization
+     * @param endpoint
+     * @return boolean whether or not to use hostid
+     */
+    private boolean usesHostId(InetAddress endpoint)
+    {
+        if (MessagingService.instance().knowsVersion(endpoint) && MessagingService.instance().getVersion(endpoint) >= MessagingService.VERSION_12)
+            return true;
+        else if (Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.NET_VERSION) != null && Integer.valueOf(Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.NET_VERSION).value) >= MessagingService.VERSION_12)
+            return true;
+        return false;
+    }
+
+    /**
      * Handle node bootstrap
      *
      * @param endpoint bootstrapping node
@@ -1018,7 +1036,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         //   versions  < 1.2 .....: STATUS,TOKEN
         //   versions >= 1.2 .....: STATUS,HOST_ID,TOKEN,TOKEN,...
         int tokenPos;
-        if (MessagingService.instance().getVersion(endpoint) >= MessagingService.VERSION_12)
+        if (usesHostId(endpoint))
         {
             assert pieces.length >= 3;
             tokenPos = 2;
@@ -1048,7 +1066,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         tokenMetadata.addBootstrapToken(token, endpoint);
         calculatePendingRanges();
 
-        if (MessagingService.instance().getVersion(endpoint) >= MessagingService.VERSION_12)
+        if (usesHostId(endpoint))
             tokenMetadata.updateHostId(UUID.fromString(pieces[1]), endpoint);
     }
 
@@ -1067,7 +1085,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         //   versions  < 1.2 .....: STATUS,TOKEN
         //   versions >= 1.2 .....: STATUS,HOST_ID,TOKEN,TOKEN,...
         int tokensPos;
-        if (MessagingService.instance().getVersion(endpoint) >= MessagingService.VERSION_12)
+        if (usesHostId(endpoint))
         {
             assert pieces.length >= 3;
             tokensPos = 2;
@@ -1084,7 +1102,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
             logger.info("Node " + endpoint + " state jump to normal");
 
         // Order Matters, TM.updateHostID() should be called before TM.updateNormalToken(), (see CASSANDRA-4300).
-        if (MessagingService.instance().getVersion(endpoint) >= MessagingService.VERSION_12)
+        if (usesHostId(endpoint))
             tokenMetadata.updateHostId(UUID.fromString(pieces[1]), endpoint);
 
         // we don't want to update if this node is responsible for the token and it has a later startup time than endpoint.