You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by go...@apache.org on 2012/02/06 05:15:13 UTC

[5/8] git commit: Add optional socket timeout for streaming

Add optional socket timeout for streaming

patch by vijay2win; reviewed by slebresne for CASSANDRA-3838


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a35f8787
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a35f8787
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a35f8787

Branch: refs/heads/trunk
Commit: a35f8787cfbfb2ad60ee7795c104ee05661227dd
Parents: 0074d64
Author: Sylvain Lebresne <sy...@riptano.com>
Authored: Sun Feb 5 22:23:51 2012 +0100
Committer: Sylvain Lebresne <sy...@riptano.com>
Committed: Sun Feb 5 22:23:51 2012 +0100

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 NEWS.txt                                           |    7 +++++++
 conf/cassandra.yaml                                |    7 +++++++
 src/java/org/apache/cassandra/config/Config.java   |    2 ++
 .../cassandra/config/DatabaseDescriptor.java       |    5 +++++
 .../apache/cassandra/streaming/FileStreamTask.java |    1 +
 .../cassandra/streaming/IncomingStreamReader.java  |    1 +
 7 files changed, 24 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a35f8787/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b201961..83eca8a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,7 @@
    readability (CASSANDRA-3726)
  * synchronize BiMap of bootstrapping tokens (CASSANDRA-3417)
  * show index options in CLI (CASSANDRA-3809)
+ * add optional socket timeout for streaming (CASSANDRA-3838)
 Merged from 0.8:
  * (Pig) fix CassandraStorage to use correct comparator in Super ColumnFamily
    case (CASSANDRA-3251)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a35f8787/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 2c29606..e36b9e8 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -8,6 +8,13 @@ upgrade, just in case you need to roll back to the previous version.
 (Cassandra version X + 1 will always be able to read data files created
 by version X, but the inverse is not necessarily the case.)
 
+1.0.8
+=====
+
+Other
+-----
+    - Allow configuring socket timeout for streaming
+
 1.0.7
 =====
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a35f8787/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 2082db0..209bcb8 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -315,6 +315,13 @@ compaction_preheat_key_cache: true
 # Time to wait for a reply from other nodes before failing the command 
 rpc_timeout_in_ms: 10000
 
+# Enable socket timeout for streaming operation.
+# When a timeout occurs during streaming, streaming is retried from the start
+# of the current file. This *can* involve re-streaming an important amount of
+# data, so you should avoid setting the value too low.
+# Default value is 0, which never timeout streams.
+# streaming_socket_timeout_in_ms: 0
+
 # phi value that must be reached for a host to be marked down.
 # most users should never need to adjust this.
 # phi_convict_threshold: 8

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a35f8787/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 9f5480c..7cff37f 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -50,6 +50,8 @@ public class Config
     
     public Long rpc_timeout_in_ms = new Long(2000);
 
+    public Integer streaming_socket_timeout_in_ms = new Integer(0);
+
     public Integer phi_convict_threshold = 8;
     
     public Integer concurrent_reads = 8;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a35f8787/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 03b5175..5aa59e4 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1008,4 +1008,9 @@ public class DatabaseDescriptor
     {
         return conf.commitlog_total_space_in_mb;
     }
+
+    public static int getStreamingSocketTimeout()
+    {
+        return conf.streaming_socket_timeout_in_ms;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a35f8787/src/java/org/apache/cassandra/streaming/FileStreamTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/FileStreamTask.java b/src/java/org/apache/cassandra/streaming/FileStreamTask.java
index 9411b16..ffb1388 100644
--- a/src/java/org/apache/cassandra/streaming/FileStreamTask.java
+++ b/src/java/org/apache/cassandra/streaming/FileStreamTask.java
@@ -197,6 +197,7 @@ public class FileStreamTask extends WrappedRunnable
             try
             {
                 socket = MessagingService.instance().getConnectionPool(to).newSocket();
+                socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout());
                 output = socket.getOutputStream();
                 break;
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a35f8787/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
index e2a618f..8ade06a 100644
--- a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
@@ -55,6 +55,7 @@ public class IncomingStreamReader
 
     public IncomingStreamReader(StreamHeader header, Socket socket) throws IOException
     {
+        socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout());
         this.socket = socket;
         InetSocketAddress remoteAddress = (InetSocketAddress)socket.getRemoteSocketAddress();
         session = StreamInSession.get(remoteAddress.getAddress(), header.sessionId);