You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2013/06/27 22:34:09 UTC
git commit: Race condition in detecting version on a mixed 1.1/1.2
cluster patch by Sergio Bossa; reviewed by jasobrown for CASSANDRA-5692
Updated Branches:
refs/heads/cassandra-1.2 9668535de -> 296da81fc
Race condition in detecting version on a mixed 1.1/1.2 cluster
patch by Sergio Bossa; reviewed by jasobrown for CASSANDRA-5692
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/296da81f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/296da81f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/296da81f
Branch: refs/heads/cassandra-1.2
Commit: 296da81fc809f71e8e08bda612ba89925880fb6c
Parents: 9668535
Author: Jason Brown <ja...@gmail.com>
Authored: Thu Jun 27 12:54:25 2013 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Thu Jun 27 13:32:50 2013 -0700
----------------------------------------------------------------------
.../cassandra/net/OutboundTcpConnection.java | 60 +++++++++++++++++++-
1 file changed, 57 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/296da81f/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 7077922..648123b 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -27,7 +27,10 @@ import java.net.SocketException;
import java.nio.ByteBuffer;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
@@ -50,6 +53,8 @@ public class OutboundTcpConnection extends Thread
private volatile boolean isStopped = false;
private static final int OPEN_RETRY_DELAY = 100; // ms between retries
+ private static final int WAIT_FOR_VERSION_MAX_TIME = 5000;
+ private static final int NO_VERSION = Integer.MIN_VALUE;
// sending thread reads from "active" (one of queue1, queue2) until it is empty.
// then it swaps it with "backlog."
@@ -288,11 +293,10 @@ public class OutboundTcpConnection extends Thread
if (logger.isDebugEnabled())
logger.debug("attempting to connect to " + poolReference.endPoint());
- targetVersion = MessagingService.instance().getVersion(poolReference.endPoint());
-
long start = System.currentTimeMillis();
while (System.currentTimeMillis() < start + DatabaseDescriptor.getRpcTimeout())
{
+ targetVersion = MessagingService.instance().getVersion(poolReference.endPoint());
try
{
socket = poolReference.newSocket();
@@ -325,7 +329,16 @@ public class OutboundTcpConnection extends Thread
out.flush();
DataInputStream in = new DataInputStream(socket.getInputStream());
- int maxTargetVersion = in.readInt();
+ int maxTargetVersion = handshakeVersion(in);
+ if (maxTargetVersion == NO_VERSION)
+ {
+ // no version is returned, so disconnect an try again: we will either get
+ // a different target version (targetVersion < MessagingService.VERSION_12)
+ // or if the same version the handshake will finally succeed
+ logger.debug("Target max version is {}; no version information yet, will retry", maxTargetVersion);
+ disconnect();
+ continue;
+ }
if (targetVersion > maxTargetVersion)
{
logger.debug("Target max version is {}; will reconnect with that version", maxTargetVersion);
@@ -371,6 +384,47 @@ public class OutboundTcpConnection extends Thread
}
return false;
}
+
+ private int handshakeVersion(final DataInputStream inputStream)
+ {
+ final AtomicInteger version = new AtomicInteger(NO_VERSION);
+ final CountDownLatch versionLatch = new CountDownLatch(1);
+ new Thread("HANDSHAKE-" + poolReference.endPoint())
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ logger.info("Handshaking version with {}", poolReference.endPoint());
+ version.set(inputStream.readInt());
+ }
+ catch (IOException ex)
+ {
+ final String msg = "Cannot handshake version with " + poolReference.endPoint();
+ if (logger.isTraceEnabled())
+ logger.trace(msg, ex);
+ else
+ logger.info(msg);
+ }
+ finally
+ {
+ //unblock the waiting thread on either success or fail
+ versionLatch.countDown();
+ }
+ }
+ }.start();
+
+ try
+ {
+ versionLatch.await(WAIT_FOR_VERSION_MAX_TIME, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException ex)
+ {
+ throw new AssertionError(ex);
+ }
+ return version.get();
+ }
private void expireMessages()
{