You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2013/06/27 22:39:02 UTC
[1/2] git commit: Race condition in detecting version on a mixed
1.1/1.2 cluster patch by Sergio Bossa;
reviewed by jasobrown for CASSANDRA-5692
Updated Branches:
refs/heads/trunk b1f3fc00f -> 4cda3622d
Race condition in detecting version on a mixed 1.1/1.2 cluster
patch by Sergio Bossa; reviewed by jasobrown for CASSANDRA-5692
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/296da81f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/296da81f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/296da81f
Branch: refs/heads/trunk
Commit: 296da81fc809f71e8e08bda612ba89925880fb6c
Parents: 9668535
Author: Jason Brown <ja...@gmail.com>
Authored: Thu Jun 27 12:54:25 2013 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Thu Jun 27 13:32:50 2013 -0700
----------------------------------------------------------------------
.../cassandra/net/OutboundTcpConnection.java | 60 +++++++++++++++++++-
1 file changed, 57 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/296da81f/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 7077922..648123b 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -27,7 +27,10 @@ import java.net.SocketException;
import java.nio.ByteBuffer;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
@@ -50,6 +53,8 @@ public class OutboundTcpConnection extends Thread
private volatile boolean isStopped = false;
private static final int OPEN_RETRY_DELAY = 100; // ms between retries
+ private static final int WAIT_FOR_VERSION_MAX_TIME = 5000;
+ private static final int NO_VERSION = Integer.MIN_VALUE;
// sending thread reads from "active" (one of queue1, queue2) until it is empty.
// then it swaps it with "backlog."
@@ -288,11 +293,10 @@ public class OutboundTcpConnection extends Thread
if (logger.isDebugEnabled())
logger.debug("attempting to connect to " + poolReference.endPoint());
- targetVersion = MessagingService.instance().getVersion(poolReference.endPoint());
-
long start = System.currentTimeMillis();
while (System.currentTimeMillis() < start + DatabaseDescriptor.getRpcTimeout())
{
+ targetVersion = MessagingService.instance().getVersion(poolReference.endPoint());
try
{
socket = poolReference.newSocket();
@@ -325,7 +329,16 @@ public class OutboundTcpConnection extends Thread
out.flush();
DataInputStream in = new DataInputStream(socket.getInputStream());
- int maxTargetVersion = in.readInt();
+ int maxTargetVersion = handshakeVersion(in);
+ if (maxTargetVersion == NO_VERSION)
+ {
+ // no version is returned, so disconnect an try again: we will either get
+ // a different target version (targetVersion < MessagingService.VERSION_12)
+ // or if the same version the handshake will finally succeed
+ logger.debug("Target max version is {}; no version information yet, will retry", maxTargetVersion);
+ disconnect();
+ continue;
+ }
if (targetVersion > maxTargetVersion)
{
logger.debug("Target max version is {}; will reconnect with that version", maxTargetVersion);
@@ -371,6 +384,47 @@ public class OutboundTcpConnection extends Thread
}
return false;
}
+
+ private int handshakeVersion(final DataInputStream inputStream)
+ {
+ final AtomicInteger version = new AtomicInteger(NO_VERSION);
+ final CountDownLatch versionLatch = new CountDownLatch(1);
+ new Thread("HANDSHAKE-" + poolReference.endPoint())
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ logger.info("Handshaking version with {}", poolReference.endPoint());
+ version.set(inputStream.readInt());
+ }
+ catch (IOException ex)
+ {
+ final String msg = "Cannot handshake version with " + poolReference.endPoint();
+ if (logger.isTraceEnabled())
+ logger.trace(msg, ex);
+ else
+ logger.info(msg);
+ }
+ finally
+ {
+ //unblock the waiting thread on either success or fail
+ versionLatch.countDown();
+ }
+ }
+ }.start();
+
+ try
+ {
+ versionLatch.await(WAIT_FOR_VERSION_MAX_TIME, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException ex)
+ {
+ throw new AssertionError(ex);
+ }
+ return version.get();
+ }
private void expireMessages()
{
[2/2] git commit: Merge branch 'cassandra-1.2' into trunk
Posted by ja...@apache.org.
Merge branch 'cassandra-1.2' into trunk
Conflicts:
src/java/org/apache/cassandra/net/OutboundTcpConnection.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4cda3622
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4cda3622
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4cda3622
Branch: refs/heads/trunk
Commit: 4cda3622d0a86d45d73a31576fc8b39b9e66928d
Parents: b1f3fc0 296da81
Author: Jason Brown <ja...@gmail.com>
Authored: Thu Jun 27 13:37:27 2013 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Thu Jun 27 13:37:27 2013 -0700
----------------------------------------------------------------------
.../cassandra/net/OutboundTcpConnection.java | 59 +++++++++++++++++++-
1 file changed, 56 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cda3622/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 4c6f498,648123b..1bdead2
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@@ -285,12 -293,10 +289,11 @@@ public class OutboundTcpConnection exte
if (logger.isDebugEnabled())
logger.debug("attempting to connect to " + poolReference.endPoint());
- targetVersion = MessagingService.instance().getVersion(poolReference.endPoint());
-
- long start = System.currentTimeMillis();
- while (System.currentTimeMillis() < start + DatabaseDescriptor.getRpcTimeout())
+ long start = System.nanoTime();
+ long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getRpcTimeout());
+ while (System.nanoTime() - start < timeout)
{
+ targetVersion = MessagingService.instance().getVersion(poolReference.endPoint());
try
{
socket = poolReference.newSocket();
@@@ -316,35 -322,47 +319,44 @@@
}
out = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream(), 4096));
- if (targetVersion >= MessagingService.VERSION_12)
- {
- out.writeInt(MessagingService.PROTOCOL_MAGIC);
- writeHeader(out, targetVersion, shouldCompressConnection());
- out.flush();
+ out.writeInt(MessagingService.PROTOCOL_MAGIC);
+ writeHeader(out, targetVersion, shouldCompressConnection());
+ out.flush();
- DataInputStream in = new DataInputStream(socket.getInputStream());
- int maxTargetVersion = handshakeVersion(in);
- if (maxTargetVersion == NO_VERSION)
- {
- // no version is returned, so disconnect an try again: we will either get
- // a different target version (targetVersion < MessagingService.VERSION_12)
- // or if the same version the handshake will finally succeed
- logger.debug("Target max version is {}; no version information yet, will retry", maxTargetVersion);
- disconnect();
- continue;
- }
- if (targetVersion > maxTargetVersion)
- {
- logger.debug("Target max version is {}; will reconnect with that version", maxTargetVersion);
- MessagingService.instance().setVersion(poolReference.endPoint(), maxTargetVersion);
- disconnect();
- return false;
- }
+ DataInputStream in = new DataInputStream(socket.getInputStream());
- int maxTargetVersion = in.readInt();
++ int maxTargetVersion = handshakeVersion(in);
++ if (maxTargetVersion == NO_VERSION)
++ {
++ // no version is returned, so disconnect an try again: we will either get
++ // a different target version (targetVersion < MessagingService.VERSION_12)
++ // or if the same version the handshake will finally succeed
++ logger.debug("Target max version is {}; no version information yet, will retry", maxTargetVersion);
++ disconnect();
++ continue;
++ }
+ if (targetVersion > maxTargetVersion)
+ {
+ logger.debug("Target max version is {}; will reconnect with that version", maxTargetVersion);
+ MessagingService.instance().setVersion(poolReference.endPoint(), maxTargetVersion);
+ disconnect();
+ return false;
+ }
- if (targetVersion < maxTargetVersion && targetVersion < MessagingService.current_version)
- {
- logger.trace("Detected higher max version {} (using {}); will reconnect when queued messages are done",
- maxTargetVersion, targetVersion);
- MessagingService.instance().setVersion(poolReference.endPoint(), Math.min(MessagingService.current_version, maxTargetVersion));
- softCloseSocket();
- }
+ if (targetVersion < maxTargetVersion && targetVersion < MessagingService.current_version)
+ {
+ logger.trace("Detected higher max version {} (using {}); will reconnect when queued messages are done",
+ maxTargetVersion, targetVersion);
+ MessagingService.instance().setVersion(poolReference.endPoint(), Math.min(MessagingService.current_version, maxTargetVersion));
+ softCloseSocket();
+ }
- out.writeInt(MessagingService.current_version);
- CompactEndpointSerializationHelper.serialize(FBUtilities.getBroadcastAddress(), out);
- if (shouldCompressConnection())
- {
- out.flush();
- logger.trace("Upgrading OutputStream to be compressed");
- out = new DataOutputStream(new SnappyOutputStream(new BufferedOutputStream(socket.getOutputStream())));
- }
+ out.writeInt(MessagingService.current_version);
+ CompactEndpointSerializationHelper.serialize(FBUtilities.getBroadcastAddress(), out);
+ if (shouldCompressConnection())
+ {
+ out.flush();
+ logger.trace("Upgrading OutputStream to be compressed");
+ out = new DataOutputStream(new SnappyOutputStream(new BufferedOutputStream(socket.getOutputStream())));
}
return true;