You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2017/01/20 19:49:34 UTC

cassandra git commit: Parallelize streaming of different keyspaces for bootstrap and rebuild

Repository: cassandra
Updated Branches:
  refs/heads/trunk d3704d8a0 -> 4d67639d3


Parallelize streaming of different keyspaces for bootstrap and rebuild

patch by Corentin Chary; reviewed by jasobrown for CASSANDRA-4663


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4d67639d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4d67639d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4d67639d

Branch: refs/heads/trunk
Commit: 4d67639d38b3e3a6fd0a3487a99b9755abda469d
Parents: d3704d8
Author: Corentin Chary <c....@criteo.com>
Authored: Wed Dec 7 11:11:06 2016 +0100
Committer: Jason Brown <ja...@gmail.com>
Committed: Fri Jan 20 11:47:18 2017 -0800

----------------------------------------------------------------------
 CHANGES.txt                                                  | 1 +
 conf/cassandra.yaml                                          | 6 ++++++
 src/java/org/apache/cassandra/config/Config.java             | 1 +
 src/java/org/apache/cassandra/config/DatabaseDescriptor.java | 5 +++++
 src/java/org/apache/cassandra/dht/BootStrapper.java          | 3 ++-
 src/java/org/apache/cassandra/dht/RangeStreamer.java         | 7 +++++--
 src/java/org/apache/cassandra/service/StorageService.java    | 3 ++-
 7 files changed, 22 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d67639d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ebf161d..86ecbc4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Parallelize streaming of different keyspaces (4663)
  * Improved compactions metrics (CASSANDRA-13015)
  * Speed-up start-up sequence by avoiding un-needed flushes (CASSANDRA-13031)
  * Use Caffeine (W-TinyLFU) for on-heap caches (CASSANDRA-10855)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d67639d/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 4891706..e728796 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -817,6 +817,12 @@ cross_node_timeout: false
 # times out in 10 minutes by default
 # streaming_keep_alive_period_in_secs: 300
 
+# Limit number of connections per host for streaming
+# Increase this when you notice that joins are CPU-bound rather that network
+# bound (for example a few nodes with big files).
+# streaming_connections_per_host: 1
+
+
 # phi value that must be reached for a host to be marked down.
 # most users should never need to adjust this.
 # phi_convict_threshold: 8

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d67639d/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index f5a8722..6fb999e 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -101,6 +101,7 @@ public class Config
     @Deprecated
     public int streaming_socket_timeout_in_ms = 86400000; //24 hours
 
+    public Integer streaming_connections_per_host = 1;
     public Integer streaming_keep_alive_period_in_secs = 300; //5 minutes
 
     public boolean cross_node_timeout = false;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d67639d/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index c43672a..5aa7065 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -2030,6 +2030,11 @@ public class DatabaseDescriptor
         return conf.streaming_keep_alive_period_in_secs;
     }
 
+    public static int getStreamingConnectionsPerHost()
+    {
+        return conf.streaming_connections_per_host;
+    }
+
     public static String getLocalDataCenter()
     {
         return localDC;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d67639d/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index 1e00f48..15e75fe 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -77,7 +77,8 @@ public class BootStrapper extends ProgressEventNotifierSupport
                                                    useStrictConsistency,
                                                    DatabaseDescriptor.getEndpointSnitch(),
                                                    stateStore,
-                                                   true);
+                                                   true,
+                                                   DatabaseDescriptor.getStreamingConnectionsPerHost());
         streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance));
         streamer.addSourceFilter(new RangeStreamer.ExcludeLocalNodeFilter());
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d67639d/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index 504ef7e..46ca779 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -147,13 +148,15 @@ public class RangeStreamer
                          boolean useStrictConsistency,
                          IEndpointSnitch snitch,
                          StreamStateStore stateStore,
-                         boolean connectSequentially)
+                         boolean connectSequentially,
+                         int connectionsPerHost)
     {
         this.metadata = metadata;
         this.tokens = tokens;
         this.address = address;
         this.description = description;
-        this.streamPlan = new StreamPlan(description, true, connectSequentially);
+        this.streamPlan = new StreamPlan(description, ActiveRepairService.UNREPAIRED_SSTABLE, connectionsPerHost,
+                true, false, connectSequentially);
         this.useStrictConsistency = useStrictConsistency;
         this.snitch = snitch;
         this.stateStore = stateStore;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d67639d/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index ad1d978..6cb67fc 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1136,7 +1136,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                                                        useStrictConsistency && !replacing,
                                                        DatabaseDescriptor.getEndpointSnitch(),
                                                        streamStateStore,
-                                                       false);
+                                                       false,
+                                                       DatabaseDescriptor.getStreamingConnectionsPerHost());
             streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance));
             if (sourceDc != null)
                 streamer.addSourceFilter(new RangeStreamer.SingleDatacenterFilter(DatabaseDescriptor.getEndpointSnitch(), sourceDc));