You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2016/03/10 20:04:35 UTC
[03/15] cassandra git commit: Fix streaming_socket_timeout_in_ms not
enforced
Fix streaming_socket_timeout_in_ms not enforced
Patch by Paulo Motta; reviewed by Yuki Morishita for CASSANDRA-11286
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/561000aa
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/561000aa
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/561000aa
Branch: refs/heads/trunk
Commit: 561000aa3094699bab29766d9644ff50f6cb74f3
Parents: e94a2a0
Author: Paulo Motta <pa...@gmail.com>
Authored: Fri Feb 12 12:17:01 2016 -0300
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Mar 10 12:54:24 2016 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 7 +++----
.../net/IncomingStreamingConnection.java | 7 ++++++-
.../cassandra/streaming/ConnectionHandler.java | 21 +++++++++++++-------
.../cassandra/streaming/StreamSession.java | 2 ++
5 files changed, 26 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/561000aa/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e7c997a..4b505f8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.14
+ * Fix streaming_socket_timeout_in_ms not enforced (CASSANDRA-11286)
* Avoid dropping message too quickly due to missing unit conversion (CASSANDRA-11302)
* COPY FROM on large datasets: fix progress report and debug performance (CASSANDRA-11053)
* InvalidateKeys should have a weak ref to key cache (CASSANDRA-11176)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/561000aa/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 1fa04e6..0da4800 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -630,10 +630,9 @@ request_timeout_in_ms: 10000
# and the times are synchronized between the nodes.
cross_node_timeout: false
-# Enable socket timeout for streaming operation.
-# When a timeout occurs during streaming, streaming is retried from the start
-# of the current file. This _can_ involve re-streaming an important amount of
-# data, so you should avoid setting the value too low.
+# Set socket timeout for streaming operation.
+# The stream session is failed if no data is received by any of the
+# participants within that period.
# Default value is 3600000, which means streams timeout after an hour.
# streaming_socket_timeout_in_ms: 3600000
http://git-wip-us.apache.org/repos/asf/cassandra/blob/561000aa/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
index 1f98bc4..5ced786 100644
--- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
@@ -27,6 +27,7 @@ import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.streaming.StreamResultFuture;
import org.apache.cassandra.streaming.messages.StreamInitMessage;
import org.apache.cassandra.streaming.messages.StreamMessage;
@@ -62,6 +63,10 @@ public class IncomingStreamingConnection extends Thread implements Closeable
DataInput input = new DataInputStream(socket.getInputStream());
StreamInitMessage init = StreamInitMessage.serializer.deserialize(input, version);
+ //Set SO_TIMEOUT on follower side
+ if (!init.isForOutgoing)
+ socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout());
+
// The initiator makes two connections, one for incoming and one for outgoing.
// The receiving side distinguish two connections by looking at StreamInitMessage#isForOutgoing.
// Note: we cannot use the same socket for incoming and outgoing streams because we want to
@@ -74,7 +79,7 @@ public class IncomingStreamingConnection extends Thread implements Closeable
close();
}
}
-
+
@Override
public void close()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/561000aa/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index ac267f9..52268b2 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -105,12 +105,22 @@ public class ConnectionHandler
{
logger.debug("[Stream #{}] Closing stream connection handler on {}", session.planId(), session.peer);
- ListenableFuture<?> inClosed = incoming == null ? Futures.immediateFuture(null) : incoming.close();
- ListenableFuture<?> outClosed = outgoing == null ? Futures.immediateFuture(null) : outgoing.close();
+ ListenableFuture<?> inClosed = closeIncoming();
+ ListenableFuture<?> outClosed = closeOutgoing();
return Futures.allAsList(inClosed, outClosed);
}
+ public ListenableFuture<?> closeOutgoing()
+ {
+ return outgoing == null ? Futures.immediateFuture(null) : outgoing.close();
+ }
+
+ public ListenableFuture<?> closeIncoming()
+ {
+ return incoming == null ? Futures.immediateFuture(null) : incoming.close();
+ }
+
/**
* Enqueue messages to be sent.
*
@@ -165,11 +175,8 @@ public class ConnectionHandler
protected static ReadableByteChannel getReadChannel(Socket socket) throws IOException
{
- ReadableByteChannel in = socket.getChannel();
- // socket channel is null when encrypted(SSL)
- return in == null
- ? Channels.newChannel(socket.getInputStream())
- : in;
+ //we do this instead of socket.getChannel() so socketSoTimeout is respected
+ return Channels.newChannel(socket.getInputStream());
}
public void sendInitMessage(Socket socket, boolean isForOutgoing) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/561000aa/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 98a6f1f..642e837 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -609,6 +609,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
else
{
state(State.WAIT_COMPLETE);
+ handler.closeIncoming();
}
}
@@ -696,6 +697,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
handler.sendMessage(new CompleteMessage());
completeSent = true;
state(State.WAIT_COMPLETE);
+ handler.closeOutgoing();
}
}
return completed;