You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/12/15 22:24:52 UTC
cassandra git commit: Establish bootstrap stream sessions sequentially
Repository: cassandra
Updated Branches:
refs/heads/trunk 07c6a36cc -> a018bcb7d
Establish bootstrap stream sessions sequentially
patch by Paulo Motta; reviewed by yukim for CASSANDRA-6992
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a018bcb7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a018bcb7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a018bcb7
Branch: refs/heads/trunk
Commit: a018bcb7d7ae74f9fa3f33cce2f0cc0deed6a442
Parents: 07c6a36
Author: Paulo Motta <pa...@gmail.com>
Authored: Wed Nov 18 16:00:36 2015 -0800
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Dec 15 15:24:22 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/dht/BootStrapper.java | 3 +-
.../org/apache/cassandra/dht/RangeStreamer.java | 5 +-
.../cassandra/io/sstable/SSTableLoader.java | 2 +-
.../apache/cassandra/repair/LocalSyncTask.java | 2 +-
.../cassandra/repair/StreamingRepairTask.java | 2 +-
.../cassandra/service/StorageService.java | 3 +-
.../cassandra/streaming/StreamCoordinator.java | 57 +++++++++++++++++++-
.../apache/cassandra/streaming/StreamPlan.java | 12 +++--
.../cassandra/streaming/StreamResultFuture.java | 9 ++--
.../apache/cassandra/dht/BootStrapperTest.java | 2 +-
11 files changed, 80 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a018bcb7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 991d42a..fb6f7e9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.2
+ * Establish bootstrap stream sessions sequentially (CASSANDRA-6992)
* Sort compactionhistory output by timestamp (CASSANDRA-10464)
* More efficient BTree removal (CASSANDRA-9991)
* Make tablehistograms accept the same syntax as tablestats (CASSANDRA-10149)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a018bcb7/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index 8d8f5c7..d10aa3b 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -74,7 +74,8 @@ public class BootStrapper extends ProgressEventNotifierSupport
"Bootstrap",
useStrictConsistency,
DatabaseDescriptor.getEndpointSnitch(),
- stateStore);
+ stateStore,
+ true);
streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance));
for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a018bcb7/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index 3da6bc8..47e0c15 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -117,13 +117,14 @@ public class RangeStreamer
String description,
boolean useStrictConsistency,
IEndpointSnitch snitch,
- StreamStateStore stateStore)
+ StreamStateStore stateStore,
+ boolean connectSequentially)
{
this.metadata = metadata;
this.tokens = tokens;
this.address = address;
this.description = description;
- this.streamPlan = new StreamPlan(description, true);
+ this.streamPlan = new StreamPlan(description, true, connectSequentially);
this.useStrictConsistency = useStrictConsistency;
this.snitch = snitch;
this.stateStore = stateStore;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a018bcb7/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 3286522..043f6fa 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -159,7 +159,7 @@ public class SSTableLoader implements StreamEventHandler
client.init(keyspace);
outputHandler.output("Established connection to initial hosts");
- StreamPlan plan = new StreamPlan("Bulk Load", 0, connectionsPerHost, false, false).connectionFactory(client.getConnectionFactory());
+ StreamPlan plan = new StreamPlan("Bulk Load", 0, connectionsPerHost, false, false, false).connectionFactory(client.getConnectionFactory());
Map<InetAddress, Collection<Range<Token>>> endpointToRanges = client.getEndpointToRangesMap();
openSSTables(endpointToRanges);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a018bcb7/src/java/org/apache/cassandra/repair/LocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
index daace01..e1da497 100644
--- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
@@ -73,7 +73,7 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler
isIncremental = prs.isIncremental;
}
Tracing.traceRepair(message);
- new StreamPlan("Repair", repairedAt, 1, false, isIncremental).listeners(this)
+ new StreamPlan("Repair", repairedAt, 1, false, isIncremental, false).listeners(this)
.flushBeforeTransfer(true)
// request ranges from the remote node
.requestRanges(dst, preferred, desc.keyspace, differences, desc.columnFamily)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a018bcb7/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index 25ef06e..b6936b6 100644
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@ -62,7 +62,7 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler
ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId);
isIncremental = prs.isIncremental;
}
- new StreamPlan("Repair", repairedAt, 1, false, isIncremental).listeners(this)
+ new StreamPlan("Repair", repairedAt, 1, false, isIncremental, false).listeners(this)
.flushBeforeTransfer(true)
// request ranges from the remote node
.requestRanges(dest, preferred, desc.keyspace, request.ranges, desc.columnFamily)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a018bcb7/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 4e749dc..069af53 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1106,7 +1106,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
"Rebuild",
!replacing && useStrictConsistency,
DatabaseDescriptor.getEndpointSnitch(),
- streamStateStore);
+ streamStateStore,
+ false);
streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance));
if (sourceDc != null)
streamer.addSourceFilter(new RangeStreamer.SingleDatacenterFilter(DatabaseDescriptor.getEndpointSnitch(), sourceDc));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a018bcb7/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
index 603366d..aac1671 100644
--- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
+++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
@@ -41,19 +41,23 @@ public class StreamCoordinator
// streaming is handled directly by the ConnectionHandler's incoming and outgoing threads.
private static final DebuggableThreadPoolExecutor streamExecutor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("StreamConnectionEstablisher",
FBUtilities.getAvailableProcessors());
+ private final boolean connectSequentially;
private Map<InetAddress, HostStreamingData> peerSessions = new HashMap<>();
private final int connectionsPerHost;
private StreamConnectionFactory factory;
private final boolean keepSSTableLevel;
private final boolean isIncremental;
+ private Iterator<StreamSession> sessionsToConnect = null;
- public StreamCoordinator(int connectionsPerHost, boolean keepSSTableLevel, boolean isIncremental, StreamConnectionFactory factory)
+ public StreamCoordinator(int connectionsPerHost, boolean keepSSTableLevel, boolean isIncremental,
+ StreamConnectionFactory factory, boolean connectSequentially)
{
this.connectionsPerHost = connectionsPerHost;
this.factory = factory;
this.keepSSTableLevel = keepSSTableLevel;
this.isIncremental = isIncremental;
+ this.connectSequentially = connectSequentially;
}
public void setConnectionFactory(StreamConnectionFactory factory)
@@ -89,12 +93,61 @@ public class StreamCoordinator
return connectionsPerHost == 0;
}
- public void connectAllStreamSessions()
+ public void connect(StreamResultFuture future)
+ {
+ if (this.connectSequentially)
+ connectSequentially(future);
+ else
+ connectAllStreamSessions();
+ }
+
+ private void connectAllStreamSessions()
{
for (HostStreamingData data : peerSessions.values())
data.connectAllStreamSessions();
}
+ private void connectSequentially(StreamResultFuture future)
+ {
+ sessionsToConnect = getAllStreamSessions().iterator();
+ future.addEventListener(new StreamEventHandler()
+ {
+ public void handleStreamEvent(StreamEvent event)
+ {
+ if (event.eventType == StreamEvent.Type.STREAM_PREPARED)
+ {
+ connectNext();
+ }
+ }
+
+ public void onSuccess(StreamState result)
+ {
+
+ }
+
+ public void onFailure(Throwable t)
+ {
+
+ }
+ });
+ connectNext();
+ }
+
+ private void connectNext()
+ {
+ if (sessionsToConnect == null)
+ return;
+
+ if (sessionsToConnect.hasNext())
+ {
+ StreamSession next = sessionsToConnect.next();
+ logger.debug("Connecting next session {} with {}.", next.planId(), next.peer.getHostAddress());
+ streamExecutor.execute(new StreamSessionConnector(next));
+ }
+ else
+ logger.debug("Finished connecting all sessions");
+ }
+
public synchronized Set<InetAddress> getPeers()
{
return new HashSet<>(peerSessions.keySet());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a018bcb7/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index 0d963ed..f0fdd55 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -47,19 +47,21 @@ public class StreamPlan
*/
public StreamPlan(String description)
{
- this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, false, false);
+ this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, false, false, false);
}
- public StreamPlan(String description, boolean keepSSTableLevels)
+ public StreamPlan(String description, boolean keepSSTableLevels, boolean connectSequentially)
{
- this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, keepSSTableLevels, false);
+ this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, keepSSTableLevels, false, connectSequentially);
}
- public StreamPlan(String description, long repairedAt, int connectionsPerHost, boolean keepSSTableLevels, boolean isIncremental)
+ public StreamPlan(String description, long repairedAt, int connectionsPerHost, boolean keepSSTableLevels,
+ boolean isIncremental, boolean connectSequentially)
{
this.description = description;
this.repairedAt = repairedAt;
- this.coordinator = new StreamCoordinator(connectionsPerHost, keepSSTableLevels, isIncremental, new DefaultConnectionFactory());
+ this.coordinator = new StreamCoordinator(connectionsPerHost, keepSSTableLevels, isIncremental, new DefaultConnectionFactory(),
+ connectSequentially);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a018bcb7/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index 99adab0..2297c83 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -71,10 +71,12 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
private StreamResultFuture(UUID planId, String description, boolean keepSSTableLevels, boolean isIncremental)
{
- this(planId, description, new StreamCoordinator(0, keepSSTableLevels, isIncremental, new DefaultConnectionFactory()));
+ this(planId, description, new StreamCoordinator(0, keepSSTableLevels, isIncremental,
+ new DefaultConnectionFactory(), false));
}
- static StreamResultFuture init(UUID planId, String description, Collection<StreamEventHandler> listeners, StreamCoordinator coordinator)
+ static StreamResultFuture init(UUID planId, String description, Collection<StreamEventHandler> listeners,
+ StreamCoordinator coordinator)
{
StreamResultFuture future = createAndRegister(planId, description, coordinator);
if (listeners != null)
@@ -90,7 +92,8 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
{
session.init(future);
}
- coordinator.connectAllStreamSessions();
+
+ coordinator.connect(future);
return future;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a018bcb7/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
index 9b1fa01..8454ec1 100644
--- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
+++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
@@ -93,7 +93,7 @@ public class BootStrapperTest
InetAddress myEndpoint = InetAddress.getByName("127.0.0.1");
assertEquals(numOldNodes, tmd.sortedTokens().size());
- RangeStreamer s = new RangeStreamer(tmd, null, myEndpoint, "Bootstrap", true, DatabaseDescriptor.getEndpointSnitch(), new StreamStateStore());
+ RangeStreamer s = new RangeStreamer(tmd, null, myEndpoint, "Bootstrap", true, DatabaseDescriptor.getEndpointSnitch(), new StreamStateStore(), false);
IFailureDetector mockFailureDetector = new IFailureDetector()
{
public boolean isAlive(InetAddress ep)