You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pa...@apache.org on 2017/03/01 23:34:49 UTC

[03/10] cassandra git commit: Avoid race on receiver by starting streaming sender thread after sending init message

Avoid race on receiver by starting streaming sender thread after sending init message

Patch by Paulo Motta; Reviewed by Yuki Morishita for CASSANDRA-12886


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/06feaefb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/06feaefb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/06feaefb

Branch: refs/heads/cassandra-3.11
Commit: 06feaefba50301734c490521d720c8a482f638e4
Parents: 9bbb449
Author: Paulo Motta <pa...@gmail.com>
Authored: Wed Mar 1 20:25:32 2017 -0300
Committer: Paulo Motta <pa...@apache.org>
Committed: Wed Mar 1 20:30:30 2017 -0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/ConnectionHandler.java  | 26 +++++++++++---------
 2 files changed, 15 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/06feaefb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 404440a..ca1aa27 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.10
+ * Avoid race on receiver by starting streaming sender thread after sending init message (CASSANDRA-12886)
  * Fix "multiple versions of ant detected..." when running ant test (CASSANDRA-13232)
  * Coalescing strategy sleeps too much (CASSANDRA-13090)
  * Make sure compaction stats are updated when compaction is interrupted (Backport from 3.0, CASSANDRA-12100)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06feaefb/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index d3d8ed2..fe551a8 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -82,13 +82,11 @@ public class ConnectionHandler
     {
         logger.debug("[Stream #{}] Sending stream init for incoming stream", session.planId());
         Socket incomingSocket = session.createConnection();
-        incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION);
-        incoming.sendInitMessage(incomingSocket, true);
+        incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION, true);
 
         logger.debug("[Stream #{}] Sending stream init for outgoing stream", session.planId());
         Socket outgoingSocket = session.createConnection();
-        outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION);
-        outgoing.sendInitMessage(outgoingSocket, false);
+        outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION, true);
     }
 
     /**
@@ -159,13 +157,15 @@ public class ConnectionHandler
 
         protected int protocolVersion;
         protected Socket socket;
+        private final boolean isOutgoingHandler;
 
         private final AtomicReference<SettableFuture<?>> closeFuture = new AtomicReference<>();
         private IncomingStreamingConnection incomingConnection;
 
-        protected MessageHandler(StreamSession session)
+        protected MessageHandler(StreamSession session, boolean isOutgoingHandler)
         {
             this.session = session;
+            this.isOutgoingHandler = isOutgoingHandler;
         }
 
         protected abstract String name();
@@ -187,14 +187,14 @@ public class ConnectionHandler
         }
 
         @SuppressWarnings("resource")
-        public void sendInitMessage(Socket socket, boolean isForOutgoing) throws IOException
+        public void sendInitMessage() throws IOException
         {
             StreamInitMessage message = new StreamInitMessage(
                     FBUtilities.getBroadcastAddress(),
                     session.sessionIndex(),
                     session.planId(),
                     session.description(),
-                    isForOutgoing,
+                    !isOutgoingHandler,
                     session.keepSSTableLevel(),
                     session.isIncremental());
             ByteBuffer messageBuf = message.createMessage(false, protocolVersion);
@@ -203,16 +203,18 @@ public class ConnectionHandler
             out.flush();
         }
 
-        public void start(IncomingStreamingConnection connection, int protocolVersion)
+        public void start(IncomingStreamingConnection connection, int protocolVersion) throws IOException
         {
             this.incomingConnection = connection;
-            start(connection.socket, protocolVersion);
+            start(connection.socket, protocolVersion, false);
         }
 
-        public void start(Socket socket, int protocolVersion)
+        public void start(Socket socket, int protocolVersion, boolean initiator) throws IOException
         {
             this.socket = socket;
             this.protocolVersion = protocolVersion;
+            if (initiator)
+                sendInitMessage();
 
             new Thread(this, name() + "-" + session.peer).start();
         }
@@ -270,7 +272,7 @@ public class ConnectionHandler
     {
         IncomingMessageHandler(StreamSession session)
         {
-            super(session);
+            super(session, false);
         }
 
         protected String name()
@@ -330,7 +332,7 @@ public class ConnectionHandler
 
         OutgoingMessageHandler(StreamSession session)
         {
-            super(session);
+            super(session, true);
         }
 
         protected String name()