You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pa...@apache.org on 2017/03/01 23:34:47 UTC
[01/10] cassandra git commit: Avoid race on receiver by starting
streaming sender thread after sending init message
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.2 9bbb44973 -> 06feaefba
refs/heads/cassandra-3.0 218743dc3 -> 815508f1c
refs/heads/cassandra-3.11 be0deb90e -> e345d4e7b
refs/heads/trunk 21b4cae61 -> f32521808
Avoid race on receiver by starting streaming sender thread after sending init message
Patch by Paulo Motta; Reviewed by Yuki Morishita for CASSANDRA-12886
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/06feaefb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/06feaefb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/06feaefb
Branch: refs/heads/cassandra-2.2
Commit: 06feaefba50301734c490521d720c8a482f638e4
Parents: 9bbb449
Author: Paulo Motta <pa...@gmail.com>
Authored: Wed Mar 1 20:25:32 2017 -0300
Committer: Paulo Motta <pa...@apache.org>
Committed: Wed Mar 1 20:30:30 2017 -0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/ConnectionHandler.java | 26 +++++++++++---------
2 files changed, 15 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/06feaefb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 404440a..ca1aa27 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.10
+ * Avoid race on receiver by starting streaming sender thread after sending init message (CASSANDRA-12886)
* Fix "multiple versions of ant detected..." when running ant test (CASSANDRA-13232)
* Coalescing strategy sleeps too much (CASSANDRA-13090)
* Make sure compaction stats are updated when compaction is interrupted (Backport from 3.0, CASSANDRA-12100)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/06feaefb/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index d3d8ed2..fe551a8 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -82,13 +82,11 @@ public class ConnectionHandler
{
logger.debug("[Stream #{}] Sending stream init for incoming stream", session.planId());
Socket incomingSocket = session.createConnection();
- incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION);
- incoming.sendInitMessage(incomingSocket, true);
+ incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION, true);
logger.debug("[Stream #{}] Sending stream init for outgoing stream", session.planId());
Socket outgoingSocket = session.createConnection();
- outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION);
- outgoing.sendInitMessage(outgoingSocket, false);
+ outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION, true);
}
/**
@@ -159,13 +157,15 @@ public class ConnectionHandler
protected int protocolVersion;
protected Socket socket;
+ private final boolean isOutgoingHandler;
private final AtomicReference<SettableFuture<?>> closeFuture = new AtomicReference<>();
private IncomingStreamingConnection incomingConnection;
- protected MessageHandler(StreamSession session)
+ protected MessageHandler(StreamSession session, boolean isOutgoingHandler)
{
this.session = session;
+ this.isOutgoingHandler = isOutgoingHandler;
}
protected abstract String name();
@@ -187,14 +187,14 @@ public class ConnectionHandler
}
@SuppressWarnings("resource")
- public void sendInitMessage(Socket socket, boolean isForOutgoing) throws IOException
+ public void sendInitMessage() throws IOException
{
StreamInitMessage message = new StreamInitMessage(
FBUtilities.getBroadcastAddress(),
session.sessionIndex(),
session.planId(),
session.description(),
- isForOutgoing,
+ !isOutgoingHandler,
session.keepSSTableLevel(),
session.isIncremental());
ByteBuffer messageBuf = message.createMessage(false, protocolVersion);
@@ -203,16 +203,18 @@ public class ConnectionHandler
out.flush();
}
- public void start(IncomingStreamingConnection connection, int protocolVersion)
+ public void start(IncomingStreamingConnection connection, int protocolVersion) throws IOException
{
this.incomingConnection = connection;
- start(connection.socket, protocolVersion);
+ start(connection.socket, protocolVersion, false);
}
- public void start(Socket socket, int protocolVersion)
+ public void start(Socket socket, int protocolVersion, boolean initiator) throws IOException
{
this.socket = socket;
this.protocolVersion = protocolVersion;
+ if (initiator)
+ sendInitMessage();
new Thread(this, name() + "-" + session.peer).start();
}
@@ -270,7 +272,7 @@ public class ConnectionHandler
{
IncomingMessageHandler(StreamSession session)
{
- super(session);
+ super(session, false);
}
protected String name()
@@ -330,7 +332,7 @@ public class ConnectionHandler
OutgoingMessageHandler(StreamSession session)
{
- super(session);
+ super(session, true);
}
protected String name()
[05/10] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by pa...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/815508f1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/815508f1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/815508f1
Branch: refs/heads/cassandra-3.11
Commit: 815508f1c240bbbac8246be56e15b1d09c1301b8
Parents: 218743d 06feaef
Author: Paulo Motta <pa...@apache.org>
Authored: Wed Mar 1 20:33:16 2017 -0300
Committer: Paulo Motta <pa...@apache.org>
Committed: Wed Mar 1 20:33:16 2017 -0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/ConnectionHandler.java | 26 +++++++++++---------
2 files changed, 15 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/815508f1/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index c4622ab,ca1aa27..ec3d826
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,12 -1,8 +1,13 @@@
-2.2.10
+3.0.12
+ * Avoid race on receiver by starting streaming sender thread after sending init message (CASSANDRA-12886)
+ * Update c.yaml doc for offheap memtables (CASSANDRA-13179)
+ * Faster StreamingHistogram (CASSANDRA-13038)
+ * Legacy deserializer can create unexpected boundary range tombstones (CASSANDRA-13237)
+ * Remove unnecessary assertion from AntiCompactionTest (CASSANDRA-13070)
+ * Fix cqlsh COPY for dates before 1900 (CASSANDRA-13185)
+Merged from 2.2
* Fix "multiple versions of ant detected..." when running ant test (CASSANDRA-13232)
* Coalescing strategy sleeps too much (CASSANDRA-13090)
- * Make sure compaction stats are updated when compaction is interrupted (Backport from 3.0, CASSANDRA-12100)
* Fix flaky LongLeveledCompactionStrategyTest (CASSANDRA-12202)
* Fix failing COPY TO STDOUT (CASSANDRA-12497)
* Fix ColumnCounter::countAll behaviour for reverse queries (CASSANDRA-13222)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/815508f1/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index b83c089,fe551a8..aa1c615
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@@ -215,8 -213,10 +215,10 @@@ public class ConnectionHandle
{
this.socket = socket;
this.protocolVersion = protocolVersion;
+ if (initiator)
+ sendInitMessage();
- new Thread(this, name() + "-" + session.peer).start();
+ new Thread(NamedThreadFactory.threadLocalDeallocator(this), name() + "-" + session.peer).start();
}
public ListenableFuture<?> close()
[07/10] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by pa...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/815508f1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/815508f1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/815508f1
Branch: refs/heads/trunk
Commit: 815508f1c240bbbac8246be56e15b1d09c1301b8
Parents: 218743d 06feaef
Author: Paulo Motta <pa...@apache.org>
Authored: Wed Mar 1 20:33:16 2017 -0300
Committer: Paulo Motta <pa...@apache.org>
Committed: Wed Mar 1 20:33:16 2017 -0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/ConnectionHandler.java | 26 +++++++++++---------
2 files changed, 15 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/815508f1/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index c4622ab,ca1aa27..ec3d826
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,12 -1,8 +1,13 @@@
-2.2.10
+3.0.12
+ * Avoid race on receiver by starting streaming sender thread after sending init message (CASSANDRA-12886)
+ * Update c.yaml doc for offheap memtables (CASSANDRA-13179)
+ * Faster StreamingHistogram (CASSANDRA-13038)
+ * Legacy deserializer can create unexpected boundary range tombstones (CASSANDRA-13237)
+ * Remove unnecessary assertion from AntiCompactionTest (CASSANDRA-13070)
+ * Fix cqlsh COPY for dates before 1900 (CASSANDRA-13185)
+Merged from 2.2
* Fix "multiple versions of ant detected..." when running ant test (CASSANDRA-13232)
* Coalescing strategy sleeps too much (CASSANDRA-13090)
- * Make sure compaction stats are updated when compaction is interrupted (Backport from 3.0, CASSANDRA-12100)
* Fix flaky LongLeveledCompactionStrategyTest (CASSANDRA-12202)
* Fix failing COPY TO STDOUT (CASSANDRA-12497)
* Fix ColumnCounter::countAll behaviour for reverse queries (CASSANDRA-13222)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/815508f1/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index b83c089,fe551a8..aa1c615
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@@ -215,8 -213,10 +215,10 @@@ public class ConnectionHandle
{
this.socket = socket;
this.protocolVersion = protocolVersion;
+ if (initiator)
+ sendInitMessage();
- new Thread(this, name() + "-" + session.peer).start();
+ new Thread(NamedThreadFactory.threadLocalDeallocator(this), name() + "-" + session.peer).start();
}
public ListenableFuture<?> close()
[03/10] cassandra git commit: Avoid race on receiver by starting
streaming sender thread after sending init message
Posted by pa...@apache.org.
Avoid race on receiver by starting streaming sender thread after sending init message
Patch by Paulo Motta; Reviewed by Yuki Morishita for CASSANDRA-12886
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/06feaefb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/06feaefb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/06feaefb
Branch: refs/heads/cassandra-3.11
Commit: 06feaefba50301734c490521d720c8a482f638e4
Parents: 9bbb449
Author: Paulo Motta <pa...@gmail.com>
Authored: Wed Mar 1 20:25:32 2017 -0300
Committer: Paulo Motta <pa...@apache.org>
Committed: Wed Mar 1 20:30:30 2017 -0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/ConnectionHandler.java | 26 +++++++++++---------
2 files changed, 15 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/06feaefb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 404440a..ca1aa27 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.10
+ * Avoid race on receiver by starting streaming sender thread after sending init message (CASSANDRA-12886)
* Fix "multiple versions of ant detected..." when running ant test (CASSANDRA-13232)
* Coalescing strategy sleeps too much (CASSANDRA-13090)
* Make sure compaction stats are updated when compaction is interrupted (Backport from 3.0, CASSANDRA-12100)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/06feaefb/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index d3d8ed2..fe551a8 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -82,13 +82,11 @@ public class ConnectionHandler
{
logger.debug("[Stream #{}] Sending stream init for incoming stream", session.planId());
Socket incomingSocket = session.createConnection();
- incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION);
- incoming.sendInitMessage(incomingSocket, true);
+ incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION, true);
logger.debug("[Stream #{}] Sending stream init for outgoing stream", session.planId());
Socket outgoingSocket = session.createConnection();
- outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION);
- outgoing.sendInitMessage(outgoingSocket, false);
+ outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION, true);
}
/**
@@ -159,13 +157,15 @@ public class ConnectionHandler
protected int protocolVersion;
protected Socket socket;
+ private final boolean isOutgoingHandler;
private final AtomicReference<SettableFuture<?>> closeFuture = new AtomicReference<>();
private IncomingStreamingConnection incomingConnection;
- protected MessageHandler(StreamSession session)
+ protected MessageHandler(StreamSession session, boolean isOutgoingHandler)
{
this.session = session;
+ this.isOutgoingHandler = isOutgoingHandler;
}
protected abstract String name();
@@ -187,14 +187,14 @@ public class ConnectionHandler
}
@SuppressWarnings("resource")
- public void sendInitMessage(Socket socket, boolean isForOutgoing) throws IOException
+ public void sendInitMessage() throws IOException
{
StreamInitMessage message = new StreamInitMessage(
FBUtilities.getBroadcastAddress(),
session.sessionIndex(),
session.planId(),
session.description(),
- isForOutgoing,
+ !isOutgoingHandler,
session.keepSSTableLevel(),
session.isIncremental());
ByteBuffer messageBuf = message.createMessage(false, protocolVersion);
@@ -203,16 +203,18 @@ public class ConnectionHandler
out.flush();
}
- public void start(IncomingStreamingConnection connection, int protocolVersion)
+ public void start(IncomingStreamingConnection connection, int protocolVersion) throws IOException
{
this.incomingConnection = connection;
- start(connection.socket, protocolVersion);
+ start(connection.socket, protocolVersion, false);
}
- public void start(Socket socket, int protocolVersion)
+ public void start(Socket socket, int protocolVersion, boolean initiator) throws IOException
{
this.socket = socket;
this.protocolVersion = protocolVersion;
+ if (initiator)
+ sendInitMessage();
new Thread(this, name() + "-" + session.peer).start();
}
@@ -270,7 +272,7 @@ public class ConnectionHandler
{
IncomingMessageHandler(StreamSession session)
{
- super(session);
+ super(session, false);
}
protected String name()
@@ -330,7 +332,7 @@ public class ConnectionHandler
OutgoingMessageHandler(StreamSession session)
{
- super(session);
+ super(session, true);
}
protected String name()
[06/10] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by pa...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/815508f1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/815508f1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/815508f1
Branch: refs/heads/cassandra-3.0
Commit: 815508f1c240bbbac8246be56e15b1d09c1301b8
Parents: 218743d 06feaef
Author: Paulo Motta <pa...@apache.org>
Authored: Wed Mar 1 20:33:16 2017 -0300
Committer: Paulo Motta <pa...@apache.org>
Committed: Wed Mar 1 20:33:16 2017 -0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/ConnectionHandler.java | 26 +++++++++++---------
2 files changed, 15 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/815508f1/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index c4622ab,ca1aa27..ec3d826
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,12 -1,8 +1,13 @@@
-2.2.10
+3.0.12
+ * Avoid race on receiver by starting streaming sender thread after sending init message (CASSANDRA-12886)
+ * Update c.yaml doc for offheap memtables (CASSANDRA-13179)
+ * Faster StreamingHistogram (CASSANDRA-13038)
+ * Legacy deserializer can create unexpected boundary range tombstones (CASSANDRA-13237)
+ * Remove unnecessary assertion from AntiCompactionTest (CASSANDRA-13070)
+ * Fix cqlsh COPY for dates before 1900 (CASSANDRA-13185)
+Merged from 2.2
* Fix "multiple versions of ant detected..." when running ant test (CASSANDRA-13232)
* Coalescing strategy sleeps too much (CASSANDRA-13090)
- * Make sure compaction stats are updated when compaction is interrupted (Backport from 3.0, CASSANDRA-12100)
* Fix flaky LongLeveledCompactionStrategyTest (CASSANDRA-12202)
* Fix failing COPY TO STDOUT (CASSANDRA-12497)
* Fix ColumnCounter::countAll behaviour for reverse queries (CASSANDRA-13222)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/815508f1/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index b83c089,fe551a8..aa1c615
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@@ -215,8 -213,10 +215,10 @@@ public class ConnectionHandle
{
this.socket = socket;
this.protocolVersion = protocolVersion;
+ if (initiator)
+ sendInitMessage();
- new Thread(this, name() + "-" + session.peer).start();
+ new Thread(NamedThreadFactory.threadLocalDeallocator(this), name() + "-" + session.peer).start();
}
public ListenableFuture<?> close()
[08/10] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.11
Posted by pa...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e345d4e7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e345d4e7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e345d4e7
Branch: refs/heads/cassandra-3.11
Commit: e345d4e7bc83e09e3c273981b5c95e5fb00d1df5
Parents: be0deb9 815508f
Author: Paulo Motta <pa...@apache.org>
Authored: Wed Mar 1 20:33:49 2017 -0300
Committer: Paulo Motta <pa...@apache.org>
Committed: Wed Mar 1 20:33:49 2017 -0300
----------------------------------------------------------------------
----------------------------------------------------------------------
[09/10] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.11
Posted by pa...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e345d4e7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e345d4e7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e345d4e7
Branch: refs/heads/trunk
Commit: e345d4e7bc83e09e3c273981b5c95e5fb00d1df5
Parents: be0deb9 815508f
Author: Paulo Motta <pa...@apache.org>
Authored: Wed Mar 1 20:33:49 2017 -0300
Committer: Paulo Motta <pa...@apache.org>
Committed: Wed Mar 1 20:33:49 2017 -0300
----------------------------------------------------------------------
----------------------------------------------------------------------
[10/10] cassandra git commit: Merge branch 'cassandra-3.11' into trunk
Posted by pa...@apache.org.
Merge branch 'cassandra-3.11' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f3252180
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f3252180
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f3252180
Branch: refs/heads/trunk
Commit: f3252180863d0a2e78d1bfb5437506033a57f931
Parents: 21b4cae e345d4e
Author: Paulo Motta <pa...@apache.org>
Authored: Wed Mar 1 20:34:28 2017 -0300
Committer: Paulo Motta <pa...@apache.org>
Committed: Wed Mar 1 20:34:28 2017 -0300
----------------------------------------------------------------------
----------------------------------------------------------------------
[02/10] cassandra git commit: Avoid race on receiver by starting
streaming sender thread after sending init message
Posted by pa...@apache.org.
Avoid race on receiver by starting streaming sender thread after sending init message
Patch by Paulo Motta; Reviewed by Yuki Morishita for CASSANDRA-12886
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/06feaefb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/06feaefb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/06feaefb
Branch: refs/heads/cassandra-3.0
Commit: 06feaefba50301734c490521d720c8a482f638e4
Parents: 9bbb449
Author: Paulo Motta <pa...@gmail.com>
Authored: Wed Mar 1 20:25:32 2017 -0300
Committer: Paulo Motta <pa...@apache.org>
Committed: Wed Mar 1 20:30:30 2017 -0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/ConnectionHandler.java | 26 +++++++++++---------
2 files changed, 15 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/06feaefb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 404440a..ca1aa27 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.10
+ * Avoid race on receiver by starting streaming sender thread after sending init message (CASSANDRA-12886)
* Fix "multiple versions of ant detected..." when running ant test (CASSANDRA-13232)
* Coalescing strategy sleeps too much (CASSANDRA-13090)
* Make sure compaction stats are updated when compaction is interrupted (Backport from 3.0, CASSANDRA-12100)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/06feaefb/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index d3d8ed2..fe551a8 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -82,13 +82,11 @@ public class ConnectionHandler
{
logger.debug("[Stream #{}] Sending stream init for incoming stream", session.planId());
Socket incomingSocket = session.createConnection();
- incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION);
- incoming.sendInitMessage(incomingSocket, true);
+ incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION, true);
logger.debug("[Stream #{}] Sending stream init for outgoing stream", session.planId());
Socket outgoingSocket = session.createConnection();
- outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION);
- outgoing.sendInitMessage(outgoingSocket, false);
+ outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION, true);
}
/**
@@ -159,13 +157,15 @@ public class ConnectionHandler
protected int protocolVersion;
protected Socket socket;
+ private final boolean isOutgoingHandler;
private final AtomicReference<SettableFuture<?>> closeFuture = new AtomicReference<>();
private IncomingStreamingConnection incomingConnection;
- protected MessageHandler(StreamSession session)
+ protected MessageHandler(StreamSession session, boolean isOutgoingHandler)
{
this.session = session;
+ this.isOutgoingHandler = isOutgoingHandler;
}
protected abstract String name();
@@ -187,14 +187,14 @@ public class ConnectionHandler
}
@SuppressWarnings("resource")
- public void sendInitMessage(Socket socket, boolean isForOutgoing) throws IOException
+ public void sendInitMessage() throws IOException
{
StreamInitMessage message = new StreamInitMessage(
FBUtilities.getBroadcastAddress(),
session.sessionIndex(),
session.planId(),
session.description(),
- isForOutgoing,
+ !isOutgoingHandler,
session.keepSSTableLevel(),
session.isIncremental());
ByteBuffer messageBuf = message.createMessage(false, protocolVersion);
@@ -203,16 +203,18 @@ public class ConnectionHandler
out.flush();
}
- public void start(IncomingStreamingConnection connection, int protocolVersion)
+ public void start(IncomingStreamingConnection connection, int protocolVersion) throws IOException
{
this.incomingConnection = connection;
- start(connection.socket, protocolVersion);
+ start(connection.socket, protocolVersion, false);
}
- public void start(Socket socket, int protocolVersion)
+ public void start(Socket socket, int protocolVersion, boolean initiator) throws IOException
{
this.socket = socket;
this.protocolVersion = protocolVersion;
+ if (initiator)
+ sendInitMessage();
new Thread(this, name() + "-" + session.peer).start();
}
@@ -270,7 +272,7 @@ public class ConnectionHandler
{
IncomingMessageHandler(StreamSession session)
{
- super(session);
+ super(session, false);
}
protected String name()
@@ -330,7 +332,7 @@ public class ConnectionHandler
OutgoingMessageHandler(StreamSession session)
{
- super(session);
+ super(session, true);
}
protected String name()
[04/10] cassandra git commit: Avoid race on receiver by starting
streaming sender thread after sending init message
Posted by pa...@apache.org.
Avoid race on receiver by starting streaming sender thread after sending init message
Patch by Paulo Motta; Reviewed by Yuki Morishita for CASSANDRA-12886
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/06feaefb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/06feaefb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/06feaefb
Branch: refs/heads/trunk
Commit: 06feaefba50301734c490521d720c8a482f638e4
Parents: 9bbb449
Author: Paulo Motta <pa...@gmail.com>
Authored: Wed Mar 1 20:25:32 2017 -0300
Committer: Paulo Motta <pa...@apache.org>
Committed: Wed Mar 1 20:30:30 2017 -0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/ConnectionHandler.java | 26 +++++++++++---------
2 files changed, 15 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/06feaefb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 404440a..ca1aa27 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.10
+ * Avoid race on receiver by starting streaming sender thread after sending init message (CASSANDRA-12886)
* Fix "multiple versions of ant detected..." when running ant test (CASSANDRA-13232)
* Coalescing strategy sleeps too much (CASSANDRA-13090)
* Make sure compaction stats are updated when compaction is interrupted (Backport from 3.0, CASSANDRA-12100)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/06feaefb/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index d3d8ed2..fe551a8 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -82,13 +82,11 @@ public class ConnectionHandler
{
logger.debug("[Stream #{}] Sending stream init for incoming stream", session.planId());
Socket incomingSocket = session.createConnection();
- incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION);
- incoming.sendInitMessage(incomingSocket, true);
+ incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION, true);
logger.debug("[Stream #{}] Sending stream init for outgoing stream", session.planId());
Socket outgoingSocket = session.createConnection();
- outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION);
- outgoing.sendInitMessage(outgoingSocket, false);
+ outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION, true);
}
/**
@@ -159,13 +157,15 @@ public class ConnectionHandler
protected int protocolVersion;
protected Socket socket;
+ private final boolean isOutgoingHandler;
private final AtomicReference<SettableFuture<?>> closeFuture = new AtomicReference<>();
private IncomingStreamingConnection incomingConnection;
- protected MessageHandler(StreamSession session)
+ protected MessageHandler(StreamSession session, boolean isOutgoingHandler)
{
this.session = session;
+ this.isOutgoingHandler = isOutgoingHandler;
}
protected abstract String name();
@@ -187,14 +187,14 @@ public class ConnectionHandler
}
@SuppressWarnings("resource")
- public void sendInitMessage(Socket socket, boolean isForOutgoing) throws IOException
+ public void sendInitMessage() throws IOException
{
StreamInitMessage message = new StreamInitMessage(
FBUtilities.getBroadcastAddress(),
session.sessionIndex(),
session.planId(),
session.description(),
- isForOutgoing,
+ !isOutgoingHandler,
session.keepSSTableLevel(),
session.isIncremental());
ByteBuffer messageBuf = message.createMessage(false, protocolVersion);
@@ -203,16 +203,18 @@ public class ConnectionHandler
out.flush();
}
- public void start(IncomingStreamingConnection connection, int protocolVersion)
+ public void start(IncomingStreamingConnection connection, int protocolVersion) throws IOException
{
this.incomingConnection = connection;
- start(connection.socket, protocolVersion);
+ start(connection.socket, protocolVersion, false);
}
- public void start(Socket socket, int protocolVersion)
+ public void start(Socket socket, int protocolVersion, boolean initiator) throws IOException
{
this.socket = socket;
this.protocolVersion = protocolVersion;
+ if (initiator)
+ sendInitMessage();
new Thread(this, name() + "-" + session.peer).start();
}
@@ -270,7 +272,7 @@ public class ConnectionHandler
{
IncomingMessageHandler(StreamSession session)
{
- super(session);
+ super(session, false);
}
protected String name()
@@ -330,7 +332,7 @@ public class ConnectionHandler
OutgoingMessageHandler(StreamSession session)
{
- super(session);
+ super(session, true);
}
protected String name()