You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2017/08/22 20:55:10 UTC

[03/11] cassandra git commit: move streaming to use netty

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index 3a95015..590ba5f 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -18,8 +18,6 @@
 package org.apache.cassandra.streaming;
 
 import java.io.*;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
 import java.util.Collection;
 import java.util.UUID;
 
@@ -30,8 +28,7 @@ import com.google.common.collect.UnmodifiableIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.ning.compress.lzf.LZFInputStream;
-
+import org.apache.cassandra.io.util.TrackedDataInputPlus;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
@@ -42,9 +39,10 @@ import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.streaming.compress.StreamCompressionInputStream;
 import org.apache.cassandra.streaming.messages.FileMessageHeader;
+import org.apache.cassandra.streaming.messages.StreamMessage;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.io.util.TrackedInputStream;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
@@ -88,12 +86,12 @@ public class StreamReader
     }
 
     /**
-     * @param channel where this reads data from
+     * @param inputPlus where this reads data from
      * @return SSTable transferred
      * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
      */
-    @SuppressWarnings("resource") // channel needs to remain open, streams on top of it can't be closed
-    public SSTableMultiWriter read(ReadableByteChannel channel) throws IOException
+    @SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed
+    public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException
     {
         long totalSize = totalSize();
 
@@ -108,7 +106,8 @@ public class StreamReader
                      session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(),
                      cfs.getTableName(), pendingRepair);
 
-        TrackedInputStream in = new TrackedInputStream(new LZFInputStream(Channels.newInputStream(channel)));
+
+        TrackedDataInputPlus in = new TrackedDataInputPlus(new StreamCompressionInputStream(inputPlus, StreamMessage.CURRENT_VERSION));
         StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata(), in, inputVersion, getHeader(cfs.metadata()));
         SSTableMultiWriter writer = null;
         try
@@ -179,10 +178,10 @@ public class StreamReader
         private Row staticRow;
         private IOException exception;
 
-        public StreamDeserializer(TableMetadata metadata, InputStream in, Version version, SerializationHeader header) throws IOException
+        public StreamDeserializer(TableMetadata metadata, DataInputPlus in, Version version, SerializationHeader header) throws IOException
         {
             this.metadata = metadata;
-            this.in = new DataInputPlus.DataInputStreamPlus(in);
+            this.in = in;
             this.helper = new SerializationHelper(metadata, version.correspondingMessagingVersion(), SerializationHelper.Flag.PRESERVE_SIZE);
             this.header = header;
         }
@@ -256,8 +255,8 @@ public class StreamReader
             // to what we do in hasNext)
             Unfiltered unfiltered = iterator.next();
             return metadata.isCounter() && unfiltered.kind() == Unfiltered.Kind.ROW
-                 ? maybeMarkLocalToBeCleared((Row) unfiltered)
-                 : unfiltered;
+                   ? maybeMarkLocalToBeCleared((Row) unfiltered)
+                   : unfiltered;
         }
 
         private Row maybeMarkLocalToBeCleared(Row row)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/StreamReceiveException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveException.java b/src/java/org/apache/cassandra/streaming/StreamReceiveException.java
new file mode 100644
index 0000000..54b365a
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+public class StreamReceiveException extends RuntimeException
+{
+    public final StreamSession session;
+
+    public StreamReceiveException(StreamSession session, String msg)
+    {
+        super(msg);
+        this.session = session;
+    }
+
+    public StreamReceiveException(StreamSession session, Throwable t)
+    {
+        super(t);
+        this.session = session;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 925dc85..6aa70ad 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -104,6 +104,7 @@ public class StreamReceiveTask extends StreamTask
 
         remoteSSTablesReceived++;
         assert tableId.equals(sstable.getTableId());
+        logger.debug("recevied {} of {} total files", remoteSSTablesReceived, totalFiles);
 
         Collection<SSTableReader> finished = null;
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index 67d7d0d..9371c65 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -17,8 +17,9 @@
  */
 package org.apache.cassandra.streaming;
 
-import java.io.IOException;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.util.*;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
@@ -27,7 +28,7 @@ import com.google.common.util.concurrent.Futures;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.net.IncomingStreamingConnection;
+import io.netty.channel.Channel;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
@@ -103,12 +104,10 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
                                                                     UUID planId,
                                                                     StreamOperation streamOperation,
                                                                     InetAddress from,
-                                                                    IncomingStreamingConnection connection,
-                                                                    boolean isForOutgoing,
-                                                                    int version,
+                                                                    Channel channel,
                                                                     boolean keepSSTableLevel,
                                                                     UUID pendingRepair,
-                                                                    PreviewKind previewKind) throws IOException
+                                                                    PreviewKind previewKind)
     {
         StreamResultFuture future = StreamManager.instance.getReceivingStream(planId);
         if (future == null)
@@ -119,7 +118,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
             future = new StreamResultFuture(planId, streamOperation, keepSSTableLevel, pendingRepair, previewKind);
             StreamManager.instance.registerReceiving(future);
         }
-        future.attachConnection(from, sessionIndex, connection, isForOutgoing, version);
+        future.attachConnection(from, sessionIndex, channel);
         logger.info("[Stream #{}, ID#{}] Received streaming plan for {}", planId, sessionIndex, streamOperation.getDescription());
         return future;
     }
@@ -131,11 +130,18 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
         return future;
     }
 
-    private void attachConnection(InetAddress from, int sessionIndex, IncomingStreamingConnection connection, boolean isForOutgoing, int version) throws IOException
+    public StreamCoordinator getCoordinator()
     {
-        StreamSession session = coordinator.getOrCreateSessionById(from, sessionIndex, connection.socket.getInetAddress());
+        return coordinator;
+    }
+
+    private void attachConnection(InetAddress from, int sessionIndex, Channel channel)
+    {
+        SocketAddress addr = channel.remoteAddress();
+        InetAddress connecting = (addr instanceof InetSocketAddress ? ((InetSocketAddress) addr).getAddress() : from);
+        StreamSession session = coordinator.getOrCreateSessionById(from, sessionIndex, connecting);
         session.init(this);
-        session.handler.initiateOnReceivingSide(connection, isForOutgoing, version);
+        session.attach(channel);
     }
 
     public void addEventListener(StreamEventHandler listener)
@@ -206,6 +212,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
 
     private synchronized void maybeComplete()
     {
+        logger.warn("[Stream #{}] maybeComplete", planId);
         if (!coordinator.hasActiveSessions())
         {
             StreamState finalState = getCurrentState();
@@ -221,4 +228,9 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
             }
         }
     }
+
+    StreamSession getSession(InetAddress peer, int sessionIndex)
+    {
+        return coordinator.getSessionById(peer, sessionIndex);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 5ca9938..0381416 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -17,9 +17,8 @@
  */
 package org.apache.cassandra.streaming;
 
-import java.io.IOException;
 import java.net.InetAddress;
-import java.net.Socket;
+import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
 import java.util.*;
 import java.util.concurrent.*;
@@ -29,17 +28,21 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import com.google.common.collect.*;
+import com.google.common.util.concurrent.Futures;
 
-import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.db.lifecycle.View;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelId;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.PartitionPosition;
@@ -47,10 +50,12 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.metrics.StreamingMetrics;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.async.OutboundConnectionIdentifier;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.streaming.async.NettyStreamingMessageSender;
 import org.apache.cassandra.streaming.messages.*;
-import org.apache.cassandra.utils.CassandraVersion;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.Pair;
@@ -59,79 +64,80 @@ import org.apache.cassandra.utils.concurrent.Refs;
 
 /**
  * Handles the streaming a one or more section of one of more sstables to and from a specific
- * remote node.
+ * remote node. The sending side performs a block-level transfer of the source sstable, while the receiver
+ * must deserilaize that data stream into an partitions and rows, and then write that out as an sstable.
  *
- * Both this node and the remote one will create a similar symmetrical StreamSession. A streaming
+ * Both this node and the remote one will create a similar symmetrical {@link StreamSession}. A streaming
  * session has the following life-cycle:
  *
- * 1. Connections Initialization
+ * 1. Session Initialization
  *
- *   (a) A node (the initiator in the following) create a new StreamSession, initialize it (init())
- *       and then start it (start()). Start will create a {@link ConnectionHandler} that will create
- *       two connections to the remote node (the follower in the following) with whom to stream and send
- *       a StreamInit message. The first connection will be the incoming connection for the
- *       initiator, and the second connection will be the outgoing.
- *   (b) Upon reception of that StreamInit message, the follower creates its own StreamSession,
- *       initialize it if it still does not exist, and attach connecting socket to its ConnectionHandler
- *       according to StreamInit message's isForOutgoing flag.
- *   (d) When the both incoming and outgoing connections are established, StreamSession calls
- *       StreamSession#onInitializationComplete method to start the streaming prepare phase
- *       (StreamResultFuture.startStreaming()).
+ *   (a) A node (the initiator in the following) create a new {@link StreamSession},
+ *       initialize it {@link #init(StreamResultFuture)}, and then start it ({@link #start()}).
+ *       Starting a session causes a {@link StreamInitMessage} to be sent.
+ *   (b) Upon reception of that {@link StreamInitMessage}, the follower creates its own {@link StreamSession},
+ *       and initializes it if it still does not exist.
+ *   (c) After the initiator sends the {@link StreamInitMessage}, it invokes
+ *       {@link StreamSession#onInitializationComplete()} to start the streaming prepare phase.
  *
  * 2. Streaming preparation phase
  *
- *   (a) This phase is started when the initiator onInitializationComplete() method is called. This method sends a
- *       PrepareMessage that includes what files/sections this node will stream to the follower
- *       (stored in a StreamTransferTask, each column family has it's own transfer task) and what
- *       the follower needs to stream back (StreamReceiveTask, same as above). If the initiator has
- *       nothing to receive from the follower, it goes directly to its Streaming phase. Otherwise,
- *       it waits for the follower PrepareMessage.
- *   (b) Upon reception of the PrepareMessage, the follower records which files/sections it will receive
- *       and send back its own PrepareMessage with a summary of the files/sections that will be sent to
- *       the initiator (prepare()). After having sent that message, the follower goes to its Streamning
- *       phase.
- *   (c) When the initiator receives the follower PrepareMessage, it records which files/sections it will
- *       receive and then goes to his own Streaming phase.
+ *   (a) A {@link PrepareSynMessage} is sent that includes a) what files/sections this node will stream to the follower
+ *       (stored locally in a {@link StreamTransferTask}, one for each table) and b) what the follower needs to
+ *       stream back (stored locally in a {@link StreamReceiveTask}, one for each table).
+ *   (b) Upon reception of the {@link PrepareSynMessage}, the follower records which files/sections it will receive
+ *       and send back a {@link PrepareSynAckMessage}, which contains a summary of the files/sections that will be sent to
+ *       the initiator.
+ *   (c) When the initiator receives the {@link PrepareSynAckMessage}, it records which files/sections it will
+ *       receive, and then goes to it's Streaming phase (see next section). If the intiator is to receive files,
+ *       it sends a {@link PrepareAckMessage} to the follower to indicate that it can start streaming to the initiator.
+ *   (d) (Optional) If the follower receives a {@link PrepareAckMessage}, it enters it's Streaming phase.
  *
  * 3. Streaming phase
  *
- *   (a) The streaming phase is started by each node (the sender in the follower, but note that each side
- *       of the StreamSession may be sender for some of the files) involved by calling startStreamingFiles().
- *       This will sequentially send a FileMessage for each file of each SteamTransferTask. Each FileMessage
- *       consists of a FileMessageHeader that indicates which file is coming and then start streaming the
- *       content for that file (StreamWriter in FileMessage.serialize()). When a file is fully sent, the
- *       fileSent() method is called for that file. If all the files for a StreamTransferTask are sent
- *       (StreamTransferTask.complete()), the task is marked complete (taskCompleted()).
- *   (b) On the receiving side, a SSTable will be written for the incoming file (StreamReader in
- *       FileMessage.deserialize()) and once the FileMessage is fully received, the file will be marked as
- *       complete (received()). When all files for the StreamReceiveTask have been received, the sstables
- *       are added to the CFS (and 2ndary index are built, StreamReceiveTask.complete()) and the task
- *       is marked complete (taskCompleted())
+ *   (a) The streaming phase is started at each node by calling {@link StreamSession#startStreamingFiles(boolean)}.
+ *       This will send, sequentially on each outbound streaming connection (see {@link NettyStreamingMessageSender}),
+ *       an {@link OutgoingFileMessage} for each file in each of the {@link StreamTransferTask}.
+ *       Each {@link OutgoingFileMessage} consists of a {@link FileMessageHeader} that contains metadata about the file
+ *       being streamed, followed by the file content itself. Once all the files for a {@link StreamTransferTask} are sent,
+ *       the task is marked complete {@link StreamTransferTask#complete(int)}.
+ *   (b) On the receiving side, a SSTable will be written for the incoming file, and once the file is fully received,
+ *       the file will be marked as complete ({@link StreamReceiveTask#received(SSTableMultiWriter)}). When all files
+ *       for the {@link StreamReceiveTask} have been received, the sstables are added to the CFS (and 2ndary indexes/MV are built),
+ *        and the task is marked complete ({@link #taskCompleted(StreamReceiveTask)}).
  *   (b) If during the streaming of a particular file an error occurs on the receiving end of a stream
- *       (FileMessage.deserialize), the node will send a SessionFailedMessage to the sender and close the stream session.
- *   (c) When all transfer and receive tasks for a session are complete, the move to the Completion phase
- *       (maybeCompleted()).
+ *       (it may be either the initiator or the follower), the node will send a {@link SessionFailedMessage}
+ *       to the sender and close the stream session.
+ *   (c) When all transfer and receive tasks for a session are complete, the session moves to the Completion phase
+ *       ({@link #maybeCompleted()}).
  *
  * 4. Completion phase
  *
- *   (a) When a node has finished all transfer and receive task, it enter the completion phase (maybeCompleted()).
- *       If it had already received a CompleteMessage from the other side (it is in the WAIT_COMPLETE state), that
- *       session is done is is closed (closeSession()). Otherwise, the node switch to the WAIT_COMPLETE state and
- *       send a CompleteMessage to the other side.
+ *   (a) When a node enters the completion phase, it sends a {@link CompleteMessage} to the peer, and then enter the
+ *       {@link StreamSession.State#WAIT_COMPLETE} state. If it has already received a {@link CompleteMessage}
+ *       from the peer, session is complete and is then closed ({@link #closeSession(State)}). Otherwise, the node
+ *       switch to the {@link StreamSession.State#WAIT_COMPLETE} state and send a {@link CompleteMessage} to the other side.
+ *
+ * In brief, the message passing looks like this (I for initiator, F for follwer):
+ * (session init)
+ * I: StreamInitMessage
+ * (session prepare)
+ * I: PrepareSynMessage
+ * F: PrepareSynAckMessage
+ * I: PrepareAckMessage
+ * (stream - this can happen in both directions)
+ * I: OutgoingFileMessage
+ * F: ReceivedMessage
+ * (completion)
+ * I/F: CompleteMessage
+ *
+ * All messages which derive from {@link StreamMessage} are sent by the standard internode messaging
+ * (via {@link org.apache.cassandra.net.MessagingService}, while the actual files themselves are sent by a special
+ * "streaming" connection type. See {@link NettyStreamingMessageSender} for details. Because of the asynchronous
  */
 public class StreamSession implements IEndpointStateChangeSubscriber
 {
-
-    /**
-     * Version where keep-alive support was added
-     */
-    private static final CassandraVersion STREAM_KEEP_ALIVE_VERSION = new CassandraVersion("3.10");
     private static final Logger logger = LoggerFactory.getLogger(StreamSession.class);
-    private static final DebuggableScheduledThreadPoolExecutor keepAliveExecutor = new DebuggableScheduledThreadPoolExecutor("StreamKeepAliveExecutor");
-    static {
-        // Immediately remove keep-alive task when cancelled.
-        keepAliveExecutor.setRemoveOnCancelPolicy(true);
-    }
 
     /**
      * Streaming endpoint.
@@ -139,7 +145,9 @@ public class StreamSession implements IEndpointStateChangeSubscriber
      * Each {@code StreamSession} is identified by this InetAddress which is broadcast address of the node streaming.
      */
     public final InetAddress peer;
+
     private final int index;
+
     /** Actual connecting address. Can be the same as {@linkplain #peer}. */
     public final InetAddress connecting;
 
@@ -154,20 +162,18 @@ public class StreamSession implements IEndpointStateChangeSubscriber
     // data receivers, filled after receiving prepare message
     private final Map<TableId, StreamReceiveTask> receivers = new ConcurrentHashMap<>();
     private final StreamingMetrics metrics;
-    /* can be null when session is created in remote */
-    private final StreamConnectionFactory factory;
 
-    public final Map<String, Set<Range<Token>>> transferredRangesPerKeyspace = new HashMap<>();
+    final Map<String, Set<Range<Token>>> transferredRangesPerKeyspace = new HashMap<>();
 
-    public final ConnectionHandler handler;
+    private final NettyStreamingMessageSender messageSender;
+    private final ConcurrentMap<ChannelId, Channel> incomingChannels = new ConcurrentHashMap<>();
 
-    private AtomicBoolean isAborted = new AtomicBoolean(false);
+    private final AtomicBoolean isAborted = new AtomicBoolean(false);
     private final boolean keepSSTableLevel;
-    private ScheduledFuture<?> keepAliveFuture = null;
     private final UUID pendingRepair;
     private final PreviewKind previewKind;
 
-    public static enum State
+    public enum State
     {
         INITIALIZED,
         PREPARING,
@@ -184,17 +190,16 @@ public class StreamSession implements IEndpointStateChangeSubscriber
      * Create new streaming session with the peer.
      *  @param peer Address of streaming peer
      * @param connecting Actual connecting address
-     * @param factory is used for establishing connection
      */
     public StreamSession(InetAddress peer, InetAddress connecting, StreamConnectionFactory factory, int index, boolean keepSSTableLevel, UUID pendingRepair, PreviewKind previewKind)
     {
         this.peer = peer;
         this.connecting = connecting;
         this.index = index;
-        this.factory = factory;
-        this.handler = new ConnectionHandler(this, isKeepAliveSupported()?
-                                                   (int)TimeUnit.SECONDS.toMillis(2 * DatabaseDescriptor.getStreamingKeepAlivePeriod()) :
-                                                   DatabaseDescriptor.getStreamingSocketTimeout(), previewKind.isPreview());
+
+        OutboundConnectionIdentifier id = OutboundConnectionIdentifier.stream(new InetSocketAddress(FBUtilities.getBroadcastAddress(), 0),
+                                                                              new InetSocketAddress(connecting, MessagingService.portFor(connecting)));
+        this.messageSender = new NettyStreamingMessageSender(this, id, factory, StreamMessage.CURRENT_VERSION, previewKind.isPreview());
         this.metrics = StreamingMetrics.get(connecting);
         this.keepSSTableLevel = keepSSTableLevel;
         this.pendingRepair = pendingRepair;
@@ -242,12 +247,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         return receivers.get(tableId).getTransaction();
     }
 
-    private boolean isKeepAliveSupported()
-    {
-        CassandraVersion peerVersion = Gossiper.instance.getReleaseVersion(peer);
-        return peerVersion != null && peerVersion.compareTo(STREAM_KEEP_ALIVE_VERSION) >= 0;
-    }
-
     /**
      * Bind this session to report to specific {@link StreamResultFuture} and
      * perform pre-streaming initialization.
@@ -258,13 +257,18 @@ public class StreamSession implements IEndpointStateChangeSubscriber
     {
         this.streamResult = streamResult;
         StreamHook.instance.reportStreamFuture(this, streamResult);
+    }
 
-        if (isKeepAliveSupported())
-            scheduleKeepAliveTask();
-        else
-            logger.debug("Peer {} does not support keep-alive.", peer);
+    public boolean attach(Channel channel)
+    {
+        if (!messageSender.hasControlChannel())
+            messageSender.injectControlMessageChannel(channel);
+        return incomingChannels.putIfAbsent(channel.id(), channel) == null;
     }
 
+    /**
+     * invoked by the node that begins the stream session (it may be sending files, receiving files, or both)
+     */
     public void start()
     {
         if (requests.isEmpty() && transfers.isEmpty())
@@ -279,7 +283,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
             logger.info("[Stream #{}] Starting streaming to {}{}", planId(),
                                                                    peer,
                                                                    peer.equals(connecting) ? "" : " through " + connecting);
-            handler.initiate();
+            messageSender.initialize();
             onInitializationComplete();
         }
         catch (Exception e)
@@ -289,12 +293,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         }
     }
 
-    public Socket createConnection() throws IOException
-    {
-        assert factory != null;
-        return factory.createConnection(connecting);
-    }
-
     /**
      * Request data fetch task to this session.
      *
@@ -317,7 +315,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
      * @param columnFamilies Transfer ColumnFamilies
      * @param flushTables flush tables?
      */
-    public synchronized void addTransferRanges(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies, boolean flushTables)
+    synchronized void addTransferRanges(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies, boolean flushTables)
     {
         failIfFinished();
         Collection<ColumnFamilyStore> stores = getColumnFamilyStores(keyspace, columnFamilies);
@@ -428,7 +426,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         }
     }
 
-    public synchronized void addTransferFiles(Collection<SSTableStreamingSections> sstableDetails)
+    synchronized void addTransferFiles(Collection<SSTableStreamingSections> sstableDetails)
     {
         failIfFinished();
         Iterator<SSTableStreamingSections> iter = sstableDetails.iterator();
@@ -472,31 +470,37 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         }
     }
 
-    private synchronized void closeSession(State finalState)
+    private synchronized Future closeSession(State finalState)
     {
+        Future abortedTasksFuture = null;
         if (isAborted.compareAndSet(false, true))
         {
             state(finalState);
 
+            // ensure aborting the tasks do not happen on the network IO thread (read: netty event loop)
+            // as we don't want any blocking disk IO to stop the network thread
             if (finalState == State.FAILED)
-            {
-                for (StreamTask task : Iterables.concat(receivers.values(), transfers.values()))
-                    task.abort();
-            }
+                abortedTasksFuture = ScheduledExecutors.nonPeriodicTasks.submit(this::abortTasks);
 
-            if (keepAliveFuture != null)
-            {
-                logger.debug("[Stream #{}] Finishing keep-alive task.", planId());
-                keepAliveFuture.cancel(false);
-                keepAliveFuture = null;
-            }
-
-            // Note that we shouldn't block on this close because this method is called on the handler
-            // incoming thread (so we would deadlock).
-            handler.close();
+            incomingChannels.values().stream().map(channel -> channel.close());
+            messageSender.close();
 
             streamResult.handleSessionComplete(this);
         }
+        return abortedTasksFuture != null ? abortedTasksFuture : Futures.immediateFuture(null);
+    }
+
+    private void abortTasks()
+    {
+        try
+        {
+            receivers.values().forEach(StreamReceiveTask::abort);
+            transfers.values().forEach(StreamTransferTask::abort);
+        }
+        catch (Exception e)
+        {
+            logger.warn("failed to abort some streaming tasks", e);
+        }
     }
 
     /**
@@ -517,6 +521,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         return state;
     }
 
+    public NettyStreamingMessageSender getMessageSender()
+    {
+        return messageSender;
+    }
+
     /**
      * Return if this session completed successfully.
      *
@@ -531,27 +540,37 @@ public class StreamSession implements IEndpointStateChangeSubscriber
     {
         switch (message.type)
         {
-            case PREPARE:
-                PrepareMessage msg = (PrepareMessage) message;
+            case STREAM_INIT:
+                // nop
+                break;
+            case PREPARE_SYN:
+                PrepareSynMessage msg = (PrepareSynMessage) message;
                 prepare(msg.requests, msg.summaries);
                 break;
-
+            case PREPARE_SYNACK:
+                prepareSynAck((PrepareSynAckMessage) message);
+                break;
+            case PREPARE_ACK:
+                prepareAck((PrepareAckMessage) message);
+                break;
             case FILE:
                 receive((IncomingFileMessage) message);
                 break;
-
             case RECEIVED:
                 ReceivedMessage received = (ReceivedMessage) message;
                 received(received.tableId, received.sequenceNumber);
                 break;
-
             case COMPLETE:
                 complete();
                 break;
-
+            case KEEP_ALIVE:
+                // NOP - we only send/receive the KEEP_ALIVE to force the TCP connection to remain open
+                break;
             case SESSION_FAILED:
                 sessionFailed();
                 break;
+            default:
+                throw new AssertionError("unhandled StreamMessage type: " + message.getClass().getName());
         }
     }
 
@@ -562,55 +581,43 @@ public class StreamSession implements IEndpointStateChangeSubscriber
     {
         // send prepare message
         state(State.PREPARING);
-        PrepareMessage prepare = new PrepareMessage();
+        PrepareSynMessage prepare = new PrepareSynMessage();
         prepare.requests.addAll(requests);
         for (StreamTransferTask task : transfers.values())
             prepare.summaries.add(task.getSummary());
-        handler.sendMessage(prepare);
-
-        // if we don't need to prepare for receiving stream, start sending files immediately
-        if (requests.isEmpty())
-            startStreamingFiles();
+        messageSender.sendMessage(prepare);
     }
 
-    /**l
+    /**
      * Call back for handling exception during streaming.
-     *
-     * @param e thrown exception
      */
-    public void onError(Throwable e)
+    public Future onError(Throwable e)
     {
         logError(e);
         // send session failure message
-        if (handler.isOutgoingConnected())
-            handler.sendMessage(new SessionFailedMessage());
+        if (messageSender.connected())
+            messageSender.sendMessage(new SessionFailedMessage());
         // fail session
-        closeSession(State.FAILED);
+        return closeSession(State.FAILED);
     }
 
     private void logError(Throwable e)
     {
         if (e instanceof SocketTimeoutException)
         {
-            if (isKeepAliveSupported())
-                logger.error("[Stream #{}] Did not receive response from peer {}{} for {} secs. Is peer down? " +
-                             "If not, maybe try increasing streaming_keep_alive_period_in_secs.", planId(),
-                             peer.getHostAddress(),
-                             peer.equals(connecting) ? "" : " through " + connecting.getHostAddress(),
-                             2 * DatabaseDescriptor.getStreamingKeepAlivePeriod(),
-                             e);
-            else
-                logger.error("[Stream #{}] Streaming socket timed out. This means the session peer stopped responding or " +
-                             "is still processing received data. If there is no sign of failure in the other end or a very " +
-                             "dense table is being transferred you may want to increase streaming_socket_timeout_in_ms " +
-                             "property. Current value is {}ms.", planId(), DatabaseDescriptor.getStreamingSocketTimeout(), e);
+            logger.error("[Stream #{}] Did not receive response from peer {}{} for {} secs. Is peer down? " +
+                         "If not, maybe try increasing streaming_keep_alive_period_in_secs.", planId(),
+                         peer.getHostAddress(),
+                         peer.equals(connecting) ? "" : " through " + connecting.getHostAddress(),
+                         2 * DatabaseDescriptor.getStreamingKeepAlivePeriod(),
+                         e);
         }
         else
         {
             logger.error("[Stream #{}] Streaming error occurred on session with peer {}{}", planId(),
-                                                                                            peer.getHostAddress(),
-                                                                                            peer.equals(connecting) ? "" : " through " + connecting.getHostAddress(),
-                                                                                            e);
+                         peer.getHostAddress(),
+                         peer.equals(connecting) ? "" : " through " + connecting.getHostAddress(),
+                         e);
         }
     }
 
@@ -621,29 +628,55 @@ public class StreamSession implements IEndpointStateChangeSubscriber
     {
         // prepare tasks
         state(State.PREPARING);
+        ScheduledExecutors.nonPeriodicTasks.execute(() -> prepareAsync(requests, summaries));
+    }
+
+    /**
+     * Finish preparing the session. This method is blocking (memtables are flushed in {@link #addTransferRanges}),
+     * so the logic should not execute on the main IO thread (read: netty event loop).
+     */
+    private void prepareAsync(Collection<StreamRequest> requests, Collection<StreamSummary> summaries)
+    {
+
         for (StreamRequest request : requests)
             addTransferRanges(request.keyspace, request.ranges, request.columnFamilies, true); // always flush on stream request
         for (StreamSummary summary : summaries)
             prepareReceiving(summary);
 
-        // send back prepare message if prepare message contains stream request
-        if (!requests.isEmpty())
-        {
-            PrepareMessage prepare = new PrepareMessage();
+        PrepareSynAckMessage prepareSynAck = new PrepareSynAckMessage();
+        if (!peer.equals(FBUtilities.getBroadcastAddress()))
             for (StreamTransferTask task : transfers.values())
-                prepare.summaries.add(task.getSummary());
-            handler.sendMessage(prepare);
+                prepareSynAck.summaries.add(task.getSummary());
+        messageSender.sendMessage(prepareSynAck);
+
+
+        streamResult.handleSessionPrepared(this);
+        maybeCompleted();
+    }
+
+    private void prepareSynAck(PrepareSynAckMessage msg)
+    {
+        if (!msg.summaries.isEmpty())
+        {
+            for (StreamSummary summary : msg.summaries)
+                prepareReceiving(summary);
+
+            // only send the (final) ACK if we are expecting the peer to send this node (the initiator) some files
+            messageSender.sendMessage(new PrepareAckMessage());
         }
 
         if (isPreview())
-        {
             completePreview();
-            return;
-        }
+        else
+            startStreamingFiles(true);
+    }
 
-        // if there are files to stream
-        if (!maybeCompleted())
-            startStreamingFiles();
+    private void prepareAck(PrepareAckMessage msg)
+    {
+        if (isPreview())
+            completePreview();
+        else
+            startStreamingFiles(true);
     }
 
     /**
@@ -665,7 +698,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
     }
 
     /**
-     * Call back after receiving FileMessageHeader.
+     * Call back after receiving a streamed file.
      *
      * @param message received file
      */
@@ -680,7 +713,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         StreamingMetrics.totalIncomingBytes.inc(headerSize);
         metrics.incomingBytes.inc(headerSize);
         // send back file received message
-        handler.sendMessage(new ReceivedMessage(message.header.tableId, message.header.sequenceNumber));
+        messageSender.sendMessage(new ReceivedMessage(message.header.tableId, message.header.sequenceNumber));
         receivers.get(message.header.tableId).received(message.sstable);
     }
 
@@ -700,11 +733,12 @@ public class StreamSession implements IEndpointStateChangeSubscriber
      */
     public synchronized void complete()
     {
+        logger.debug("handling Complete message, state = {}, completeSent = {}", state, completeSent);
         if (state == State.WAIT_COMPLETE)
         {
             if (!completeSent)
             {
-                handler.sendMessage(new CompleteMessage());
+                messageSender.sendMessage(new CompleteMessage());
                 completeSent = true;
             }
             closeSession(State.COMPLETE);
@@ -712,17 +746,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         else
         {
             state(State.WAIT_COMPLETE);
-            handler.closeIncoming();
-        }
-    }
-
-    private synchronized void scheduleKeepAliveTask()
-    {
-        if (keepAliveFuture == null)
-        {
-            int keepAlivePeriod = DatabaseDescriptor.getStreamingKeepAlivePeriod();
-            logger.debug("[Stream #{}] Scheduling keep-alive task with {}s period.", planId(), keepAlivePeriod);
-            keepAliveFuture = keepAliveExecutor.scheduleAtFixedRate(new KeepAliveTask(), 0, keepAlivePeriod, TimeUnit.SECONDS);
         }
     }
 
@@ -804,7 +827,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
             {
                 if (!completeSent)
                 {
-                    handler.sendMessage(new CompleteMessage());
+                    messageSender.sendMessage(new CompleteMessage());
                     completeSent = true;
                 }
                 closeSession(State.COMPLETE);
@@ -812,10 +835,9 @@ public class StreamSession implements IEndpointStateChangeSubscriber
             else
             {
                 // notify peer that this session is completed
-                handler.sendMessage(new CompleteMessage());
+                messageSender.sendMessage(new CompleteMessage());
                 completeSent = true;
                 state(State.WAIT_COMPLETE);
-                handler.closeOutgoing();
             }
         }
         return completed;
@@ -840,46 +862,30 @@ public class StreamSession implements IEndpointStateChangeSubscriber
             receivers.put(summary.tableId, new StreamReceiveTask(this, summary.tableId, summary.files, summary.totalSize));
     }
 
-    private void startStreamingFiles()
+    private void startStreamingFiles(boolean notifyPrepared)
     {
-        streamResult.handleSessionPrepared(this);
+        if (notifyPrepared)
+            streamResult.handleSessionPrepared(this);
 
         state(State.STREAMING);
 
         for (StreamTransferTask task : transfers.values())
         {
             Collection<OutgoingFileMessage> messages = task.getFileMessages();
-            if (messages.size() > 0)
-                handler.sendMessages(messages);
-            else
-                taskCompleted(task); // there is no file to send
-        }
-    }
-
-    class KeepAliveTask implements Runnable
-    {
-        private KeepAliveMessage last = null;
-
-        public void run()
-        {
-            //to avoid jamming the message queue, we only send if the last one was sent
-            if (last == null || last.wasSent())
+            if (!messages.isEmpty())
             {
-                logger.trace("[Stream #{}] Sending keep-alive to {}.", planId(), peer);
-                last = new KeepAliveMessage();
-                try
+                for (OutgoingFileMessage ofm : messages)
                 {
-                    handler.sendMessage(last);
-                }
-                catch (RuntimeException e) //connection handler is closed
-                {
-                    logger.debug("[Stream #{}] Could not send keep-alive message (perhaps stream session is finished?).", planId(), e);
+                    // pass the session planId/index to the OFM (which is only set at init(), after the transfers have already been created)
+                    ofm.header.addSessionInfo(this);
+                    messageSender.sendMessage(ofm);
                 }
             }
             else
             {
-                logger.trace("[Stream #{}] Skip sending keep-alive to {} (previous was not yet sent).", planId(), peer);
+                taskCompleted(task); // there are no files to send
             }
         }
+        maybeCompleted();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index 748da8b..5e21712 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -20,11 +20,15 @@ package org.apache.cassandra.streaming;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Throwables;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.TableId;
@@ -37,6 +41,7 @@ import org.apache.cassandra.utils.concurrent.Ref;
  */
 public class StreamTransferTask extends StreamTask
 {
+    private static final Logger logger = LoggerFactory.getLogger(StreamTransferTask.class);
     private static final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("StreamingTransferTaskTimeouts"));
 
     private final AtomicInteger sequenceNumber = new AtomicInteger(0);
@@ -56,10 +61,10 @@ public class StreamTransferTask extends StreamTask
     public synchronized void addTransferFile(Ref<SSTableReader> ref, long estimatedKeys, List<Pair<Long, Long>> sections)
     {
         assert ref.get() != null && tableId.equals(ref.get().metadata().id);
-        OutgoingFileMessage message = new OutgoingFileMessage(ref, sequenceNumber.getAndIncrement(), estimatedKeys, sections, session.keepSSTableLevel());
+        OutgoingFileMessage message = new OutgoingFileMessage(ref, session, sequenceNumber.getAndIncrement(), estimatedKeys, sections, session.keepSSTableLevel());
         message = StreamHook.instance.reportOutgoingFile(session, ref.get(), message);
         files.put(message.header.sequenceNumber, message);
-        totalSize += message.header.size();
+                totalSize += message.header.size();
     }
 
     /**
@@ -80,6 +85,7 @@ public class StreamTransferTask extends StreamTask
             if (file != null)
                 file.complete();
 
+            logger.debug("recevied sequenceNumber {}, remaining files {}", sequenceNumber, files.keySet());
             signalComplete = files.isEmpty();
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/StreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamWriter.java b/src/java/org/apache/cassandra/streaming/StreamWriter.java
index 6c86c8b..81b3d8a 100644
--- a/src/java/org/apache/cassandra/streaming/StreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/StreamWriter.java
@@ -19,21 +19,21 @@ package org.apache.cassandra.streaming;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import java.util.Collection;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.ning.compress.lzf.LZFOutputStream;
-
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.ChannelProxy;
 import org.apache.cassandra.io.util.DataIntegrityMetadata;
 import org.apache.cassandra.io.util.DataIntegrityMetadata.ChecksumValidator;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
-import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
+import org.apache.cassandra.streaming.compress.ByteBufCompressionDataOutputStreamPlus;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
@@ -51,11 +51,6 @@ public class StreamWriter
     protected final StreamRateLimiter limiter;
     protected final StreamSession session;
 
-    private OutputStream compressedOutput;
-
-    // allocate buffer to use for transfers only once
-    private byte[] transferBuffer;
-
     public StreamWriter(SSTableReader sstable, Collection<Pair<Long, Long>> sections, StreamSession session)
     {
         this.session = session;
@@ -78,45 +73,48 @@ public class StreamWriter
         logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize = {}", session.planId(),
                      sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize);
 
-        try(RandomAccessReader file = sstable.openDataReader();
+        try(ChannelProxy proxy = sstable.getDataChannel().sharedCopy();
             ChecksumValidator validator = new File(sstable.descriptor.filenameFor(Component.CRC)).exists()
                                           ? DataIntegrityMetadata.checksumValidator(sstable.descriptor)
-                                          : null;)
+                                          : null)
         {
-            transferBuffer = validator == null ? new byte[DEFAULT_CHUNK_SIZE] : new byte[validator.chunkSize];
+            int bufferSize = validator == null ? DEFAULT_CHUNK_SIZE: validator.chunkSize;
 
             // setting up data compression stream
-            compressedOutput = new LZFOutputStream(output);
             long progress = 0L;
 
-            // stream each of the required sections of the file
-            for (Pair<Long, Long> section : sections)
+            try (DataOutputStreamPlus compressedOutput = new ByteBufCompressionDataOutputStreamPlus(output, limiter))
             {
-                long start = validator == null ? section.left : validator.chunkStart(section.left);
-                int readOffset = (int) (section.left - start);
-                // seek to the beginning of the section
-                file.seek(start);
-                if (validator != null)
-                    validator.seek(start);
-
-                // length of the section to read
-                long length = section.right - start;
-                // tracks write progress
-                long bytesRead = 0;
-                while (bytesRead < length)
+                // stream each of the required sections of the file
+                for (Pair<Long, Long> section : sections)
                 {
-                    long lastBytesRead = write(file, validator, readOffset, length, bytesRead);
-                    bytesRead += lastBytesRead;
-                    progress += (lastBytesRead - readOffset);
-                    session.progress(sstable.descriptor.filenameFor(Component.DATA), ProgressInfo.Direction.OUT, progress, totalSize);
-                    readOffset = 0;
+                    long start = validator == null ? section.left : validator.chunkStart(section.left);
+                    // if the transfer does not start on the valididator's chunk boundary, this is the number of bytes to offset by
+                    int transferOffset = (int) (section.left - start);
+                    if (validator != null)
+                        validator.seek(start);
+
+                    // length of the section to read
+                    long length = section.right - start;
+                    // tracks write progress
+                    long bytesRead = 0;
+                    while (bytesRead < length)
+                    {
+                        int toTransfer = (int) Math.min(bufferSize, length - bytesRead);
+                        long lastBytesRead = write(proxy, validator, compressedOutput, start, transferOffset, toTransfer, bufferSize);
+                        start += lastBytesRead;
+                        bytesRead += lastBytesRead;
+                        progress += (lastBytesRead - transferOffset);
+                        session.progress(sstable.descriptor.filenameFor(Component.DATA), ProgressInfo.Direction.OUT, progress, totalSize);
+                        transferOffset = 0;
+                    }
+
+                    // make sure that current section is sent
+                    output.flush();
                 }
-
-                // make sure that current section is sent
-                compressedOutput.flush();
+                logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred = {}, totalSize = {}",
+                             session.planId(), sstable.getFilename(), session.peer, FBUtilities.prettyPrintMemory(progress), FBUtilities.prettyPrintMemory(totalSize));
             }
-            logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred = {}, totalSize = {}",
-                         session.planId(), sstable.getFilename(), session.peer, FBUtilities.prettyPrintMemory(progress), FBUtilities.prettyPrintMemory(totalSize));
         }
     }
 
@@ -131,27 +129,44 @@ public class StreamWriter
     /**
      * Sequentially read bytes from the file and write them to the output stream
      *
-     * @param reader The file reader to read from
+     * @param proxy The file reader to read from
      * @param validator validator to verify data integrity
-     * @param start number of bytes to skip transfer, but include for validation.
-     * @param length The full length that should be read from {@code reader}
-     * @param bytesTransferred Number of bytes already read out of {@code length}
+     * @param start The readd offset from the beginning of the {@code proxy} file.
+     * @param transferOffset number of bytes to skip transfer, but include for validation.
+     * @param toTransfer The number of bytes to be transferred.
      *
-     * @return Number of bytes read
+     * @return Number of bytes transferred.
      *
      * @throws java.io.IOException on any I/O error
      */
-    protected long write(RandomAccessReader reader, ChecksumValidator validator, int start, long length, long bytesTransferred) throws IOException
+    protected long write(ChannelProxy proxy, ChecksumValidator validator, DataOutputStreamPlus output, long start, int transferOffset, int toTransfer, int bufferSize) throws IOException
     {
-        int toTransfer = (int) Math.min(transferBuffer.length, length - bytesTransferred);
-        int minReadable = (int) Math.min(transferBuffer.length, reader.length() - reader.getFilePointer());
+        // the count of bytes to read off disk
+        int minReadable = (int) Math.min(bufferSize, proxy.size() - start);
 
-        reader.readFully(transferBuffer, 0, minReadable);
-        if (validator != null)
-            validator.validate(transferBuffer, 0, minReadable);
+        // this buffer will hold the data from disk. as it will be compressed on the fly by
+        // ByteBufCompressionDataOutputStreamPlus.write(ByteBuffer), we can release this buffer as soon as we can.
+        ByteBuffer buffer = ByteBuffer.allocateDirect(minReadable);
+        try
+        {
+            int readCount = proxy.read(buffer, start);
+            assert readCount == minReadable : String.format("could not read required number of bytes from file to be streamed: read %d bytes, wanted %d bytes", readCount, minReadable);
+            buffer.flip();
 
-        limiter.acquire(toTransfer - start);
-        compressedOutput.write(transferBuffer, start, (toTransfer - start));
+            if (validator != null)
+            {
+                validator.validate(buffer);
+                buffer.flip();
+            }
+
+            buffer.position(transferOffset);
+            buffer.limit(transferOffset + (toTransfer - transferOffset));
+            output.write(buffer);
+        }
+        finally
+        {
+            FileUtils.clean(buffer);
+        }
 
         return toTransfer;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/StreamingMessageSender.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamingMessageSender.java b/src/java/org/apache/cassandra/streaming/StreamingMessageSender.java
new file mode 100644
index 0000000..9562981
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamingMessageSender.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+import java.io.IOException;
+
+import org.apache.cassandra.streaming.messages.StreamMessage;
+
+public interface StreamingMessageSender
+{
+    void initialize() throws IOException;
+
+    void sendMessage(StreamMessage message) throws IOException;
+
+    boolean connected();
+
+    void close();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
new file mode 100644
index 0000000..f872005
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming.async;
+
+import java.io.IOError;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelPipeline;
+import io.netty.util.AttributeKey;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.util.DataOutputBufferFixed;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus;
+import org.apache.cassandra.net.async.NettyFactory;
+import org.apache.cassandra.net.async.OutboundConnectionIdentifier;
+import org.apache.cassandra.streaming.StreamConnectionFactory;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.StreamingMessageSender;
+import org.apache.cassandra.streaming.messages.IncomingFileMessage;
+import org.apache.cassandra.streaming.messages.KeepAliveMessage;
+import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
+import org.apache.cassandra.streaming.messages.StreamInitMessage;
+import org.apache.cassandra.streaming.messages.StreamMessage;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Responsible for sending {@link StreamMessage}s to a given peer. We manage an array of netty {@link Channel}s
+ * for sending {@link OutgoingFileMessage} instances; all other {@link StreamMessage} types are sent via
+ * a special control channel. The reason for this is to treat those messages carefully and not let them get stuck
+ * behind a file transfer.
+ *
+ * One of the challenges when sending files is we might need to delay shipping the file if:
+ *
+ * - we've exceeded our network I/O use due to rate limiting (at the cassandra level)
+ * - the receiver isn't keeping up, which causes the local TCP socket buffer to not empty, which causes epoll writes to not
+ * move any bytes to the socket, which causes buffers to stick around in user-land (a/k/a cassandra) memory.
+ *
+ * When those conditions occur, it's easy enough to reschedule processing the file once the resources pick up
+ * (we acquire the permits from the rate limiter, or the socket drains). However, we need to ensure that
+ * no other messages are submitted to the same channel while the current file is still being processed.
+ */
+public class NettyStreamingMessageSender implements StreamingMessageSender
+{
+    private static final Logger logger = LoggerFactory.getLogger(NettyStreamingMessageSender.class);
+
+    private static final int DEFAULT_MAX_PARALLEL_TRANSFERS = FBUtilities.getAvailableProcessors();
+    private static final int MAX_PARALLEL_TRANSFERS = Integer.parseInt(System.getProperty(Config.PROPERTY_PREFIX + "streaming.session.parallelTransfers", Integer.toString(DEFAULT_MAX_PARALLEL_TRANSFERS)));
+
+    // a simple mechansim for allowing a degree of fairnes across multiple sessions
+    private static final Semaphore fileTransferSemaphore = new Semaphore(DEFAULT_MAX_PARALLEL_TRANSFERS, true);
+
+    private final StreamSession session;
+    private final boolean isPreview;
+    private final int protocolVersion;
+    private final OutboundConnectionIdentifier connectionId;
+    private final StreamConnectionFactory factory;
+
+    private volatile boolean closed;
+
+    /**
+     * A special {@link Channel} for sending non-file streaming messages, basically anything that isn't an
+     * {@link OutgoingFileMessage} (or an {@link IncomingFileMessage}, but a node doesn't send that, it's only received).
+     */
+    private Channel controlMessageChannel;
+
+    // note: this really doesn't need to be a LBQ, just something that's thread safe
+    private final Collection<ScheduledFuture<?>> channelKeepAlives = new LinkedBlockingQueue<>();
+
+    private final ThreadPoolExecutor fileTransferExecutor;
+
+    /**
+     * A {@link ThreadLocal} used by the threads in {@link #fileTransferExecutor} to stash references to constructed
+     * and connected {@link Channel}s.
+     */
+    private final ConcurrentMap<Thread, Channel> threadLocalChannel = new ConcurrentHashMap<>();
+
+    /**
+     * A netty channel attribute used to indicate if a channel is currently transferring a file. This is primarily used
+     * to indicate to the {@link KeepAliveTask} if it is safe to send a {@link KeepAliveMessage}, as sending the
+     * (application level) keep-alive in the middle of streaming a file would be bad news.
+     */
+    @VisibleForTesting
+    static final AttributeKey<Boolean> TRANSFERRING_FILE_ATTR = AttributeKey.valueOf("transferringFile");
+
+    public NettyStreamingMessageSender(StreamSession session, OutboundConnectionIdentifier connectionId, StreamConnectionFactory factory, int protocolVersion, boolean isPreview)
+    {
+        this.session = session;
+        this.protocolVersion = protocolVersion;
+        this.connectionId = connectionId;
+        this.factory = factory;
+        this.isPreview = isPreview;
+
+        String name = session.peer.toString().replace(':', '.');
+        fileTransferExecutor = new DebuggableThreadPoolExecutor(1, MAX_PARALLEL_TRANSFERS, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
+                                                                new NamedThreadFactory("NettyStreaming-Outbound-" + name));
+        fileTransferExecutor.allowCoreThreadTimeOut(true);
+    }
+
+    @Override
+    public void initialize() throws IOException
+    {
+        StreamInitMessage message = new StreamInitMessage(FBUtilities.getBroadcastAddress(),
+                                                          session.sessionIndex(),
+                                                          session.planId(),
+                                                          session.streamOperation(),
+                                                          session.keepSSTableLevel(),
+                                                          session.getPendingRepair(),
+                                                          session.getPreviewKind());
+        sendMessage(message);
+    }
+
+    public boolean hasControlChannel()
+    {
+        return controlMessageChannel != null;
+    }
+
+    public void injectControlMessageChannel(Channel channel)
+    {
+        this.controlMessageChannel = channel;
+        channel.attr(TRANSFERRING_FILE_ATTR).set(Boolean.FALSE);
+        scheduleKeepAliveTask(channel);
+    }
+
+    private void setupControlMessageChannel() throws IOException
+    {
+        if (controlMessageChannel == null)
+        {
+            controlMessageChannel = createChannel();
+            scheduleKeepAliveTask(controlMessageChannel);
+        }
+    }
+
+    private void scheduleKeepAliveTask(Channel channel)
+    {
+        int keepAlivePeriod = DatabaseDescriptor.getStreamingKeepAlivePeriod();
+        logger.debug("{} Scheduling keep-alive task with {}s period.", createLogTag(session, channel), keepAlivePeriod);
+
+        KeepAliveTask task = new KeepAliveTask(channel, session);
+        ScheduledFuture<?> scheduledFuture = channel.eventLoop().scheduleAtFixedRate(task, 0, keepAlivePeriod, TimeUnit.SECONDS);
+        channelKeepAlives.add(scheduledFuture);
+        task.future = scheduledFuture;
+    }
+    
+    private Channel createChannel() throws IOException
+    {
+        Channel channel = factory.createConnection(connectionId, protocolVersion);
+        ChannelPipeline pipeline = channel.pipeline();
+        pipeline.addLast(NettyFactory.instance.streamingGroup, NettyFactory.INBOUND_STREAM_HANDLER_NAME, new StreamingInboundHandler(connectionId.remoteAddress(), protocolVersion, session));
+        channel.attr(TRANSFERRING_FILE_ATTR).set(Boolean.FALSE);
+        return channel;
+    }
+
+     static String createLogTag(StreamSession session, Channel channel)
+    {
+        StringBuilder sb = new StringBuilder(64);
+        sb.append("[Stream");
+
+        if (session != null)
+                sb.append(" #").append(session.planId());
+
+        if (channel != null)
+                sb.append(" channel: ").append(channel.id());
+
+        sb.append(']');
+        return sb.toString();
+    }
+
+    @Override
+    public void sendMessage(StreamMessage message)
+    {
+        if (closed)
+            throw new RuntimeException("stream has been closed, cannot send " + message);
+
+        if (message instanceof OutgoingFileMessage)
+        {
+            if (isPreview)
+                throw new RuntimeException("Cannot send file messages for preview streaming sessions");
+            logger.debug("{} Sending {}", createLogTag(session, null), message);
+            fileTransferExecutor.submit(new FileStreamTask((OutgoingFileMessage)message));
+            return;
+        }
+
+        try
+        {
+            setupControlMessageChannel();
+            sendControlMessage(controlMessageChannel, message, future -> onControlMessageComplete(future, message));
+        }
+        catch (Exception e)
+        {
+            close();
+            session.onError(e);
+        }
+    }
+
+    private void sendControlMessage(Channel channel, StreamMessage message, GenericFutureListener listener) throws IOException
+    {
+        logger.debug("{} Sending {}", createLogTag(session, channel), message);
+
+        // we anticipate that the control messages are rather small, so allocating a ByteBuf shouldn't  blow out of memory.
+        long messageSize = StreamMessage.serializedSize(message, protocolVersion);
+        if (messageSize > 1 << 30)
+        {
+            throw new IllegalStateException(String.format("%s something is seriously wrong with the calculated stream control message's size: %d bytes, type is %s",
+                                                          createLogTag(session, channel), messageSize, message.type));
+        }
+
+        // as control messages are (expected to be) small, we can simply allocate a ByteBuf here, wrap it, and send via the channel
+        ByteBuf buf = channel.alloc().directBuffer((int) messageSize, (int) messageSize);
+        ByteBuffer nioBuf = buf.nioBuffer(0, (int) messageSize);
+        @SuppressWarnings("resource")
+        DataOutputBufferFixed out = new DataOutputBufferFixed(nioBuf);
+        StreamMessage.serialize(message, out, protocolVersion, session);
+        assert nioBuf.position() == nioBuf.limit();
+        buf.writerIndex(nioBuf.position());
+
+        ChannelFuture channelFuture = channel.writeAndFlush(buf);
+        channelFuture.addListener(future -> listener.operationComplete(future));
+    }
+
+    /**
+     * Decides what to do after a {@link StreamMessage} is processed.
+     *
+     * Note: this is called from the netty event loop.
+     *
+     * @return null if the message was processed sucessfully; else, a {@link java.util.concurrent.Future} to indicate
+     * the status of aborting any remaining tasks in the session.
+     */
+    java.util.concurrent.Future onControlMessageComplete(Future<?> future, StreamMessage msg)
+    {
+        ChannelFuture channelFuture = (ChannelFuture)future;
+        Throwable cause = future.cause();
+        if (cause == null)
+            return null;
+
+        Channel channel = channelFuture.channel();
+        logger.error("{} failed to send a stream message/file to peer {}: msg = {}",
+                     createLogTag(session, channel), connectionId, msg, future.cause());
+
+        // StreamSession will invoke close(), but we have to mark this sender as closed so the session doesn't try
+        // to send any failure messages
+        return session.onError(cause);
+    }
+
+    class FileStreamTask implements Runnable
+    {
+        /**
+         * Time interval, in minutes, to wait between logging a message indicating that we're waiting on a semaphore
+         * permit to become available.
+         */
+        private static final int SEMAPHORE_UNAVAILABLE_LOG_INTERVAL = 3;
+
+        /**
+         * Even though we expect only an {@link OutgoingFileMessage} at runtime, the type here is {@link StreamMessage}
+         * to facilitate simpler testing.
+         */
+        private final StreamMessage msg;
+
+        FileStreamTask(OutgoingFileMessage ofm)
+        {
+            this.msg = ofm;
+        }
+
+        /**
+         * For testing purposes
+         */
+        FileStreamTask(StreamMessage msg)
+        {
+            this.msg = msg;
+        }
+
+        @Override
+        public void run()
+        {
+            if (!acquirePermit(SEMAPHORE_UNAVAILABLE_LOG_INTERVAL))
+                return;
+
+            try
+            {
+                Channel channel = getOrCreateChannel();
+                if (!channel.attr(TRANSFERRING_FILE_ATTR).compareAndSet(false, true))
+                    throw new IllegalStateException("channel's transferring state is currently set to true. refusing to start new stream");
+
+                // close the DataOutputStreamPlus as we're done with it - but don't close the channel
+                try (DataOutputStreamPlus outPlus = ByteBufDataOutputStreamPlus.create(session, channel, 1 << 16))
+                {
+                    StreamMessage.serialize(msg, outPlus, protocolVersion, session);
+                    channel.flush();
+                }
+                finally
+                {
+                    channel.attr(TRANSFERRING_FILE_ATTR).set(Boolean.FALSE);
+                }
+            }
+            catch (Exception e)
+            {
+                session.onError(e);
+            }
+            finally
+            {
+                fileTransferSemaphore.release();
+            }
+        }
+
+        boolean acquirePermit(int logInterval)
+        {
+            long logIntervalNanos = TimeUnit.MINUTES.toNanos(logInterval);
+            long timeOfLastLogging = System.nanoTime();
+            while (true)
+            {
+                if (closed)
+                    return false;
+                try
+                {
+                    if (fileTransferSemaphore.tryAcquire(1, TimeUnit.SECONDS))
+                        return true;
+
+                    // log a helpful message to operators in case they are wondering why a given session might not be making progress.
+                    long now = System.nanoTime();
+                    if (now - timeOfLastLogging > logIntervalNanos)
+                    {
+                        timeOfLastLogging = now;
+                        OutgoingFileMessage ofm = (OutgoingFileMessage)msg;
+                        logger.info("{} waiting to acquire a permit to begin streaming file {}. This message logs every {} minutes",
+                                    createLogTag(session, null), ofm.getFilename(), logInterval);
+                    }
+                }
+                catch (InterruptedException ie)
+                {
+                    //ignore
+                }
+            }
+        }
+
+        private Channel getOrCreateChannel()
+        {
+            Thread currentThread = Thread.currentThread();
+            try
+            {
+                Channel channel = threadLocalChannel.get(currentThread);
+                if (channel != null)
+                    return channel;
+
+                channel = createChannel();
+                threadLocalChannel.put(currentThread, channel);
+                return channel;
+            }
+            catch (Exception e)
+            {
+                throw new IOError(e);
+            }
+        }
+
+        /**
+         * For testing purposes
+         */
+        void injectChannel(Channel channel)
+        {
+            Thread currentThread = Thread.currentThread();
+            if (threadLocalChannel.get(currentThread) != null)
+                throw new IllegalStateException("previous channel already set");
+
+            threadLocalChannel.put(currentThread, channel);
+        }
+
+        /**
+         * For testing purposes
+         */
+        void unsetChannel()
+        {
+            threadLocalChannel.remove(Thread.currentThread());
+        }
+    }
+
+    /**
+     * Periodically sends the {@link KeepAliveMessage}.
+     *
+     * NOTE: this task, and the callback function {@link #keepAliveListener(Future)} is executed in the netty event loop.
+     */
+    class KeepAliveTask implements Runnable
+    {
+        private final Channel channel;
+        private final StreamSession session;
+
+        /**
+         * A reference to the scheduled task for this instance so that it may be cancelled.
+         */
+        ScheduledFuture<?> future;
+
+        KeepAliveTask(Channel channel, StreamSession session)
+        {
+            this.channel = channel;
+            this.session = session;
+        }
+
+        public void run()
+        {
+            // if the channel has been closed, cancel the scheduled task and return
+            if (!channel.isOpen() || closed)
+            {
+                future.cancel(false);
+                return;
+            }
+
+            // if the channel is currently processing streaming, skip this execution. As this task executes
+            // on the event loop, even if there is a race with a FileStreamTask which changes the channel attribute
+            // after we check it, the FileStreamTask cannot send out any bytes as this KeepAliveTask is executing
+            // on the event loop (and FileStreamTask publishes it's buffer to the channel, consumed after we're done here).
+            if (channel.attr(TRANSFERRING_FILE_ATTR).get())
+                return;
+
+            try
+            {
+                logger.trace("{} Sending keep-alive to {}.", createLogTag(session, channel), session.peer);
+                sendControlMessage(channel, new KeepAliveMessage(), this::keepAliveListener);
+            }
+            catch (IOException ioe)
+            {
+                future.cancel(false);
+            }
+        }
+
+        private void keepAliveListener(Future<? super Void> future)
+        {
+            if (future.isSuccess() || future.isCancelled())
+                return;
+
+            logger.debug("{} Could not send keep-alive message (perhaps stream session is finished?).",
+                         createLogTag(session, channel), future.cause());
+        }
+    }
+
+    /**
+     * For testing purposes only.
+     */
+    void setClosed()
+    {
+        closed = true;
+    }
+
+    void setControlMessageChannel(Channel channel)
+    {
+        controlMessageChannel = channel;
+    }
+
+    int semaphoreAvailablePermits()
+    {
+        return fileTransferSemaphore.availablePermits();
+    }
+
+    @Override
+    public boolean connected()
+    {
+        return !closed;
+    }
+
+    @Override
+    public void close()
+    {
+        closed = true;
+        logger.debug("{} Closing stream connection channels on {}", createLogTag(session, null), connectionId);
+        channelKeepAlives.stream().map(scheduledFuture -> scheduledFuture.cancel(false));
+        channelKeepAlives.clear();
+
+        threadLocalChannel.values().stream().map(channel -> channel.close());
+        threadLocalChannel.clear();
+        fileTransferExecutor.shutdownNow();
+
+        if (controlMessageChannel != null)
+            controlMessageChannel.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/async/StreamCompressionSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/async/StreamCompressionSerializer.java b/src/java/org/apache/cassandra/streaming/async/StreamCompressionSerializer.java
new file mode 100644
index 0000000..ca15b78
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/async/StreamCompressionSerializer.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming.async;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4FastDecompressor;
+import org.apache.cassandra.io.util.DataInputPlus;
+
+/**
+ * A serialiazer for stream compressed files (see package-level documentation). Much like a typical compressed
+ * output stream, this class operates on buffers or chunks of the data at a a time. The format for each compressed
+ * chunk is as follows:
+ *
+ * - int - compressed payload length
+ * - int - uncompressed payload length
+ * - bytes - compressed payload
+ */
+public class StreamCompressionSerializer
+{
+    private final ByteBufAllocator allocator;
+
+    public StreamCompressionSerializer(ByteBufAllocator allocator)
+    {
+        this.allocator = allocator;
+    }
+
+    /**
+     * Length of heaer data, which includes compressed length, uncompressed length.
+     */
+    private static final int HEADER_LENGTH = 8;
+
+    /**
+     * @return A buffer with decompressed data.
+     */
+    public ByteBuf serialize(LZ4Compressor compressor, ByteBuffer in, int version)
+    {
+        final int uncompressedLength = in.remaining();
+        int maxLength = compressor.maxCompressedLength(uncompressedLength);
+        ByteBuf out = allocator.directBuffer(maxLength);
+        try
+        {
+            ByteBuffer compressedNioBuffer = out.nioBuffer(HEADER_LENGTH, maxLength - HEADER_LENGTH);
+            compressor.compress(in, compressedNioBuffer);
+            final int compressedLength = compressedNioBuffer.position();
+            out.setInt(0, compressedLength);
+            out.setInt(4, uncompressedLength);
+            out.writerIndex(HEADER_LENGTH + compressedLength);
+        }
+        catch (Exception e)
+        {
+            if (out != null)
+                out.release();
+        }
+        return out;
+    }
+
+    /**
+     * @return A buffer with decompressed data.
+     */
+    public ByteBuf deserialize(LZ4FastDecompressor decompressor, DataInputPlus in, int version) throws IOException
+    {
+        final int compressedLength = in.readInt();
+        final int uncompressedLength = in.readInt();
+
+        // there's no guarantee the next compressed block is contained within one buffer in the input,
+        // so hence we need a 'staging' buffer to get all the bytes into one contiguous buffer for the decompressor
+        ByteBuf compressed = null;
+        ByteBuf uncompressed = null;
+        try
+        {
+            final ByteBuffer compressedNioBuffer;
+
+            // ReadableByteChannel allows us to keep the bytes off-heap because we pass a ByteBuffer to RBC.read(BB),
+            // DataInputPlus.read() takes a byte array (thus, an on-heap array).
+            if (in instanceof ReadableByteChannel)
+            {
+                compressed = allocator.directBuffer(compressedLength);
+                compressedNioBuffer = compressed.nioBuffer(0, compressedLength);
+                int readLength = ((ReadableByteChannel) in).read(compressedNioBuffer);
+                assert readLength == compressedNioBuffer.position();
+                compressedNioBuffer.flip();
+            }
+            else
+            {
+                byte[] compressedBytes = new byte[compressedLength];
+                in.readFully(compressedBytes);
+                compressedNioBuffer = ByteBuffer.wrap(compressedBytes);
+            }
+
+            uncompressed = allocator.directBuffer(uncompressedLength);
+            ByteBuffer uncompressedNioBuffer = uncompressed.nioBuffer(0, uncompressedLength);
+            decompressor.decompress(compressedNioBuffer, uncompressedNioBuffer);
+            uncompressed.writerIndex(uncompressedLength);
+            return uncompressed;
+        }
+        catch (Exception e)
+        {
+            if (uncompressed != null)
+                uncompressed.release();
+
+            if (e instanceof IOException)
+                throw e;
+            throw new IOException(e);
+        }
+        finally
+        {
+            if (compressed != null)
+                compressed.release();
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org