You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2017/01/20 19:49:34 UTC
cassandra git commit: Parallelize streaming of different keyspaces
for bootstrap and rebuild
Repository: cassandra
Updated Branches:
refs/heads/trunk d3704d8a0 -> 4d67639d3
Parallelize streaming of different keyspaces for bootstrap and rebuild
patch by Corentin Chary; reviewed by jasobrown for CASSANDRA-4663
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4d67639d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4d67639d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4d67639d
Branch: refs/heads/trunk
Commit: 4d67639d38b3e3a6fd0a3487a99b9755abda469d
Parents: d3704d8
Author: Corentin Chary <c....@criteo.com>
Authored: Wed Dec 7 11:11:06 2016 +0100
Committer: Jason Brown <ja...@gmail.com>
Committed: Fri Jan 20 11:47:18 2017 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 6 ++++++
src/java/org/apache/cassandra/config/Config.java | 1 +
src/java/org/apache/cassandra/config/DatabaseDescriptor.java | 5 +++++
src/java/org/apache/cassandra/dht/BootStrapper.java | 3 ++-
src/java/org/apache/cassandra/dht/RangeStreamer.java | 7 +++++--
src/java/org/apache/cassandra/service/StorageService.java | 3 ++-
7 files changed, 22 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d67639d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ebf161d..86ecbc4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0
+ * Parallelize streaming of different keyspaces (4663)
* Improved compactions metrics (CASSANDRA-13015)
* Speed-up start-up sequence by avoiding un-needed flushes (CASSANDRA-13031)
* Use Caffeine (W-TinyLFU) for on-heap caches (CASSANDRA-10855)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d67639d/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 4891706..e728796 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -817,6 +817,12 @@ cross_node_timeout: false
# times out in 10 minutes by default
# streaming_keep_alive_period_in_secs: 300
+# Limit number of connections per host for streaming
+# Increase this when you notice that joins are CPU-bound rather that network
+# bound (for example a few nodes with big files).
+# streaming_connections_per_host: 1
+
+
# phi value that must be reached for a host to be marked down.
# most users should never need to adjust this.
# phi_convict_threshold: 8
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d67639d/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index f5a8722..6fb999e 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -101,6 +101,7 @@ public class Config
@Deprecated
public int streaming_socket_timeout_in_ms = 86400000; //24 hours
+ public Integer streaming_connections_per_host = 1;
public Integer streaming_keep_alive_period_in_secs = 300; //5 minutes
public boolean cross_node_timeout = false;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d67639d/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index c43672a..5aa7065 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -2030,6 +2030,11 @@ public class DatabaseDescriptor
return conf.streaming_keep_alive_period_in_secs;
}
+ public static int getStreamingConnectionsPerHost()
+ {
+ return conf.streaming_connections_per_host;
+ }
+
public static String getLocalDataCenter()
{
return localDC;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d67639d/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index 1e00f48..15e75fe 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -77,7 +77,8 @@ public class BootStrapper extends ProgressEventNotifierSupport
useStrictConsistency,
DatabaseDescriptor.getEndpointSnitch(),
stateStore,
- true);
+ true,
+ DatabaseDescriptor.getStreamingConnectionsPerHost());
streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance));
streamer.addSourceFilter(new RangeStreamer.ExcludeLocalNodeFilter());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d67639d/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index 504ef7e..46ca779 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
+import org.apache.cassandra.service.ActiveRepairService;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -147,13 +148,15 @@ public class RangeStreamer
boolean useStrictConsistency,
IEndpointSnitch snitch,
StreamStateStore stateStore,
- boolean connectSequentially)
+ boolean connectSequentially,
+ int connectionsPerHost)
{
this.metadata = metadata;
this.tokens = tokens;
this.address = address;
this.description = description;
- this.streamPlan = new StreamPlan(description, true, connectSequentially);
+ this.streamPlan = new StreamPlan(description, ActiveRepairService.UNREPAIRED_SSTABLE, connectionsPerHost,
+ true, false, connectSequentially);
this.useStrictConsistency = useStrictConsistency;
this.snitch = snitch;
this.stateStore = stateStore;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4d67639d/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index ad1d978..6cb67fc 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1136,7 +1136,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
useStrictConsistency && !replacing,
DatabaseDescriptor.getEndpointSnitch(),
streamStateStore,
- false);
+ false,
+ DatabaseDescriptor.getStreamingConnectionsPerHost());
streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance));
if (sourceDc != null)
streamer.addSourceFilter(new RangeStreamer.SingleDatacenterFilter(DatabaseDescriptor.getEndpointSnitch(), sourceDc));