You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/06/20 19:07:17 UTC

[5/5] git commit: Streaming 2.0

Streaming 2.0

patch by Yukim; reviewed by slebresne for CASSANDRA-5286


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/51511697
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/51511697
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/51511697

Branch: refs/heads/trunk
Commit: 51511697254615b570f4162bbcd2baee23a234e9
Parents: 40b6c5d
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue May 28 20:28:13 2013 -0500
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Jun 20 19:05:24 2013 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/dht/BootStrapper.java  |  19 +-
 .../org/apache/cassandra/dht/RangeStreamer.java |  69 +--
 .../cassandra/hadoop/BulkRecordWriter.java      |  15 +-
 .../cassandra/io/sstable/SSTableLoader.java     | 145 ++---
 .../net/IncomingStreamingConnection.java        |  76 +++
 .../cassandra/net/IncomingTcpConnection.java    |  54 +-
 .../apache/cassandra/net/MessagingService.java  | 102 +---
 .../cassandra/service/StorageService.java       | 273 +++------
 .../streaming/AbstractStreamSession.java        | 114 ----
 .../cassandra/streaming/ConnectionHandler.java  | 309 ++++++++++
 .../cassandra/streaming/FileStreamTask.java     | 291 ---------
 .../cassandra/streaming/IStreamCallback.java    |  36 --
 .../streaming/IncomingStreamReader.java         | 200 -------
 .../cassandra/streaming/OperationType.java      |  32 -
 .../apache/cassandra/streaming/PendingFile.java | 180 ------
 .../cassandra/streaming/ProgressInfo.java       | 110 ++++
 .../apache/cassandra/streaming/SessionInfo.java | 187 ++++++
 .../apache/cassandra/streaming/StreamEvent.java |  81 +++
 .../cassandra/streaming/StreamEventHandler.java |  31 +
 .../cassandra/streaming/StreamException.java    |  35 ++
 .../cassandra/streaming/StreamHeader.java       |  94 ---
 .../apache/cassandra/streaming/StreamIn.java    |  89 ---
 .../cassandra/streaming/StreamInSession.java    | 251 --------
 .../cassandra/streaming/StreamManager.java      |  94 +++
 .../cassandra/streaming/StreamManagerMBean.java |  32 +
 .../apache/cassandra/streaming/StreamOut.java   | 187 ------
 .../cassandra/streaming/StreamOutSession.java   | 164 -----
 .../apache/cassandra/streaming/StreamPlan.java  | 169 ++++++
 .../cassandra/streaming/StreamReader.java       | 120 ++++
 .../cassandra/streaming/StreamReceiveTask.java  |  95 +++
 .../apache/cassandra/streaming/StreamReply.java |  91 ---
 .../streaming/StreamReplyVerbHandler.java       |  63 --
 .../cassandra/streaming/StreamRequest.java      | 174 ++----
 .../streaming/StreamRequestVerbHandler.java     |  47 --
 .../cassandra/streaming/StreamResultFuture.java | 172 ++++++
 .../cassandra/streaming/StreamSession.java      | 591 +++++++++++++++++++
 .../apache/cassandra/streaming/StreamState.java |  53 ++
 .../cassandra/streaming/StreamSummary.java      | 107 ++++
 .../apache/cassandra/streaming/StreamTask.java  |  55 ++
 .../cassandra/streaming/StreamTransferTask.java |  83 +++
 .../cassandra/streaming/StreamWriter.java       | 158 +++++
 .../streaming/StreamingRepairTask.java          |  75 +--
 .../cassandra/streaming/StreamingService.java   | 110 ----
 .../streaming/StreamingServiceMBean.java        |  41 --
 .../compress/CompressedFileStreamTask.java      | 160 -----
 .../compress/CompressedInputStream.java         |   7 +-
 .../compress/CompressedStreamReader.java        | 112 ++++
 .../compress/CompressedStreamWriter.java        | 129 ++++
 .../streaming/messages/CompleteMessage.java     |  42 ++
 .../streaming/messages/FileMessage.java         | 106 ++++
 .../streaming/messages/FileMessageHeader.java   | 151 +++++
 .../streaming/messages/PrepareMessage.java      |  91 +++
 .../streaming/messages/RetryMessage.java        |  57 ++
 .../messages/SessionFailedMessage.java          |  42 ++
 .../streaming/messages/StreamInitMessage.java   | 115 ++++
 .../streaming/messages/StreamMessage.java       | 109 ++++
 .../org/apache/cassandra/tools/BulkLoader.java  | 151 ++---
 .../org/apache/cassandra/tools/NodeCmd.java     |  61 +-
 .../org/apache/cassandra/tools/NodeProbe.java   |  29 +-
 .../apache/cassandra/dht/BootStrapperTest.java  |   3 +-
 .../apache/cassandra/service/RemoveTest.java    |  38 +-
 .../cassandra/streaming/BootstrapTest.java      |  54 --
 .../cassandra/streaming/SerializationsTest.java | 220 -------
 .../cassandra/streaming/SessionInfoTest.java    |  73 +++
 .../apache/cassandra/streaming/StreamUtil.java  |  46 --
 .../streaming/StreamingTransferTest.java        | 198 ++++---
 .../compress/CompressedInputStreamTest.java     |   5 +-
 68 files changed, 4068 insertions(+), 3406 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7e1bf77..2b21d05 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -60,6 +60,7 @@
  * cqlsh: Add row count to SELECT output (CASSANDRA-5636)
  * Include a timestamp with all read commands to determine column expiration
    (CASSANDRA-5149)
+ * Streaming 2.0 (CASSANDRA-5286)
 
 1.2.6
  * Fix cross-DC mutation forwarding (CASSANDRA-5632)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index 28615cf..55d82e1 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -22,6 +22,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.*;
+import java.util.concurrent.ExecutionException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,7 +37,6 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.streaming.OperationType;
 
 public class BootStrapper
 {
@@ -63,7 +63,7 @@ public class BootStrapper
         if (logger.isDebugEnabled())
             logger.debug("Beginning bootstrap process");
 
-        RangeStreamer streamer = new RangeStreamer(tokenMetadata, address, OperationType.BOOTSTRAP);
+        RangeStreamer streamer = new RangeStreamer(tokenMetadata, address, "Bootstrap");
         streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance));
 
         for (String table : Schema.instance.getNonSystemTables())
@@ -72,8 +72,19 @@ public class BootStrapper
             streamer.addRanges(table, strategy.getPendingAddressRanges(tokenMetadata, tokens, address));
         }
 
-        streamer.fetch();
-        StorageService.instance.finishBootstrapping();
+        try
+        {
+            streamer.fetchAsync().get();
+            StorageService.instance.finishBootstrapping();
+        }
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException("Interrupted while waiting on boostrap to complete. Bootstrap will have to be restarted.");
+        }
+        catch (ExecutionException e)
+        {
+            throw new RuntimeException("Error during boostrap: " + e.getCause().getMessage(), e.getCause());
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index a0e1a93..2196f4d 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -19,14 +19,10 @@ package org.apache.cassandra.dht;
 
 import java.net.InetAddress;
 import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
 
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
-import org.apache.cassandra.streaming.IStreamCallback;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,8 +34,9 @@ import org.apache.cassandra.gms.IFailureDetector;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.streaming.OperationType;
-import org.apache.cassandra.streaming.StreamIn;
+import org.apache.cassandra.streaming.StreamPlan;
+import org.apache.cassandra.streaming.StreamResultFuture;
+import org.apache.cassandra.utils.FBUtilities;
 
 /**
  * Assists in streaming ranges to a node.
@@ -50,12 +47,10 @@ public class RangeStreamer
 
     private final TokenMetadata metadata;
     private final InetAddress address;
-    private final OperationType opType;
+    private final String description;
     private final Multimap<String, Map.Entry<InetAddress, Collection<Range<Token>>>> toFetch = HashMultimap.create();
     private final Set<ISourceFilter> sourceFilters = new HashSet<ISourceFilter>();
-    // protected for testing.
-    protected CountDownLatch latch;
-    private Set<Range<Token>> completed = Collections.newSetFromMap(new ConcurrentHashMap<Range<Token>, Boolean>());
+    private final StreamPlan streamPlan;
 
     /**
      * A filter applied to sources to stream from when constructing a fetch map.
@@ -104,11 +99,12 @@ public class RangeStreamer
         }
     }
 
-    public RangeStreamer(TokenMetadata metadata, InetAddress address, OperationType opType)
+    public RangeStreamer(TokenMetadata metadata, InetAddress address, String description)
     {
         this.metadata = metadata;
         this.address = address;
-        this.opType = opType;
+        this.description = description;
+        this.streamPlan = new StreamPlan(description);
     }
 
     public void addSourceFilter(ISourceFilter filter)
@@ -123,7 +119,7 @@ public class RangeStreamer
         if (logger.isDebugEnabled())
         {
             for (Map.Entry<Range<Token>, InetAddress> entry: rangesForTable.entries())
-                logger.debug(String.format("%s: range %s exists on %s", opType, entry.getKey(), entry.getValue()));
+                logger.debug(String.format("%s: range %s exists on %s", description, entry.getKey(), entry.getValue()));
         }
 
         for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : getRangeFetchMap(rangesForTable, sourceFilters).asMap().entrySet())
@@ -131,7 +127,7 @@ public class RangeStreamer
             if (logger.isDebugEnabled())
             {
                 for (Range r : entry.getValue())
-                    logger.debug(String.format("%s: range %s from source %s for table %s", opType, r, entry.getKey(), table));
+                    logger.debug(String.format("%s: range %s from source %s for table %s", description, r, entry.getKey(), table));
             }
             toFetch.put(table, entry);
         }
@@ -219,50 +215,19 @@ public class RangeStreamer
         return toFetch;
     }
 
-    public void fetch()
+    public StreamResultFuture fetchAsync()
     {
-        latch = new CountDownLatch(toFetch.entries().size());
-
         for (Map.Entry<String, Map.Entry<InetAddress, Collection<Range<Token>>>> entry : toFetch.entries())
         {
-            final String table = entry.getKey();
-            final InetAddress source = entry.getValue().getKey();
-            final Collection<Range<Token>> ranges = entry.getValue().getValue();
+            String keyspace = entry.getKey();
+            InetAddress source = entry.getValue().getKey();
+            Collection<Range<Token>> ranges = entry.getValue().getValue();
             /* Send messages to respective folks to stream data over to me */
-            IStreamCallback callback = new IStreamCallback()
-            {
-                public void onSuccess()
-                {
-                    completed.addAll(ranges);
-                    latch.countDown();
-                    if (logger.isDebugEnabled())
-                        logger.debug(String.format("Removed %s/%s as a %s source; remaining is %s",
-                                     source, table, opType, latch.getCount()));
-                }
-
-                public void onFailure()
-                {
-                    latch.countDown();
-                    logger.warn("Streaming from " + source + " failed");
-                }
-            };
             if (logger.isDebugEnabled())
-                logger.debug("" + opType + "ing from " + source + " ranges " + StringUtils.join(ranges, ", "));
-            StreamIn.requestRanges(source, table, ranges, callback, opType);
+                logger.debug("" + description + "ing from " + source + " ranges " + StringUtils.join(ranges, ", "));
+            streamPlan.requestRanges(source, keyspace, ranges);
         }
 
-        try
-        {
-            latch.await();
-            for (Map.Entry<String, Map.Entry<InetAddress, Collection<Range<Token>>>> entry : toFetch.entries())
-            {
-                if (!completed.containsAll(entry.getValue().getValue()))
-                    throw new RuntimeException(String.format("Unable to fetch range %s for keyspace %s from any hosts", entry.getValue().getValue(), entry.getKey()));
-            }
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
+        return streamPlan.execute();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
index de6d2e0..704d19f 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
@@ -23,6 +23,8 @@ import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.net.UnknownHostException;
 import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -38,6 +40,7 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.SSTableLoader;
 import org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter;
+import org.apache.cassandra.streaming.StreamState;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.OutputHandler;
 import org.apache.hadoop.conf.Configuration;
@@ -221,7 +224,7 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
         if (writer != null)
         {
             writer.close();
-            SSTableLoader.LoaderFuture future = loader.stream();
+            Future<StreamState> future = loader.stream();
             while (true)
             {
                 try
@@ -229,7 +232,7 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
                     future.get(1000, TimeUnit.MILLISECONDS);
                     break;
                 }
-                catch (TimeoutException te)
+                catch (ExecutionException | TimeoutException te)
                 {
                     progress.progress();
                 }
@@ -238,12 +241,12 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
                     throw new IOException(e);
                 }
             }
-            if (future.hadFailures())
+            if (loader.getFailedHosts().size() > 0)
             {
-                if (future.getFailedHosts().size() > maxFailures)
-                    throw new IOException("Too many hosts failed: " + future.getFailedHosts());
+                if (loader.getFailedHosts().size() > maxFailures)
+                    throw new IOException("Too many hosts failed: " + loader.getFailedHosts());
                 else
-                    logger.warn("Some hosts failed: " + future.getFailedHosts());
+                    logger.warn("Some hosts failed: " + loader.getFailedHosts());
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 56bb1d4..4d54cd9 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -22,30 +22,29 @@ import java.io.FilenameFilter;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.*;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import org.apache.cassandra.config.Config;
-import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.streaming.*;
-import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.OutputHandler;
+import org.apache.cassandra.utils.Pair;
 
 /**
  * Cassandra SSTable bulk loader.
  * Load an externally created sstable into a cluster.
  */
-public class SSTableLoader
+public class SSTableLoader implements StreamEventHandler
 {
     private final File directory;
     private final String keyspace;
     private final Client client;
     private final OutputHandler outputHandler;
+    private final Set<InetAddress> failedHosts = new HashSet<>();
 
     static
     {
@@ -109,110 +108,61 @@ public class SSTableLoader
         return sstables;
     }
 
-    public LoaderFuture stream()
+    public StreamResultFuture stream()
     {
         return stream(Collections.<InetAddress>emptySet());
     }
 
-    public LoaderFuture stream(Set<InetAddress> toIgnore)
+    public StreamResultFuture stream(Set<InetAddress> toIgnore)
     {
         client.init(keyspace);
 
+        StreamPlan plan = new StreamPlan("Bulk Load");
         Collection<SSTableReader> sstables = openSSTables();
         if (sstables.isEmpty())
         {
-            outputHandler.output("No sstables to stream");
-            return new LoaderFuture(0);
+            // return empty result
+            return plan.execute();
         }
-
         Map<InetAddress, Collection<Range<Token>>> endpointToRanges = client.getEndpointToRangesMap();
-        outputHandler.output(String.format("Streaming revelant part of %s to %s", names(sstables), endpointToRanges.keySet()));
+        outputHandler.output(String.format("Streaming relevant part of %sto %s", names(sstables), endpointToRanges.keySet()));
 
-        // There will be one streaming session by endpoint
-        LoaderFuture future = new LoaderFuture(endpointToRanges.size());
         for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : endpointToRanges.entrySet())
         {
             InetAddress remote = entry.getKey();
             if (toIgnore.contains(remote))
-            {
-                future.latch.countDown();
                 continue;
-            }
             Collection<Range<Token>> ranges = entry.getValue();
-            StreamOutSession session = StreamOutSession.create(keyspace, remote, new CountDownCallback(future, remote));
             // transferSSTables assumes references have been acquired
             SSTableReader.acquireReferences(sstables);
-            StreamOut.transferSSTables(session, sstables, ranges, OperationType.BULK_LOAD);
-            future.setPendings(remote, session.getFiles());
+            plan.transferFiles(remote, ranges, sstables);
         }
-        return future;
+        StreamResultFuture bulkResult = plan.execute();
+        bulkResult.addEventListener(this);
+        return bulkResult;
     }
 
-    public static class LoaderFuture implements Future<Void>
-    {
-        final CountDownLatch latch;
-        final Map<InetAddress, Collection<PendingFile>> pendingFiles;
-        private List<InetAddress> failedHosts = new ArrayList<InetAddress>();
-
-        private LoaderFuture(int request)
-        {
-            latch = new CountDownLatch(request);
-            pendingFiles = new HashMap<InetAddress, Collection<PendingFile>>();
-        }
-
-        private void setPendings(InetAddress remote, Collection<PendingFile> files)
-        {
-            pendingFiles.put(remote, new ArrayList(files));
-        }
+    public void onSuccess(StreamState finalState) {}
+    public void onFailure(Throwable t) {}
 
-        private void setFailed(InetAddress addr)
-        {
-            failedHosts.add(addr);
-        }
-
-        public List<InetAddress> getFailedHosts()
-        {
-            return failedHosts;
-        }
-
-        public boolean cancel(boolean mayInterruptIfRunning)
-        {
-            throw new UnsupportedOperationException("Cancellation is not yet supported");
-        }
-
-        public Void get() throws InterruptedException
-        {
-            latch.await();
-            return null;
-        }
-
-        public Void get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
-        {
-            if (latch.await(timeout, unit))
-                return null;
-            else
-                throw new TimeoutException();
-        }
-
-        public boolean isCancelled()
-        {
-            // For now, cancellation is not supported, maybe one day...
-            return false;
-        }
-
-        public boolean isDone()
-        {
-            return latch.getCount() == 0;
-        }
-
-        public boolean hadFailures()
+    public void handleStreamEvent(StreamEvent event)
+    {
+        if (event.eventType == StreamEvent.Type.FILE_PROGRESS)
         {
-            return failedHosts.size() > 0;
+            ProgressInfo progress = ((StreamEvent.ProgressEvent) event).progress;
+            StringBuilder sb = new StringBuilder("\r");
+            sb.append(progress.fileName);
+            sb.append(": ");
+            sb.append(progress.currentBytes).append("/").append(progress.totalBytes);
+            System.out.print(sb.toString());
+            if (progress.currentBytes == progress.totalBytes)
+                System.out.println();
         }
-
-        public Map<InetAddress, Collection<PendingFile>> getPendingFiles()
+        else if (event.eventType == StreamEvent.Type.STREAM_COMPLETE)
         {
-            return pendingFiles;
+            StreamEvent.SessionCompleteEvent se = (StreamEvent.SessionCompleteEvent) event;
+            if (!se.success)
+                failedHosts.add(se.peer);
         }
     }
 
@@ -224,34 +174,9 @@ public class SSTableLoader
         return builder.toString();
     }
 
-    private class CountDownCallback implements IStreamCallback
+    public Set<InetAddress> getFailedHosts()
     {
-        private final InetAddress endpoint;
-        private final LoaderFuture future;
-
-        CountDownCallback(LoaderFuture future, InetAddress endpoint)
-        {
-            this.future = future;
-            this.endpoint = endpoint;
-        }
-
-        public void onSuccess()
-        {
-            future.latch.countDown();
-            outputHandler.debug(String.format("Streaming session to %s completed (waiting on %d outstanding sessions)", endpoint, future.latch.getCount()));
-
-            // There could be race with stop being called twice but it should be ok
-            if (future.latch.getCount() == 0)
-                client.stop();
-        }
-
-        public void onFailure()
-        {
-            outputHandler.output(String.format("Streaming session to %s failed", endpoint));
-            future.setFailed(endpoint);
-            future.latch.countDown();
-            client.stop();
-        }
+        return failedHosts;
     }
 
     public static abstract class Client

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
new file mode 100644
index 0000000..dd5b7b4
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.Socket;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.messages.StreamInitMessage;
+import org.apache.cassandra.streaming.messages.StreamMessage;
+
+/**
+ * Thread to consume stream init messages.
+ */
+public class IncomingStreamingConnection extends Thread
+{
+    private static final Logger logger = LoggerFactory.getLogger(IncomingStreamingConnection.class);
+
+    private final int version;
+    private final Socket socket;
+
+    public IncomingStreamingConnection(int version, Socket socket)
+    {
+        super("stream-init " + socket.getRemoteSocketAddress());
+        this.version = version;
+        this.socket = socket;
+    }
+
+    @Override
+    public void run()
+    {
+        try
+        {
+            // streaming connections are per-session and have a fixed version.  we can't do anything with a wrong-version stream connection, so drop it.
+            if (version != StreamMessage.CURRENT_VERSION)
+                throw new IOException(String.format("Received stream using protocol version %d (my version %d). Terminating connection", version, MessagingService.current_version));
+
+            DataInput input = new DataInputStream(socket.getInputStream());
+            StreamInitMessage init = StreamInitMessage.serializer.deserialize(input, version);
+
+            StreamSession.startReceivingStreamAsync(init.planId, init.description, socket, version);
+        }
+        catch (IOException e)
+        {
+            logger.debug("IOException reading from socket; closing", e);
+            try
+            {
+                socket.close();
+            }
+            catch (IOException e2)
+            {
+                logger.debug("error closing socket", e2);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
index 1977086..4130006 100644
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
@@ -24,24 +24,25 @@ import java.net.SocketException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.xerial.snappy.SnappyInputStream;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.io.util.FastByteArrayInputStream;
-import org.apache.cassandra.streaming.IncomingStreamReader;
-import org.apache.cassandra.streaming.StreamHeader;
-import org.xerial.snappy.SnappyInputStream;
 
 public class IncomingTcpConnection extends Thread
 {
     private static final Logger logger = LoggerFactory.getLogger(IncomingTcpConnection.class);
 
+    private final int version;
+    private final boolean compressed;
     private final Socket socket;
     public InetAddress from;
 
-    public IncomingTcpConnection(Socket socket)
+    public IncomingTcpConnection(int version, boolean compressed, Socket socket)
     {
         assert socket != null;
+        this.version = version;
+        this.compressed = compressed;
         this.socket = socket;
         if (DatabaseDescriptor.getInternodeRecvBufferSize() != null)
         {
@@ -66,20 +67,10 @@ public class IncomingTcpConnection extends Thread
     {
         try
         {
-            // determine the connection type to decide whether to buffer
-            DataInputStream in = new DataInputStream(socket.getInputStream());
-            MessagingService.validateMagic(in.readInt());
-            int header = in.readInt();
-            boolean isStream = MessagingService.getBits(header, 3, 1) == 1;
-            int version = MessagingService.getBits(header, 15, 8);
-            logger.debug("Connection version {} from {}", version, socket.getInetAddress());
-
-            if (isStream)
-                handleStream(in, version);
-            else if (version < MessagingService.VERSION_12)
-                handleLegacyVersion(version);
+            if (version < MessagingService.VERSION_12)
+                handleLegacyVersion();
             else
-                handleModernVersion(version, header);
+                handleModernVersion();
         }
         catch (EOFException e)
         {
@@ -96,7 +87,7 @@ public class IncomingTcpConnection extends Thread
         }
     }
 
-    private void handleModernVersion(int version, int header) throws IOException
+    private void handleModernVersion() throws IOException
     {
         DataOutputStream out = new DataOutputStream(socket.getOutputStream());
         out.writeInt(MessagingService.current_version);
@@ -105,7 +96,6 @@ public class IncomingTcpConnection extends Thread
         DataInputStream in = new DataInputStream(socket.getInputStream());
         int maxVersion = in.readInt();
         from = CompactEndpointSerializationHelper.deserialize(in);
-        boolean compressed = MessagingService.getBits(header, 2, 1) == 1;
 
         if (compressed)
         {
@@ -136,28 +126,11 @@ public class IncomingTcpConnection extends Thread
         }
     }
 
-    private void handleLegacyVersion(int version)
+    private void handleLegacyVersion()
     {
         throw new UnsupportedOperationException("Unable to read obsolete message version " + version + "; the earliest version supported is 1.2.0");
     }
 
-    private void handleStream(DataInputStream input, int version) throws IOException
-    {
-        if (version == MessagingService.current_version)
-        {
-            int size = input.readInt();
-            byte[] headerBytes = new byte[size];
-            input.readFully(headerBytes);
-            stream(StreamHeader.serializer.deserialize(new DataInputStream(new FastByteArrayInputStream(headerBytes)), version), input);
-        }
-        else
-        {
-            // streaming connections are per-session and have a fixed version.  we can't do anything with a wrong-version stream connection, so drop it.
-            logger.error("Received stream using protocol version {} (my version {}). Terminating connection",
-                         version, MessagingService.current_version);
-        }
-    }
-
     private InetAddress receiveMessage(DataInputStream input, int version) throws IOException
     {
         int id;
@@ -204,9 +177,4 @@ public class IncomingTcpConnection extends Thread
                 logger.debug("error closing socket", e);
         }
     }
-
-    private void stream(StreamHeader streamHeader, DataInputStream input) throws IOException
-    {
-        new IncomingStreamReader(streamHeader, socket).read();
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 1569faf..8e94f8e 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -17,13 +17,9 @@
  */
 package org.apache.cassandra.net;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOError;
-import java.io.IOException;
+import java.io.*;
 import java.lang.management.ManagementFactory;
 import java.net.*;
-import java.nio.ByteBuffer;
 import java.nio.channels.AsynchronousCloseException;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.ServerSocketChannel;
@@ -53,7 +49,6 @@ import org.apache.cassandra.gms.GossipDigestAck;
 import org.apache.cassandra.gms.GossipDigestAck2;
 import org.apache.cassandra.gms.GossipDigestSyn;
 import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.locator.ILatencySubscriber;
 import org.apache.cassandra.metrics.ConnectionMetrics;
 import org.apache.cassandra.metrics.DroppedMessageMetrics;
@@ -63,7 +58,6 @@ import org.apache.cassandra.service.*;
 import org.apache.cassandra.service.paxos.Commit;
 import org.apache.cassandra.service.paxos.PrepareResponse;
 import org.apache.cassandra.streaming.*;
-import org.apache.cassandra.streaming.compress.CompressedFileStreamTask;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.*;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
@@ -92,8 +86,8 @@ public final class MessagingService implements MessagingServiceMBean
         REQUEST_RESPONSE, // client-initiated reads and writes
         @Deprecated STREAM_INITIATE,
         @Deprecated STREAM_INITIATE_DONE,
-        STREAM_REPLY,
-        STREAM_REQUEST,
+        @Deprecated STREAM_REPLY,
+        @Deprecated STREAM_REQUEST,
         RANGE_SLICE,
         @Deprecated BOOTSTRAP_TOKEN,
         TREE_REQUEST,
@@ -192,8 +186,6 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.MUTATION, RowMutation.serializer);
         put(Verb.READ_REPAIR, RowMutation.serializer);
         put(Verb.READ, ReadCommand.serializer);
-        put(Verb.STREAM_REPLY, StreamReply.serializer);
-        put(Verb.STREAM_REQUEST, StreamRequest.serializer);
         put(Verb.RANGE_SLICE, RangeSliceCommand.serializer);
         put(Verb.BOOTSTRAP_TOKEN, BootStrapper.StringSerializer.instance);
         put(Verb.TREE_REQUEST, ActiveRepairService.TreeRequest.serializer);
@@ -637,34 +629,6 @@ public final class MessagingService implements MessagingServiceMBean
         return iar;
     }
 
-    /**
-     * Stream a file from source to destination. This is highly optimized
-     * to not hold any of the contents of the file in memory.
-     *
-     * @param header Header contains file to stream and other metadata.
-     * @param to     endpoint to which we need to stream the file.
-     */
-
-    public void stream(StreamHeader header, InetAddress to)
-    {
-        DebuggableThreadPoolExecutor executor = streamExecutors.get(to);
-        if (executor == null)
-        {
-            // Using a core pool size of 0 is important. See documentation of streamExecutors.
-            executor = DebuggableThreadPoolExecutor.createWithMaximumPoolSize("Streaming to " + to, 1, 1, TimeUnit.SECONDS);
-            DebuggableThreadPoolExecutor old = streamExecutors.putIfAbsent(to, executor);
-            if (old != null)
-            {
-                executor.shutdown();
-                executor = old;
-            }
-        }
-
-        executor.execute(header.file == null || header.file.compressionInfo == null
-                         ? new FileStreamTask(header, to)
-                         : new CompressedFileStreamTask(header, to));
-    }
-
     public void register(ILatencySubscriber subcriber)
     {
         subscribers.add(subcriber);
@@ -763,44 +727,6 @@ public final class MessagingService implements MessagingServiceMBean
         return packed >>> (start + 1) - count & ~(-1 << count);
     }
 
-    public ByteBuffer constructStreamHeader(StreamHeader streamHeader, boolean compress, int version)
-    {
-        int header = 0;
-        // set compression bit.
-        if (compress)
-            header |= 4;
-        // set streaming bit
-        header |= 8;
-        // Setting up the version bit
-        header |= (version << 8);
-
-        /* Adding the StreamHeader which contains the session Id along
-         * with the pendingfile info for the stream.
-         * | Session Id | Pending File Size | Pending File | Bool more files |
-         * | No. of Pending files | Pending Files ... |
-         */
-        byte[] bytes;
-        try
-        {
-            DataOutputBuffer buffer = new DataOutputBuffer();
-            StreamHeader.serializer.serialize(streamHeader, buffer, version);
-            bytes = buffer.getData();
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-        assert bytes.length > 0;
-
-        ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + 4 + bytes.length);
-        buffer.putInt(PROTOCOL_MAGIC);
-        buffer.putInt(header);
-        buffer.putInt(bytes.length);
-        buffer.put(bytes);
-        buffer.flip();
-        return buffer;
-    }
-
     /**
      * @return the last version associated with address, or @param version if this is the first such version
      */
@@ -885,9 +811,29 @@ public final class MessagingService implements MessagingServiceMBean
                 {
                     Socket socket = server.accept();
                     if (authenticate(socket))
-                        new IncomingTcpConnection(socket).start();
+                    {
+                        // determine the connection type to decide whether to buffer
+                        DataInputStream in = new DataInputStream(socket.getInputStream());
+                        MessagingService.validateMagic(in.readInt());
+                        int header = in.readInt();
+                        boolean isStream = MessagingService.getBits(header, 3, 1) == 1;
+                        int version = MessagingService.getBits(header, 15, 8);
+                        logger.debug("Connection version {} from {}", version, socket.getInetAddress());
+
+                        if (isStream)
+                        {
+                            new IncomingStreamingConnection(version, socket).start();
+                        }
+                        else
+                        {
+                            boolean compressed = MessagingService.getBits(header, 2, 1) == 1;
+                            new IncomingTcpConnection(version, compressed, socket).start();
+                        }
+                    }
                     else
+                    {
                         socket.close();
+                    }
                 }
                 catch (AsynchronousCloseException e)
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index f6becd0..5e30207 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -37,6 +37,8 @@ import javax.management.ObjectName;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.AtomicDouble;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.Uninterruptibles;
 
 import org.apache.log4j.Level;
@@ -214,6 +216,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         {
             jmxObjectName = new ObjectName("org.apache.cassandra.db:type=StorageService");
             mbs.registerMBean(this, jmxObjectName);
+            mbs.registerMBean(StreamManager.instance, new ObjectName(StreamManager.OBJECT_NAME));
         }
         catch (Exception e)
         {
@@ -232,8 +235,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAXOS_COMMIT, new CommitVerbHandler());
 
         // see BootStrapper for a summary of how the bootstrap verbs interact
-        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.STREAM_REQUEST, new StreamRequestVerbHandler());
-        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.STREAM_REPLY, new StreamReplyVerbHandler());
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.REPLICATION_FINISHED, new ReplicationFinishedVerbHandler());
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.REQUEST_RESPONSE, new ResponseVerbHandler());
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.INTERNAL_RESPONSE, new ResponseVerbHandler());
@@ -253,10 +254,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.SNAPSHOT, new SnapshotVerbHandler());
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.ECHO, new EchoVerbHandler());
-
-        // spin up the streaming service so it is available for jmx tools.
-        if (StreamingService.instance == null)
-            throw new RuntimeException("Streaming service is unavailable.");
     }
 
     public void registerDaemon(CassandraDaemon daemon)
@@ -792,7 +789,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     {
         logger.info("rebuild from dc: {}", sourceDc == null ? "(any dc)" : sourceDc);
 
-        RangeStreamer streamer = new RangeStreamer(tokenMetadata, FBUtilities.getBroadcastAddress(), OperationType.REBUILD);
+        RangeStreamer streamer = new RangeStreamer(tokenMetadata, FBUtilities.getBroadcastAddress(), "Rebuild");
         streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance));
         if (sourceDc != null)
             streamer.addSourceFilter(new RangeStreamer.SingleDatacenterFilter(DatabaseDescriptor.getEndpointSnitch(), sourceDc));
@@ -800,7 +797,20 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         for (String table : Schema.instance.getNonSystemTables())
             streamer.addRanges(table, getLocalRanges(table));
 
-        streamer.fetch();
+        try
+        {
+            streamer.fetchAsync().get();
+        }
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException("Interrupted while waiting on rebuild streaming");
+        }
+        catch (ExecutionException e)
+        {
+            // This is used exclusively through JMX, so log the full trace but only throw a simple RTE
+            logger.error("Error while rebuilding node", e.getCause());
+            throw new RuntimeException("Error while rebuilding node: " + e.getCause().getMessage());
+        }
     }
 
     public void setStreamThroughputMbPerSec(int value)
@@ -1775,7 +1785,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      */
     private void restoreReplicaCount(InetAddress endpoint, final InetAddress notifyEndpoint)
     {
-        final Multimap<InetAddress, String> fetchSources = HashMultimap.create();
         Multimap<String, Map.Entry<InetAddress, Collection<Range<Token>>>> rangesToFetch = HashMultimap.create();
 
         final InetAddress myAddress = FBUtilities.getBroadcastAddress();
@@ -1792,40 +1801,37 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             Multimap<InetAddress, Range<Token>> sourceRanges = getNewSourceRanges(table, myNewRanges);
             for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : sourceRanges.asMap().entrySet())
             {
-                fetchSources.put(entry.getKey(), table);
                 rangesToFetch.put(table, entry);
             }
         }
 
+        StreamPlan stream = new StreamPlan("Restore replica count");
         for (final String table : rangesToFetch.keySet())
         {
             for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : rangesToFetch.get(table))
             {
                 final InetAddress source = entry.getKey();
                 Collection<Range<Token>> ranges = entry.getValue();
-                final IStreamCallback callback = new IStreamCallback()
-                {
-                    public void onSuccess()
-                    {
-                        synchronized (fetchSources)
-                        {
-                            fetchSources.remove(source, table);
-                            if (fetchSources.isEmpty())
-                                sendReplicationNotification(notifyEndpoint);
-                        }
-                    }
-
-                    public void onFailure()
-                    {
-                        logger.warn("Streaming from " + source + " failed");
-                        onSuccess(); // calling onSuccess to send notification
-                    }
-                };
                 if (logger.isDebugEnabled())
                     logger.debug("Requesting from " + source + " ranges " + StringUtils.join(ranges, ", "));
-                StreamIn.requestRanges(source, table, ranges, callback, OperationType.RESTORE_REPLICA_COUNT);
+                stream.requestRanges(source, table, ranges);
             }
         }
+        StreamResultFuture future = stream.execute();
+        Futures.addCallback(future, new FutureCallback<StreamState>()
+        {
+            public void onSuccess(StreamState finalState)
+            {
+                sendReplicationNotification(notifyEndpoint);
+            }
+
+            public void onFailure(Throwable t)
+            {
+                logger.warn("Streaming to restore replica count failed", t);
+                // We still want to send the notification
+                sendReplicationNotification(notifyEndpoint);
+            }
+        });
     }
 
     // needs to be modified to accept either a table or ARS.
@@ -2694,17 +2700,17 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
         setMode(Mode.LEAVING, "streaming data to other nodes", true);
 
-        CountDownLatch latch = streamRanges(rangesToStream);
-        CountDownLatch hintsLatch = streamHints();
+        Future<StreamState> streamSuccess = streamRanges(rangesToStream);
+        Future<StreamState> hintsSuccess = streamHints();
 
         // wait for the transfer runnables to signal the latch.
         logger.debug("waiting for stream aks.");
         try
         {
-            latch.await();
-            hintsLatch.await();
+            streamSuccess.get();
+            hintsSuccess.get();
         }
-        catch (InterruptedException e)
+        catch (ExecutionException | InterruptedException e)
         {
             throw new RuntimeException(e);
         }
@@ -2713,10 +2719,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         onFinish.run();
     }
 
-    private CountDownLatch streamHints()
+    private Future<StreamState> streamHints()
     {
         if (HintedHandOffManager.instance.listEndpointsPendingHints().size() == 0)
-            return new CountDownLatch(0);
+            return Futures.immediateFuture(null);
 
         // gather all live nodes in the cluster that aren't also leaving
         List<InetAddress> candidates = new ArrayList<InetAddress>(StorageService.instance.getTokenMetadata().cloneAfterAllLeft().getAllEndpoints());
@@ -2731,7 +2737,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         if (candidates.isEmpty())
         {
             logger.warn("Unable to stream hints since no live endpoints seen");
-            return new CountDownLatch(0);
+            return Futures.immediateFuture(null);
         }
         else
         {
@@ -2743,14 +2749,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             Token token = StorageService.getPartitioner().getMinimumToken();
             List<Range<Token>> ranges = Collections.singletonList(new Range<Token>(token, token));
 
-            CountDownLatch latch = new CountDownLatch(1);
-            StreamOut.transferRanges(hintsDestinationHost,
-                                     Table.open(Table.SYSTEM_KS),
-                                     Collections.singletonList(Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.HINTS_CF)),
-                                     ranges,
-                                     new CountingDownStreamCallback(latch, hintsDestinationHost),
-                                     OperationType.UNBOOTSTRAP);
-            return latch;
+            return new StreamPlan("Hints").transferRanges(hintsDestinationHost,
+                                                                      Table.SYSTEM_KS,
+                                                                      ranges,
+                                                                      SystemTable.HINTS_CF)
+                                                      .execute();
         }
     }
 
@@ -2812,25 +2815,19 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         if (relocator.streamsNeeded())
         {
             setMode(Mode.MOVING, "fetching new ranges and streaming old ranges", true);
-
-            relocator.logStreamsMap("[Move->STREAMING]");
-            CountDownLatch streamLatch = relocator.streams();
-
-            relocator.logRequestsMap("[Move->FETCHING]");
-            CountDownLatch fetchLatch = relocator.requests();
-
             try
             {
-                streamLatch.await();
-                fetchLatch.await();
+                relocator.stream().get();
             }
-            catch (InterruptedException e)
+            catch (ExecutionException | InterruptedException e)
             {
-                throw new RuntimeException("Interrupted latch while waiting for stream/fetch ranges to finish: " + e.getMessage());
+                throw new RuntimeException("Interrupted while waiting for stream/fetch ranges to finish: " + e.getMessage());
             }
         }
         else
+        {
             setMode(Mode.MOVING, "No ranges to fetch/stream", true);
+        }
 
         setTokens(Collections.singleton(newToken)); // setting new token as we have everything settled
 
@@ -2840,8 +2837,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     private class RangeRelocator
     {
-        private Map<String, Multimap<InetAddress, Range<Token>>> rangesToFetch = new HashMap<String, Multimap<InetAddress, Range<Token>>>();
-        private Map<String, Multimap<Range<Token>, InetAddress>> rangesToStreamByTable = new HashMap<String, Multimap<Range<Token>, InetAddress>>();
+        private StreamPlan streamPlan = new StreamPlan("Bootstrap");
 
         private RangeRelocator(Collection<Token> tokens, List<String> tables)
         {
@@ -2856,15 +2852,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             // clone to avoid concurrent modification in calculateNaturalEndpoints
             TokenMetadata tokenMetaClone = tokenMetadata.cloneOnlyTokenMap();
 
-            for (String table : tables)
+            for (String keyspace : tables)
             {
                 for (Token newToken : newTokens)
                 {
                     // replication strategy of the current keyspace (aka table)
-                    AbstractReplicationStrategy strategy = Table.open(table).getReplicationStrategy();
+                    AbstractReplicationStrategy strategy = Table.open(keyspace).getReplicationStrategy();
 
                     // getting collection of the currently used ranges by this keyspace
-                    Collection<Range<Token>> currentRanges = getRangesForEndpoint(table, localAddress);
+                    Collection<Range<Token>> currentRanges = getRangesForEndpoint(keyspace, localAddress);
                     // collection of ranges which this node will serve after move to the new token
                     Collection<Range<Token>> updatedRanges = strategy.getPendingAddressRanges(tokenMetadata, newToken, localAddress);
 
@@ -2895,51 +2891,39 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
                     // calculating endpoints to stream current ranges to if needed
                     // in some situations node will handle current ranges as part of the new ranges
-                    Multimap<Range<Token>, InetAddress> rangeWithEndpoints = HashMultimap.create();
-
+                    Multimap<InetAddress, Range<Token>> endpointRanges = HashMultimap.create();
                     for (Range<Token> toStream : rangesPerTable.left)
                     {
                         Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(toStream.right, tokenMetaClone));
                         Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(toStream.right, tokenMetaCloneAllSettled));
                         logger.debug("Range:" + toStream + "Current endpoints: " + currentEndpoints + " New endpoints: " + newEndpoints);
-                        rangeWithEndpoints.putAll(toStream, Sets.difference(newEndpoints, currentEndpoints));
+                        for (InetAddress address : Sets.difference(newEndpoints, currentEndpoints))
+                            endpointRanges.put(address, toStream);
                     }
 
-                    // associating table with range-to-endpoints map
-                    rangesToStreamByTable.put(table, rangeWithEndpoints);
+                    // stream ranges
+                    for (InetAddress address : endpointRanges.keySet())
+                        streamPlan.transferRanges(address, keyspace, endpointRanges.get(address));
 
+                    // stream requests
                     Multimap<InetAddress, Range<Token>> workMap = RangeStreamer.getWorkMap(rangesToFetchWithPreferredEndpoints);
-                    rangesToFetch.put(table, workMap);
+                    for (InetAddress address : workMap.keySet())
+                        streamPlan.requestRanges(address, keyspace, workMap.get(address));
 
                     if (logger.isDebugEnabled())
-                        logger.debug("Table {}: work map {}.", table, workMap);
+                        logger.debug("Table {}: work map {}.", keyspace, workMap);
                 }
             }
         }
 
-        private void logStreamsMap(String prefix)
-        {
-            logger.debug("{} Work map: {}", prefix, rangesToStreamByTable);
-        }
-
-        private void logRequestsMap(String prefix)
+        public Future<StreamState> stream()
         {
-            logger.debug("{} Work map: {}", prefix, rangesToFetch);
+            return streamPlan.execute();
         }
 
-        private boolean streamsNeeded()
+        public boolean streamsNeeded()
         {
-            return !rangesToStreamByTable.isEmpty() || !rangesToFetch.isEmpty();
-        }
-
-        private CountDownLatch streams()
-        {
-            return streamRanges(rangesToStreamByTable);
-        }
-
-        private CountDownLatch requests()
-        {
-            return requestRanges(rangesToFetch);
+            return !streamPlan.isEmpty();
         }
     }
 
@@ -2995,25 +2979,19 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         if (relocator.streamsNeeded())
         {
             setMode(Mode.RELOCATING, "fetching new ranges and streaming old ranges", true);
-
-            relocator.logStreamsMap("[Relocate->STREAMING]");
-            CountDownLatch streamLatch = relocator.streams();
-
-            relocator.logRequestsMap("[Relocate->FETCHING]");
-            CountDownLatch fetchLatch = relocator.requests();
-
             try
             {
-                streamLatch.await();
-                fetchLatch.await();
+                relocator.stream().get();
             }
-            catch (InterruptedException e)
+            catch (ExecutionException | InterruptedException e)
             {
                 throw new RuntimeException("Interrupted latch while waiting for stream/fetch ranges to finish: " + e.getMessage());
             }
         }
         else
+        {
             setMode(Mode.RELOCATING, "no new ranges to stream/fetch", true);
+        }
 
         Collection<Token> currentTokens = SystemTable.updateLocalTokens(tokens, Collections.<Token>emptyList());
         tokenMetadata.updateNormalTokens(currentTokens, FBUtilities.getBroadcastAddress());
@@ -3420,26 +3398,21 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      * Seed data to the endpoints that will be responsible for it at the future
      *
      * @param rangesToStreamByTable tables and data ranges with endpoints included for each
-     * @return latch to count down
+     * @return async Future for whether stream was success
      */
-    private CountDownLatch streamRanges(final Map<String, Multimap<Range<Token>, InetAddress>> rangesToStreamByTable)
+    private Future<StreamState> streamRanges(final Map<String, Multimap<Range<Token>, InetAddress>> rangesToStreamByTable)
     {
         // First, we build a list of ranges to stream to each host, per table
         final Map<String, Map<InetAddress, List<Range<Token>>>> sessionsToStreamByTable = new HashMap<String, Map<InetAddress, List<Range<Token>>>>();
-        // The number of stream out sessions we need to start, to be built up as we build sessionsToStreamByTable
-        int sessionCount = 0;
-
         for (Map.Entry<String, Multimap<Range<Token>, InetAddress>> entry : rangesToStreamByTable.entrySet())
         {
+            String keyspace = entry.getKey();
             Multimap<Range<Token>, InetAddress> rangesWithEndpoints = entry.getValue();
 
             if (rangesWithEndpoints.isEmpty())
                 continue;
 
-            final String table = entry.getKey();
-
             Map<InetAddress, List<Range<Token>>> rangesPerEndpoint = new HashMap<InetAddress, List<Range<Token>>>();
-
             for (final Map.Entry<Range<Token>, InetAddress> endPointEntry : rangesWithEndpoints.entries())
             {
                 final Range<Token> range = endPointEntry.getKey();
@@ -3454,12 +3427,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 curRanges.add(range);
             }
 
-            sessionCount += rangesPerEndpoint.size();
-            sessionsToStreamByTable.put(table, rangesPerEndpoint);
+            sessionsToStreamByTable.put(keyspace, rangesPerEndpoint);
         }
 
-        final CountDownLatch latch = new CountDownLatch(sessionCount);
-
+        StreamPlan streamPlan = new StreamPlan("Unbootstrap");
         for (Map.Entry<String, Map<InetAddress, List<Range<Token>>>> entry : sessionsToStreamByTable.entrySet())
         {
             final String table = entry.getKey();
@@ -3471,90 +3442,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 final InetAddress newEndpoint = rangesEntry.getKey();
 
                 // TODO each call to transferRanges re-flushes, this is potentially a lot of waste
-                StreamOut.transferRanges(newEndpoint,
-                                         Table.open(table),
-                                         ranges,
-                                         new CountingDownStreamCallback(latch, newEndpoint),
-                                         OperationType.UNBOOTSTRAP);
-            }
-        }
-        return latch;
-    }
-
-    static class CountingDownStreamCallback implements IStreamCallback
-    {
-        private final CountDownLatch latch;
-        private final InetAddress targetAddr;
-
-        CountingDownStreamCallback(CountDownLatch latch, InetAddress targetAddr)
-        {
-            this.latch = latch;
-            this.targetAddr = targetAddr;
-        }
-
-        public void onSuccess()
-        {
-            latch.countDown();
-        }
-
-        public void onFailure()
-        {
-            logger.warn("Streaming to " + targetAddr + " failed");
-            onSuccess(); // calling onSuccess for latch countdown
-        }
-    }
-
-    /**
-     * Used to request ranges from endpoints in the ring (will block until all data is fetched and ready)
-     * @param ranges ranges to fetch as map of the preferred address and range collection
-     * @return latch to count down
-     */
-    private CountDownLatch requestRanges(final Map<String, Multimap<InetAddress, Range<Token>>> ranges)
-    {
-        final CountDownLatch latch = new CountDownLatch(ranges.keySet().size());
-        for (Map.Entry<String, Multimap<InetAddress, Range<Token>>> entry : ranges.entrySet())
-        {
-            Multimap<InetAddress, Range<Token>> endpointWithRanges = entry.getValue();
-
-            if (endpointWithRanges.isEmpty())
-            {
-                latch.countDown();
-                continue;
-            }
-
-            final String table = entry.getKey();
-            final Set<InetAddress> pending = new HashSet<InetAddress>(endpointWithRanges.keySet());
-
-            // Send messages to respective folks to stream data over to me
-            for (final InetAddress source: endpointWithRanges.keySet())
-            {
-                Collection<Range<Token>> toFetch = endpointWithRanges.get(source);
-
-                final IStreamCallback callback = new IStreamCallback()
-                {
-                    public void onSuccess()
-                    {
-                        pending.remove(source);
-
-                        if (pending.isEmpty())
-                            latch.countDown();
-                    }
-
-                    public void onFailure()
-                    {
-                        logger.warn("Streaming from " + source + " failed");
-                        onSuccess(); // calling onSuccess for latch countdown
-                    }
-                };
-
-                if (logger.isDebugEnabled())
-                    logger.debug("Requesting from " + source + " ranges " + StringUtils.join(toFetch, ", "));
-
-                // sending actual request
-                StreamIn.requestRanges(source, table, toFetch, callback, OperationType.BOOTSTRAP);
+                streamPlan.transferRanges(newEndpoint, table, ranges);
             }
         }
-        return latch;
+        return streamPlan.execute();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java b/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java
deleted file mode 100644
index 89fbf5f..0000000
--- a/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.streaming;
-
-import java.net.InetAddress;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.slf4j.LoggerFactory;
-import org.slf4j.Logger;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.gms.*;
-
-public abstract class AbstractStreamSession implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener
-{
-    private static final Logger logger = LoggerFactory.getLogger(AbstractStreamSession.class);
-
-    protected final InetAddress host;
-    protected final UUID sessionId;
-    protected String table;
-    protected final IStreamCallback callback;
-    private final AtomicBoolean isClosed = new AtomicBoolean(false);
-
-    protected AbstractStreamSession(String table, InetAddress host, UUID sessionId, IStreamCallback callback)
-    {
-        this.host = host;
-        this.sessionId = sessionId;
-        this.table = table;
-        this.callback = callback;
-        Gossiper.instance.register(this);
-        FailureDetector.instance.registerFailureDetectionEventListener(this);
-    }
-
-    public UUID getSessionId()
-    {
-        return sessionId;
-    }
-
-    public InetAddress getHost()
-    {
-        return host;
-    }
-
-    public void close(boolean success)
-    {
-        if (!isClosed.compareAndSet(false, true))
-        {
-            logger.debug("Stream session {} already closed", getSessionId());
-            return;
-        }
-
-        closeInternal(success);
-
-        Gossiper.instance.unregister(this);
-        FailureDetector.instance.unregisterFailureDetectionEventListener(this);
-
-        logger.debug("closing with status " + success);
-        if (callback != null)
-        {
-            if (success)
-                callback.onSuccess();
-            else
-                callback.onFailure();
-        }
-    }
-
-    protected abstract void closeInternal(boolean success);
-
-    public void onJoin(InetAddress endpoint, EndpointState epState) {}
-    public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
-    public void onAlive(InetAddress endpoint, EndpointState state) {}
-    public void onDead(InetAddress endpoint, EndpointState state) {}
-
-    public void onRemove(InetAddress endpoint)
-    {
-        convict(endpoint, Double.MAX_VALUE);
-    }
-
-    public void onRestart(InetAddress endpoint, EndpointState epState)
-    {
-        convict(endpoint, Double.MAX_VALUE);
-    }
-
-    public void convict(InetAddress endpoint, double phi)
-    {
-        if (!endpoint.equals(getHost()))
-            return;
-
-        // We want a higher confidence in the failure detection than usual because failing a streaming wrongly has a high cost.
-        if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
-            return;
-
-        logger.error("Stream failed because {} died or was restarted/removed (streams may still be active "
-                      + "in background, but further streams won't be started)", endpoint);
-        close(false);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
new file mode 100644
index 0000000..27ea5af
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketException;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.streaming.messages.StreamInitMessage;
+import org.apache.cassandra.streaming.messages.StreamMessage;
+
+/**
+ * ConnectionHandler manages incoming/outgoing message exchange for the {@link StreamSession}.
+ *
+ * <p>
+ * Internally, ConnectionHandler manages thread to receive incoming {@link StreamMessage} and thread to
+ * send outgoing message. Messages are encoded/decoded on those thread and handed to
+ * {@link StreamSession#messageReceived(org.apache.cassandra.streaming.messages.StreamMessage)}.
+ */
+public class ConnectionHandler
+{
+    private static final Logger logger = LoggerFactory.getLogger(ConnectionHandler.class);
+
+    private static final int MAX_CONNECT_ATTEMPTS = 3;
+
+    private final StreamSession session;
+    private final int protocolVersion;
+
+    private IncomingMessageHandler incoming;
+    private OutgoingMessageHandler outgoing;
+
+    private boolean connected = false;
+    private Socket socket;
+
+    ConnectionHandler(StreamSession session)
+    {
+        this.session = session;
+        this.protocolVersion = StreamMessage.CURRENT_VERSION;
+    }
+
+    ConnectionHandler(StreamSession session, Socket socket, int protocolVersion)
+    {
+        this.session = session;
+        this.socket = Preconditions.checkNotNull(socket);
+        this.connected = socket.isConnected();
+        this.protocolVersion = protocolVersion;
+    }
+
+    /**
+     * Connect to peer and start exchanging message.
+     * When connect attempt fails, this retries for maximum of MAX_CONNECT_ATTEMPTS times.
+     *
+     * @throws IOException when connection failed.
+     */
+    public void connect() throws IOException
+    {
+        int attempts = 0;
+        while (true)
+        {
+            try
+            {
+                socket = MessagingService.instance().getConnectionPool(session.peer).newSocket();
+                socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout());
+                break;
+            }
+            catch (IOException e)
+            {
+                if (++attempts >= MAX_CONNECT_ATTEMPTS)
+                    throw e;
+
+                long waitms = DatabaseDescriptor.getRpcTimeout() * (long)Math.pow(2, attempts);
+                logger.warn("Failed attempt " + attempts + " to connect to " + session.peer + ". Retrying in " + waitms + " ms. (" + e + ")");
+                try
+                {
+                    Thread.sleep(waitms);
+                }
+                catch (InterruptedException wtf)
+                {
+                    throw new IOException("interrupted", wtf);
+                }
+            }
+        }
+        // send stream init message
+        SocketChannel channel = socket.getChannel();
+        WritableByteChannel out = channel;
+        // socket channel is null when encrypted(SSL)
+        if (channel == null)
+        {
+            out = Channels.newChannel(socket.getOutputStream());
+        }
+        logger.debug("Sending stream init...");
+        StreamInitMessage message = new StreamInitMessage(session.planId(), session.description());
+        out.write(message.createMessage(false, protocolVersion));
+
+        connected = true;
+
+        start();
+        session.onConnect();
+    }
+
+    public void close()
+    {
+        incoming.terminate();
+        outgoing.terminate();
+        if (socket != null && !isConnected())
+        {
+            try
+            {
+                socket.close();
+            }
+            catch (IOException ignore) {}
+        }
+    }
+
+    /**
+     * Start incoming/outgoing messaging threads.
+     */
+    public void start() throws IOException
+    {
+        SocketChannel channel = socket.getChannel();
+        ReadableByteChannel in = channel;
+        WritableByteChannel out = channel;
+        // socket channel is null when encrypted(SSL)
+        if (channel == null)
+        {
+            in = Channels.newChannel(socket.getInputStream());
+            out = Channels.newChannel(socket.getOutputStream());
+        }
+
+        incoming = new IncomingMessageHandler(session, protocolVersion, in);
+        outgoing = new OutgoingMessageHandler(session, protocolVersion, out);
+
+        // ready to send/receive files
+        new Thread(incoming, "STREAM-IN-" + session.peer).start();
+        new Thread(outgoing, "STREAM-OUT-" + session.peer).start();
+    }
+
+    public boolean isConnected()
+    {
+        return connected;
+    }
+
+    /**
+     * Enqueue messages to be sent.
+     *
+     * @param messages messages to send
+     */
+    public void sendMessages(Collection<? extends StreamMessage> messages)
+    {
+        for (StreamMessage message : messages)
+            sendMessage(message);
+    }
+
+    public void sendMessage(StreamMessage message)
+    {
+        assert isConnected();
+        outgoing.enqueue(message);
+    }
+
+    abstract static class MessageHandler implements Runnable
+    {
+        protected final StreamSession session;
+        protected final int protocolVersion;
+        private volatile boolean terminated;
+
+        protected MessageHandler(StreamSession session, int protocolVersion)
+        {
+            this.session = session;
+            this.protocolVersion = protocolVersion;
+        }
+
+        public void terminate()
+        {
+            terminated = true;
+        }
+
+        public boolean terminated()
+        {
+            return terminated;
+        }
+    }
+
+    /**
+     * Incoming streaming message handler
+     */
+    static class IncomingMessageHandler extends MessageHandler
+    {
+        private final ReadableByteChannel in;
+
+        IncomingMessageHandler(StreamSession session, int protocolVersion, ReadableByteChannel in)
+        {
+            super(session, protocolVersion);
+            this.in = in;
+        }
+
+        public void run()
+        {
+            while (!terminated())
+            {
+                try
+                {
+                    // receive message
+                    StreamMessage message = StreamMessage.deserialize(in, protocolVersion, session);
+                    assert message != null;
+                    session.messageReceived(message);
+                }
+                catch (SocketException e)
+                {
+                    // socket is closed
+                    terminate();
+                }
+                catch (Throwable e)
+                {
+                    session.onError(e);
+                }
+            }
+        }
+    }
+
+    /**
+     * Outgoing file transfer thread
+     */
+    static class OutgoingMessageHandler extends MessageHandler
+    {
+        /*
+         * All out going messages are queued up into messageQueue.
+         * The size will grow when received streaming request.
+         *
+         * Queue is also PriorityQueue so that prior messages can go out fast.
+         */
+        private final PriorityBlockingQueue<StreamMessage> messageQueue = new PriorityBlockingQueue<>(64, new Comparator<StreamMessage>()
+        {
+            public int compare(StreamMessage o1, StreamMessage o2)
+            {
+                return o2.getPriority() - o1.getPriority();
+            }
+        });
+
+        private final WritableByteChannel out;
+
+        OutgoingMessageHandler(StreamSession session, int protocolVersion, WritableByteChannel out)
+        {
+            super(session, protocolVersion);
+            this.out = out;
+        }
+
+        public void enqueue(StreamMessage message)
+        {
+            messageQueue.put(message);
+        }
+
+        public void run()
+        {
+            while (!terminated())
+            {
+                try
+                {
+                    StreamMessage next = messageQueue.poll(1, TimeUnit.SECONDS);
+                    if (next != null)
+                    {
+                        logger.debug("Sending " + next);
+                        StreamMessage.serialize(next, out, protocolVersion, session);
+                        if (next.type == StreamMessage.Type.SESSION_FAILED)
+                            terminate();
+                    }
+                }
+                catch (SocketException e)
+                {
+                    session.onError(e);
+                    terminate();
+                }
+                catch (InterruptedException | IOException e)
+                {
+                    session.onError(e);
+                }
+            }
+        }
+    }
+}