You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/12/15 22:24:52 UTC

cassandra git commit: Establish bootstrap stream sessions sequentially

Repository: cassandra
Updated Branches:
  refs/heads/trunk 07c6a36cc -> a018bcb7d


Establish bootstrap stream sessions sequentially

patch by Paulo Motta; reviewed by yukim for CASSANDRA-6992


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a018bcb7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a018bcb7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a018bcb7

Branch: refs/heads/trunk
Commit: a018bcb7d7ae74f9fa3f33cce2f0cc0deed6a442
Parents: 07c6a36
Author: Paulo Motta <pa...@gmail.com>
Authored: Wed Nov 18 16:00:36 2015 -0800
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Dec 15 15:24:22 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/dht/BootStrapper.java  |  3 +-
 .../org/apache/cassandra/dht/RangeStreamer.java |  5 +-
 .../cassandra/io/sstable/SSTableLoader.java     |  2 +-
 .../apache/cassandra/repair/LocalSyncTask.java  |  2 +-
 .../cassandra/repair/StreamingRepairTask.java   |  2 +-
 .../cassandra/service/StorageService.java       |  3 +-
 .../cassandra/streaming/StreamCoordinator.java  | 57 +++++++++++++++++++-
 .../apache/cassandra/streaming/StreamPlan.java  | 12 +++--
 .../cassandra/streaming/StreamResultFuture.java |  9 ++--
 .../apache/cassandra/dht/BootStrapperTest.java  |  2 +-
 11 files changed, 80 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a018bcb7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 991d42a..fb6f7e9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.2
+ * Establish bootstrap stream sessions sequentially (CASSANDRA-6992)
  * Sort compactionhistory output by timestamp (CASSANDRA-10464)
  * More efficient BTree removal (CASSANDRA-9991)
  * Make tablehistograms accept the same syntax as tablestats (CASSANDRA-10149)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a018bcb7/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index 8d8f5c7..d10aa3b 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -74,7 +74,8 @@ public class BootStrapper extends ProgressEventNotifierSupport
                                                    "Bootstrap",
                                                    useStrictConsistency,
                                                    DatabaseDescriptor.getEndpointSnitch(),
-                                                   stateStore);
+                                                   stateStore,
+                                                   true);
         streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance));
 
         for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a018bcb7/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index 3da6bc8..47e0c15 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -117,13 +117,14 @@ public class RangeStreamer
                          String description,
                          boolean useStrictConsistency,
                          IEndpointSnitch snitch,
-                         StreamStateStore stateStore)
+                         StreamStateStore stateStore,
+                         boolean connectSequentially)
     {
         this.metadata = metadata;
         this.tokens = tokens;
         this.address = address;
         this.description = description;
-        this.streamPlan = new StreamPlan(description, true);
+        this.streamPlan = new StreamPlan(description, true, connectSequentially);
         this.useStrictConsistency = useStrictConsistency;
         this.snitch = snitch;
         this.stateStore = stateStore;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a018bcb7/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 3286522..043f6fa 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -159,7 +159,7 @@ public class SSTableLoader implements StreamEventHandler
         client.init(keyspace);
         outputHandler.output("Established connection to initial hosts");
 
-        StreamPlan plan = new StreamPlan("Bulk Load", 0, connectionsPerHost, false, false).connectionFactory(client.getConnectionFactory());
+        StreamPlan plan = new StreamPlan("Bulk Load", 0, connectionsPerHost, false, false, false).connectionFactory(client.getConnectionFactory());
 
         Map<InetAddress, Collection<Range<Token>>> endpointToRanges = client.getEndpointToRangesMap();
         openSSTables(endpointToRanges);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a018bcb7/src/java/org/apache/cassandra/repair/LocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
index daace01..e1da497 100644
--- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
@@ -73,7 +73,7 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler
             isIncremental = prs.isIncremental;
         }
         Tracing.traceRepair(message);
-        new StreamPlan("Repair", repairedAt, 1, false, isIncremental).listeners(this)
+        new StreamPlan("Repair", repairedAt, 1, false, isIncremental, false).listeners(this)
                                             .flushBeforeTransfer(true)
                                             // request ranges from the remote node
                                             .requestRanges(dst, preferred, desc.keyspace, differences, desc.columnFamily)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a018bcb7/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index 25ef06e..b6936b6 100644
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@ -62,7 +62,7 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler
             ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId);
             isIncremental = prs.isIncremental;
         }
-        new StreamPlan("Repair", repairedAt, 1, false, isIncremental).listeners(this)
+        new StreamPlan("Repair", repairedAt, 1, false, isIncremental, false).listeners(this)
                                             .flushBeforeTransfer(true)
                                             // request ranges from the remote node
                                             .requestRanges(dest, preferred, desc.keyspace, request.ranges, desc.columnFamily)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a018bcb7/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 4e749dc..069af53 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1106,7 +1106,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                                                        "Rebuild",
                                                        !replacing && useStrictConsistency,
                                                        DatabaseDescriptor.getEndpointSnitch(),
-                                                       streamStateStore);
+                                                       streamStateStore,
+                                                       false);
             streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance));
             if (sourceDc != null)
                 streamer.addSourceFilter(new RangeStreamer.SingleDatacenterFilter(DatabaseDescriptor.getEndpointSnitch(), sourceDc));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a018bcb7/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
index 603366d..aac1671 100644
--- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
+++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
@@ -41,19 +41,23 @@ public class StreamCoordinator
     // streaming is handled directly by the ConnectionHandler's incoming and outgoing threads.
     private static final DebuggableThreadPoolExecutor streamExecutor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("StreamConnectionEstablisher",
                                                                                                                             FBUtilities.getAvailableProcessors());
+    private final boolean connectSequentially;
 
     private Map<InetAddress, HostStreamingData> peerSessions = new HashMap<>();
     private final int connectionsPerHost;
     private StreamConnectionFactory factory;
     private final boolean keepSSTableLevel;
     private final boolean isIncremental;
+    private Iterator<StreamSession> sessionsToConnect = null;
 
-    public StreamCoordinator(int connectionsPerHost, boolean keepSSTableLevel, boolean isIncremental, StreamConnectionFactory factory)
+    public StreamCoordinator(int connectionsPerHost, boolean keepSSTableLevel, boolean isIncremental,
+                             StreamConnectionFactory factory, boolean connectSequentially)
     {
         this.connectionsPerHost = connectionsPerHost;
         this.factory = factory;
         this.keepSSTableLevel = keepSSTableLevel;
         this.isIncremental = isIncremental;
+        this.connectSequentially = connectSequentially;
     }
 
     public void setConnectionFactory(StreamConnectionFactory factory)
@@ -89,12 +93,61 @@ public class StreamCoordinator
         return connectionsPerHost == 0;
     }
 
-    public void connectAllStreamSessions()
+    public void connect(StreamResultFuture future)
+    {
+        if (this.connectSequentially)
+            connectSequentially(future);
+        else
+            connectAllStreamSessions();
+    }
+
+    private void connectAllStreamSessions()
     {
         for (HostStreamingData data : peerSessions.values())
             data.connectAllStreamSessions();
     }
 
+    private void connectSequentially(StreamResultFuture future)
+    {
+        sessionsToConnect = getAllStreamSessions().iterator();
+        future.addEventListener(new StreamEventHandler()
+        {
+            public void handleStreamEvent(StreamEvent event)
+            {
+                if (event.eventType == StreamEvent.Type.STREAM_PREPARED)
+                {
+                    connectNext();
+                }
+            }
+
+            public void onSuccess(StreamState result)
+            {
+
+            }
+
+            public void onFailure(Throwable t)
+            {
+
+            }
+        });
+        connectNext();
+    }
+
+    private void connectNext()
+    {
+        if (sessionsToConnect == null)
+            return;
+
+        if (sessionsToConnect.hasNext())
+        {
+            StreamSession next = sessionsToConnect.next();
+            logger.debug("Connecting next session {} with {}.", next.planId(), next.peer.getHostAddress());
+            streamExecutor.execute(new StreamSessionConnector(next));
+        }
+        else
+            logger.debug("Finished connecting all sessions");
+    }
+
     public synchronized Set<InetAddress> getPeers()
     {
         return new HashSet<>(peerSessions.keySet());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a018bcb7/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index 0d963ed..f0fdd55 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -47,19 +47,21 @@ public class StreamPlan
      */
     public StreamPlan(String description)
     {
-        this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, false, false);
+        this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, false, false, false);
     }
 
-    public StreamPlan(String description, boolean keepSSTableLevels)
+    public StreamPlan(String description, boolean keepSSTableLevels, boolean connectSequentially)
     {
-        this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, keepSSTableLevels, false);
+        this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, keepSSTableLevels, false, connectSequentially);
     }
 
-    public StreamPlan(String description, long repairedAt, int connectionsPerHost, boolean keepSSTableLevels, boolean isIncremental)
+    public StreamPlan(String description, long repairedAt, int connectionsPerHost, boolean keepSSTableLevels,
+                      boolean isIncremental, boolean connectSequentially)
     {
         this.description = description;
         this.repairedAt = repairedAt;
-        this.coordinator = new StreamCoordinator(connectionsPerHost, keepSSTableLevels, isIncremental, new DefaultConnectionFactory());
+        this.coordinator = new StreamCoordinator(connectionsPerHost, keepSSTableLevels, isIncremental, new DefaultConnectionFactory(),
+                                                 connectSequentially);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a018bcb7/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index 99adab0..2297c83 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -71,10 +71,12 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
 
     private StreamResultFuture(UUID planId, String description, boolean keepSSTableLevels, boolean isIncremental)
     {
-        this(planId, description, new StreamCoordinator(0, keepSSTableLevels, isIncremental, new DefaultConnectionFactory()));
+        this(planId, description, new StreamCoordinator(0, keepSSTableLevels, isIncremental,
+                                                        new DefaultConnectionFactory(), false));
     }
 
-    static StreamResultFuture init(UUID planId, String description, Collection<StreamEventHandler> listeners, StreamCoordinator coordinator)
+    static StreamResultFuture init(UUID planId, String description, Collection<StreamEventHandler> listeners,
+                                   StreamCoordinator coordinator)
     {
         StreamResultFuture future = createAndRegister(planId, description, coordinator);
         if (listeners != null)
@@ -90,7 +92,8 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
         {
             session.init(future);
         }
-        coordinator.connectAllStreamSessions();
+
+        coordinator.connect(future);
 
         return future;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a018bcb7/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
index 9b1fa01..8454ec1 100644
--- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
+++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
@@ -93,7 +93,7 @@ public class BootStrapperTest
         InetAddress myEndpoint = InetAddress.getByName("127.0.0.1");
 
         assertEquals(numOldNodes, tmd.sortedTokens().size());
-        RangeStreamer s = new RangeStreamer(tmd, null, myEndpoint, "Bootstrap", true, DatabaseDescriptor.getEndpointSnitch(), new StreamStateStore());
+        RangeStreamer s = new RangeStreamer(tmd, null, myEndpoint, "Bootstrap", true, DatabaseDescriptor.getEndpointSnitch(), new StreamStateStore(), false);
         IFailureDetector mockFailureDetector = new IFailureDetector()
         {
             public boolean isAlive(InetAddress ep)