You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by go...@apache.org on 2012/02/06 05:15:13 UTC
[5/8] git commit: Add optional socket timeout for streaming
Add optional socket timeout for streaming
patch by vijay2win; reviewed by slebresne for CASSANDRA-3838
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a35f8787
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a35f8787
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a35f8787
Branch: refs/heads/trunk
Commit: a35f8787cfbfb2ad60ee7795c104ee05661227dd
Parents: 0074d64
Author: Sylvain Lebresne <sy...@riptano.com>
Authored: Sun Feb 5 22:23:51 2012 +0100
Committer: Sylvain Lebresne <sy...@riptano.com>
Committed: Sun Feb 5 22:23:51 2012 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 7 +++++++
conf/cassandra.yaml | 7 +++++++
src/java/org/apache/cassandra/config/Config.java | 2 ++
.../cassandra/config/DatabaseDescriptor.java | 5 +++++
.../apache/cassandra/streaming/FileStreamTask.java | 1 +
.../cassandra/streaming/IncomingStreamReader.java | 1 +
7 files changed, 24 insertions(+), 0 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a35f8787/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b201961..83eca8a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,7 @@
readability (CASSANDRA-3726)
* synchronize BiMap of bootstrapping tokens (CASSANDRA-3417)
* show index options in CLI (CASSANDRA-3809)
+ * add optional socket timeout for streaming (CASSANDRA-3838)
Merged from 0.8:
* (Pig) fix CassandraStorage to use correct comparator in Super ColumnFamily
case (CASSANDRA-3251)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a35f8787/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 2c29606..e36b9e8 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -8,6 +8,13 @@ upgrade, just in case you need to roll back to the previous version.
(Cassandra version X + 1 will always be able to read data files created
by version X, but the inverse is not necessarily the case.)
+1.0.8
+=====
+
+Other
+-----
+ - Allow configuring socket timeout for streaming
+
1.0.7
=====
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a35f8787/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 2082db0..209bcb8 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -315,6 +315,13 @@ compaction_preheat_key_cache: true
# Time to wait for a reply from other nodes before failing the command
rpc_timeout_in_ms: 10000
+# Enable socket timeout for streaming operation.
+# When a timeout occurs during streaming, streaming is retried from the start
+# of the current file. This *can* involve re-streaming an important amount of
+# data, so you should avoid setting the value too low.
+# Default value is 0, which never timeout streams.
+# streaming_socket_timeout_in_ms: 0
+
# phi value that must be reached for a host to be marked down.
# most users should never need to adjust this.
# phi_convict_threshold: 8
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a35f8787/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 9f5480c..7cff37f 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -50,6 +50,8 @@ public class Config
public Long rpc_timeout_in_ms = new Long(2000);
+ public Integer streaming_socket_timeout_in_ms = new Integer(0);
+
public Integer phi_convict_threshold = 8;
public Integer concurrent_reads = 8;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a35f8787/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 03b5175..5aa59e4 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1008,4 +1008,9 @@ public class DatabaseDescriptor
{
return conf.commitlog_total_space_in_mb;
}
+
+ public static int getStreamingSocketTimeout()
+ {
+ return conf.streaming_socket_timeout_in_ms;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a35f8787/src/java/org/apache/cassandra/streaming/FileStreamTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/FileStreamTask.java b/src/java/org/apache/cassandra/streaming/FileStreamTask.java
index 9411b16..ffb1388 100644
--- a/src/java/org/apache/cassandra/streaming/FileStreamTask.java
+++ b/src/java/org/apache/cassandra/streaming/FileStreamTask.java
@@ -197,6 +197,7 @@ public class FileStreamTask extends WrappedRunnable
try
{
socket = MessagingService.instance().getConnectionPool(to).newSocket();
+ socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout());
output = socket.getOutputStream();
break;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a35f8787/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
index e2a618f..8ade06a 100644
--- a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
@@ -55,6 +55,7 @@ public class IncomingStreamReader
public IncomingStreamReader(StreamHeader header, Socket socket) throws IOException
{
+ socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout());
this.socket = socket;
InetSocketAddress remoteAddress = (InetSocketAddress)socket.getRemoteSocketAddress();
session = StreamInSession.get(remoteAddress.getAddress(), header.sessionId);