You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pa...@apache.org on 2017/03/01 23:34:49 UTC
[03/10] cassandra git commit: Avoid race on receiver by starting
streaming sender thread after sending init message
Avoid race on receiver by starting streaming sender thread after sending init message
Patch by Paulo Motta; Reviewed by Yuki Morishita for CASSANDRA-12886
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/06feaefb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/06feaefb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/06feaefb
Branch: refs/heads/cassandra-3.11
Commit: 06feaefba50301734c490521d720c8a482f638e4
Parents: 9bbb449
Author: Paulo Motta <pa...@gmail.com>
Authored: Wed Mar 1 20:25:32 2017 -0300
Committer: Paulo Motta <pa...@apache.org>
Committed: Wed Mar 1 20:30:30 2017 -0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/ConnectionHandler.java | 26 +++++++++++---------
2 files changed, 15 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/06feaefb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 404440a..ca1aa27 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.10
+ * Avoid race on receiver by starting streaming sender thread after sending init message (CASSANDRA-12886)
* Fix "multiple versions of ant detected..." when running ant test (CASSANDRA-13232)
* Coalescing strategy sleeps too much (CASSANDRA-13090)
* Make sure compaction stats are updated when compaction is interrupted (Backport from 3.0, CASSANDRA-12100)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/06feaefb/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index d3d8ed2..fe551a8 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -82,13 +82,11 @@ public class ConnectionHandler
{
logger.debug("[Stream #{}] Sending stream init for incoming stream", session.planId());
Socket incomingSocket = session.createConnection();
- incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION);
- incoming.sendInitMessage(incomingSocket, true);
+ incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION, true);
logger.debug("[Stream #{}] Sending stream init for outgoing stream", session.planId());
Socket outgoingSocket = session.createConnection();
- outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION);
- outgoing.sendInitMessage(outgoingSocket, false);
+ outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION, true);
}
/**
@@ -159,13 +157,15 @@ public class ConnectionHandler
protected int protocolVersion;
protected Socket socket;
+ private final boolean isOutgoingHandler;
private final AtomicReference<SettableFuture<?>> closeFuture = new AtomicReference<>();
private IncomingStreamingConnection incomingConnection;
- protected MessageHandler(StreamSession session)
+ protected MessageHandler(StreamSession session, boolean isOutgoingHandler)
{
this.session = session;
+ this.isOutgoingHandler = isOutgoingHandler;
}
protected abstract String name();
@@ -187,14 +187,14 @@ public class ConnectionHandler
}
@SuppressWarnings("resource")
- public void sendInitMessage(Socket socket, boolean isForOutgoing) throws IOException
+ public void sendInitMessage() throws IOException
{
StreamInitMessage message = new StreamInitMessage(
FBUtilities.getBroadcastAddress(),
session.sessionIndex(),
session.planId(),
session.description(),
- isForOutgoing,
+ !isOutgoingHandler,
session.keepSSTableLevel(),
session.isIncremental());
ByteBuffer messageBuf = message.createMessage(false, protocolVersion);
@@ -203,16 +203,18 @@ public class ConnectionHandler
out.flush();
}
- public void start(IncomingStreamingConnection connection, int protocolVersion)
+ public void start(IncomingStreamingConnection connection, int protocolVersion) throws IOException
{
this.incomingConnection = connection;
- start(connection.socket, protocolVersion);
+ start(connection.socket, protocolVersion, false);
}
- public void start(Socket socket, int protocolVersion)
+ public void start(Socket socket, int protocolVersion, boolean initiator) throws IOException
{
this.socket = socket;
this.protocolVersion = protocolVersion;
+ if (initiator)
+ sendInitMessage();
new Thread(this, name() + "-" + session.peer).start();
}
@@ -270,7 +272,7 @@ public class ConnectionHandler
{
IncomingMessageHandler(StreamSession session)
{
- super(session);
+ super(session, false);
}
protected String name()
@@ -330,7 +332,7 @@ public class ConnectionHandler
OutgoingMessageHandler(StreamSession session)
{
- super(session);
+ super(session, true);
}
protected String name()