You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2016/03/10 20:04:36 UTC

[04/15] cassandra git commit: Fix streaming_socket_timeout_in_ms not enforced

Fix streaming_socket_timeout_in_ms not enforced

Patch by Paulo Motta; reviewed by Yuki Morishita for CASSANDRA-11286


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/561000aa
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/561000aa
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/561000aa

Branch: refs/heads/cassandra-3.0
Commit: 561000aa3094699bab29766d9644ff50f6cb74f3
Parents: e94a2a0
Author: Paulo Motta <pa...@gmail.com>
Authored: Fri Feb 12 12:17:01 2016 -0300
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Mar 10 12:54:24 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 conf/cassandra.yaml                             |  7 +++----
 .../net/IncomingStreamingConnection.java        |  7 ++++++-
 .../cassandra/streaming/ConnectionHandler.java  | 21 +++++++++++++-------
 .../cassandra/streaming/StreamSession.java      |  2 ++
 5 files changed, 26 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/561000aa/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e7c997a..4b505f8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.14
+ * Fix streaming_socket_timeout_in_ms not enforced (CASSANDRA-11286)
  * Avoid dropping message too quickly due to missing unit conversion (CASSANDRA-11302)
  * COPY FROM on large datasets: fix progress report and debug performance (CASSANDRA-11053)
  * InvalidateKeys should have a weak ref to key cache (CASSANDRA-11176)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/561000aa/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 1fa04e6..0da4800 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -630,10 +630,9 @@ request_timeout_in_ms: 10000
 # and the times are synchronized between the nodes.
 cross_node_timeout: false
 
-# Enable socket timeout for streaming operation.
-# When a timeout occurs during streaming, streaming is retried from the start
-# of the current file. This _can_ involve re-streaming an important amount of
-# data, so you should avoid setting the value too low.
+# Set socket timeout for streaming operation.
+# The stream session is failed if no data is received by any of the
+# participants within that period.
 # Default value is 3600000, which means streams timeout after an hour.
 # streaming_socket_timeout_in_ms: 3600000
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/561000aa/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
index 1f98bc4..5ced786 100644
--- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
@@ -27,6 +27,7 @@ import java.util.Set;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.streaming.StreamResultFuture;
 import org.apache.cassandra.streaming.messages.StreamInitMessage;
 import org.apache.cassandra.streaming.messages.StreamMessage;
@@ -62,6 +63,10 @@ public class IncomingStreamingConnection extends Thread implements Closeable
             DataInput input = new DataInputStream(socket.getInputStream());
             StreamInitMessage init = StreamInitMessage.serializer.deserialize(input, version);
 
+            //Set SO_TIMEOUT on follower side
+            if (!init.isForOutgoing)
+                socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout());
+
             // The initiator makes two connections, one for incoming and one for outgoing.
             // The receiving side distinguish two connections by looking at StreamInitMessage#isForOutgoing.
             // Note: we cannot use the same socket for incoming and outgoing streams because we want to
@@ -74,7 +79,7 @@ public class IncomingStreamingConnection extends Thread implements Closeable
             close();
         }
     }
-    
+
     @Override
     public void close()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/561000aa/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index ac267f9..52268b2 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -105,12 +105,22 @@ public class ConnectionHandler
     {
         logger.debug("[Stream #{}] Closing stream connection handler on {}", session.planId(), session.peer);
 
-        ListenableFuture<?> inClosed = incoming == null ? Futures.immediateFuture(null) : incoming.close();
-        ListenableFuture<?> outClosed = outgoing == null ? Futures.immediateFuture(null) : outgoing.close();
+        ListenableFuture<?> inClosed = closeIncoming();
+        ListenableFuture<?> outClosed = closeOutgoing();
 
         return Futures.allAsList(inClosed, outClosed);
     }
 
+    public ListenableFuture<?> closeOutgoing()
+    {
+        return outgoing == null ? Futures.immediateFuture(null) : outgoing.close();
+    }
+
+    public ListenableFuture<?> closeIncoming()
+    {
+        return incoming == null ? Futures.immediateFuture(null) : incoming.close();
+    }
+
     /**
      * Enqueue messages to be sent.
      *
@@ -165,11 +175,8 @@ public class ConnectionHandler
 
         protected static ReadableByteChannel getReadChannel(Socket socket) throws IOException
         {
-            ReadableByteChannel in = socket.getChannel();
-            // socket channel is null when encrypted(SSL)
-            return in == null
-                 ? Channels.newChannel(socket.getInputStream())
-                 : in;
+            //we do this instead of socket.getChannel() so socketSoTimeout is respected
+            return Channels.newChannel(socket.getInputStream());
         }
 
         public void sendInitMessage(Socket socket, boolean isForOutgoing) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/561000aa/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 98a6f1f..642e837 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -609,6 +609,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         else
         {
             state(State.WAIT_COMPLETE);
+            handler.closeIncoming();
         }
     }
 
@@ -696,6 +697,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
                 handler.sendMessage(new CompleteMessage());
                 completeSent = true;
                 state(State.WAIT_COMPLETE);
+                handler.closeOutgoing();
             }
         }
         return completed;