You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/05/02 00:53:17 UTC

[1/3] git commit: Parallel streaming for sstableloader

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 31cd61387 -> 44fa2cdb4
  refs/heads/trunk f0def3412 -> 0ed57bf53


Parallel streaming for sstableloader

patch by Joshua Mckenzie; reviewed by yukim for CASSANDRA-3668


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/44fa2cdb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/44fa2cdb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/44fa2cdb

Branch: refs/heads/cassandra-2.1
Commit: 44fa2cdb48cf6b4e589d5c48f5c57f0a93c03b60
Parents: 31cd613
Author: Joshua McKenzie <jo...@datastax.com>
Authored: Thu May 1 17:51:12 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu May 1 17:51:12 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/io/sstable/SSTableLoader.java     |   9 +-
 .../net/IncomingStreamingConnection.java        |   2 +-
 .../cassandra/repair/StreamingRepairTask.java   |   2 +-
 .../cassandra/streaming/ConnectionHandler.java  |   7 +-
 .../cassandra/streaming/ProgressInfo.java       |  10 +-
 .../apache/cassandra/streaming/SessionInfo.java |   3 +
 .../cassandra/streaming/StreamCoordinator.java  | 272 +++++++++++++++++++
 .../apache/cassandra/streaming/StreamEvent.java |   2 +
 .../apache/cassandra/streaming/StreamPlan.java  |  34 +--
 .../cassandra/streaming/StreamResultFuture.java |  80 +++---
 .../cassandra/streaming/StreamSession.java      |  42 ++-
 .../management/ProgressInfoCompositeData.java   |  21 +-
 .../management/SessionInfoCompositeData.java    |   1 +
 .../streaming/messages/StreamInitMessage.java   |   9 +-
 .../streaming/messages/StreamMessage.java       |   2 +-
 .../org/apache/cassandra/tools/BulkLoader.java  | 169 ++++++++----
 .../cassandra/streaming/SessionInfoTest.java    |   6 +-
 .../streaming/StreamTransferTaskTest.java       |   2 +-
 19 files changed, 500 insertions(+), 174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 16fcdfc..86940ea 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.0-rc1
+ * Parallel streaming for sstableloader (CASSANDRA-3668)
 Merged from 2.0:
  * Add Google Compute Engine snitch (CASSANDRA-7132)
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index b14e203..bbb1277 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -48,6 +48,7 @@ public class SSTableLoader implements StreamEventHandler
     private final File directory;
     private final String keyspace;
     private final Client client;
+    private final int connectionsPerHost;
     private final OutputHandler outputHandler;
     private final Set<InetAddress> failedHosts = new HashSet<>();
 
@@ -61,10 +62,16 @@ public class SSTableLoader implements StreamEventHandler
 
     public SSTableLoader(File directory, Client client, OutputHandler outputHandler)
     {
+        this(directory, client, outputHandler, 1);
+    }
+
+    public SSTableLoader(File directory, Client client, OutputHandler outputHandler, int connectionsPerHost)
+    {
         this.directory = directory;
         this.keyspace = directory.getParentFile().getName();
         this.client = client;
         this.outputHandler = outputHandler;
+        this.connectionsPerHost = connectionsPerHost;
     }
 
     protected Collection<SSTableReader> openSSTables(final Map<InetAddress, Collection<Range<Token>>> ranges)
@@ -150,7 +157,7 @@ public class SSTableLoader implements StreamEventHandler
         client.init(keyspace);
         outputHandler.output("Established connection to initial hosts");
 
-        StreamPlan plan = new StreamPlan("Bulk Load");
+        StreamPlan plan = new StreamPlan("Bulk Load", 0, connectionsPerHost);
 
         Map<InetAddress, Collection<Range<Token>>> endpointToRanges = client.getEndpointToRangesMap();
         openSSTables(endpointToRanges);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
index 20392f2..003bbf9 100644
--- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
@@ -62,7 +62,7 @@ public class IncomingStreamingConnection extends Thread
             // The receiving side distinguish two connections by looking at StreamInitMessage#isForOutgoing.
             // Note: we cannot use the same socket for incoming and outgoing streams because we want to
             // parallelize said streams and the socket is blocking, so we might deadlock.
-            StreamResultFuture.initReceivingSide(init.planId, init.description, init.from, socket, init.isForOutgoing, version);
+            StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.description, init.from, socket, init.isForOutgoing, version);
         }
         catch (IOException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index 636568c..b9184ca 100644
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@ -71,7 +71,7 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler
             repairedAt = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).repairedAt;
 
         logger.info(String.format("[streaming task #%s] Performing streaming repair of %d ranges with %s", desc.sessionId, request.ranges.size(), request.dst));
-        StreamResultFuture op = new StreamPlan("Repair", repairedAt)
+        StreamResultFuture op = new StreamPlan("Repair", repairedAt, 1)
                                     .flushBeforeTransfer(true)
                                     // request ranges from the remote node
                                     .requestRanges(request.dst, desc.keyspace, request.ranges, desc.columnFamily)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index 562645d..5484c83 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -217,7 +217,12 @@ public class ConnectionHandler
 
         public void sendInitMessage(Socket socket, boolean isForOutgoing) throws IOException
         {
-            StreamInitMessage message = new StreamInitMessage(FBUtilities.getBroadcastAddress(), session.planId(), session.description(), isForOutgoing);
+            StreamInitMessage message = new StreamInitMessage(
+                    FBUtilities.getBroadcastAddress(),
+                    session.sessionIndex(),
+                    session.planId(),
+                    session.description(),
+                    isForOutgoing);
             ByteBuffer messageBuf = message.createMessage(false, protocolVersion);
             getWriteChannel(socket).write(messageBuf);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/ProgressInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ProgressInfo.java b/src/java/org/apache/cassandra/streaming/ProgressInfo.java
index d308ed0..fdd3e97 100644
--- a/src/java/org/apache/cassandra/streaming/ProgressInfo.java
+++ b/src/java/org/apache/cassandra/streaming/ProgressInfo.java
@@ -49,16 +49,18 @@ public class ProgressInfo implements Serializable
     }
 
     public final InetAddress peer;
+    public final int sessionIndex;
     public final String fileName;
     public final Direction direction;
     public final long currentBytes;
     public final long totalBytes;
 
-    public ProgressInfo(InetAddress peer, String fileName, Direction direction, long currentBytes, long totalBytes)
+    public ProgressInfo(InetAddress peer, int sessionIndex, String fileName, Direction direction, long currentBytes, long totalBytes)
     {
         assert totalBytes > 0;
 
         this.peer = peer;
+        this.sessionIndex = sessionIndex;
         this.fileName = fileName;
         this.direction = direction;
         this.currentBytes = currentBytes;
@@ -70,7 +72,7 @@ public class ProgressInfo implements Serializable
      */
     public boolean isCompleted()
     {
-        return currentBytes == totalBytes;
+        return currentBytes >= totalBytes;
     }
 
     /**
@@ -87,13 +89,14 @@ public class ProgressInfo implements Serializable
         if (totalBytes != that.totalBytes) return false;
         if (direction != that.direction) return false;
         if (!fileName.equals(that.fileName)) return false;
+        if (sessionIndex != that.sessionIndex) return false;
         return peer.equals(that.peer);
     }
 
     @Override
     public int hashCode()
     {
-        return Objects.hashCode(peer, fileName, direction, totalBytes);
+        return Objects.hashCode(peer, sessionIndex, fileName, direction, totalBytes);
     }
 
     @Override
@@ -104,6 +107,7 @@ public class ProgressInfo implements Serializable
         sb.append("/").append(totalBytes).append(" bytes");
         sb.append("(").append(currentBytes*100/totalBytes).append("%) ");
         sb.append(direction == Direction.OUT ? "sent to " : "received from ");
+        sb.append("idx:").append(sessionIndex);
         sb.append(peer);
         return sb.toString();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/SessionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/SessionInfo.java b/src/java/org/apache/cassandra/streaming/SessionInfo.java
index b722ecf..98e945b 100644
--- a/src/java/org/apache/cassandra/streaming/SessionInfo.java
+++ b/src/java/org/apache/cassandra/streaming/SessionInfo.java
@@ -33,6 +33,7 @@ import com.google.common.collect.Iterables;
 public final class SessionInfo implements Serializable
 {
     public final InetAddress peer;
+    public final int sessionIndex;
     /** Immutable collection of receiving summaries */
     public final Collection<StreamSummary> receivingSummaries;
     /** Immutable collection of sending summaries*/
@@ -44,11 +45,13 @@ public final class SessionInfo implements Serializable
     private final Map<String, ProgressInfo> sendingFiles;
 
     public SessionInfo(InetAddress peer,
+                       int sessionIndex,
                        Collection<StreamSummary> receivingSummaries,
                        Collection<StreamSummary> sendingSummaries,
                        StreamSession.State state)
     {
         this.peer = peer;
+        this.sessionIndex = sessionIndex;
         this.receivingSummaries = ImmutableSet.copyOf(receivingSummaries);
         this.sendingSummaries = ImmutableSet.copyOf(sendingSummaries);
         this.receivingFiles = new ConcurrentHashMap<>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
new file mode 100644
index 0000000..425b5b1
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.net.InetAddress;
+import java.util.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * {@link StreamCoordinator} is a helper class that abstracts away maintaining multiple
+ * StreamSession and ProgressInfo instances per peer.
+ *
+ * This class coordinates multiple SessionStreams per peer in both the outgoing StreamPlan context and on the
+ * inbound StreamResultFuture context.
+ */
+public class StreamCoordinator
+{
+    private static final Logger logger = LoggerFactory.getLogger(StreamCoordinator.class);
+
+    // Executor strictly for establishing the initial connections. Once we're connected to the other end the rest of the
+    // streaming is handled directly by the ConnectionHandler's incoming and outgoing threads.
+    private static final DebuggableThreadPoolExecutor streamExecutor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("StreamConnectionEstablisher",
+                                                                                                                            FBUtilities.getAvailableProcessors());
+
+    private Map<InetAddress, HostStreamingData> peerSessions = new HashMap<>();
+    private final int connectionsPerHost;
+
+    public StreamCoordinator(int connectionsPerHost)
+    {
+        this.connectionsPerHost = connectionsPerHost;
+    }
+
+    /**
+     * @return true if any stream session is active
+     */
+    public synchronized boolean hasActiveSessions()
+    {
+        for (HostStreamingData data : peerSessions.values())
+        {
+            if (data.hasActiveSessions())
+                return true;
+        }
+        return false;
+    }
+
+    public synchronized Collection<StreamSession> getAllStreamSessions()
+    {
+        Collection<StreamSession> results = new ArrayList<>();
+        for (HostStreamingData data : peerSessions.values())
+        {
+            results.addAll(data.getAllStreamSessions());
+        }
+        return results;
+    }
+
+    public void connectAllStreamSessions()
+    {
+        for (HostStreamingData data : peerSessions.values())
+            data.connectAllStreamSessions();
+    }
+
+    public synchronized Set<InetAddress> getPeers()
+    {
+        return new HashSet<>(peerSessions.keySet());
+    }
+
+    public synchronized StreamSession getOrCreateNextSession(InetAddress peer)
+    {
+        return getOrCreateHostData(peer).getOrCreateNextSession(peer);
+    }
+
+    public synchronized StreamSession getOrCreateSessionById(InetAddress peer, int id)
+    {
+        return getOrCreateHostData(peer).getOrCreateSessionById(peer, id);
+    }
+
+    public synchronized void updateProgress(ProgressInfo info)
+    {
+        getHostData(info.peer).updateProgress(info);
+    }
+
+    public synchronized void addSessionInfo(SessionInfo session)
+    {
+        HostStreamingData data = getOrCreateHostData(session.peer);
+        data.addSessionInfo(session);
+    }
+
+    public synchronized Set<SessionInfo> getAllSessionInfo()
+    {
+        Set<SessionInfo> result = new HashSet<>();
+        for (HostStreamingData data : peerSessions.values())
+        {
+            result.addAll(data.getAllSessionInfo());
+        }
+        return result;
+    }
+
+    public synchronized void transferFiles(InetAddress to, Collection<StreamSession.SSTableStreamingSections> sstableDetails)
+    {
+        HostStreamingData sessionList = getOrCreateHostData(to);
+
+        if (connectionsPerHost > 1)
+        {
+            List<List<StreamSession.SSTableStreamingSections>> buckets = sliceSSTableDetails(sstableDetails);
+
+            for (List<StreamSession.SSTableStreamingSections> subList : buckets)
+            {
+                StreamSession session = sessionList.getOrCreateNextSession(to);
+                session.addTransferFiles(subList);
+            }
+        }
+        else
+        {
+            StreamSession session = sessionList.getOrCreateNextSession(to);
+            session.addTransferFiles(sstableDetails);
+        }
+    }
+
+    private List<List<StreamSession.SSTableStreamingSections>> sliceSSTableDetails(Collection<StreamSession.SSTableStreamingSections> sstableDetails)
+    {
+        // There's no point in divvying things up into more buckets than we have sstableDetails
+        int targetSlices = Math.min(sstableDetails.size(), connectionsPerHost);
+        int step = Math.round((float) sstableDetails.size() / (float) targetSlices);
+        int index = 0;
+
+        List<List<StreamSession.SSTableStreamingSections>> result = new ArrayList<>();
+        List<StreamSession.SSTableStreamingSections> slice = null;
+        for (StreamSession.SSTableStreamingSections streamSession : sstableDetails)
+        {
+            if (index % step == 0)
+            {
+                slice = new ArrayList<>();
+                result.add(slice);
+            }
+            slice.add(streamSession);
+            ++index;
+        }
+
+        return result;
+    }
+
+    private HostStreamingData getHostData(InetAddress peer)
+    {
+        HostStreamingData data = peerSessions.get(peer);
+        if (data == null)
+            throw new IllegalArgumentException("Unknown peer requested: " + peer.toString());
+        return data;
+    }
+
+    private HostStreamingData getOrCreateHostData(InetAddress peer)
+    {
+        HostStreamingData data = peerSessions.get(peer);
+        if (data == null)
+        {
+            data = new HostStreamingData();
+            peerSessions.put(peer, data);
+        }
+        return data;
+    }
+
+    private class StreamSessionConnector implements Runnable
+    {
+        private final StreamSession session;
+        public StreamSessionConnector(StreamSession session)
+        {
+            this.session = session;
+        }
+
+        public void run()
+        {
+            session.start();
+            logger.info("[Stream #{}, ID#{}] Beginning stream session with {}", session.planId(), session.sessionIndex(), session.peer);
+        }
+    }
+
+    private class HostStreamingData
+    {
+        private Map<Integer, StreamSession> streamSessions = new HashMap<>();
+        private Map<Integer, SessionInfo> sessionInfos = new HashMap<>();
+
+        private int lastReturned = -1;
+
+        public boolean hasActiveSessions()
+        {
+            for (StreamSession session : streamSessions.values())
+            {
+                StreamSession.State state = session.state();
+                if (state != StreamSession.State.COMPLETE && state != StreamSession.State.FAILED)
+                    return true;
+            }
+            return false;
+        }
+
+        public StreamSession getOrCreateNextSession(InetAddress peer)
+        {
+            // create
+            if (streamSessions.size() < connectionsPerHost)
+            {
+                StreamSession session = new StreamSession(peer, streamSessions.size());
+                streamSessions.put(++lastReturned, session);
+                return session;
+            }
+            // get
+            else
+            {
+                if (lastReturned == streamSessions.size() - 1)
+                    lastReturned = 0;
+
+                return streamSessions.get(lastReturned++);
+            }
+        }
+
+        public void connectAllStreamSessions()
+        {
+            for (StreamSession session : streamSessions.values())
+            {
+                streamExecutor.execute(new StreamSessionConnector(session));
+            }
+        }
+
+        public Collection<StreamSession> getAllStreamSessions()
+        {
+            return Collections.unmodifiableCollection(streamSessions.values());
+        }
+
+        public StreamSession getOrCreateSessionById(InetAddress peer, int id)
+        {
+            StreamSession session = streamSessions.get(id);
+            if (session == null)
+            {
+                session = new StreamSession(peer, id);
+                streamSessions.put(id, session);
+            }
+            return session;
+        }
+
+        public void updateProgress(ProgressInfo info)
+        {
+            sessionInfos.get(info.sessionIndex).updateProgress(info);
+        }
+
+        public void addSessionInfo(SessionInfo info)
+        {
+            sessionInfos.put(info.sessionIndex, info);
+        }
+
+        public Collection<SessionInfo> getAllSessionInfo()
+        {
+            return sessionInfos.values();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/StreamEvent.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamEvent.java b/src/java/org/apache/cassandra/streaming/StreamEvent.java
index 9af1fbd..8089323 100644
--- a/src/java/org/apache/cassandra/streaming/StreamEvent.java
+++ b/src/java/org/apache/cassandra/streaming/StreamEvent.java
@@ -42,12 +42,14 @@ public abstract class StreamEvent
     {
         public final InetAddress peer;
         public final boolean success;
+        public final int sessionIndex;
 
         public SessionCompleteEvent(StreamSession session)
         {
             super(Type.STREAM_COMPLETE, session.planId());
             this.peer = session.peer;
             this.success = session.isSuccess();
+            this.sessionIndex = session.sessionIndex();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index 04bd7df..e775c90 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -35,10 +35,8 @@ public class StreamPlan
     private final UUID planId = UUIDGen.getTimeUUID();
     private final String description;
     private final List<StreamEventHandler> handlers = new ArrayList<>();
-
-    // sessions per InetAddress of the other end.
-    private final Map<InetAddress, StreamSession> sessions = new HashMap<>();
     private final long repairedAt;
+    private final StreamCoordinator coordinator;
 
     private boolean flushBeforeTransfer = true;
 
@@ -49,16 +47,16 @@ public class StreamPlan
      */
     public StreamPlan(String description)
     {
-        this(description, ActiveRepairService.UNREPAIRED_SSTABLE);
+        this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1);
     }
 
-    public StreamPlan(String description, long repairedAt)
+    public StreamPlan(String description, long repairedAt, int connectionsPerHost)
     {
         this.description = description;
         this.repairedAt = repairedAt;
+        this.coordinator = new StreamCoordinator(connectionsPerHost);
     }
 
-
     /**
      * Request data in {@code keyspace} and {@code ranges} from specific node.
      *
@@ -83,7 +81,7 @@ public class StreamPlan
      */
     public StreamPlan requestRanges(InetAddress from, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
     {
-        StreamSession session = getOrCreateSession(from);
+        StreamSession session = coordinator.getOrCreateNextSession(from);
         session.addStreamRequest(keyspace, ranges, Arrays.asList(columnFamilies), repairedAt);
         return this;
     }
@@ -112,7 +110,7 @@ public class StreamPlan
      */
     public StreamPlan transferRanges(InetAddress to, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
     {
-        StreamSession session = getOrCreateSession(to);
+        StreamSession session = coordinator.getOrCreateNextSession(to);
         session.addTransferRanges(keyspace, ranges, Arrays.asList(columnFamilies), flushBeforeTransfer, repairedAt);
         return this;
     }
@@ -127,9 +125,9 @@ public class StreamPlan
      */
     public StreamPlan transferFiles(InetAddress to, Collection<StreamSession.SSTableStreamingSections> sstableDetails)
     {
-        StreamSession session = getOrCreateSession(to);
-        session.addTransferFiles(sstableDetails);
+        coordinator.transferFiles(to, sstableDetails);
         return this;
+
     }
 
     public StreamPlan listeners(StreamEventHandler handler, StreamEventHandler... handlers)
@@ -145,7 +143,7 @@ public class StreamPlan
      */
     public boolean isEmpty()
     {
-        return sessions.isEmpty();
+        return coordinator.hasActiveSessions();
     }
 
     /**
@@ -155,7 +153,7 @@ public class StreamPlan
      */
     public StreamResultFuture execute()
     {
-        return StreamResultFuture.init(planId, description, sessions.values(), handlers);
+        return StreamResultFuture.init(planId, description, handlers, coordinator);
     }
 
     /**
@@ -170,16 +168,4 @@ public class StreamPlan
         this.flushBeforeTransfer = flushBeforeTransfer;
         return this;
     }
-
-    private StreamSession getOrCreateSession(InetAddress peer)
-    {
-        StreamSession session = sessions.get(peer);
-        if (session == null)
-        {
-            session = new StreamSession(peer);
-            sessions.put(peer, session);
-        }
-        return session;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index dcffaff..c04c3f1 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -23,10 +23,8 @@ import java.net.Socket;
 import java.util.*;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
-import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.AbstractFuture;
 import com.google.common.util.concurrent.Futures;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,11 +47,9 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
 
     public final UUID planId;
     public final String description;
+    private final StreamCoordinator coordinator;
     private final Collection<StreamEventHandler> eventListeners = new ConcurrentLinkedQueue<>();
 
-    private final Map<InetAddress, StreamSession> ongoingSessions;
-    private final Map<InetAddress, SessionInfo> sessionStates = new NonBlockingHashMap<>();
-
     /**
      * Create new StreamResult of given {@code planId} and type.
      *
@@ -62,22 +58,25 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
      * @param planId Stream plan ID
      * @param description Stream description
      */
-    private StreamResultFuture(UUID planId, String description, Collection<StreamSession> sessions)
+    private StreamResultFuture(UUID planId, String description, StreamCoordinator coordinator)
     {
         this.planId = planId;
         this.description = description;
-        this.ongoingSessions = new HashMap<>(sessions.size());
-        for (StreamSession session : sessions)
-            this.ongoingSessions.put(session.peer, session);
+        this.coordinator = coordinator;
 
         // if there is no session to listen to, we immediately set result for returning
-        if (sessions.isEmpty())
+        if (!coordinator.hasActiveSessions())
             set(getCurrentState());
     }
 
-    static StreamResultFuture init(UUID planId, String description, Collection<StreamSession> sessions, Collection<StreamEventHandler> listeners)
+    private StreamResultFuture(UUID planId, String description)
+    {
+        this(planId, description, new StreamCoordinator(0));
+    }
+
+    static StreamResultFuture init(UUID planId, String description, Collection<StreamEventHandler> listeners, StreamCoordinator coordinator)
     {
-        StreamResultFuture future = createAndRegister(planId, description, sessions);
+        StreamResultFuture future = createAndRegister(planId, description, coordinator);
         if (listeners != null)
         {
             for (StreamEventHandler listener : listeners)
@@ -85,18 +84,19 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
         }
 
         logger.info("[Stream #{}] Executing streaming plan for {}", planId,  description);
-        // start sessions
-        for (final StreamSession session : sessions)
+
+        // Initialize and start all sessions
+        for (final StreamSession session : coordinator.getAllStreamSessions())
         {
-            logger.info("[Stream #{}] Beginning stream session with {}", planId, session.peer);
             session.init(future);
-            session.start();
         }
+        coordinator.connectAllStreamSessions();
 
         return future;
     }
 
-    public static synchronized StreamResultFuture initReceivingSide(UUID planId,
+    public static synchronized StreamResultFuture initReceivingSide(int sessionIndex,
+                                                                    UUID planId,
                                                                     String description,
                                                                     InetAddress from,
                                                                     Socket socket,
@@ -106,35 +106,28 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
         StreamResultFuture future = StreamManager.instance.getReceivingStream(planId);
         if (future == null)
         {
-            final StreamSession session = new StreamSession(from);
+            logger.info("[Stream #{} ID#{}] Creating new streaming plan for {}", planId, sessionIndex, description);
 
             // The main reason we create a StreamResultFuture on the receiving side is for JMX exposure.
-            future = new StreamResultFuture(planId, description, Collections.singleton(session));
+            future = new StreamResultFuture(planId, description);
             StreamManager.instance.registerReceiving(future);
-
-            session.init(future);
-            session.handler.initiateOnReceivingSide(socket, isForOutgoing, version);
-        }
-        else
-        {
-            future.attachSocket(from, socket, isForOutgoing, version);
-            logger.info("[Stream #{}] Received streaming plan for {}", planId,  description);
         }
+        future.attachSocket(from, sessionIndex, socket, isForOutgoing, version);
+        logger.info("[Stream #{}, ID#{}] Received streaming plan for {}", planId, sessionIndex, description);
         return future;
     }
 
-    private static StreamResultFuture createAndRegister(UUID planId, String description, Collection<StreamSession> sessions)
+    private static StreamResultFuture createAndRegister(UUID planId, String description, StreamCoordinator coordinator)
     {
-        StreamResultFuture future = new StreamResultFuture(planId, description, sessions);
+        StreamResultFuture future = new StreamResultFuture(planId, description, coordinator);
         StreamManager.instance.register(future);
         return future;
     }
 
-    public void attachSocket(InetAddress from, Socket socket, boolean isForOutgoing, int version) throws IOException
+    private void attachSocket(InetAddress from, int sessionIndex, Socket socket, boolean isForOutgoing, int version) throws IOException
     {
-        StreamSession session = ongoingSessions.get(from);
-        if (session == null)
-            throw new RuntimeException(String.format("Got connection from %s for stream session %s but no such session locally", from, planId));
+        StreamSession session = coordinator.getOrCreateSessionById(from, sessionIndex);
+        session.init(this);
         session.handler.initiateOnReceivingSide(socket, isForOutgoing, version);
     }
 
@@ -149,7 +142,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
      */
     public StreamState getCurrentState()
     {
-        return new StreamState(planId, description, ImmutableSet.copyOf(sessionStates.values()));
+        return new StreamState(planId, description, coordinator.getAllSessionInfo());
     }
 
     @Override
@@ -170,44 +163,41 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
     void handleSessionPrepared(StreamSession session)
     {
         SessionInfo sessionInfo = session.getSessionInfo();
-        logger.info("[Stream #{}] Prepare completed. Receiving {} files({} bytes), sending {} files({} bytes)",
+        logger.info("[Stream #{} ID#{}] Prepare completed. Receiving {} files({} bytes), sending {} files({} bytes)",
                               session.planId(),
+                              session.sessionIndex(),
                               sessionInfo.getTotalFilesToReceive(),
                               sessionInfo.getTotalSizeToReceive(),
                               sessionInfo.getTotalFilesToSend(),
                               sessionInfo.getTotalSizeToSend());
         StreamEvent.SessionPreparedEvent event = new StreamEvent.SessionPreparedEvent(planId, sessionInfo);
-        sessionStates.put(sessionInfo.peer, sessionInfo);
+        coordinator.addSessionInfo(sessionInfo);
         fireStreamEvent(event);
     }
 
     void handleSessionComplete(StreamSession session)
     {
         logger.info("[Stream #{}] Session with {} is complete", session.planId(), session.peer);
-
-        SessionInfo sessionInfo = session.getSessionInfo();
-        sessionStates.put(sessionInfo.peer, sessionInfo);
         fireStreamEvent(new StreamEvent.SessionCompleteEvent(session));
-        maybeComplete(session);
+        maybeComplete();
     }
 
     public void handleProgress(ProgressInfo progress)
     {
-        sessionStates.get(progress.peer).updateProgress(progress);
+        coordinator.updateProgress(progress);
         fireStreamEvent(new StreamEvent.ProgressEvent(planId, progress));
     }
 
-    void fireStreamEvent(StreamEvent event)
+    synchronized void fireStreamEvent(StreamEvent event)
     {
         // delegate to listener
         for (StreamEventHandler listener : eventListeners)
             listener.handleStreamEvent(event);
     }
 
-    private synchronized void maybeComplete(StreamSession session)
+    private synchronized void maybeComplete()
     {
-        ongoingSessions.remove(session.peer);
-        if (ongoingSessions.isEmpty())
+        if (!coordinator.hasActiveSessions())
         {
             StreamState finalState = getCurrentState();
             if (finalState.hasFailedSession())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index c5f4cf9..a1c571b 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -28,7 +28,6 @@ import com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
@@ -113,12 +112,8 @@ import org.apache.cassandra.utils.Pair;
 public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener
 {
     private static final Logger logger = LoggerFactory.getLogger(StreamSession.class);
-
-    // Executor that establish the streaming connection. Once we're connected to the other end, the rest of the streaming
-    // is directly handled by the ConnectionHandler incoming and outgoing threads.
-    private static final DebuggableThreadPoolExecutor streamExecutor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("StreamConnectionEstablisher",
-                                                                                                                            FBUtilities.getAvailableProcessors());
     public final InetAddress peer;
+    private final int index;
 
     // should not be null when session is started
     private StreamResultFuture streamResult;
@@ -153,9 +148,10 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
      *
      * @param peer Address of streaming peer
      */
-    public StreamSession(InetAddress peer)
+    public StreamSession(InetAddress peer, int index)
     {
         this.peer = peer;
+        this.index = index;
         this.handler = new ConnectionHandler(this);
         this.metrics = StreamingMetrics.get(peer);
     }
@@ -165,6 +161,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
         return streamResult == null ? null : streamResult.planId;
     }
 
+    public int sessionIndex()
+    {
+        return index;
+    }
+
     public String description()
     {
         return streamResult == null ? null : streamResult.description;
@@ -194,21 +195,16 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
             return;
         }
 
-        streamExecutor.execute(new Runnable()
+        try
         {
-            public void run()
-            {
-                try
-                {
-                    handler.initiate();
-                    onInitializationComplete();
-                }
-                catch (IOException e)
-                {
-                    onError(e);
-                }
-            }
-        });
+            logger.info("[Stream #{}, ID#{}] Beginning stream session with {}", planId(), sessionIndex(), peer);
+            handler.initiate();
+            onInitializationComplete();
+        }
+        catch (Exception e)
+        {
+            onError(e);
+        }
     }
 
     /**
@@ -519,7 +515,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
 
     public void progress(Descriptor desc, ProgressInfo.Direction direction, long bytes, long total)
     {
-        ProgressInfo progress = new ProgressInfo(peer, desc.filenameFor(Component.DATA), direction, bytes, total);
+        ProgressInfo progress = new ProgressInfo(peer, index, desc.filenameFor(Component.DATA), direction, bytes, total);
         streamResult.handleProgress(progress);
     }
 
@@ -590,7 +586,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
         List<StreamSummary> transferSummaries = Lists.newArrayList();
         for (StreamTask transfer : transfers.values())
             transferSummaries.add(transfer.getSummary());
-        return new SessionInfo(peer, receivingSummaries, transferSummaries, state);
+        return new SessionInfo(peer, index, receivingSummaries, transferSummaries, state);
     }
 
     public synchronized void taskCompleted(StreamReceiveTask completedTask)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java b/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
index b361b1b..a54498d 100644
--- a/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
+++ b/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
@@ -32,18 +32,21 @@ public class ProgressInfoCompositeData
 {
     private static final String[] ITEM_NAMES = new String[]{"planId",
                                                             "peer",
+                                                            "sessionIndex",
                                                             "fileName",
                                                             "direction",
                                                             "currentBytes",
                                                             "totalBytes"};
     private static final String[] ITEM_DESCS = new String[]{"String representation of Plan ID",
                                                             "Session peer",
+                                                            "Index of session",
                                                             "Name of the file",
                                                             "Direction('IN' or 'OUT')",
                                                             "Current bytes transferred",
                                                             "Total bytes to transfer"};
     private static final OpenType<?>[] ITEM_TYPES = new OpenType[]{SimpleType.STRING,
                                                                    SimpleType.STRING,
+                                                                   SimpleType.INTEGER,
                                                                    SimpleType.STRING,
                                                                    SimpleType.STRING,
                                                                    SimpleType.LONG,
@@ -70,10 +73,11 @@ public class ProgressInfoCompositeData
         Map<String, Object> valueMap = new HashMap<>();
         valueMap.put(ITEM_NAMES[0], planId.toString());
         valueMap.put(ITEM_NAMES[1], progressInfo.peer.getHostAddress());
-        valueMap.put(ITEM_NAMES[2], progressInfo.fileName);
-        valueMap.put(ITEM_NAMES[3], progressInfo.direction.name());
-        valueMap.put(ITEM_NAMES[4], progressInfo.currentBytes);
-        valueMap.put(ITEM_NAMES[5], progressInfo.totalBytes);
+        valueMap.put(ITEM_NAMES[2], progressInfo.sessionIndex);
+        valueMap.put(ITEM_NAMES[3], progressInfo.fileName);
+        valueMap.put(ITEM_NAMES[4], progressInfo.direction.name());
+        valueMap.put(ITEM_NAMES[5], progressInfo.currentBytes);
+        valueMap.put(ITEM_NAMES[6], progressInfo.totalBytes);
         try
         {
             return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
@@ -90,10 +94,11 @@ public class ProgressInfoCompositeData
         try
         {
             return new ProgressInfo(InetAddress.getByName((String) values[1]),
-                                    (String) values[2],
-                                    ProgressInfo.Direction.valueOf((String)values[3]),
-                                    (long) values[4],
-                                    (long) values[5]);
+                                    (int) values[2],
+                                    (String) values[3],
+                                    ProgressInfo.Direction.valueOf((String)values[4]),
+                                    (long) values[5],
+                                    (long) values[6]);
         }
         catch (UnknownHostException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java b/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
index 658facf..8618cca 100644
--- a/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
+++ b/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
@@ -129,6 +129,7 @@ public class SessionInfoCompositeData
             }
         };
         SessionInfo info = new SessionInfo(peer,
+                                           (int)values[2],
                                            fromArrayOfCompositeData((CompositeData[]) values[2], toStreamSummary),
                                            fromArrayOfCompositeData((CompositeData[]) values[3], toStreamSummary),
                                            StreamSession.State.valueOf((String) values[4]));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
index 947e5d5..a9ec4ae 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
@@ -40,15 +40,17 @@ public class StreamInitMessage
     public static IVersionedSerializer<StreamInitMessage> serializer = new StreamInitMessageSerializer();
 
     public final InetAddress from;
+    public final int sessionIndex;
     public final UUID planId;
     public final String description;
 
     // true if this init message is to connect for outgoing message on receiving side
     public final boolean isForOutgoing;
 
-    public StreamInitMessage(InetAddress from, UUID planId, String description, boolean isForOutgoing)
+    public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, String description, boolean isForOutgoing)
     {
         this.from = from;
+        this.sessionIndex = sessionIndex;
         this.planId = planId;
         this.description = description;
         this.isForOutgoing = isForOutgoing;
@@ -99,6 +101,7 @@ public class StreamInitMessage
         public void serialize(StreamInitMessage message, DataOutputPlus out, int version) throws IOException
         {
             CompactEndpointSerializationHelper.serialize(message.from, out);
+            out.writeInt(message.sessionIndex);
             UUIDSerializer.serializer.serialize(message.planId, out, MessagingService.current_version);
             out.writeUTF(message.description);
             out.writeBoolean(message.isForOutgoing);
@@ -107,15 +110,17 @@ public class StreamInitMessage
         public StreamInitMessage deserialize(DataInput in, int version) throws IOException
         {
             InetAddress from = CompactEndpointSerializationHelper.deserialize(in);
+            int sessionIndex = in.readInt();
             UUID planId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
             String description = in.readUTF();
             boolean sentByInitiator = in.readBoolean();
-            return new StreamInitMessage(from, planId, description, sentByInitiator);
+            return new StreamInitMessage(from, sessionIndex, planId, description, sentByInitiator);
         }
 
         public long serializedSize(StreamInitMessage message, int version)
         {
             long size = CompactEndpointSerializationHelper.serializedSize(message.from);
+            size += TypeSizes.NATIVE.sizeof(message.sessionIndex);
             size += UUIDSerializer.serializer.serializedSize(message.planId, MessagingService.current_version);
             size += TypeSizes.NATIVE.sizeof(message.description);
             size += TypeSizes.NATIVE.sizeof(message.isForOutgoing);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
index 50c3911..360b59e 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
@@ -33,7 +33,7 @@ import org.apache.cassandra.streaming.StreamSession;
 public abstract class StreamMessage
 {
     /** Streaming protocol version */
-    public static final int CURRENT_VERSION = 1;
+    public static final int CURRENT_VERSION = 2;
 
     public static void serialize(StreamMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java
index 01952e5..6042f92 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -21,12 +21,10 @@ import java.io.File;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Joiner;
-import com.google.common.collect.Sets;
-import org.apache.cassandra.config.EncryptionOptions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
 import org.apache.commons.cli.*;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TProtocol;
@@ -35,6 +33,7 @@ import org.apache.thrift.transport.TTransport;
 import org.apache.cassandra.auth.IAuthenticator;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.marshal.UTF8Type;
@@ -50,7 +49,6 @@ public class BulkLoader
 {
     private static final String TOOL_NAME = "sstableloader";
     private static final String VERBOSE_OPTION  = "verbose";
-    private static final String DEBUG_OPTION  = "debug";
     private static final String HELP_OPTION  = "help";
     private static final String NOPROGRESS_OPTION  = "no-progress";
     private static final String IGNORE_NODES_OPTION  = "ignore";
@@ -68,47 +66,63 @@ public class BulkLoader
     private static final String SSL_ALGORITHM = "ssl-alg";
     private static final String SSL_STORE_TYPE = "store-type";
     private static final String SSL_CIPHER_SUITES = "ssl-ciphers";
+    private static final String CONNECTIONS_PER_HOST = "connections-per-host";
 
     public static void main(String args[])
     {
         LoaderOptions options = LoaderOptions.parseArgs(args);
-        OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug);
-        SSTableLoader loader = new SSTableLoader(options.directory, new ExternalClient(options.hosts, options.rpcPort, options.user, options.passwd, options.transportFactory), handler);
+        OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, true);
+        SSTableLoader loader = new SSTableLoader(
+                options.directory,
+                new ExternalClient(
+                        options.hosts,
+                        options.rpcPort,
+                        options.user,
+                        options.passwd,
+                        options.transportFactory),
+                handler,
+                options.connectionsPerHost);
         DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(options.throttle);
         StreamResultFuture future = null;
+
+        ProgressIndicator indicator = new ProgressIndicator();
         try
         {
             if (options.noProgress)
+            {
                 future = loader.stream(options.ignores);
+            }
             else
-                future = loader.stream(options.ignores, new ProgressIndicator());
+            {
+                future = loader.stream(options.ignores, indicator);
+            }
+
         }
         catch (Exception e)
         {
             System.err.println(e.getMessage());
             if (e.getCause() != null)
                 System.err.println(e.getCause());
-            if (options.debug)
-                e.printStackTrace(System.err);
-            else
-                System.err.println("Run with --debug to get full stack trace or --help to get help.");
+            e.printStackTrace(System.err);
             System.exit(1);
         }
 
-        handler.output(String.format("Streaming session ID: %s", future.planId));
-
         try
         {
             future.get();
+
+            if (!options.noProgress)
+                indicator.printSummary(options.connectionsPerHost);
+
+            // Give sockets time to gracefully close
+            Thread.sleep(1000);
             System.exit(0); // We need that to stop non daemonized threads
         }
         catch (Exception e)
         {
             System.err.println("Streaming to the following hosts failed:");
             System.err.println(loader.getFailedHosts());
-            System.err.println(e);
-            if (options.debug)
-                e.printStackTrace(System.err);
+            e.printStackTrace(System.err);
             System.exit(1);
         }
     }
@@ -116,13 +130,15 @@ public class BulkLoader
     // Return true when everything is at 100%
     static class ProgressIndicator implements StreamEventHandler
     {
-        private final Map<InetAddress, SessionInfo> sessionsByHost = new ConcurrentHashMap<>();
-        private final Map<InetAddress, Set<ProgressInfo>> progressByHost = new ConcurrentHashMap<>();
-
         private long start;
         private long lastProgress;
         private long lastTime;
 
+        private int peak = 0;
+        private int totalFiles = 0;
+
+        private final Multimap<InetAddress, SessionInfo> sessionsByHost = HashMultimap.create();
+
         public ProgressIndicator()
         {
             start = lastTime = System.nanoTime();
@@ -131,70 +147,100 @@ public class BulkLoader
         public void onSuccess(StreamState finalState) {}
         public void onFailure(Throwable t) {}
 
-        public void handleStreamEvent(StreamEvent event)
+        public synchronized void handleStreamEvent(StreamEvent event)
         {
             if (event.eventType == StreamEvent.Type.STREAM_PREPARED)
             {
                 SessionInfo session = ((StreamEvent.SessionPreparedEvent) event).session;
                 sessionsByHost.put(session.peer, session);
             }
-            else if (event.eventType == StreamEvent.Type.FILE_PROGRESS)
+            else if (event.eventType == StreamEvent.Type.FILE_PROGRESS || event.eventType == StreamEvent.Type.STREAM_COMPLETE)
             {
-                ProgressInfo progressInfo = ((StreamEvent.ProgressEvent) event).progress;
-
-                // update progress
-                Set<ProgressInfo> progresses = progressByHost.get(progressInfo.peer);
-                if (progresses == null)
+                ProgressInfo progressInfo = null;
+                if (event.eventType == StreamEvent.Type.FILE_PROGRESS)
                 {
-                    progresses = Sets.newSetFromMap(new ConcurrentHashMap<ProgressInfo, Boolean>());
-                    progressByHost.put(progressInfo.peer, progresses);
+                    progressInfo = ((StreamEvent.ProgressEvent) event).progress;
                 }
-                if (progresses.contains(progressInfo))
-                    progresses.remove(progressInfo);
-                progresses.add(progressInfo);
+
+                long time = System.nanoTime();
+                long deltaTime = time - lastTime;
 
                 StringBuilder sb = new StringBuilder();
                 sb.append("\rprogress: ");
 
                 long totalProgress = 0;
                 long totalSize = 0;
-                for (Map.Entry<InetAddress, Set<ProgressInfo>> entry : progressByHost.entrySet())
+
+                boolean updateTotalFiles = totalFiles == 0;
+                // recalculate progress across all sessions in all hosts and display
+                for (InetAddress peer : sessionsByHost.keySet())
                 {
-                    SessionInfo session = sessionsByHost.get(entry.getKey());
+                    sb.append("[").append(peer.toString()).append("]");
 
-                    long size = session.getTotalSizeToSend();
-                    long current = 0;
-                    int completed = 0;
-                    for (ProgressInfo progress : entry.getValue())
+                    for (SessionInfo session : sessionsByHost.get(peer))
                     {
-                        if (progress.currentBytes == progress.totalBytes)
-                            completed++;
-                        current += progress.currentBytes;
+                        long size = session.getTotalSizeToSend();
+                        long current = 0;
+                        int completed = 0;
+
+                        if (progressInfo != null && session.peer.equals(progressInfo.peer) && (session.sessionIndex == progressInfo.sessionIndex))
+                        {
+                            session.updateProgress(progressInfo);
+                        }
+                        for (ProgressInfo progress : session.getSendingFiles())
+                        {
+                            if (progress.isCompleted())
+                                completed++;
+                            current += progress.currentBytes;
+                        }
+                        totalProgress += current;
+
+                        totalSize += size;
+
+                        sb.append(session.sessionIndex).append(":");
+                        sb.append(completed).append("/").append(session.getTotalFilesToSend());
+                        sb.append(" ").append(String.format("%-3d", size == 0 ? 100L : current * 100L / size)).append("% ");
+
+                        if (updateTotalFiles)
+                            totalFiles += session.getTotalFilesToSend();
                     }
-                    totalProgress += current;
-                    totalSize += size;
-                    sb.append("[").append(entry.getKey());
-                    sb.append(" ").append(completed).append("/").append(session.getTotalFilesToSend());
-                    sb.append(" (").append(size == 0 ? 100L : current * 100L / size).append("%)] ");
                 }
-                long time = System.nanoTime();
-                long deltaTime = TimeUnit.NANOSECONDS.toMillis(time - lastTime);
+
                 lastTime = time;
                 long deltaProgress = totalProgress - lastProgress;
                 lastProgress = totalProgress;
 
-                sb.append("[total: ").append(totalSize == 0 ? 100L : totalProgress * 100L / totalSize).append("% - ");
-                sb.append(mbPerSec(deltaProgress, deltaTime)).append("MB/s");
-                sb.append(" (avg: ").append(mbPerSec(totalProgress, TimeUnit.NANOSECONDS.toMillis(time - start))).append("MB/s)]");
+                sb.append("total: ").append(totalSize == 0 ? 100L : totalProgress * 100L / totalSize).append("% ");
+                sb.append(String.format("%-3d", mbPerSec(deltaProgress, deltaTime))).append("MB/s");
+                int average = mbPerSec(totalProgress, (time - start));
+                if (average > peak)
+                    peak = average;
+                sb.append("(avg: ").append(average).append(" MB/s)");
 
-                System.out.print(sb.toString());
+                System.err.print(sb.toString());
             }
         }
 
-        private int mbPerSec(long bytes, long timeInMs)
+        private int mbPerSec(long bytes, long timeInNano)
         {
-            double bytesPerMs = ((double)bytes) / timeInMs;
-            return (int)((bytesPerMs * 1000) / (1024 * 2024));
+            double bytesPerNano = ((double)bytes) / timeInNano;
+            return (int)((bytesPerNano * 1000 * 1000 * 1000) / (1024 * 2024));
+        }
+
+        private void printSummary(int connectionsPerHost)
+        {
+            long end = System.nanoTime();
+            long durationMS = ((end - start) / (1000000));
+            int average = mbPerSec(lastProgress, (end - start));
+            StringBuilder sb = new StringBuilder();
+            sb.append("\nSummary statistics: \n");
+            sb.append(String.format("   %-30s: %-10d\n", "Connections per host: ", connectionsPerHost));
+            sb.append(String.format("   %-30s: %-10d\n", "Total files transferred: ", totalFiles));
+            sb.append(String.format("   %-30s: %-10d\n", "Total bytes transferred: ", lastProgress));
+            sb.append(String.format("   %-30s: %-10d\n", "Total duration (ms): ", durationMS));
+            sb.append(String.format("   %-30s: %-10d\n", "Average transfer rate (MB/s): ", + average));
+            sb.append(String.format("   %-30s: %-10d\n", "Peak transfer rate (MB/s): ", + peak));
+            System.err.println(sb.toString());
         }
     }
 
@@ -282,7 +328,7 @@ public class BulkLoader
             Cassandra.Client client = new Cassandra.Client(protocol);
             if (user != null && passwd != null)
             {
-                Map<String, String> credentials = new HashMap<String, String>();
+                Map<String, String> credentials = new HashMap<>();
                 credentials.put(IAuthenticator.USERNAME_KEY, user);
                 credentials.put(IAuthenticator.PASSWORD_KEY, passwd);
                 AuthenticationRequest authenticationRequest = new AuthenticationRequest(credentials);
@@ -305,9 +351,10 @@ public class BulkLoader
         public int throttle = 0;
         public ITransportFactory transportFactory = new TFramedTransportFactory();
         public EncryptionOptions encOptions = new EncryptionOptions.ClientEncryptionOptions();
+        public int connectionsPerHost = 1;
 
-        public final Set<InetAddress> hosts = new HashSet<InetAddress>();
-        public final Set<InetAddress> ignores = new HashSet<InetAddress>();
+        public final Set<InetAddress> hosts = new HashSet<>();
+        public final Set<InetAddress> ignores = new HashSet<>();
 
         LoaderOptions(File directory)
         {
@@ -354,7 +401,6 @@ public class BulkLoader
 
                 LoaderOptions opts = new LoaderOptions(dir);
 
-                opts.debug = cmd.hasOption(DEBUG_OPTION);
                 opts.verbose = cmd.hasOption(VERBOSE_OPTION);
                 opts.noProgress = cmd.hasOption(NOPROGRESS_OPTION);
 
@@ -409,6 +455,9 @@ public class BulkLoader
                     }
                 }
 
+                if (cmd.hasOption(CONNECTIONS_PER_HOST))
+                    opts.connectionsPerHost = Integer.parseInt(cmd.getOptionValue(CONNECTIONS_PER_HOST));
+
                 if(cmd.hasOption(SSL_TRUSTSTORE))
                 {
                     opts.encOptions.truststore = cmd.getOptionValue(SSL_TRUSTSTORE);
@@ -520,7 +569,6 @@ public class BulkLoader
         private static CmdLineOptions getCmdLineOptions()
         {
             CmdLineOptions options = new CmdLineOptions();
-            options.addOption(null, DEBUG_OPTION,        "display stack traces");
             options.addOption("v",  VERBOSE_OPTION,      "verbose output");
             options.addOption("h",  HELP_OPTION,         "display this help message");
             options.addOption(null, NOPROGRESS_OPTION,   "don't display progress");
@@ -531,6 +579,7 @@ public class BulkLoader
             options.addOption("u",  USER_OPTION, "username", "username for cassandra authentication");
             options.addOption("pw", PASSWD_OPTION, "password", "password for cassandra authentication");
             options.addOption("tf", TRANSPORT_FACTORY, "transport factory", "Fully-qualified ITransportFactory class name for creating a connection to cassandra");
+            options.addOption("cph", CONNECTIONS_PER_HOST, "connectionsPerHost", "number of concurrent connections-per-host.");
             // ssl connection-related options
             options.addOption("ts", SSL_TRUSTSTORE, "TRUSTSTORE", "SSL: full path to truststore");
             options.addOption("tspw", SSL_TRUSTSTORE_PW, "TRUSTSTORE-PASSWORD", "SSL: password of the truststore");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java b/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
index 60fbf40..f015a01 100644
--- a/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
+++ b/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
@@ -46,7 +46,7 @@ public class SessionInfoTest
         }
 
         StreamSummary sending = new StreamSummary(cfId, 10, 100);
-        SessionInfo info = new SessionInfo(local, summaries, Collections.singleton(sending), StreamSession.State.PREPARING);
+        SessionInfo info = new SessionInfo(local, 0, summaries, Collections.singleton(sending), StreamSession.State.PREPARING);
 
         assert info.getTotalFilesToReceive() == 45;
         assert info.getTotalFilesToSend() == 10;
@@ -57,13 +57,13 @@ public class SessionInfoTest
         assert info.getTotalFilesSent() == 0;
 
         // receive in progress
-        info.updateProgress(new ProgressInfo(local, "test.txt", ProgressInfo.Direction.IN, 50, 100));
+        info.updateProgress(new ProgressInfo(local, 0, "test.txt", ProgressInfo.Direction.IN, 50, 100));
         // still in progress, but not completed yet
         assert info.getTotalSizeReceived() == 50;
         assert info.getTotalSizeSent() == 0;
         assert info.getTotalFilesReceived() == 0;
         assert info.getTotalFilesSent() == 0;
-        info.updateProgress(new ProgressInfo(local, "test.txt", ProgressInfo.Direction.IN, 100, 100));
+        info.updateProgress(new ProgressInfo(local, 0, "test.txt", ProgressInfo.Direction.IN, 100, 100));
         // 1 file should be completed
         assert info.getTotalSizeReceived() == 100;
         assert info.getTotalSizeSent() == 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index e22fa8f..c6c04b0 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -43,7 +43,7 @@ public class StreamTransferTaskTest extends SchemaLoader
         String ks = "Keyspace1";
         String cf = "Standard1";
 
-        StreamSession session = new StreamSession(FBUtilities.getBroadcastAddress());
+        StreamSession session = new StreamSession(FBUtilities.getBroadcastAddress(), 0);
         ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(cf);
 
         // create two sstables


[2/3] git commit: Parallel streaming for sstableloader

Posted by yu...@apache.org.
Parallel streaming for sstableloader

patch by Joshua Mckenzie; reviewed by yukim for CASSANDRA-3668


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/44fa2cdb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/44fa2cdb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/44fa2cdb

Branch: refs/heads/trunk
Commit: 44fa2cdb48cf6b4e589d5c48f5c57f0a93c03b60
Parents: 31cd613
Author: Joshua McKenzie <jo...@datastax.com>
Authored: Thu May 1 17:51:12 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu May 1 17:51:12 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/io/sstable/SSTableLoader.java     |   9 +-
 .../net/IncomingStreamingConnection.java        |   2 +-
 .../cassandra/repair/StreamingRepairTask.java   |   2 +-
 .../cassandra/streaming/ConnectionHandler.java  |   7 +-
 .../cassandra/streaming/ProgressInfo.java       |  10 +-
 .../apache/cassandra/streaming/SessionInfo.java |   3 +
 .../cassandra/streaming/StreamCoordinator.java  | 272 +++++++++++++++++++
 .../apache/cassandra/streaming/StreamEvent.java |   2 +
 .../apache/cassandra/streaming/StreamPlan.java  |  34 +--
 .../cassandra/streaming/StreamResultFuture.java |  80 +++---
 .../cassandra/streaming/StreamSession.java      |  42 ++-
 .../management/ProgressInfoCompositeData.java   |  21 +-
 .../management/SessionInfoCompositeData.java    |   1 +
 .../streaming/messages/StreamInitMessage.java   |   9 +-
 .../streaming/messages/StreamMessage.java       |   2 +-
 .../org/apache/cassandra/tools/BulkLoader.java  | 169 ++++++++----
 .../cassandra/streaming/SessionInfoTest.java    |   6 +-
 .../streaming/StreamTransferTaskTest.java       |   2 +-
 19 files changed, 500 insertions(+), 174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 16fcdfc..86940ea 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.0-rc1
+ * Parallel streaming for sstableloader (CASSANDRA-3668)
 Merged from 2.0:
  * Add Google Compute Engine snitch (CASSANDRA-7132)
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index b14e203..bbb1277 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -48,6 +48,7 @@ public class SSTableLoader implements StreamEventHandler
     private final File directory;
     private final String keyspace;
     private final Client client;
+    private final int connectionsPerHost;
     private final OutputHandler outputHandler;
     private final Set<InetAddress> failedHosts = new HashSet<>();
 
@@ -61,10 +62,16 @@ public class SSTableLoader implements StreamEventHandler
 
     public SSTableLoader(File directory, Client client, OutputHandler outputHandler)
     {
+        this(directory, client, outputHandler, 1);
+    }
+
+    public SSTableLoader(File directory, Client client, OutputHandler outputHandler, int connectionsPerHost)
+    {
         this.directory = directory;
         this.keyspace = directory.getParentFile().getName();
         this.client = client;
         this.outputHandler = outputHandler;
+        this.connectionsPerHost = connectionsPerHost;
     }
 
     protected Collection<SSTableReader> openSSTables(final Map<InetAddress, Collection<Range<Token>>> ranges)
@@ -150,7 +157,7 @@ public class SSTableLoader implements StreamEventHandler
         client.init(keyspace);
         outputHandler.output("Established connection to initial hosts");
 
-        StreamPlan plan = new StreamPlan("Bulk Load");
+        StreamPlan plan = new StreamPlan("Bulk Load", 0, connectionsPerHost);
 
         Map<InetAddress, Collection<Range<Token>>> endpointToRanges = client.getEndpointToRangesMap();
         openSSTables(endpointToRanges);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
index 20392f2..003bbf9 100644
--- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
@@ -62,7 +62,7 @@ public class IncomingStreamingConnection extends Thread
             // The receiving side distinguish two connections by looking at StreamInitMessage#isForOutgoing.
             // Note: we cannot use the same socket for incoming and outgoing streams because we want to
             // parallelize said streams and the socket is blocking, so we might deadlock.
-            StreamResultFuture.initReceivingSide(init.planId, init.description, init.from, socket, init.isForOutgoing, version);
+            StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.description, init.from, socket, init.isForOutgoing, version);
         }
         catch (IOException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index 636568c..b9184ca 100644
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@ -71,7 +71,7 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler
             repairedAt = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).repairedAt;
 
         logger.info(String.format("[streaming task #%s] Performing streaming repair of %d ranges with %s", desc.sessionId, request.ranges.size(), request.dst));
-        StreamResultFuture op = new StreamPlan("Repair", repairedAt)
+        StreamResultFuture op = new StreamPlan("Repair", repairedAt, 1)
                                     .flushBeforeTransfer(true)
                                     // request ranges from the remote node
                                     .requestRanges(request.dst, desc.keyspace, request.ranges, desc.columnFamily)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index 562645d..5484c83 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -217,7 +217,12 @@ public class ConnectionHandler
 
         public void sendInitMessage(Socket socket, boolean isForOutgoing) throws IOException
         {
-            StreamInitMessage message = new StreamInitMessage(FBUtilities.getBroadcastAddress(), session.planId(), session.description(), isForOutgoing);
+            StreamInitMessage message = new StreamInitMessage(
+                    FBUtilities.getBroadcastAddress(),
+                    session.sessionIndex(),
+                    session.planId(),
+                    session.description(),
+                    isForOutgoing);
             ByteBuffer messageBuf = message.createMessage(false, protocolVersion);
             getWriteChannel(socket).write(messageBuf);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/ProgressInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ProgressInfo.java b/src/java/org/apache/cassandra/streaming/ProgressInfo.java
index d308ed0..fdd3e97 100644
--- a/src/java/org/apache/cassandra/streaming/ProgressInfo.java
+++ b/src/java/org/apache/cassandra/streaming/ProgressInfo.java
@@ -49,16 +49,18 @@ public class ProgressInfo implements Serializable
     }
 
     public final InetAddress peer;
+    public final int sessionIndex;
     public final String fileName;
     public final Direction direction;
     public final long currentBytes;
     public final long totalBytes;
 
-    public ProgressInfo(InetAddress peer, String fileName, Direction direction, long currentBytes, long totalBytes)
+    public ProgressInfo(InetAddress peer, int sessionIndex, String fileName, Direction direction, long currentBytes, long totalBytes)
     {
         assert totalBytes > 0;
 
         this.peer = peer;
+        this.sessionIndex = sessionIndex;
         this.fileName = fileName;
         this.direction = direction;
         this.currentBytes = currentBytes;
@@ -70,7 +72,7 @@ public class ProgressInfo implements Serializable
      */
     public boolean isCompleted()
     {
-        return currentBytes == totalBytes;
+        return currentBytes >= totalBytes;
     }
 
     /**
@@ -87,13 +89,14 @@ public class ProgressInfo implements Serializable
         if (totalBytes != that.totalBytes) return false;
         if (direction != that.direction) return false;
         if (!fileName.equals(that.fileName)) return false;
+        if (sessionIndex != that.sessionIndex) return false;
         return peer.equals(that.peer);
     }
 
     @Override
     public int hashCode()
     {
-        return Objects.hashCode(peer, fileName, direction, totalBytes);
+        return Objects.hashCode(peer, sessionIndex, fileName, direction, totalBytes);
     }
 
     @Override
@@ -104,6 +107,7 @@ public class ProgressInfo implements Serializable
         sb.append("/").append(totalBytes).append(" bytes");
         sb.append("(").append(currentBytes*100/totalBytes).append("%) ");
         sb.append(direction == Direction.OUT ? "sent to " : "received from ");
+        sb.append("idx:").append(sessionIndex);
         sb.append(peer);
         return sb.toString();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/SessionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/SessionInfo.java b/src/java/org/apache/cassandra/streaming/SessionInfo.java
index b722ecf..98e945b 100644
--- a/src/java/org/apache/cassandra/streaming/SessionInfo.java
+++ b/src/java/org/apache/cassandra/streaming/SessionInfo.java
@@ -33,6 +33,7 @@ import com.google.common.collect.Iterables;
 public final class SessionInfo implements Serializable
 {
     public final InetAddress peer;
+    public final int sessionIndex;
     /** Immutable collection of receiving summaries */
     public final Collection<StreamSummary> receivingSummaries;
     /** Immutable collection of sending summaries*/
@@ -44,11 +45,13 @@ public final class SessionInfo implements Serializable
     private final Map<String, ProgressInfo> sendingFiles;
 
     public SessionInfo(InetAddress peer,
+                       int sessionIndex,
                        Collection<StreamSummary> receivingSummaries,
                        Collection<StreamSummary> sendingSummaries,
                        StreamSession.State state)
     {
         this.peer = peer;
+        this.sessionIndex = sessionIndex;
         this.receivingSummaries = ImmutableSet.copyOf(receivingSummaries);
         this.sendingSummaries = ImmutableSet.copyOf(sendingSummaries);
         this.receivingFiles = new ConcurrentHashMap<>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
new file mode 100644
index 0000000..425b5b1
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.net.InetAddress;
+import java.util.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * {@link StreamCoordinator} is a helper class that abstracts away maintaining multiple
+ * StreamSession and ProgressInfo instances per peer.
+ *
+ * This class coordinates multiple SessionStreams per peer in both the outgoing StreamPlan context and on the
+ * inbound StreamResultFuture context.
+ */
+public class StreamCoordinator
+{
+    private static final Logger logger = LoggerFactory.getLogger(StreamCoordinator.class);
+
+    // Executor strictly for establishing the initial connections. Once we're connected to the other end the rest of the
+    // streaming is handled directly by the ConnectionHandler's incoming and outgoing threads.
+    private static final DebuggableThreadPoolExecutor streamExecutor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("StreamConnectionEstablisher",
+                                                                                                                            FBUtilities.getAvailableProcessors());
+
+    private Map<InetAddress, HostStreamingData> peerSessions = new HashMap<>();
+    private final int connectionsPerHost;
+
+    public StreamCoordinator(int connectionsPerHost)
+    {
+        this.connectionsPerHost = connectionsPerHost;
+    }
+
+    /**
+     * @return true if any stream session is active
+     */
+    public synchronized boolean hasActiveSessions()
+    {
+        for (HostStreamingData data : peerSessions.values())
+        {
+            if (data.hasActiveSessions())
+                return true;
+        }
+        return false;
+    }
+
+    public synchronized Collection<StreamSession> getAllStreamSessions()
+    {
+        Collection<StreamSession> results = new ArrayList<>();
+        for (HostStreamingData data : peerSessions.values())
+        {
+            results.addAll(data.getAllStreamSessions());
+        }
+        return results;
+    }
+
+    public void connectAllStreamSessions()
+    {
+        for (HostStreamingData data : peerSessions.values())
+            data.connectAllStreamSessions();
+    }
+
+    public synchronized Set<InetAddress> getPeers()
+    {
+        return new HashSet<>(peerSessions.keySet());
+    }
+
+    public synchronized StreamSession getOrCreateNextSession(InetAddress peer)
+    {
+        return getOrCreateHostData(peer).getOrCreateNextSession(peer);
+    }
+
+    public synchronized StreamSession getOrCreateSessionById(InetAddress peer, int id)
+    {
+        return getOrCreateHostData(peer).getOrCreateSessionById(peer, id);
+    }
+
+    public synchronized void updateProgress(ProgressInfo info)
+    {
+        getHostData(info.peer).updateProgress(info);
+    }
+
+    public synchronized void addSessionInfo(SessionInfo session)
+    {
+        HostStreamingData data = getOrCreateHostData(session.peer);
+        data.addSessionInfo(session);
+    }
+
+    public synchronized Set<SessionInfo> getAllSessionInfo()
+    {
+        Set<SessionInfo> result = new HashSet<>();
+        for (HostStreamingData data : peerSessions.values())
+        {
+            result.addAll(data.getAllSessionInfo());
+        }
+        return result;
+    }
+
+    public synchronized void transferFiles(InetAddress to, Collection<StreamSession.SSTableStreamingSections> sstableDetails)
+    {
+        HostStreamingData sessionList = getOrCreateHostData(to);
+
+        if (connectionsPerHost > 1)
+        {
+            List<List<StreamSession.SSTableStreamingSections>> buckets = sliceSSTableDetails(sstableDetails);
+
+            for (List<StreamSession.SSTableStreamingSections> subList : buckets)
+            {
+                StreamSession session = sessionList.getOrCreateNextSession(to);
+                session.addTransferFiles(subList);
+            }
+        }
+        else
+        {
+            StreamSession session = sessionList.getOrCreateNextSession(to);
+            session.addTransferFiles(sstableDetails);
+        }
+    }
+
+    private List<List<StreamSession.SSTableStreamingSections>> sliceSSTableDetails(Collection<StreamSession.SSTableStreamingSections> sstableDetails)
+    {
+        // There's no point in divvying things up into more buckets than we have sstableDetails
+        int targetSlices = Math.min(sstableDetails.size(), connectionsPerHost);
+        int step = Math.round((float) sstableDetails.size() / (float) targetSlices);
+        int index = 0;
+
+        List<List<StreamSession.SSTableStreamingSections>> result = new ArrayList<>();
+        List<StreamSession.SSTableStreamingSections> slice = null;
+        for (StreamSession.SSTableStreamingSections streamSession : sstableDetails)
+        {
+            if (index % step == 0)
+            {
+                slice = new ArrayList<>();
+                result.add(slice);
+            }
+            slice.add(streamSession);
+            ++index;
+        }
+
+        return result;
+    }
+
+    private HostStreamingData getHostData(InetAddress peer)
+    {
+        HostStreamingData data = peerSessions.get(peer);
+        if (data == null)
+            throw new IllegalArgumentException("Unknown peer requested: " + peer.toString());
+        return data;
+    }
+
+    private HostStreamingData getOrCreateHostData(InetAddress peer)
+    {
+        HostStreamingData data = peerSessions.get(peer);
+        if (data == null)
+        {
+            data = new HostStreamingData();
+            peerSessions.put(peer, data);
+        }
+        return data;
+    }
+
+    private class StreamSessionConnector implements Runnable
+    {
+        private final StreamSession session;
+        public StreamSessionConnector(StreamSession session)
+        {
+            this.session = session;
+        }
+
+        public void run()
+        {
+            session.start();
+            logger.info("[Stream #{}, ID#{}] Beginning stream session with {}", session.planId(), session.sessionIndex(), session.peer);
+        }
+    }
+
+    private class HostStreamingData
+    {
+        private Map<Integer, StreamSession> streamSessions = new HashMap<>();
+        private Map<Integer, SessionInfo> sessionInfos = new HashMap<>();
+
+        private int lastReturned = -1;
+
+        public boolean hasActiveSessions()
+        {
+            for (StreamSession session : streamSessions.values())
+            {
+                StreamSession.State state = session.state();
+                if (state != StreamSession.State.COMPLETE && state != StreamSession.State.FAILED)
+                    return true;
+            }
+            return false;
+        }
+
+        public StreamSession getOrCreateNextSession(InetAddress peer)
+        {
+            // create
+            if (streamSessions.size() < connectionsPerHost)
+            {
+                StreamSession session = new StreamSession(peer, streamSessions.size());
+                streamSessions.put(++lastReturned, session);
+                return session;
+            }
+            // get
+            else
+            {
+                if (lastReturned == streamSessions.size() - 1)
+                    lastReturned = 0;
+
+                return streamSessions.get(lastReturned++);
+            }
+        }
+
+        public void connectAllStreamSessions()
+        {
+            for (StreamSession session : streamSessions.values())
+            {
+                streamExecutor.execute(new StreamSessionConnector(session));
+            }
+        }
+
+        public Collection<StreamSession> getAllStreamSessions()
+        {
+            return Collections.unmodifiableCollection(streamSessions.values());
+        }
+
+        public StreamSession getOrCreateSessionById(InetAddress peer, int id)
+        {
+            StreamSession session = streamSessions.get(id);
+            if (session == null)
+            {
+                session = new StreamSession(peer, id);
+                streamSessions.put(id, session);
+            }
+            return session;
+        }
+
+        public void updateProgress(ProgressInfo info)
+        {
+            sessionInfos.get(info.sessionIndex).updateProgress(info);
+        }
+
+        public void addSessionInfo(SessionInfo info)
+        {
+            sessionInfos.put(info.sessionIndex, info);
+        }
+
+        public Collection<SessionInfo> getAllSessionInfo()
+        {
+            return sessionInfos.values();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/StreamEvent.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamEvent.java b/src/java/org/apache/cassandra/streaming/StreamEvent.java
index 9af1fbd..8089323 100644
--- a/src/java/org/apache/cassandra/streaming/StreamEvent.java
+++ b/src/java/org/apache/cassandra/streaming/StreamEvent.java
@@ -42,12 +42,14 @@ public abstract class StreamEvent
     {
         public final InetAddress peer;
         public final boolean success;
+        public final int sessionIndex;
 
         public SessionCompleteEvent(StreamSession session)
         {
             super(Type.STREAM_COMPLETE, session.planId());
             this.peer = session.peer;
             this.success = session.isSuccess();
+            this.sessionIndex = session.sessionIndex();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index 04bd7df..e775c90 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -35,10 +35,8 @@ public class StreamPlan
     private final UUID planId = UUIDGen.getTimeUUID();
     private final String description;
     private final List<StreamEventHandler> handlers = new ArrayList<>();
-
-    // sessions per InetAddress of the other end.
-    private final Map<InetAddress, StreamSession> sessions = new HashMap<>();
     private final long repairedAt;
+    private final StreamCoordinator coordinator;
 
     private boolean flushBeforeTransfer = true;
 
@@ -49,16 +47,16 @@ public class StreamPlan
      */
     public StreamPlan(String description)
     {
-        this(description, ActiveRepairService.UNREPAIRED_SSTABLE);
+        this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1);
     }
 
-    public StreamPlan(String description, long repairedAt)
+    public StreamPlan(String description, long repairedAt, int connectionsPerHost)
     {
         this.description = description;
         this.repairedAt = repairedAt;
+        this.coordinator = new StreamCoordinator(connectionsPerHost);
     }
 
-
     /**
      * Request data in {@code keyspace} and {@code ranges} from specific node.
      *
@@ -83,7 +81,7 @@ public class StreamPlan
      */
     public StreamPlan requestRanges(InetAddress from, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
     {
-        StreamSession session = getOrCreateSession(from);
+        StreamSession session = coordinator.getOrCreateNextSession(from);
         session.addStreamRequest(keyspace, ranges, Arrays.asList(columnFamilies), repairedAt);
         return this;
     }
@@ -112,7 +110,7 @@ public class StreamPlan
      */
     public StreamPlan transferRanges(InetAddress to, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
     {
-        StreamSession session = getOrCreateSession(to);
+        StreamSession session = coordinator.getOrCreateNextSession(to);
         session.addTransferRanges(keyspace, ranges, Arrays.asList(columnFamilies), flushBeforeTransfer, repairedAt);
         return this;
     }
@@ -127,9 +125,9 @@ public class StreamPlan
      */
     public StreamPlan transferFiles(InetAddress to, Collection<StreamSession.SSTableStreamingSections> sstableDetails)
     {
-        StreamSession session = getOrCreateSession(to);
-        session.addTransferFiles(sstableDetails);
+        coordinator.transferFiles(to, sstableDetails);
         return this;
+
     }
 
     public StreamPlan listeners(StreamEventHandler handler, StreamEventHandler... handlers)
@@ -145,7 +143,7 @@ public class StreamPlan
      */
     public boolean isEmpty()
     {
-        return sessions.isEmpty();
+        return coordinator.hasActiveSessions();
     }
 
     /**
@@ -155,7 +153,7 @@ public class StreamPlan
      */
     public StreamResultFuture execute()
     {
-        return StreamResultFuture.init(planId, description, sessions.values(), handlers);
+        return StreamResultFuture.init(planId, description, handlers, coordinator);
     }
 
     /**
@@ -170,16 +168,4 @@ public class StreamPlan
         this.flushBeforeTransfer = flushBeforeTransfer;
         return this;
     }
-
-    private StreamSession getOrCreateSession(InetAddress peer)
-    {
-        StreamSession session = sessions.get(peer);
-        if (session == null)
-        {
-            session = new StreamSession(peer);
-            sessions.put(peer, session);
-        }
-        return session;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index dcffaff..c04c3f1 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -23,10 +23,8 @@ import java.net.Socket;
 import java.util.*;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
-import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.AbstractFuture;
 import com.google.common.util.concurrent.Futures;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,11 +47,9 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
 
     public final UUID planId;
     public final String description;
+    private final StreamCoordinator coordinator;
     private final Collection<StreamEventHandler> eventListeners = new ConcurrentLinkedQueue<>();
 
-    private final Map<InetAddress, StreamSession> ongoingSessions;
-    private final Map<InetAddress, SessionInfo> sessionStates = new NonBlockingHashMap<>();
-
     /**
      * Create new StreamResult of given {@code planId} and type.
      *
@@ -62,22 +58,25 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
      * @param planId Stream plan ID
      * @param description Stream description
      */
-    private StreamResultFuture(UUID planId, String description, Collection<StreamSession> sessions)
+    private StreamResultFuture(UUID planId, String description, StreamCoordinator coordinator)
     {
         this.planId = planId;
         this.description = description;
-        this.ongoingSessions = new HashMap<>(sessions.size());
-        for (StreamSession session : sessions)
-            this.ongoingSessions.put(session.peer, session);
+        this.coordinator = coordinator;
 
         // if there is no session to listen to, we immediately set result for returning
-        if (sessions.isEmpty())
+        if (!coordinator.hasActiveSessions())
             set(getCurrentState());
     }
 
-    static StreamResultFuture init(UUID planId, String description, Collection<StreamSession> sessions, Collection<StreamEventHandler> listeners)
+    private StreamResultFuture(UUID planId, String description)
+    {
+        this(planId, description, new StreamCoordinator(0));
+    }
+
+    static StreamResultFuture init(UUID planId, String description, Collection<StreamEventHandler> listeners, StreamCoordinator coordinator)
     {
-        StreamResultFuture future = createAndRegister(planId, description, sessions);
+        StreamResultFuture future = createAndRegister(planId, description, coordinator);
         if (listeners != null)
         {
             for (StreamEventHandler listener : listeners)
@@ -85,18 +84,19 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
         }
 
         logger.info("[Stream #{}] Executing streaming plan for {}", planId,  description);
-        // start sessions
-        for (final StreamSession session : sessions)
+
+        // Initialize and start all sessions
+        for (final StreamSession session : coordinator.getAllStreamSessions())
         {
-            logger.info("[Stream #{}] Beginning stream session with {}", planId, session.peer);
             session.init(future);
-            session.start();
         }
+        coordinator.connectAllStreamSessions();
 
         return future;
     }
 
-    public static synchronized StreamResultFuture initReceivingSide(UUID planId,
+    public static synchronized StreamResultFuture initReceivingSide(int sessionIndex,
+                                                                    UUID planId,
                                                                     String description,
                                                                     InetAddress from,
                                                                     Socket socket,
@@ -106,35 +106,28 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
         StreamResultFuture future = StreamManager.instance.getReceivingStream(planId);
         if (future == null)
         {
-            final StreamSession session = new StreamSession(from);
+            logger.info("[Stream #{} ID#{}] Creating new streaming plan for {}", planId, sessionIndex, description);
 
             // The main reason we create a StreamResultFuture on the receiving side is for JMX exposure.
-            future = new StreamResultFuture(planId, description, Collections.singleton(session));
+            future = new StreamResultFuture(planId, description);
             StreamManager.instance.registerReceiving(future);
-
-            session.init(future);
-            session.handler.initiateOnReceivingSide(socket, isForOutgoing, version);
-        }
-        else
-        {
-            future.attachSocket(from, socket, isForOutgoing, version);
-            logger.info("[Stream #{}] Received streaming plan for {}", planId,  description);
         }
+        future.attachSocket(from, sessionIndex, socket, isForOutgoing, version);
+        logger.info("[Stream #{}, ID#{}] Received streaming plan for {}", planId, sessionIndex, description);
         return future;
     }
 
-    private static StreamResultFuture createAndRegister(UUID planId, String description, Collection<StreamSession> sessions)
+    private static StreamResultFuture createAndRegister(UUID planId, String description, StreamCoordinator coordinator)
     {
-        StreamResultFuture future = new StreamResultFuture(planId, description, sessions);
+        StreamResultFuture future = new StreamResultFuture(planId, description, coordinator);
         StreamManager.instance.register(future);
         return future;
     }
 
-    public void attachSocket(InetAddress from, Socket socket, boolean isForOutgoing, int version) throws IOException
+    private void attachSocket(InetAddress from, int sessionIndex, Socket socket, boolean isForOutgoing, int version) throws IOException
     {
-        StreamSession session = ongoingSessions.get(from);
-        if (session == null)
-            throw new RuntimeException(String.format("Got connection from %s for stream session %s but no such session locally", from, planId));
+        StreamSession session = coordinator.getOrCreateSessionById(from, sessionIndex);
+        session.init(this);
         session.handler.initiateOnReceivingSide(socket, isForOutgoing, version);
     }
 
@@ -149,7 +142,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
      */
     public StreamState getCurrentState()
     {
-        return new StreamState(planId, description, ImmutableSet.copyOf(sessionStates.values()));
+        return new StreamState(planId, description, coordinator.getAllSessionInfo());
     }
 
     @Override
@@ -170,44 +163,41 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
     void handleSessionPrepared(StreamSession session)
     {
         SessionInfo sessionInfo = session.getSessionInfo();
-        logger.info("[Stream #{}] Prepare completed. Receiving {} files({} bytes), sending {} files({} bytes)",
+        logger.info("[Stream #{} ID#{}] Prepare completed. Receiving {} files({} bytes), sending {} files({} bytes)",
                               session.planId(),
+                              session.sessionIndex(),
                               sessionInfo.getTotalFilesToReceive(),
                               sessionInfo.getTotalSizeToReceive(),
                               sessionInfo.getTotalFilesToSend(),
                               sessionInfo.getTotalSizeToSend());
         StreamEvent.SessionPreparedEvent event = new StreamEvent.SessionPreparedEvent(planId, sessionInfo);
-        sessionStates.put(sessionInfo.peer, sessionInfo);
+        coordinator.addSessionInfo(sessionInfo);
         fireStreamEvent(event);
     }
 
     void handleSessionComplete(StreamSession session)
     {
         logger.info("[Stream #{}] Session with {} is complete", session.planId(), session.peer);
-
-        SessionInfo sessionInfo = session.getSessionInfo();
-        sessionStates.put(sessionInfo.peer, sessionInfo);
         fireStreamEvent(new StreamEvent.SessionCompleteEvent(session));
-        maybeComplete(session);
+        maybeComplete();
     }
 
     public void handleProgress(ProgressInfo progress)
     {
-        sessionStates.get(progress.peer).updateProgress(progress);
+        coordinator.updateProgress(progress);
         fireStreamEvent(new StreamEvent.ProgressEvent(planId, progress));
     }
 
-    void fireStreamEvent(StreamEvent event)
+    synchronized void fireStreamEvent(StreamEvent event)
     {
         // delegate to listener
         for (StreamEventHandler listener : eventListeners)
             listener.handleStreamEvent(event);
     }
 
-    private synchronized void maybeComplete(StreamSession session)
+    private synchronized void maybeComplete()
     {
-        ongoingSessions.remove(session.peer);
-        if (ongoingSessions.isEmpty())
+        if (!coordinator.hasActiveSessions())
         {
             StreamState finalState = getCurrentState();
             if (finalState.hasFailedSession())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index c5f4cf9..a1c571b 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -28,7 +28,6 @@ import com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
@@ -113,12 +112,8 @@ import org.apache.cassandra.utils.Pair;
 public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener
 {
     private static final Logger logger = LoggerFactory.getLogger(StreamSession.class);
-
-    // Executor that establish the streaming connection. Once we're connected to the other end, the rest of the streaming
-    // is directly handled by the ConnectionHandler incoming and outgoing threads.
-    private static final DebuggableThreadPoolExecutor streamExecutor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("StreamConnectionEstablisher",
-                                                                                                                            FBUtilities.getAvailableProcessors());
     public final InetAddress peer;
+    private final int index;
 
     // should not be null when session is started
     private StreamResultFuture streamResult;
@@ -153,9 +148,10 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
      *
      * @param peer Address of streaming peer
      */
-    public StreamSession(InetAddress peer)
+    public StreamSession(InetAddress peer, int index)
     {
         this.peer = peer;
+        this.index = index;
         this.handler = new ConnectionHandler(this);
         this.metrics = StreamingMetrics.get(peer);
     }
@@ -165,6 +161,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
         return streamResult == null ? null : streamResult.planId;
     }
 
+    public int sessionIndex()
+    {
+        return index;
+    }
+
     public String description()
     {
         return streamResult == null ? null : streamResult.description;
@@ -194,21 +195,16 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
             return;
         }
 
-        streamExecutor.execute(new Runnable()
+        try
         {
-            public void run()
-            {
-                try
-                {
-                    handler.initiate();
-                    onInitializationComplete();
-                }
-                catch (IOException e)
-                {
-                    onError(e);
-                }
-            }
-        });
+            logger.info("[Stream #{}, ID#{}] Beginning stream session with {}", planId(), sessionIndex(), peer);
+            handler.initiate();
+            onInitializationComplete();
+        }
+        catch (Exception e)
+        {
+            onError(e);
+        }
     }
 
     /**
@@ -519,7 +515,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
 
     public void progress(Descriptor desc, ProgressInfo.Direction direction, long bytes, long total)
     {
-        ProgressInfo progress = new ProgressInfo(peer, desc.filenameFor(Component.DATA), direction, bytes, total);
+        ProgressInfo progress = new ProgressInfo(peer, index, desc.filenameFor(Component.DATA), direction, bytes, total);
         streamResult.handleProgress(progress);
     }
 
@@ -590,7 +586,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
         List<StreamSummary> transferSummaries = Lists.newArrayList();
         for (StreamTask transfer : transfers.values())
             transferSummaries.add(transfer.getSummary());
-        return new SessionInfo(peer, receivingSummaries, transferSummaries, state);
+        return new SessionInfo(peer, index, receivingSummaries, transferSummaries, state);
     }
 
     public synchronized void taskCompleted(StreamReceiveTask completedTask)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java b/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
index b361b1b..a54498d 100644
--- a/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
+++ b/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
@@ -32,18 +32,21 @@ public class ProgressInfoCompositeData
 {
     private static final String[] ITEM_NAMES = new String[]{"planId",
                                                             "peer",
+                                                            "sessionIndex",
                                                             "fileName",
                                                             "direction",
                                                             "currentBytes",
                                                             "totalBytes"};
     private static final String[] ITEM_DESCS = new String[]{"String representation of Plan ID",
                                                             "Session peer",
+                                                            "Index of session",
                                                             "Name of the file",
                                                             "Direction('IN' or 'OUT')",
                                                             "Current bytes transferred",
                                                             "Total bytes to transfer"};
     private static final OpenType<?>[] ITEM_TYPES = new OpenType[]{SimpleType.STRING,
                                                                    SimpleType.STRING,
+                                                                   SimpleType.INTEGER,
                                                                    SimpleType.STRING,
                                                                    SimpleType.STRING,
                                                                    SimpleType.LONG,
@@ -70,10 +73,11 @@ public class ProgressInfoCompositeData
         Map<String, Object> valueMap = new HashMap<>();
         valueMap.put(ITEM_NAMES[0], planId.toString());
         valueMap.put(ITEM_NAMES[1], progressInfo.peer.getHostAddress());
-        valueMap.put(ITEM_NAMES[2], progressInfo.fileName);
-        valueMap.put(ITEM_NAMES[3], progressInfo.direction.name());
-        valueMap.put(ITEM_NAMES[4], progressInfo.currentBytes);
-        valueMap.put(ITEM_NAMES[5], progressInfo.totalBytes);
+        valueMap.put(ITEM_NAMES[2], progressInfo.sessionIndex);
+        valueMap.put(ITEM_NAMES[3], progressInfo.fileName);
+        valueMap.put(ITEM_NAMES[4], progressInfo.direction.name());
+        valueMap.put(ITEM_NAMES[5], progressInfo.currentBytes);
+        valueMap.put(ITEM_NAMES[6], progressInfo.totalBytes);
         try
         {
             return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
@@ -90,10 +94,11 @@ public class ProgressInfoCompositeData
         try
         {
             return new ProgressInfo(InetAddress.getByName((String) values[1]),
-                                    (String) values[2],
-                                    ProgressInfo.Direction.valueOf((String)values[3]),
-                                    (long) values[4],
-                                    (long) values[5]);
+                                    (int) values[2],
+                                    (String) values[3],
+                                    ProgressInfo.Direction.valueOf((String)values[4]),
+                                    (long) values[5],
+                                    (long) values[6]);
         }
         catch (UnknownHostException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java b/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
index 658facf..8618cca 100644
--- a/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
+++ b/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
@@ -129,6 +129,7 @@ public class SessionInfoCompositeData
             }
         };
         SessionInfo info = new SessionInfo(peer,
+                                           (int)values[2],
                                            fromArrayOfCompositeData((CompositeData[]) values[2], toStreamSummary),
                                            fromArrayOfCompositeData((CompositeData[]) values[3], toStreamSummary),
                                            StreamSession.State.valueOf((String) values[4]));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
index 947e5d5..a9ec4ae 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
@@ -40,15 +40,17 @@ public class StreamInitMessage
     public static IVersionedSerializer<StreamInitMessage> serializer = new StreamInitMessageSerializer();
 
     public final InetAddress from;
+    public final int sessionIndex;
     public final UUID planId;
     public final String description;
 
     // true if this init message is to connect for outgoing message on receiving side
     public final boolean isForOutgoing;
 
-    public StreamInitMessage(InetAddress from, UUID planId, String description, boolean isForOutgoing)
+    public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, String description, boolean isForOutgoing)
     {
         this.from = from;
+        this.sessionIndex = sessionIndex;
         this.planId = planId;
         this.description = description;
         this.isForOutgoing = isForOutgoing;
@@ -99,6 +101,7 @@ public class StreamInitMessage
         public void serialize(StreamInitMessage message, DataOutputPlus out, int version) throws IOException
         {
             CompactEndpointSerializationHelper.serialize(message.from, out);
+            out.writeInt(message.sessionIndex);
             UUIDSerializer.serializer.serialize(message.planId, out, MessagingService.current_version);
             out.writeUTF(message.description);
             out.writeBoolean(message.isForOutgoing);
@@ -107,15 +110,17 @@ public class StreamInitMessage
         public StreamInitMessage deserialize(DataInput in, int version) throws IOException
         {
             InetAddress from = CompactEndpointSerializationHelper.deserialize(in);
+            int sessionIndex = in.readInt();
             UUID planId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
             String description = in.readUTF();
             boolean sentByInitiator = in.readBoolean();
-            return new StreamInitMessage(from, planId, description, sentByInitiator);
+            return new StreamInitMessage(from, sessionIndex, planId, description, sentByInitiator);
         }
 
         public long serializedSize(StreamInitMessage message, int version)
         {
             long size = CompactEndpointSerializationHelper.serializedSize(message.from);
+            size += TypeSizes.NATIVE.sizeof(message.sessionIndex);
             size += UUIDSerializer.serializer.serializedSize(message.planId, MessagingService.current_version);
             size += TypeSizes.NATIVE.sizeof(message.description);
             size += TypeSizes.NATIVE.sizeof(message.isForOutgoing);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
index 50c3911..360b59e 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
@@ -33,7 +33,7 @@ import org.apache.cassandra.streaming.StreamSession;
 public abstract class StreamMessage
 {
     /** Streaming protocol version */
-    public static final int CURRENT_VERSION = 1;
+    public static final int CURRENT_VERSION = 2;
 
     public static void serialize(StreamMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java
index 01952e5..6042f92 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -21,12 +21,10 @@ import java.io.File;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Joiner;
-import com.google.common.collect.Sets;
-import org.apache.cassandra.config.EncryptionOptions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
 import org.apache.commons.cli.*;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TProtocol;
@@ -35,6 +33,7 @@ import org.apache.thrift.transport.TTransport;
 import org.apache.cassandra.auth.IAuthenticator;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.marshal.UTF8Type;
@@ -50,7 +49,6 @@ public class BulkLoader
 {
     private static final String TOOL_NAME = "sstableloader";
     private static final String VERBOSE_OPTION  = "verbose";
-    private static final String DEBUG_OPTION  = "debug";
     private static final String HELP_OPTION  = "help";
     private static final String NOPROGRESS_OPTION  = "no-progress";
     private static final String IGNORE_NODES_OPTION  = "ignore";
@@ -68,47 +66,63 @@ public class BulkLoader
     private static final String SSL_ALGORITHM = "ssl-alg";
     private static final String SSL_STORE_TYPE = "store-type";
     private static final String SSL_CIPHER_SUITES = "ssl-ciphers";
+    private static final String CONNECTIONS_PER_HOST = "connections-per-host";
 
     public static void main(String args[])
     {
         LoaderOptions options = LoaderOptions.parseArgs(args);
-        OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug);
-        SSTableLoader loader = new SSTableLoader(options.directory, new ExternalClient(options.hosts, options.rpcPort, options.user, options.passwd, options.transportFactory), handler);
+        OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, true);
+        SSTableLoader loader = new SSTableLoader(
+                options.directory,
+                new ExternalClient(
+                        options.hosts,
+                        options.rpcPort,
+                        options.user,
+                        options.passwd,
+                        options.transportFactory),
+                handler,
+                options.connectionsPerHost);
         DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(options.throttle);
         StreamResultFuture future = null;
+
+        ProgressIndicator indicator = new ProgressIndicator();
         try
         {
             if (options.noProgress)
+            {
                 future = loader.stream(options.ignores);
+            }
             else
-                future = loader.stream(options.ignores, new ProgressIndicator());
+            {
+                future = loader.stream(options.ignores, indicator);
+            }
+
         }
         catch (Exception e)
         {
             System.err.println(e.getMessage());
             if (e.getCause() != null)
                 System.err.println(e.getCause());
-            if (options.debug)
-                e.printStackTrace(System.err);
-            else
-                System.err.println("Run with --debug to get full stack trace or --help to get help.");
+            e.printStackTrace(System.err);
             System.exit(1);
         }
 
-        handler.output(String.format("Streaming session ID: %s", future.planId));
-
         try
         {
             future.get();
+
+            if (!options.noProgress)
+                indicator.printSummary(options.connectionsPerHost);
+
+            // Give sockets time to gracefully close
+            Thread.sleep(1000);
             System.exit(0); // We need that to stop non daemonized threads
         }
         catch (Exception e)
         {
             System.err.println("Streaming to the following hosts failed:");
             System.err.println(loader.getFailedHosts());
-            System.err.println(e);
-            if (options.debug)
-                e.printStackTrace(System.err);
+            e.printStackTrace(System.err);
             System.exit(1);
         }
     }
@@ -116,13 +130,15 @@ public class BulkLoader
     // Return true when everything is at 100%
     static class ProgressIndicator implements StreamEventHandler
     {
-        private final Map<InetAddress, SessionInfo> sessionsByHost = new ConcurrentHashMap<>();
-        private final Map<InetAddress, Set<ProgressInfo>> progressByHost = new ConcurrentHashMap<>();
-
         private long start;
         private long lastProgress;
         private long lastTime;
 
+        private int peak = 0;
+        private int totalFiles = 0;
+
+        private final Multimap<InetAddress, SessionInfo> sessionsByHost = HashMultimap.create();
+
         public ProgressIndicator()
         {
             start = lastTime = System.nanoTime();
@@ -131,70 +147,100 @@ public class BulkLoader
         public void onSuccess(StreamState finalState) {}
         public void onFailure(Throwable t) {}
 
-        public void handleStreamEvent(StreamEvent event)
+        public synchronized void handleStreamEvent(StreamEvent event)
         {
             if (event.eventType == StreamEvent.Type.STREAM_PREPARED)
             {
                 SessionInfo session = ((StreamEvent.SessionPreparedEvent) event).session;
                 sessionsByHost.put(session.peer, session);
             }
-            else if (event.eventType == StreamEvent.Type.FILE_PROGRESS)
+            else if (event.eventType == StreamEvent.Type.FILE_PROGRESS || event.eventType == StreamEvent.Type.STREAM_COMPLETE)
             {
-                ProgressInfo progressInfo = ((StreamEvent.ProgressEvent) event).progress;
-
-                // update progress
-                Set<ProgressInfo> progresses = progressByHost.get(progressInfo.peer);
-                if (progresses == null)
+                ProgressInfo progressInfo = null;
+                if (event.eventType == StreamEvent.Type.FILE_PROGRESS)
                 {
-                    progresses = Sets.newSetFromMap(new ConcurrentHashMap<ProgressInfo, Boolean>());
-                    progressByHost.put(progressInfo.peer, progresses);
+                    progressInfo = ((StreamEvent.ProgressEvent) event).progress;
                 }
-                if (progresses.contains(progressInfo))
-                    progresses.remove(progressInfo);
-                progresses.add(progressInfo);
+
+                long time = System.nanoTime();
+                long deltaTime = time - lastTime;
 
                 StringBuilder sb = new StringBuilder();
                 sb.append("\rprogress: ");
 
                 long totalProgress = 0;
                 long totalSize = 0;
-                for (Map.Entry<InetAddress, Set<ProgressInfo>> entry : progressByHost.entrySet())
+
+                boolean updateTotalFiles = totalFiles == 0;
+                // recalculate progress across all sessions in all hosts and display
+                for (InetAddress peer : sessionsByHost.keySet())
                 {
-                    SessionInfo session = sessionsByHost.get(entry.getKey());
+                    sb.append("[").append(peer.toString()).append("]");
 
-                    long size = session.getTotalSizeToSend();
-                    long current = 0;
-                    int completed = 0;
-                    for (ProgressInfo progress : entry.getValue())
+                    for (SessionInfo session : sessionsByHost.get(peer))
                     {
-                        if (progress.currentBytes == progress.totalBytes)
-                            completed++;
-                        current += progress.currentBytes;
+                        long size = session.getTotalSizeToSend();
+                        long current = 0;
+                        int completed = 0;
+
+                        if (progressInfo != null && session.peer.equals(progressInfo.peer) && (session.sessionIndex == progressInfo.sessionIndex))
+                        {
+                            session.updateProgress(progressInfo);
+                        }
+                        for (ProgressInfo progress : session.getSendingFiles())
+                        {
+                            if (progress.isCompleted())
+                                completed++;
+                            current += progress.currentBytes;
+                        }
+                        totalProgress += current;
+
+                        totalSize += size;
+
+                        sb.append(session.sessionIndex).append(":");
+                        sb.append(completed).append("/").append(session.getTotalFilesToSend());
+                        sb.append(" ").append(String.format("%-3d", size == 0 ? 100L : current * 100L / size)).append("% ");
+
+                        if (updateTotalFiles)
+                            totalFiles += session.getTotalFilesToSend();
                     }
-                    totalProgress += current;
-                    totalSize += size;
-                    sb.append("[").append(entry.getKey());
-                    sb.append(" ").append(completed).append("/").append(session.getTotalFilesToSend());
-                    sb.append(" (").append(size == 0 ? 100L : current * 100L / size).append("%)] ");
                 }
-                long time = System.nanoTime();
-                long deltaTime = TimeUnit.NANOSECONDS.toMillis(time - lastTime);
+
                 lastTime = time;
                 long deltaProgress = totalProgress - lastProgress;
                 lastProgress = totalProgress;
 
-                sb.append("[total: ").append(totalSize == 0 ? 100L : totalProgress * 100L / totalSize).append("% - ");
-                sb.append(mbPerSec(deltaProgress, deltaTime)).append("MB/s");
-                sb.append(" (avg: ").append(mbPerSec(totalProgress, TimeUnit.NANOSECONDS.toMillis(time - start))).append("MB/s)]");
+                sb.append("total: ").append(totalSize == 0 ? 100L : totalProgress * 100L / totalSize).append("% ");
+                sb.append(String.format("%-3d", mbPerSec(deltaProgress, deltaTime))).append("MB/s");
+                int average = mbPerSec(totalProgress, (time - start));
+                if (average > peak)
+                    peak = average;
+                sb.append("(avg: ").append(average).append(" MB/s)");
 
-                System.out.print(sb.toString());
+                System.err.print(sb.toString());
             }
         }
 
-        private int mbPerSec(long bytes, long timeInMs)
+        private int mbPerSec(long bytes, long timeInNano)
         {
-            double bytesPerMs = ((double)bytes) / timeInMs;
-            return (int)((bytesPerMs * 1000) / (1024 * 2024));
+            double bytesPerNano = ((double)bytes) / timeInNano;
+            return (int)((bytesPerNano * 1000 * 1000 * 1000) / (1024 * 2024));
+        }
+
+        private void printSummary(int connectionsPerHost)
+        {
+            long end = System.nanoTime();
+            long durationMS = ((end - start) / (1000000));
+            int average = mbPerSec(lastProgress, (end - start));
+            StringBuilder sb = new StringBuilder();
+            sb.append("\nSummary statistics: \n");
+            sb.append(String.format("   %-30s: %-10d\n", "Connections per host: ", connectionsPerHost));
+            sb.append(String.format("   %-30s: %-10d\n", "Total files transferred: ", totalFiles));
+            sb.append(String.format("   %-30s: %-10d\n", "Total bytes transferred: ", lastProgress));
+            sb.append(String.format("   %-30s: %-10d\n", "Total duration (ms): ", durationMS));
+            sb.append(String.format("   %-30s: %-10d\n", "Average transfer rate (MB/s): ", + average));
+            sb.append(String.format("   %-30s: %-10d\n", "Peak transfer rate (MB/s): ", + peak));
+            System.err.println(sb.toString());
         }
     }
 
@@ -282,7 +328,7 @@ public class BulkLoader
             Cassandra.Client client = new Cassandra.Client(protocol);
             if (user != null && passwd != null)
             {
-                Map<String, String> credentials = new HashMap<String, String>();
+                Map<String, String> credentials = new HashMap<>();
                 credentials.put(IAuthenticator.USERNAME_KEY, user);
                 credentials.put(IAuthenticator.PASSWORD_KEY, passwd);
                 AuthenticationRequest authenticationRequest = new AuthenticationRequest(credentials);
@@ -305,9 +351,10 @@ public class BulkLoader
         public int throttle = 0;
         public ITransportFactory transportFactory = new TFramedTransportFactory();
         public EncryptionOptions encOptions = new EncryptionOptions.ClientEncryptionOptions();
+        public int connectionsPerHost = 1;
 
-        public final Set<InetAddress> hosts = new HashSet<InetAddress>();
-        public final Set<InetAddress> ignores = new HashSet<InetAddress>();
+        public final Set<InetAddress> hosts = new HashSet<>();
+        public final Set<InetAddress> ignores = new HashSet<>();
 
         LoaderOptions(File directory)
         {
@@ -354,7 +401,6 @@ public class BulkLoader
 
                 LoaderOptions opts = new LoaderOptions(dir);
 
-                opts.debug = cmd.hasOption(DEBUG_OPTION);
                 opts.verbose = cmd.hasOption(VERBOSE_OPTION);
                 opts.noProgress = cmd.hasOption(NOPROGRESS_OPTION);
 
@@ -409,6 +455,9 @@ public class BulkLoader
                     }
                 }
 
+                if (cmd.hasOption(CONNECTIONS_PER_HOST))
+                    opts.connectionsPerHost = Integer.parseInt(cmd.getOptionValue(CONNECTIONS_PER_HOST));
+
                 if(cmd.hasOption(SSL_TRUSTSTORE))
                 {
                     opts.encOptions.truststore = cmd.getOptionValue(SSL_TRUSTSTORE);
@@ -520,7 +569,6 @@ public class BulkLoader
         private static CmdLineOptions getCmdLineOptions()
         {
             CmdLineOptions options = new CmdLineOptions();
-            options.addOption(null, DEBUG_OPTION,        "display stack traces");
             options.addOption("v",  VERBOSE_OPTION,      "verbose output");
             options.addOption("h",  HELP_OPTION,         "display this help message");
             options.addOption(null, NOPROGRESS_OPTION,   "don't display progress");
@@ -531,6 +579,7 @@ public class BulkLoader
             options.addOption("u",  USER_OPTION, "username", "username for cassandra authentication");
             options.addOption("pw", PASSWD_OPTION, "password", "password for cassandra authentication");
             options.addOption("tf", TRANSPORT_FACTORY, "transport factory", "Fully-qualified ITransportFactory class name for creating a connection to cassandra");
+            options.addOption("cph", CONNECTIONS_PER_HOST, "connectionsPerHost", "number of concurrent connections-per-host.");
             // ssl connection-related options
             options.addOption("ts", SSL_TRUSTSTORE, "TRUSTSTORE", "SSL: full path to truststore");
             options.addOption("tspw", SSL_TRUSTSTORE_PW, "TRUSTSTORE-PASSWORD", "SSL: password of the truststore");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java b/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
index 60fbf40..f015a01 100644
--- a/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
+++ b/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
@@ -46,7 +46,7 @@ public class SessionInfoTest
         }
 
         StreamSummary sending = new StreamSummary(cfId, 10, 100);
-        SessionInfo info = new SessionInfo(local, summaries, Collections.singleton(sending), StreamSession.State.PREPARING);
+        SessionInfo info = new SessionInfo(local, 0, summaries, Collections.singleton(sending), StreamSession.State.PREPARING);
 
         assert info.getTotalFilesToReceive() == 45;
         assert info.getTotalFilesToSend() == 10;
@@ -57,13 +57,13 @@ public class SessionInfoTest
         assert info.getTotalFilesSent() == 0;
 
         // receive in progress
-        info.updateProgress(new ProgressInfo(local, "test.txt", ProgressInfo.Direction.IN, 50, 100));
+        info.updateProgress(new ProgressInfo(local, 0, "test.txt", ProgressInfo.Direction.IN, 50, 100));
         // still in progress, but not completed yet
         assert info.getTotalSizeReceived() == 50;
         assert info.getTotalSizeSent() == 0;
         assert info.getTotalFilesReceived() == 0;
         assert info.getTotalFilesSent() == 0;
-        info.updateProgress(new ProgressInfo(local, "test.txt", ProgressInfo.Direction.IN, 100, 100));
+        info.updateProgress(new ProgressInfo(local, 0, "test.txt", ProgressInfo.Direction.IN, 100, 100));
         // 1 file should be completed
         assert info.getTotalSizeReceived() == 100;
         assert info.getTotalSizeSent() == 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index e22fa8f..c6c04b0 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -43,7 +43,7 @@ public class StreamTransferTaskTest extends SchemaLoader
         String ks = "Keyspace1";
         String cf = "Standard1";
 
-        StreamSession session = new StreamSession(FBUtilities.getBroadcastAddress());
+        StreamSession session = new StreamSession(FBUtilities.getBroadcastAddress(), 0);
         ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(cf);
 
         // create two sstables


[3/3] git commit: Merge branch 'cassandra-2.1' into trunk

Posted by yu...@apache.org.
Merge branch 'cassandra-2.1' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0ed57bf5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0ed57bf5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0ed57bf5

Branch: refs/heads/trunk
Commit: 0ed57bf5317ad259f39f4d3ff97a651d5c18d7cc
Parents: f0def34 44fa2cd
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu May 1 17:51:54 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu May 1 17:51:54 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/io/sstable/SSTableLoader.java     |   9 +-
 .../net/IncomingStreamingConnection.java        |   2 +-
 .../cassandra/repair/StreamingRepairTask.java   |   2 +-
 .../cassandra/streaming/ConnectionHandler.java  |   7 +-
 .../cassandra/streaming/ProgressInfo.java       |  10 +-
 .../apache/cassandra/streaming/SessionInfo.java |   3 +
 .../cassandra/streaming/StreamCoordinator.java  | 272 +++++++++++++++++++
 .../apache/cassandra/streaming/StreamEvent.java |   2 +
 .../apache/cassandra/streaming/StreamPlan.java  |  34 +--
 .../cassandra/streaming/StreamResultFuture.java |  80 +++---
 .../cassandra/streaming/StreamSession.java      |  42 ++-
 .../management/ProgressInfoCompositeData.java   |  21 +-
 .../management/SessionInfoCompositeData.java    |   1 +
 .../streaming/messages/StreamInitMessage.java   |   9 +-
 .../streaming/messages/StreamMessage.java       |   2 +-
 .../org/apache/cassandra/tools/BulkLoader.java  | 169 ++++++++----
 .../cassandra/streaming/SessionInfoTest.java    |   6 +-
 .../streaming/StreamTransferTaskTest.java       |   2 +-
 19 files changed, 500 insertions(+), 174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ed57bf5/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 0c32f3c,86940ea..5ec4c89
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,12 -1,5 +1,13 @@@
 +3.0
 + * Move sstable RandomAccessReader to nio2, which allows using the
 +   FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
 + * Remove CQL2 (CASSANDRA-5918)
 + * Add Thrift get_multi_slice call (CASSANDRA-6757)
 + * Optimize fetching multiple cells by name (CASSANDRA-6933)
 + * Allow compilation in java 8 (CASSANDRA-7208)
 +
  2.1.0-rc1
+  * Parallel streaming for sstableloader (CASSANDRA-3668)
  Merged from 2.0:
   * Add Google Compute Engine snitch (CASSANDRA-7132)