You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pa...@apache.org on 2017/03/01 23:34:47 UTC

[01/10] cassandra git commit: Avoid race on receiver by starting streaming sender thread after sending init message

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 9bbb44973 -> 06feaefba
  refs/heads/cassandra-3.0 218743dc3 -> 815508f1c
  refs/heads/cassandra-3.11 be0deb90e -> e345d4e7b
  refs/heads/trunk 21b4cae61 -> f32521808


Avoid race on receiver by starting streaming sender thread after sending init message

Patch by Paulo Motta; Reviewed by Yuki Morishita for CASSANDRA-12886


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/06feaefb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/06feaefb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/06feaefb

Branch: refs/heads/cassandra-2.2
Commit: 06feaefba50301734c490521d720c8a482f638e4
Parents: 9bbb449
Author: Paulo Motta <pa...@gmail.com>
Authored: Wed Mar 1 20:25:32 2017 -0300
Committer: Paulo Motta <pa...@apache.org>
Committed: Wed Mar 1 20:30:30 2017 -0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/ConnectionHandler.java  | 26 +++++++++++---------
 2 files changed, 15 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/06feaefb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 404440a..ca1aa27 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.10
+ * Avoid race on receiver by starting streaming sender thread after sending init message (CASSANDRA-12886)
  * Fix "multiple versions of ant detected..." when running ant test (CASSANDRA-13232)
  * Coalescing strategy sleeps too much (CASSANDRA-13090)
  * Make sure compaction stats are updated when compaction is interrupted (Backport from 3.0, CASSANDRA-12100)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06feaefb/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index d3d8ed2..fe551a8 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -82,13 +82,11 @@ public class ConnectionHandler
     {
         logger.debug("[Stream #{}] Sending stream init for incoming stream", session.planId());
         Socket incomingSocket = session.createConnection();
-        incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION);
-        incoming.sendInitMessage(incomingSocket, true);
+        incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION, true);
 
         logger.debug("[Stream #{}] Sending stream init for outgoing stream", session.planId());
         Socket outgoingSocket = session.createConnection();
-        outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION);
-        outgoing.sendInitMessage(outgoingSocket, false);
+        outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION, true);
     }
 
     /**
@@ -159,13 +157,15 @@ public class ConnectionHandler
 
         protected int protocolVersion;
         protected Socket socket;
+        private final boolean isOutgoingHandler;
 
         private final AtomicReference<SettableFuture<?>> closeFuture = new AtomicReference<>();
         private IncomingStreamingConnection incomingConnection;
 
-        protected MessageHandler(StreamSession session)
+        protected MessageHandler(StreamSession session, boolean isOutgoingHandler)
         {
             this.session = session;
+            this.isOutgoingHandler = isOutgoingHandler;
         }
 
         protected abstract String name();
@@ -187,14 +187,14 @@ public class ConnectionHandler
         }
 
         @SuppressWarnings("resource")
-        public void sendInitMessage(Socket socket, boolean isForOutgoing) throws IOException
+        public void sendInitMessage() throws IOException
         {
             StreamInitMessage message = new StreamInitMessage(
                     FBUtilities.getBroadcastAddress(),
                     session.sessionIndex(),
                     session.planId(),
                     session.description(),
-                    isForOutgoing,
+                    !isOutgoingHandler,
                     session.keepSSTableLevel(),
                     session.isIncremental());
             ByteBuffer messageBuf = message.createMessage(false, protocolVersion);
@@ -203,16 +203,18 @@ public class ConnectionHandler
             out.flush();
         }
 
-        public void start(IncomingStreamingConnection connection, int protocolVersion)
+        public void start(IncomingStreamingConnection connection, int protocolVersion) throws IOException
         {
             this.incomingConnection = connection;
-            start(connection.socket, protocolVersion);
+            start(connection.socket, protocolVersion, false);
         }
 
-        public void start(Socket socket, int protocolVersion)
+        public void start(Socket socket, int protocolVersion, boolean initiator) throws IOException
         {
             this.socket = socket;
             this.protocolVersion = protocolVersion;
+            if (initiator)
+                sendInitMessage();
 
             new Thread(this, name() + "-" + session.peer).start();
         }
@@ -270,7 +272,7 @@ public class ConnectionHandler
     {
         IncomingMessageHandler(StreamSession session)
         {
-            super(session);
+            super(session, false);
         }
 
         protected String name()
@@ -330,7 +332,7 @@ public class ConnectionHandler
 
         OutgoingMessageHandler(StreamSession session)
         {
-            super(session);
+            super(session, true);
         }
 
         protected String name()


[05/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by pa...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/815508f1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/815508f1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/815508f1

Branch: refs/heads/cassandra-3.11
Commit: 815508f1c240bbbac8246be56e15b1d09c1301b8
Parents: 218743d 06feaef
Author: Paulo Motta <pa...@apache.org>
Authored: Wed Mar 1 20:33:16 2017 -0300
Committer: Paulo Motta <pa...@apache.org>
Committed: Wed Mar 1 20:33:16 2017 -0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/ConnectionHandler.java  | 26 +++++++++++---------
 2 files changed, 15 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/815508f1/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index c4622ab,ca1aa27..ec3d826
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,12 -1,8 +1,13 @@@
 -2.2.10
 +3.0.12
+  * Avoid race on receiver by starting streaming sender thread after sending init message (CASSANDRA-12886)
 + * Update c.yaml doc for offheap memtables (CASSANDRA-13179)
 + * Faster StreamingHistogram (CASSANDRA-13038)
 + * Legacy deserializer can create unexpected boundary range tombstones (CASSANDRA-13237)
 + * Remove unnecessary assertion from AntiCompactionTest (CASSANDRA-13070)
 + * Fix cqlsh COPY for dates before 1900 (CASSANDRA-13185)
 +Merged from 2.2
   * Fix "multiple versions of ant detected..." when running ant test (CASSANDRA-13232)
   * Coalescing strategy sleeps too much (CASSANDRA-13090)
 - * Make sure compaction stats are updated when compaction is interrupted (Backport from 3.0, CASSANDRA-12100)
   * Fix flaky LongLeveledCompactionStrategyTest (CASSANDRA-12202)
   * Fix failing COPY TO STDOUT (CASSANDRA-12497)
   * Fix ColumnCounter::countAll behaviour for reverse queries (CASSANDRA-13222)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/815508f1/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index b83c089,fe551a8..aa1c615
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@@ -215,8 -213,10 +215,10 @@@ public class ConnectionHandle
          {
              this.socket = socket;
              this.protocolVersion = protocolVersion;
+             if (initiator)
+                 sendInitMessage();
  
 -            new Thread(this, name() + "-" + session.peer).start();
 +            new Thread(NamedThreadFactory.threadLocalDeallocator(this), name() + "-" + session.peer).start();
          }
  
          public ListenableFuture<?> close()


[07/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by pa...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/815508f1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/815508f1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/815508f1

Branch: refs/heads/trunk
Commit: 815508f1c240bbbac8246be56e15b1d09c1301b8
Parents: 218743d 06feaef
Author: Paulo Motta <pa...@apache.org>
Authored: Wed Mar 1 20:33:16 2017 -0300
Committer: Paulo Motta <pa...@apache.org>
Committed: Wed Mar 1 20:33:16 2017 -0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/ConnectionHandler.java  | 26 +++++++++++---------
 2 files changed, 15 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/815508f1/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index c4622ab,ca1aa27..ec3d826
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,12 -1,8 +1,13 @@@
 -2.2.10
 +3.0.12
+  * Avoid race on receiver by starting streaming sender thread after sending init message (CASSANDRA-12886)
 + * Update c.yaml doc for offheap memtables (CASSANDRA-13179)
 + * Faster StreamingHistogram (CASSANDRA-13038)
 + * Legacy deserializer can create unexpected boundary range tombstones (CASSANDRA-13237)
 + * Remove unnecessary assertion from AntiCompactionTest (CASSANDRA-13070)
 + * Fix cqlsh COPY for dates before 1900 (CASSANDRA-13185)
 +Merged from 2.2
   * Fix "multiple versions of ant detected..." when running ant test (CASSANDRA-13232)
   * Coalescing strategy sleeps too much (CASSANDRA-13090)
 - * Make sure compaction stats are updated when compaction is interrupted (Backport from 3.0, CASSANDRA-12100)
   * Fix flaky LongLeveledCompactionStrategyTest (CASSANDRA-12202)
   * Fix failing COPY TO STDOUT (CASSANDRA-12497)
   * Fix ColumnCounter::countAll behaviour for reverse queries (CASSANDRA-13222)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/815508f1/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index b83c089,fe551a8..aa1c615
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@@ -215,8 -213,10 +215,10 @@@ public class ConnectionHandle
          {
              this.socket = socket;
              this.protocolVersion = protocolVersion;
+             if (initiator)
+                 sendInitMessage();
  
 -            new Thread(this, name() + "-" + session.peer).start();
 +            new Thread(NamedThreadFactory.threadLocalDeallocator(this), name() + "-" + session.peer).start();
          }
  
          public ListenableFuture<?> close()


[03/10] cassandra git commit: Avoid race on receiver by starting streaming sender thread after sending init message

Posted by pa...@apache.org.
Avoid race on receiver by starting streaming sender thread after sending init message

Patch by Paulo Motta; Reviewed by Yuki Morishita for CASSANDRA-12886


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/06feaefb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/06feaefb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/06feaefb

Branch: refs/heads/cassandra-3.11
Commit: 06feaefba50301734c490521d720c8a482f638e4
Parents: 9bbb449
Author: Paulo Motta <pa...@gmail.com>
Authored: Wed Mar 1 20:25:32 2017 -0300
Committer: Paulo Motta <pa...@apache.org>
Committed: Wed Mar 1 20:30:30 2017 -0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/ConnectionHandler.java  | 26 +++++++++++---------
 2 files changed, 15 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/06feaefb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 404440a..ca1aa27 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.10
+ * Avoid race on receiver by starting streaming sender thread after sending init message (CASSANDRA-12886)
  * Fix "multiple versions of ant detected..." when running ant test (CASSANDRA-13232)
  * Coalescing strategy sleeps too much (CASSANDRA-13090)
  * Make sure compaction stats are updated when compaction is interrupted (Backport from 3.0, CASSANDRA-12100)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06feaefb/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index d3d8ed2..fe551a8 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -82,13 +82,11 @@ public class ConnectionHandler
     {
         logger.debug("[Stream #{}] Sending stream init for incoming stream", session.planId());
         Socket incomingSocket = session.createConnection();
-        incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION);
-        incoming.sendInitMessage(incomingSocket, true);
+        incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION, true);
 
         logger.debug("[Stream #{}] Sending stream init for outgoing stream", session.planId());
         Socket outgoingSocket = session.createConnection();
-        outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION);
-        outgoing.sendInitMessage(outgoingSocket, false);
+        outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION, true);
     }
 
     /**
@@ -159,13 +157,15 @@ public class ConnectionHandler
 
         protected int protocolVersion;
         protected Socket socket;
+        private final boolean isOutgoingHandler;
 
         private final AtomicReference<SettableFuture<?>> closeFuture = new AtomicReference<>();
         private IncomingStreamingConnection incomingConnection;
 
-        protected MessageHandler(StreamSession session)
+        protected MessageHandler(StreamSession session, boolean isOutgoingHandler)
         {
             this.session = session;
+            this.isOutgoingHandler = isOutgoingHandler;
         }
 
         protected abstract String name();
@@ -187,14 +187,14 @@ public class ConnectionHandler
         }
 
         @SuppressWarnings("resource")
-        public void sendInitMessage(Socket socket, boolean isForOutgoing) throws IOException
+        public void sendInitMessage() throws IOException
         {
             StreamInitMessage message = new StreamInitMessage(
                     FBUtilities.getBroadcastAddress(),
                     session.sessionIndex(),
                     session.planId(),
                     session.description(),
-                    isForOutgoing,
+                    !isOutgoingHandler,
                     session.keepSSTableLevel(),
                     session.isIncremental());
             ByteBuffer messageBuf = message.createMessage(false, protocolVersion);
@@ -203,16 +203,18 @@ public class ConnectionHandler
             out.flush();
         }
 
-        public void start(IncomingStreamingConnection connection, int protocolVersion)
+        public void start(IncomingStreamingConnection connection, int protocolVersion) throws IOException
         {
             this.incomingConnection = connection;
-            start(connection.socket, protocolVersion);
+            start(connection.socket, protocolVersion, false);
         }
 
-        public void start(Socket socket, int protocolVersion)
+        public void start(Socket socket, int protocolVersion, boolean initiator) throws IOException
         {
             this.socket = socket;
             this.protocolVersion = protocolVersion;
+            if (initiator)
+                sendInitMessage();
 
             new Thread(this, name() + "-" + session.peer).start();
         }
@@ -270,7 +272,7 @@ public class ConnectionHandler
     {
         IncomingMessageHandler(StreamSession session)
         {
-            super(session);
+            super(session, false);
         }
 
         protected String name()
@@ -330,7 +332,7 @@ public class ConnectionHandler
 
         OutgoingMessageHandler(StreamSession session)
         {
-            super(session);
+            super(session, true);
         }
 
         protected String name()


[06/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by pa...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/815508f1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/815508f1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/815508f1

Branch: refs/heads/cassandra-3.0
Commit: 815508f1c240bbbac8246be56e15b1d09c1301b8
Parents: 218743d 06feaef
Author: Paulo Motta <pa...@apache.org>
Authored: Wed Mar 1 20:33:16 2017 -0300
Committer: Paulo Motta <pa...@apache.org>
Committed: Wed Mar 1 20:33:16 2017 -0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/ConnectionHandler.java  | 26 +++++++++++---------
 2 files changed, 15 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/815508f1/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index c4622ab,ca1aa27..ec3d826
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,12 -1,8 +1,13 @@@
 -2.2.10
 +3.0.12
+  * Avoid race on receiver by starting streaming sender thread after sending init message (CASSANDRA-12886)
 + * Update c.yaml doc for offheap memtables (CASSANDRA-13179)
 + * Faster StreamingHistogram (CASSANDRA-13038)
 + * Legacy deserializer can create unexpected boundary range tombstones (CASSANDRA-13237)
 + * Remove unnecessary assertion from AntiCompactionTest (CASSANDRA-13070)
 + * Fix cqlsh COPY for dates before 1900 (CASSANDRA-13185)
 +Merged from 2.2
   * Fix "multiple versions of ant detected..." when running ant test (CASSANDRA-13232)
   * Coalescing strategy sleeps too much (CASSANDRA-13090)
 - * Make sure compaction stats are updated when compaction is interrupted (Backport from 3.0, CASSANDRA-12100)
   * Fix flaky LongLeveledCompactionStrategyTest (CASSANDRA-12202)
   * Fix failing COPY TO STDOUT (CASSANDRA-12497)
   * Fix ColumnCounter::countAll behaviour for reverse queries (CASSANDRA-13222)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/815508f1/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index b83c089,fe551a8..aa1c615
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@@ -215,8 -213,10 +215,10 @@@ public class ConnectionHandle
          {
              this.socket = socket;
              this.protocolVersion = protocolVersion;
+             if (initiator)
+                 sendInitMessage();
  
 -            new Thread(this, name() + "-" + session.peer).start();
 +            new Thread(NamedThreadFactory.threadLocalDeallocator(this), name() + "-" + session.peer).start();
          }
  
          public ListenableFuture<?> close()


[08/10] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by pa...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e345d4e7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e345d4e7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e345d4e7

Branch: refs/heads/cassandra-3.11
Commit: e345d4e7bc83e09e3c273981b5c95e5fb00d1df5
Parents: be0deb9 815508f
Author: Paulo Motta <pa...@apache.org>
Authored: Wed Mar 1 20:33:49 2017 -0300
Committer: Paulo Motta <pa...@apache.org>
Committed: Wed Mar 1 20:33:49 2017 -0300

----------------------------------------------------------------------

----------------------------------------------------------------------



[09/10] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by pa...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e345d4e7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e345d4e7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e345d4e7

Branch: refs/heads/trunk
Commit: e345d4e7bc83e09e3c273981b5c95e5fb00d1df5
Parents: be0deb9 815508f
Author: Paulo Motta <pa...@apache.org>
Authored: Wed Mar 1 20:33:49 2017 -0300
Committer: Paulo Motta <pa...@apache.org>
Committed: Wed Mar 1 20:33:49 2017 -0300

----------------------------------------------------------------------

----------------------------------------------------------------------



[10/10] cassandra git commit: Merge branch 'cassandra-3.11' into trunk

Posted by pa...@apache.org.
Merge branch 'cassandra-3.11' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f3252180
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f3252180
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f3252180

Branch: refs/heads/trunk
Commit: f3252180863d0a2e78d1bfb5437506033a57f931
Parents: 21b4cae e345d4e
Author: Paulo Motta <pa...@apache.org>
Authored: Wed Mar 1 20:34:28 2017 -0300
Committer: Paulo Motta <pa...@apache.org>
Committed: Wed Mar 1 20:34:28 2017 -0300

----------------------------------------------------------------------

----------------------------------------------------------------------



[02/10] cassandra git commit: Avoid race on receiver by starting streaming sender thread after sending init message

Posted by pa...@apache.org.
Avoid race on receiver by starting streaming sender thread after sending init message

Patch by Paulo Motta; Reviewed by Yuki Morishita for CASSANDRA-12886


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/06feaefb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/06feaefb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/06feaefb

Branch: refs/heads/cassandra-3.0
Commit: 06feaefba50301734c490521d720c8a482f638e4
Parents: 9bbb449
Author: Paulo Motta <pa...@gmail.com>
Authored: Wed Mar 1 20:25:32 2017 -0300
Committer: Paulo Motta <pa...@apache.org>
Committed: Wed Mar 1 20:30:30 2017 -0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/ConnectionHandler.java  | 26 +++++++++++---------
 2 files changed, 15 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/06feaefb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 404440a..ca1aa27 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.10
+ * Avoid race on receiver by starting streaming sender thread after sending init message (CASSANDRA-12886)
  * Fix "multiple versions of ant detected..." when running ant test (CASSANDRA-13232)
  * Coalescing strategy sleeps too much (CASSANDRA-13090)
  * Make sure compaction stats are updated when compaction is interrupted (Backport from 3.0, CASSANDRA-12100)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06feaefb/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index d3d8ed2..fe551a8 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -82,13 +82,11 @@ public class ConnectionHandler
     {
         logger.debug("[Stream #{}] Sending stream init for incoming stream", session.planId());
         Socket incomingSocket = session.createConnection();
-        incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION);
-        incoming.sendInitMessage(incomingSocket, true);
+        incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION, true);
 
         logger.debug("[Stream #{}] Sending stream init for outgoing stream", session.planId());
         Socket outgoingSocket = session.createConnection();
-        outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION);
-        outgoing.sendInitMessage(outgoingSocket, false);
+        outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION, true);
     }
 
     /**
@@ -159,13 +157,15 @@ public class ConnectionHandler
 
         protected int protocolVersion;
         protected Socket socket;
+        private final boolean isOutgoingHandler;
 
         private final AtomicReference<SettableFuture<?>> closeFuture = new AtomicReference<>();
         private IncomingStreamingConnection incomingConnection;
 
-        protected MessageHandler(StreamSession session)
+        protected MessageHandler(StreamSession session, boolean isOutgoingHandler)
         {
             this.session = session;
+            this.isOutgoingHandler = isOutgoingHandler;
         }
 
         protected abstract String name();
@@ -187,14 +187,14 @@ public class ConnectionHandler
         }
 
         @SuppressWarnings("resource")
-        public void sendInitMessage(Socket socket, boolean isForOutgoing) throws IOException
+        public void sendInitMessage() throws IOException
         {
             StreamInitMessage message = new StreamInitMessage(
                     FBUtilities.getBroadcastAddress(),
                     session.sessionIndex(),
                     session.planId(),
                     session.description(),
-                    isForOutgoing,
+                    !isOutgoingHandler,
                     session.keepSSTableLevel(),
                     session.isIncremental());
             ByteBuffer messageBuf = message.createMessage(false, protocolVersion);
@@ -203,16 +203,18 @@ public class ConnectionHandler
             out.flush();
         }
 
-        public void start(IncomingStreamingConnection connection, int protocolVersion)
+        public void start(IncomingStreamingConnection connection, int protocolVersion) throws IOException
         {
             this.incomingConnection = connection;
-            start(connection.socket, protocolVersion);
+            start(connection.socket, protocolVersion, false);
         }
 
-        public void start(Socket socket, int protocolVersion)
+        public void start(Socket socket, int protocolVersion, boolean initiator) throws IOException
         {
             this.socket = socket;
             this.protocolVersion = protocolVersion;
+            if (initiator)
+                sendInitMessage();
 
             new Thread(this, name() + "-" + session.peer).start();
         }
@@ -270,7 +272,7 @@ public class ConnectionHandler
     {
         IncomingMessageHandler(StreamSession session)
         {
-            super(session);
+            super(session, false);
         }
 
         protected String name()
@@ -330,7 +332,7 @@ public class ConnectionHandler
 
         OutgoingMessageHandler(StreamSession session)
         {
-            super(session);
+            super(session, true);
         }
 
         protected String name()


[04/10] cassandra git commit: Avoid race on receiver by starting streaming sender thread after sending init message

Posted by pa...@apache.org.
Avoid race on receiver by starting streaming sender thread after sending init message

Patch by Paulo Motta; Reviewed by Yuki Morishita for CASSANDRA-12886


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/06feaefb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/06feaefb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/06feaefb

Branch: refs/heads/trunk
Commit: 06feaefba50301734c490521d720c8a482f638e4
Parents: 9bbb449
Author: Paulo Motta <pa...@gmail.com>
Authored: Wed Mar 1 20:25:32 2017 -0300
Committer: Paulo Motta <pa...@apache.org>
Committed: Wed Mar 1 20:30:30 2017 -0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/ConnectionHandler.java  | 26 +++++++++++---------
 2 files changed, 15 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/06feaefb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 404440a..ca1aa27 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.10
+ * Avoid race on receiver by starting streaming sender thread after sending init message (CASSANDRA-12886)
  * Fix "multiple versions of ant detected..." when running ant test (CASSANDRA-13232)
  * Coalescing strategy sleeps too much (CASSANDRA-13090)
  * Make sure compaction stats are updated when compaction is interrupted (Backport from 3.0, CASSANDRA-12100)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06feaefb/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index d3d8ed2..fe551a8 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -82,13 +82,11 @@ public class ConnectionHandler
     {
         logger.debug("[Stream #{}] Sending stream init for incoming stream", session.planId());
         Socket incomingSocket = session.createConnection();
-        incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION);
-        incoming.sendInitMessage(incomingSocket, true);
+        incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION, true);
 
         logger.debug("[Stream #{}] Sending stream init for outgoing stream", session.planId());
         Socket outgoingSocket = session.createConnection();
-        outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION);
-        outgoing.sendInitMessage(outgoingSocket, false);
+        outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION, true);
     }
 
     /**
@@ -159,13 +157,15 @@ public class ConnectionHandler
 
         protected int protocolVersion;
         protected Socket socket;
+        private final boolean isOutgoingHandler;
 
         private final AtomicReference<SettableFuture<?>> closeFuture = new AtomicReference<>();
         private IncomingStreamingConnection incomingConnection;
 
-        protected MessageHandler(StreamSession session)
+        protected MessageHandler(StreamSession session, boolean isOutgoingHandler)
         {
             this.session = session;
+            this.isOutgoingHandler = isOutgoingHandler;
         }
 
         protected abstract String name();
@@ -187,14 +187,14 @@ public class ConnectionHandler
         }
 
         @SuppressWarnings("resource")
-        public void sendInitMessage(Socket socket, boolean isForOutgoing) throws IOException
+        public void sendInitMessage() throws IOException
         {
             StreamInitMessage message = new StreamInitMessage(
                     FBUtilities.getBroadcastAddress(),
                     session.sessionIndex(),
                     session.planId(),
                     session.description(),
-                    isForOutgoing,
+                    !isOutgoingHandler,
                     session.keepSSTableLevel(),
                     session.isIncremental());
             ByteBuffer messageBuf = message.createMessage(false, protocolVersion);
@@ -203,16 +203,18 @@ public class ConnectionHandler
             out.flush();
         }
 
-        public void start(IncomingStreamingConnection connection, int protocolVersion)
+        public void start(IncomingStreamingConnection connection, int protocolVersion) throws IOException
         {
             this.incomingConnection = connection;
-            start(connection.socket, protocolVersion);
+            start(connection.socket, protocolVersion, false);
         }
 
-        public void start(Socket socket, int protocolVersion)
+        public void start(Socket socket, int protocolVersion, boolean initiator) throws IOException
         {
             this.socket = socket;
             this.protocolVersion = protocolVersion;
+            if (initiator)
+                sendInitMessage();
 
             new Thread(this, name() + "-" + session.peer).start();
         }
@@ -270,7 +272,7 @@ public class ConnectionHandler
     {
         IncomingMessageHandler(StreamSession session)
         {
-            super(session);
+            super(session, false);
         }
 
         protected String name()
@@ -330,7 +332,7 @@ public class ConnectionHandler
 
         OutgoingMessageHandler(StreamSession session)
         {
-            super(session);
+            super(session, true);
         }
 
         protected String name()