You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/06/28 19:50:43 UTC

svn commit: r1140751 - in /cassandra/branches/cassandra-0.8: ./ contrib/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/gms/ src/java/org/apache/cassandra/net/

Author: jbellis
Date: Tue Jun 28 17:50:43 2011
New Revision: 1140751

URL: http://svn.apache.org/viewvc?rev=1140751&view=rev
Log:
fix Message version propagation fromold nodes to new ones
patch by brandonwilliams and jbellis for CASSANDRA-2818

Modified:
    cassandra/branches/cassandra-0.8/   (props changed)
    cassandra/branches/cassandra-0.8/contrib/   (props changed)
    cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java   (props changed)
    cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props changed)
    cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java   (props changed)
    cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java   (props changed)
    cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java   (props changed)
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/Gossiper.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/IncomingTcpConnection.java

Propchange: cassandra/branches/cassandra-0.8/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jun 28 17:50:43 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7:1026516-1140565
+/cassandra/branches/cassandra-0.7:1026516-1140567
 /cassandra/branches/cassandra-0.7.0:1053690-1055654
 /cassandra/branches/cassandra-0.8:1090934-1125013,1125041
 /cassandra/branches/cassandra-0.8.0:1125021-1130369

Propchange: cassandra/branches/cassandra-0.8/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jun 28 17:50:43 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/contrib:1026516-1140565
+/cassandra/branches/cassandra-0.7/contrib:1026516-1140567
 /cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
 /cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125041
 /cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369

Propchange: cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jun 28 17:50:43 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1140565
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1140567
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125041
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369

Propchange: cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jun 28 17:50:43 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1140565
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1140567
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125041
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369

Propchange: cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jun 28 17:50:43 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1140565
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1140567
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125041
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369

Propchange: cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jun 28 17:50:43 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1140565
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1140567
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125041
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369

Propchange: cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jun 28 17:50:43 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1140565
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1140567
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125041
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1140751&r1=1140750&r2=1140751&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/Gossiper.java Tue Jun 28 17:50:43 2011
@@ -871,17 +871,13 @@ public class Gossiper implements IFailur
      */
     public void addSavedEndpoint(InetAddress ep)
     {
-        EndpointState epState = endpointStateMap.get(ep);
-        if (epState == null)
-        {
-            epState = new EndpointState(new HeartBeatState(0));
-            epState.markDead();
-            epState.setHasToken(true);
-            endpointStateMap.put(ep, epState);
-            unreachableEndpoints.put(ep, System.currentTimeMillis());
-            if (logger.isTraceEnabled())
-                logger.trace("Adding saved endpoint " + ep + " " + epState.getHeartBeatState().getGeneration());
-        }
+        EndpointState epState = new EndpointState(new HeartBeatState(0));
+        epState.markDead();
+        epState.setHasToken(true);
+        endpointStateMap.put(ep, epState);
+        unreachableEndpoints.put(ep, System.currentTimeMillis());
+        if (logger.isTraceEnabled())
+            logger.trace("Adding saved endpoint " + ep + " " + epState.getHeartBeatState().getGeneration());
     }
 
     public void addLocalApplicationState(ApplicationState state, VersionedValue value)

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1140751&r1=1140750&r2=1140751&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Tue Jun 28 17:50:43 2011
@@ -70,25 +70,43 @@ public class IncomingTcpConnection exten
                 // we should buffer
                 input = new DataInputStream(new BufferedInputStream(socket.getInputStream(), 4096));
             version = MessagingService.getBits(header, 15, 8);
-            Gossiper.instance.setVersion(socket.getInetAddress(), version);
+            if (logger.isDebugEnabled())
+                logger.debug("Version for " + socket.getInetAddress() + " is " + version);
         }
         catch (IOException e)
         {
             close();
             throw new IOError(e);
         }
+
+        if (version > MessagingService.version_)
+        {
+            // save the endpoint so gossip will reconnect to it
+            Gossiper.instance.addSavedEndpoint(socket.getInetAddress());
+            logger.info("Received " + (isStream ? "streaming " : "") + "connection from newer protocol version. Ignorning");
+
+            // streaming connections are per-session and have a fixed version.  we can't do anything with a new-version
+            // stream connection, so drop it.
+            if (isStream)
+            {
+                close();
+                return;
+            }
+            // for non-streaming connections, continue to read the messages (and ignore them) until sender
+            // starts sending correct-version messages (which it can do without reconnecting -- version is per-Message)
+        }
+        else
+        {
+            // only set version when <= to us, otherwise it's the responsibility of the other end to mimic us
+            Gossiper.instance.setVersion(socket.getInetAddress(), version);
+        }
+
         while (true)
         {
             try
             {
                 if (isStream)
                 {
-                    if (version > MessagingService.version_)
-                    {
-                        logger.error("Received untranslated stream from newer protcol version. Terminating connection!");
-                        close();
-                        return;
-                    }
                     int size = input.readInt();
                     byte[] headerBytes = new byte[size];
                     input.readFully(headerBytes);
@@ -106,11 +124,8 @@ public class IncomingTcpConnection exten
                         input.readFully(contentBytes, offset, CHUNK_SIZE);
                     input.readFully(contentBytes, size - remainder, remainder);
 
-                    if (version > MessagingService.version_)
-                        logger.info("Received connection from newer protocol version. Ignorning message.");
-                    else
+                    if (version <= MessagingService.version_)
                     {
-                        // todo: need to be aware of message version.
                         DataInputStream dis = new DataInputStream(new ByteArrayInputStream(contentBytes));
                         String id = dis.readUTF();
                         Message message = Message.serializer().deserialize(dis, version);
@@ -120,9 +135,8 @@ public class IncomingTcpConnection exten
                 // prepare to read the next message
                 MessagingService.validateMagic(input.readInt());
                 int header = input.readInt();
-                version = MessagingService.getBits(header, 15, 8);
                 assert isStream == (MessagingService.getBits(header, 3, 1) == 1) : "Connections cannot change type: " + isStream;
-                assert version == MessagingService.getBits(header, 15, 8) : "Protocol version shouldn't change during a session";
+                version = MessagingService.getBits(header, 15, 8);
             }
             catch (EOFException e)
             {