You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2013/06/27 22:34:09 UTC

git commit: Race condition in detecting version on a mixed 1.1/1.2 cluster patch by Sergio Bossa; reviewed by jasobrown for CASSANDRA-5692

Updated Branches:
  refs/heads/cassandra-1.2 9668535de -> 296da81fc


Race condition in detecting version on a mixed 1.1/1.2 cluster
patch by Sergio Bossa; reviewed by jasobrown for CASSANDRA-5692


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/296da81f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/296da81f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/296da81f

Branch: refs/heads/cassandra-1.2
Commit: 296da81fc809f71e8e08bda612ba89925880fb6c
Parents: 9668535
Author: Jason Brown <ja...@gmail.com>
Authored: Thu Jun 27 12:54:25 2013 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Thu Jun 27 13:32:50 2013 -0700

----------------------------------------------------------------------
 .../cassandra/net/OutboundTcpConnection.java    | 60 +++++++++++++++++++-
 1 file changed, 57 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/296da81f/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 7077922..648123b 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -27,7 +27,10 @@ import java.net.SocketException;
 import java.nio.ByteBuffer;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.slf4j.Logger;
@@ -50,6 +53,8 @@ public class OutboundTcpConnection extends Thread
     private volatile boolean isStopped = false;
 
     private static final int OPEN_RETRY_DELAY = 100; // ms between retries
+    private static final int WAIT_FOR_VERSION_MAX_TIME = 5000;
+    private static final int NO_VERSION = Integer.MIN_VALUE;
 
     // sending thread reads from "active" (one of queue1, queue2) until it is empty.
     // then it swaps it with "backlog."
@@ -288,11 +293,10 @@ public class OutboundTcpConnection extends Thread
         if (logger.isDebugEnabled())
             logger.debug("attempting to connect to " + poolReference.endPoint());
 
-        targetVersion = MessagingService.instance().getVersion(poolReference.endPoint());
-
         long start = System.currentTimeMillis();
         while (System.currentTimeMillis() < start + DatabaseDescriptor.getRpcTimeout())
         {
+            targetVersion = MessagingService.instance().getVersion(poolReference.endPoint());
             try
             {
                 socket = poolReference.newSocket();
@@ -325,7 +329,16 @@ public class OutboundTcpConnection extends Thread
                     out.flush();
 
                     DataInputStream in = new DataInputStream(socket.getInputStream());
-                    int maxTargetVersion = in.readInt();
+                    int maxTargetVersion = handshakeVersion(in);
+                    if (maxTargetVersion == NO_VERSION) 
+                    {
+                        // no version is returned, so disconnect an try again: we will either get
+                        // a different target version (targetVersion < MessagingService.VERSION_12)
+                        // or if the same version the handshake will finally succeed
+                        logger.debug("Target max version is {}; no version information yet, will retry", maxTargetVersion);
+                        disconnect();
+                        continue;
+                    }
                     if (targetVersion > maxTargetVersion)
                     {
                         logger.debug("Target max version is {}; will reconnect with that version", maxTargetVersion);
@@ -371,6 +384,47 @@ public class OutboundTcpConnection extends Thread
         }
         return false;
     }
+    
+    private int handshakeVersion(final DataInputStream inputStream)
+    {
+        final AtomicInteger version = new AtomicInteger(NO_VERSION);
+        final CountDownLatch versionLatch = new CountDownLatch(1);
+        new Thread("HANDSHAKE-" + poolReference.endPoint())
+        {
+            @Override
+            public void run()
+            {
+                try
+                {
+                    logger.info("Handshaking version with {}", poolReference.endPoint());
+                    version.set(inputStream.readInt());
+                }
+                catch (IOException ex) 
+                {
+                    final String msg = "Cannot handshake version with " + poolReference.endPoint();
+                    if (logger.isTraceEnabled())
+                        logger.trace(msg, ex);
+                    else
+                        logger.info(msg);
+                }
+                finally
+                {
+                    //unblock the waiting thread on either success or fail
+                    versionLatch.countDown();
+                }
+            }
+        }.start();
+
+        try
+        {
+            versionLatch.await(WAIT_FOR_VERSION_MAX_TIME, TimeUnit.MILLISECONDS);
+        }
+        catch (InterruptedException ex)
+        {
+            throw new AssertionError(ex);
+        }
+        return version.get();
+    }
 
     private void expireMessages()
     {