You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2013/06/27 22:39:02 UTC

[1/2] git commit: Race condition in detecting version on a mixed 1.1/1.2 cluster patch by Sergio Bossa; reviewed by jasobrown for CASSANDRA-5692

Updated Branches:
  refs/heads/trunk b1f3fc00f -> 4cda3622d


Race condition in detecting version on a mixed 1.1/1.2 cluster
patch by Sergio Bossa; reviewed by jasobrown for CASSANDRA-5692


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/296da81f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/296da81f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/296da81f

Branch: refs/heads/trunk
Commit: 296da81fc809f71e8e08bda612ba89925880fb6c
Parents: 9668535
Author: Jason Brown <ja...@gmail.com>
Authored: Thu Jun 27 12:54:25 2013 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Thu Jun 27 13:32:50 2013 -0700

----------------------------------------------------------------------
 .../cassandra/net/OutboundTcpConnection.java    | 60 +++++++++++++++++++-
 1 file changed, 57 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/296da81f/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 7077922..648123b 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -27,7 +27,10 @@ import java.net.SocketException;
 import java.nio.ByteBuffer;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.slf4j.Logger;
@@ -50,6 +53,8 @@ public class OutboundTcpConnection extends Thread
     private volatile boolean isStopped = false;
 
     private static final int OPEN_RETRY_DELAY = 100; // ms between retries
+    private static final int WAIT_FOR_VERSION_MAX_TIME = 5000;
+    private static final int NO_VERSION = Integer.MIN_VALUE;
 
     // sending thread reads from "active" (one of queue1, queue2) until it is empty.
     // then it swaps it with "backlog."
@@ -288,11 +293,10 @@ public class OutboundTcpConnection extends Thread
         if (logger.isDebugEnabled())
             logger.debug("attempting to connect to " + poolReference.endPoint());
 
-        targetVersion = MessagingService.instance().getVersion(poolReference.endPoint());
-
         long start = System.currentTimeMillis();
         while (System.currentTimeMillis() < start + DatabaseDescriptor.getRpcTimeout())
         {
+            targetVersion = MessagingService.instance().getVersion(poolReference.endPoint());
             try
             {
                 socket = poolReference.newSocket();
@@ -325,7 +329,16 @@ public class OutboundTcpConnection extends Thread
                     out.flush();
 
                     DataInputStream in = new DataInputStream(socket.getInputStream());
-                    int maxTargetVersion = in.readInt();
+                    int maxTargetVersion = handshakeVersion(in);
+                    if (maxTargetVersion == NO_VERSION) 
+                    {
+                        // no version is returned, so disconnect an try again: we will either get
+                        // a different target version (targetVersion < MessagingService.VERSION_12)
+                        // or if the same version the handshake will finally succeed
+                        logger.debug("Target max version is {}; no version information yet, will retry", maxTargetVersion);
+                        disconnect();
+                        continue;
+                    }
                     if (targetVersion > maxTargetVersion)
                     {
                         logger.debug("Target max version is {}; will reconnect with that version", maxTargetVersion);
@@ -371,6 +384,47 @@ public class OutboundTcpConnection extends Thread
         }
         return false;
     }
+    
+    private int handshakeVersion(final DataInputStream inputStream)
+    {
+        final AtomicInteger version = new AtomicInteger(NO_VERSION);
+        final CountDownLatch versionLatch = new CountDownLatch(1);
+        new Thread("HANDSHAKE-" + poolReference.endPoint())
+        {
+            @Override
+            public void run()
+            {
+                try
+                {
+                    logger.info("Handshaking version with {}", poolReference.endPoint());
+                    version.set(inputStream.readInt());
+                }
+                catch (IOException ex) 
+                {
+                    final String msg = "Cannot handshake version with " + poolReference.endPoint();
+                    if (logger.isTraceEnabled())
+                        logger.trace(msg, ex);
+                    else
+                        logger.info(msg);
+                }
+                finally
+                {
+                    //unblock the waiting thread on either success or fail
+                    versionLatch.countDown();
+                }
+            }
+        }.start();
+
+        try
+        {
+            versionLatch.await(WAIT_FOR_VERSION_MAX_TIME, TimeUnit.MILLISECONDS);
+        }
+        catch (InterruptedException ex)
+        {
+            throw new AssertionError(ex);
+        }
+        return version.get();
+    }
 
     private void expireMessages()
     {


[2/2] git commit: Merge branch 'cassandra-1.2' into trunk

Posted by ja...@apache.org.
Merge branch 'cassandra-1.2' into trunk

Conflicts:
	src/java/org/apache/cassandra/net/OutboundTcpConnection.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4cda3622
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4cda3622
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4cda3622

Branch: refs/heads/trunk
Commit: 4cda3622d0a86d45d73a31576fc8b39b9e66928d
Parents: b1f3fc0 296da81
Author: Jason Brown <ja...@gmail.com>
Authored: Thu Jun 27 13:37:27 2013 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Thu Jun 27 13:37:27 2013 -0700

----------------------------------------------------------------------
 .../cassandra/net/OutboundTcpConnection.java    | 59 +++++++++++++++++++-
 1 file changed, 56 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cda3622/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 4c6f498,648123b..1bdead2
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@@ -285,12 -293,10 +289,11 @@@ public class OutboundTcpConnection exte
          if (logger.isDebugEnabled())
              logger.debug("attempting to connect to " + poolReference.endPoint());
  
-         targetVersion = MessagingService.instance().getVersion(poolReference.endPoint());
- 
 -        long start = System.currentTimeMillis();
 -        while (System.currentTimeMillis() < start + DatabaseDescriptor.getRpcTimeout())
 +        long start = System.nanoTime();
 +        long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getRpcTimeout());
 +        while (System.nanoTime() - start < timeout)
          {
+             targetVersion = MessagingService.instance().getVersion(poolReference.endPoint());
              try
              {
                  socket = poolReference.newSocket();
@@@ -316,35 -322,47 +319,44 @@@
                  }
                  out = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream(), 4096));
  
 -                if (targetVersion >= MessagingService.VERSION_12)
 -                {
 -                    out.writeInt(MessagingService.PROTOCOL_MAGIC);
 -                    writeHeader(out, targetVersion, shouldCompressConnection());
 -                    out.flush();
 +                out.writeInt(MessagingService.PROTOCOL_MAGIC);
 +                writeHeader(out, targetVersion, shouldCompressConnection());
 +                out.flush();
  
 -                    DataInputStream in = new DataInputStream(socket.getInputStream());
 -                    int maxTargetVersion = handshakeVersion(in);
 -                    if (maxTargetVersion == NO_VERSION) 
 -                    {
 -                        // no version is returned, so disconnect an try again: we will either get
 -                        // a different target version (targetVersion < MessagingService.VERSION_12)
 -                        // or if the same version the handshake will finally succeed
 -                        logger.debug("Target max version is {}; no version information yet, will retry", maxTargetVersion);
 -                        disconnect();
 -                        continue;
 -                    }
 -                    if (targetVersion > maxTargetVersion)
 -                    {
 -                        logger.debug("Target max version is {}; will reconnect with that version", maxTargetVersion);
 -                        MessagingService.instance().setVersion(poolReference.endPoint(), maxTargetVersion);
 -                        disconnect();
 -                        return false;
 -                    }
 +                DataInputStream in = new DataInputStream(socket.getInputStream());
-                 int maxTargetVersion = in.readInt();
++                int maxTargetVersion = handshakeVersion(in);
++                if (maxTargetVersion == NO_VERSION)
++                {
++                    // no version is returned, so disconnect an try again: we will either get
++                    // a different target version (targetVersion < MessagingService.VERSION_12)
++                    // or if the same version the handshake will finally succeed
++                    logger.debug("Target max version is {}; no version information yet, will retry", maxTargetVersion);
++                    disconnect();
++                    continue;
++                }
 +                if (targetVersion > maxTargetVersion)
 +                {
 +                    logger.debug("Target max version is {}; will reconnect with that version", maxTargetVersion);
 +                    MessagingService.instance().setVersion(poolReference.endPoint(), maxTargetVersion);
 +                    disconnect();
 +                    return false;
 +                }
  
 -                    if (targetVersion < maxTargetVersion && targetVersion < MessagingService.current_version)
 -                    {
 -                        logger.trace("Detected higher max version {} (using {}); will reconnect when queued messages are done",
 -                                     maxTargetVersion, targetVersion);
 -                        MessagingService.instance().setVersion(poolReference.endPoint(), Math.min(MessagingService.current_version, maxTargetVersion));
 -                        softCloseSocket();
 -                    }
 +                if (targetVersion < maxTargetVersion && targetVersion < MessagingService.current_version)
 +                {
 +                    logger.trace("Detected higher max version {} (using {}); will reconnect when queued messages are done",
 +                                 maxTargetVersion, targetVersion);
 +                    MessagingService.instance().setVersion(poolReference.endPoint(), Math.min(MessagingService.current_version, maxTargetVersion));
 +                    softCloseSocket();
 +                }
  
 -                    out.writeInt(MessagingService.current_version);
 -                    CompactEndpointSerializationHelper.serialize(FBUtilities.getBroadcastAddress(), out);
 -                    if (shouldCompressConnection())
 -                    {
 -                        out.flush();
 -                        logger.trace("Upgrading OutputStream to be compressed");
 -                        out = new DataOutputStream(new SnappyOutputStream(new BufferedOutputStream(socket.getOutputStream())));
 -                    }
 +                out.writeInt(MessagingService.current_version);
 +                CompactEndpointSerializationHelper.serialize(FBUtilities.getBroadcastAddress(), out);
 +                if (shouldCompressConnection())
 +                {
 +                    out.flush();
 +                    logger.trace("Upgrading OutputStream to be compressed");
 +                    out = new DataOutputStream(new SnappyOutputStream(new BufferedOutputStream(socket.getOutputStream())));
                  }
  
                  return true;