You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2017/08/22 20:55:09 UTC

[02/11] cassandra git commit: move streaming to use netty

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java b/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
new file mode 100644
index 0000000..cc6f9e0
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming.async;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.concurrent.FastThreadLocalThread;
+import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus;
+import org.apache.cassandra.streaming.StreamManager;
+import org.apache.cassandra.streaming.StreamReceiveException;
+import org.apache.cassandra.streaming.StreamResultFuture;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.messages.FileMessageHeader;
+import org.apache.cassandra.streaming.messages.IncomingFileMessage;
+import org.apache.cassandra.streaming.messages.KeepAliveMessage;
+import org.apache.cassandra.streaming.messages.StreamInitMessage;
+import org.apache.cassandra.streaming.messages.StreamMessage;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+
+import static org.apache.cassandra.streaming.async.NettyStreamingMessageSender.createLogTag;
+
+/**
+ * Handles the inbound side of streaming messages and sstable data. From the incoming data, we derserialize the message
+ * and potentially reify partitions and rows and write those out to new sstable files. Because deserialization is a blocking affair,
+ * we can't block the netty event loop. Thus we have a background thread perform all the blocking deserialization.
+ */
+public class StreamingInboundHandler extends ChannelInboundHandlerAdapter
+{
+    private static final Logger logger = LoggerFactory.getLogger(StreamingInboundHandler.class);
+    static final Function<SessionIdentifier, StreamSession> DEFAULT_SESSION_PROVIDER = sid -> StreamManager.instance.findSession(sid.from, sid.planId, sid.sessionIndex);
+
+    private static final int AUTO_READ_LOW_WATER_MARK = 1 << 15;
+    private static final int AUTO_READ_HIGH_WATER_MARK = 1 << 16;
+
+    private final InetSocketAddress remoteAddress;
+    private final int protocolVersion;
+
+    private final StreamSession session;
+
+    /**
+     * A collection of {@link ByteBuf}s that are yet to be processed. Incoming buffers are first dropped into this
+     * structure, and then consumed.
+     * <p>
+     * For thread safety, this structure's resources are released on the consuming thread
+     * (via {@link RebufferingByteBufDataInputPlus#close()},
+     * but the producing side calls {@link RebufferingByteBufDataInputPlus#markClose()} to notify the input that is should close.
+     */
+    private RebufferingByteBufDataInputPlus buffers;
+
+    private volatile boolean closed;
+
+    public StreamingInboundHandler(InetSocketAddress remoteAddress, int protocolVersion, @Nullable StreamSession session)
+    {
+        this.remoteAddress = remoteAddress;
+        this.protocolVersion = protocolVersion;
+        this.session = session;
+    }
+
+    @Override
+    @SuppressWarnings("resource")
+    public void handlerAdded(ChannelHandlerContext ctx)
+    {
+        buffers = new RebufferingByteBufDataInputPlus(AUTO_READ_LOW_WATER_MARK, AUTO_READ_HIGH_WATER_MARK, ctx.channel().config());
+        Thread blockingIOThread = new FastThreadLocalThread(new StreamDeserializingTask(DEFAULT_SESSION_PROVIDER, session, ctx.channel()),
+                                                            String.format("Stream-Deserializer-%s-%s", remoteAddress.toString(), ctx.channel().id()));
+        blockingIOThread.setDaemon(true);
+        blockingIOThread.start();
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object message)
+    {
+        if (!closed && message instanceof ByteBuf)
+            buffers.append((ByteBuf) message);
+        else
+            ReferenceCountUtil.release(message);
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx)
+    {
+        close();
+        ctx.fireChannelInactive();
+    }
+
+    void close()
+    {
+        closed = true;
+        buffers.markClose();
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+    {
+        if (cause instanceof IOException)
+            logger.trace("connection problem while streaming", cause);
+        else
+            logger.warn("exception occurred while in processing streaming file", cause);
+        close();
+    }
+
+    /**
+     * For testing only!!
+     */
+    void setPendingBuffers(RebufferingByteBufDataInputPlus bufChannel)
+    {
+        this.buffers = bufChannel;
+    }
+
+    /**
+     * The task that performs the actual deserialization.
+     */
+    class StreamDeserializingTask implements Runnable
+    {
+        private final Function<SessionIdentifier, StreamSession> sessionProvider;
+        private final Channel channel;
+
+        @VisibleForTesting
+        StreamSession session;
+
+        StreamDeserializingTask(Function<SessionIdentifier, StreamSession> sessionProvider, StreamSession session, Channel channel)
+        {
+            this.sessionProvider = sessionProvider;
+            this.session = session;
+            this.channel = channel;
+        }
+
+        @Override
+        public void run()
+        {
+            try
+            {
+                while (true)
+                {
+                    // do a check of available bytes and possibly sleep some amount of time (then continue).
+                    // this way we can break out of run() sanely or we end up blocking indefintely in StreamMessage.deserialize()
+                    while (buffers.available() == 0)
+                    {
+                        if (closed)
+                            return;
+
+                        Uninterruptibles.sleepUninterruptibly(400, TimeUnit.MILLISECONDS);
+                    }
+
+                    StreamMessage message = StreamMessage.deserialize(buffers, protocolVersion, null);
+
+                    // keep-alives don't necessarily need to be tied to a session (they could be arrive before or after
+                    // wrt session lifecycle, due to races), just log that we received the message and carry on
+                    if (message instanceof KeepAliveMessage)
+                    {
+                        logger.debug("{} Received {}", createLogTag(session, channel), message);
+                        continue;
+                    }
+
+                    if (session == null)
+                        session = deriveSession(message);
+                    logger.debug("{} Received {}", createLogTag(session, channel), message);
+                    session.messageReceived(message);
+                }
+            }
+            catch (EOFException eof)
+            {
+                // ignore
+            }
+            catch (Throwable t)
+            {
+                JVMStabilityInspector.inspectThrowable(t);
+                if (session != null)
+                {
+                    session.onError(t);
+                }
+                else if (t instanceof StreamReceiveException)
+                {
+                    ((StreamReceiveException)t).session.onError(t);
+                }
+                else
+                {
+                    logger.error("{} stream operation from {} failed", createLogTag(session, channel), remoteAddress, t);
+                }
+            }
+            finally
+            {
+                channel.close();
+                closed = true;
+
+                if (buffers != null)
+                    buffers.close();
+            }
+        }
+
+        StreamSession deriveSession(StreamMessage message) throws IOException
+        {
+            StreamSession streamSession = null;
+            // StreamInitMessage starts a new channel, and IncomingFileMessage potentially, as well.
+            // IncomingFileMessage needs a session to be established a priori, though
+            if (message instanceof StreamInitMessage)
+            {
+                assert session == null : "initiator of stream session received a StreamInitMessage";
+                StreamInitMessage init = (StreamInitMessage) message;
+                StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.streamOperation, init.from, channel, init.keepSSTableLevel, init.pendingRepair, init.previewKind);
+                streamSession = sessionProvider.apply(new SessionIdentifier(init.from, init.planId, init.sessionIndex));
+            }
+            else if (message instanceof IncomingFileMessage)
+            {
+                // TODO: it'd be great to check if the session actually exists before slurping in the entire sstable,
+                // but that's a refactoring for another day
+                FileMessageHeader header = ((IncomingFileMessage) message).header;
+                streamSession = sessionProvider.apply(new SessionIdentifier(header.sender, header.planId, header.sessionIndex));
+            }
+
+            if (streamSession == null)
+                throw new IllegalStateException(createLogTag(null, channel) + " no session found for message " + message);
+
+            streamSession.attach(channel);
+            return streamSession;
+        }
+    }
+
+    /**
+     * A simple struct to wrap the data points required to lookup a {@link StreamSession}
+     */
+    static class SessionIdentifier
+    {
+        final InetAddress from;
+        final UUID planId;
+        final int sessionIndex;
+
+        SessionIdentifier(InetAddress from, UUID planId, int sessionIndex)
+        {
+            this.from = from;
+            this.planId = planId;
+            this.sessionIndex = sessionIndex;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/async/package-info.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/async/package-info.java b/src/java/org/apache/cassandra/streaming/async/package-info.java
new file mode 100644
index 0000000..ecf5115
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/async/package-info.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <h1>Non-blocking streaming with netty</h1>
+ * This document describes the implementation details of streaming protocol. A listener for a streaming
+ * session listens on the same socket as internode messaging, and participates in the same handshake protocol
+ * That protocol is described in the package-level documentation for {@link org.apache.cassandra.net.async}, and
+ * thus not here.
+ *
+ * Streaming 2.0 was implemented as CASSANDRA-5286. Streaming 2.0 used (the equivalent of) a single thread and
+ * a single socket to transfer sstables sequentially to a peer (either as part of a repair, bootstrap, and so on).
+ * Part of the motivation for switching to netty and a non-blocking model as to enable file transfers to occur
+ * in parallel for a given session.
+ *
+ * Thus, a more detailed approach is required for stream session management.
+ *
+ * <h2>Session setup and management</h2>
+ *
+ * The full details of the session lifecycle are documented in {@link org.apache.cassandra.streaming.StreamSession}.
+ *
+ *
+ * <h2>File transfer</h2>
+ *
+ * When tranferring whole or subsections of an sstable, only the DATA component is shipped. To that end,
+ * there are three "modes" of an sstable transfer that need to be handled somewhat differently:
+ *
+ * 1) uncompressed sstable - data needs to be read into user space so it can be manipulated: checksum validation,
+ * apply stream compression (see next section), and/or TLS encryption.
+ *
+ * 2) compressed sstable, transferred with SSL/TLS - data needs to be read into user space as that is where the TLS encryption
+ * needs to happen. Netty does not allow the pretense of doing zero-copy transfers when TLS is in the pipeline;
+ * data must explicitly be pulled into user-space memory for TLS encryption to work.
+ *
+ * 3) compressed sstable, transferred without SSL/TLS - data can be streamed via zero-copy transfer as the data does not
+ * need to be manipulated (it can be sent "as-is").
+ *
+ * <h3>Compressing the data</h3>
+ * We always want to transfer as few bytes as possible of the wire when streaming a file. If the
+ * sstable is not already compressed via table compression options, we apply an on-the-fly stream compression
+ * to the data. The stream compression format is documented in
+ * {@link org.apache.cassandra.streaming.async.StreamCompressionSerializer}
+ *
+ * You may be wondering: why implement your own compression scheme? why not use netty's built-in compression codecs,
+ * like {@link io.netty.handler.codec.compression.Lz4FrameEncoder}? That makes complete sense if all the sstables
+ * to be streamed are non using sstable compression (and obviously you wouldn't use stream compression when the sstables
+ * are using sstable compression). The problem is when you have a mix of files, some using sstable compression
+ * and some not. You can either:
+ *
+ * - send the files of one type over one kind of socket, and the others over another socket
+ * - send them both over the same socket, but then auto-adjust per each file type.
+ *
+ * I've opted for the latter to keep socket/channel management simpler and cleaner.
+ */
+package org.apache.cassandra.streaming.async;
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/compress/ByteBufCompressionDataOutputStreamPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/ByteBufCompressionDataOutputStreamPlus.java b/src/java/org/apache/cassandra/streaming/compress/ByteBufCompressionDataOutputStreamPlus.java
new file mode 100644
index 0000000..3f1b22b
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/compress/ByteBufCompressionDataOutputStreamPlus.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming.compress;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import io.netty.buffer.ByteBuf;
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
+import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus;
+import org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
+import org.apache.cassandra.streaming.async.StreamCompressionSerializer;
+import org.apache.cassandra.streaming.messages.StreamMessage;
+
+/**
+ * The intent of this class is to only be used in a very narrow use-case: on the stream compression path of streaming.
+ * This class should really only get calls to {@link #write(ByteBuffer)}, where the incoming buffer is compressed and sent
+ * downstream.
+ */
+public class ByteBufCompressionDataOutputStreamPlus extends WrappedDataOutputStreamPlus
+{
+    private final StreamRateLimiter limiter;
+    private final LZ4Compressor compressor;
+    private final StreamCompressionSerializer serializer;
+
+    public ByteBufCompressionDataOutputStreamPlus(DataOutputStreamPlus out, StreamRateLimiter limiter)
+    {
+        super(out);
+        assert out instanceof ByteBufDataOutputStreamPlus;
+        compressor = LZ4Factory.fastestInstance().fastCompressor();
+        serializer = new StreamCompressionSerializer(((ByteBufDataOutputStreamPlus)out).getAllocator());
+        this.limiter = limiter;
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * Compress the incoming buffer and send the result downstream. The buffer parameter will not be used nor passed
+     * to downstream components, and thus callers can safely free the buffer upon return.
+     */
+    @Override
+    public void write(ByteBuffer buffer) throws IOException
+    {
+        ByteBuf compressed = serializer.serialize(compressor, buffer, StreamMessage.CURRENT_VERSION);
+
+        // this is a blocking call - you have been warned
+        limiter.acquire(compressed.readableBytes());
+
+        ((ByteBufDataOutputStreamPlus)out).writeToChannel(compressed);
+    }
+
+    @Override
+    public void close()
+    {
+        // explicitly overriding close() to avoid closing the wrapped stream; it will be closed via other means
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 003db61..76f76ea 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -19,7 +19,8 @@ package org.apache.cassandra.streaming.compress;
 
 import java.io.EOFException;
 import java.io.IOException;
-import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
 import java.util.Iterator;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
@@ -27,42 +28,45 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Supplier;
 
 import com.google.common.collect.Iterators;
-import com.google.common.primitives.Ints;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import io.netty.util.concurrent.FastThreadLocalThread;
 import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.RebufferingInputStream;
+import org.apache.cassandra.streaming.StreamReader.StreamDeserializer;
 import org.apache.cassandra.utils.ChecksumType;
 import org.apache.cassandra.utils.WrappedRunnable;
 
 /**
- * InputStream which reads data from underlining source with given {@link CompressionInfo}.
+ * InputStream which reads data from underlining source with given {@link CompressionInfo}. Uses {@link #buffer} as a buffer
+ * for uncompressed data (which is read by stream consumers - {@link StreamDeserializer} in this case).
  */
-public class CompressedInputStream extends InputStream
+public class CompressedInputStream extends RebufferingInputStream
 {
 
     private static final Logger logger = LoggerFactory.getLogger(CompressedInputStream.class);
 
     private final CompressionInfo info;
     // chunk buffer
-    private final BlockingQueue<byte[]> dataBuffer;
+    private final BlockingQueue<ByteBuffer> dataBuffer;
     private final Supplier<Double> crcCheckChanceSupplier;
 
-    // uncompressed bytes
-    private final byte[] buffer;
+    /**
+     * The base offset of the current {@link #buffer} from the beginning of the stream.
+     */
+    private long bufferOffset = 0;
 
-    // offset from the beginning of the buffer
-    protected long bufferOffset = 0;
-    // current position in stream
+    /**
+     * The current {@link CompressedStreamReader#sections} offset in the stream.
+     */
     private long current = 0;
-    // number of bytes in the buffer that are actually valid
-    protected int validBufferBytes = -1;
 
     private final ChecksumType checksumType;
 
-    // raw checksum bytes
-    private final byte[] checksumBytes = new byte[4];
+    private static final int CHECKSUM_LENGTH = 4;
 
     /**
      * Indicates there was a problem when reading from source stream.
@@ -71,9 +75,9 @@ public class CompressedInputStream extends InputStream
      * with the cause of the error when reading from source stream, so it is
      * thrown to the consumer on subsequent read operation.
      */
-    private static final byte[] POISON_PILL = new byte[0];
+    private static final ByteBuffer POISON_PILL = ByteBuffer.wrap(new byte[0]);
 
-    protected volatile IOException readException = null;
+    private volatile IOException readException = null;
 
     private long totalCompressedBytesRead;
 
@@ -81,11 +85,11 @@ public class CompressedInputStream extends InputStream
      * @param source Input source to read compressed data from
      * @param info Compression info
      */
-    public CompressedInputStream(InputStream source, CompressionInfo info, ChecksumType checksumType, Supplier<Double> crcCheckChanceSupplier)
+    public CompressedInputStream(DataInputPlus source, CompressionInfo info, ChecksumType checksumType, Supplier<Double> crcCheckChanceSupplier)
     {
+        super(ByteBuffer.allocateDirect(info.parameters.chunkLength()));
+        buffer.limit(buffer.position()); // force the buffer to appear "consumed" so that it triggers reBuffer on the first read
         this.info = info;
-        this.buffer = new byte[info.parameters.chunkLength()];
-        // buffer is limited to store up to 1024 chunks
         this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
         this.crcCheckChanceSupplier = crcCheckChanceSupplier;
         this.checksumType = checksumType;
@@ -93,19 +97,50 @@ public class CompressedInputStream extends InputStream
         new FastThreadLocalThread(new Reader(source, info, dataBuffer)).start();
     }
 
-    private void decompressNextChunk() throws IOException
+    /**
+     * Invoked when crossing into the next stream boundary in {@link CompressedStreamReader#sections}.
+     */
+    public void position(long position) throws IOException
     {
         if (readException != null)
             throw readException;
 
+        assert position >= current : "stream can only read forward.";
+        current = position;
+
+        if (current > bufferOffset + buffer.limit())
+            reBuffer(false);
+
+        buffer.position((int)(current - bufferOffset));
+    }
+
+    protected void reBuffer() throws IOException
+    {
+        reBuffer(true);
+    }
+
+    private void reBuffer(boolean updateCurrent) throws IOException
+    {
+        if (readException != null)
+        {
+            FileUtils.clean(buffer);
+            buffer = null;
+            throw readException;
+        }
+
+        // increment the offset into the stream based on the current buffer's read count
+        if (updateCurrent)
+            current += buffer.position();
+
         try
         {
-            byte[] compressedWithCRC = dataBuffer.take();
+            ByteBuffer compressedWithCRC = dataBuffer.take();
             if (compressedWithCRC == POISON_PILL)
             {
                 assert readException != null;
                 throw readException;
             }
+
             decompress(compressedWithCRC);
         }
         catch (InterruptedException e)
@@ -114,74 +149,49 @@ public class CompressedInputStream extends InputStream
         }
     }
 
-    @Override
-    public int read() throws IOException
+    private void decompress(ByteBuffer compressed) throws IOException
     {
-        if (current >= bufferOffset + buffer.length || validBufferBytes == -1)
-            decompressNextChunk();
-
-        assert current >= bufferOffset && current < bufferOffset + validBufferBytes;
-
-        return ((int) buffer[(int) (current++ - bufferOffset)]) & 0xff;
-    }
-
-    @Override
-    public int read(byte[] b, int off, int len) throws IOException
-    {
-        long nextCurrent = current + len;
-
-        if (current >= bufferOffset + buffer.length || validBufferBytes == -1)
-            decompressNextChunk();
+        final int compressedChunkLength = info.parameters.chunkLength();
+        int length = compressed.remaining();
 
-        assert nextCurrent >= bufferOffset;
-
-        int read = 0;
-        while (read < len)
+        // uncompress if the buffer size is less than chunk size. else, if the buffer size is equal to the compressedChunkLength,
+        // we assume the buffer is not compressed. see CASSANDRA-10520
+        final boolean releaseCompressedBuffer;
+        if (length - CHECKSUM_LENGTH < compressedChunkLength)
         {
-            int nextLen = Math.min((len - read), (int)((bufferOffset + validBufferBytes) - current));
-
-            System.arraycopy(buffer, (int)(current - bufferOffset), b, off + read, nextLen);
-            read += nextLen;
-
-            current += nextLen;
-            if (read != len)
-                decompressNextChunk();
+            buffer.clear();
+            compressed.limit(length - CHECKSUM_LENGTH);
+            info.parameters.getSstableCompressor().uncompress(compressed, buffer);
+            buffer.flip();
+            releaseCompressedBuffer = true;
         }
-
-        return len;
-    }
-
-    public void position(long position)
-    {
-        assert position >= current : "stream can only read forward.";
-        current = position;
-    }
-
-    private void decompress(byte[] compressed) throws IOException
-    {
-        // uncompress
-        if (compressed.length - checksumBytes.length < info.parameters.maxCompressedLength())
-            validBufferBytes = info.parameters.getSstableCompressor().uncompress(compressed, 0, compressed.length - checksumBytes.length, buffer, 0);
         else
         {
-            validBufferBytes = compressed.length - checksumBytes.length;
-            System.arraycopy(compressed, 0, buffer, 0, validBufferBytes);
+            FileUtils.clean(buffer);
+            buffer = compressed;
+            buffer.limit(length - CHECKSUM_LENGTH);
+            releaseCompressedBuffer = false;
         }
-        totalCompressedBytesRead += compressed.length;
+        totalCompressedBytesRead += length;
 
         // validate crc randomly
         double crcCheckChance = this.crcCheckChanceSupplier.get();
         if (crcCheckChance > 0d && crcCheckChance > ThreadLocalRandom.current().nextDouble())
         {
-            int checksum = (int) checksumType.of(compressed, 0, compressed.length - checksumBytes.length);
+            ByteBuffer crcBuf = compressed.duplicate();
+            crcBuf.limit(length - CHECKSUM_LENGTH).position(0);
+            int checksum = (int) checksumType.of(crcBuf);
 
-            System.arraycopy(compressed, compressed.length - checksumBytes.length, checksumBytes, 0, checksumBytes.length);
-            if (Ints.fromByteArray(checksumBytes) != checksum)
+            crcBuf.limit(length);
+            if (crcBuf.getInt() != checksum)
                 throw new IOException("CRC unmatched");
         }
 
+        if (releaseCompressedBuffer)
+            FileUtils.clean(compressed);
+
         // buffer offset is always aligned
-        bufferOffset = current & ~(buffer.length - 1);
+        bufferOffset = current & ~(compressedChunkLength - 1);
     }
 
     public long getTotalCompressedBytesRead()
@@ -189,13 +199,26 @@ public class CompressedInputStream extends InputStream
         return totalCompressedBytesRead;
     }
 
+    /**
+     * Releases the resources specific to this instance, but not the {@link DataInputPlus} that is used by the {@link Reader}.
+     */
+    @Override
+    public void close()
+    {
+        if (buffer != null)
+        {
+            FileUtils.clean(buffer);
+            buffer = null;
+        }
+    }
+
     class Reader extends WrappedRunnable
     {
-        private final InputStream source;
+        private final DataInputPlus source;
         private final Iterator<CompressionMetadata.Chunk> chunks;
-        private final BlockingQueue<byte[]> dataBuffer;
+        private final BlockingQueue<ByteBuffer> dataBuffer;
 
-        Reader(InputStream source, CompressionInfo info, BlockingQueue<byte[]> dataBuffer)
+        Reader(DataInputPlus source, CompressionInfo info, BlockingQueue<ByteBuffer> dataBuffer)
         {
             this.source = source;
             this.chunks = Iterators.forArray(info.chunks);
@@ -204,36 +227,54 @@ public class CompressedInputStream extends InputStream
 
         protected void runMayThrow() throws Exception
         {
-            byte[] compressedWithCRC;
+            byte[] tmp = null;
             while (chunks.hasNext())
             {
                 CompressionMetadata.Chunk chunk = chunks.next();
 
                 int readLength = chunk.length + 4; // read with CRC
-                compressedWithCRC = new byte[readLength];
-
-                int bufferRead = 0;
-                while (bufferRead < readLength)
+                ByteBuffer compressedWithCRC = null;
+                try
                 {
-                    try
+                    final int r;
+                    if (source instanceof ReadableByteChannel)
                     {
-                        int r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
-                        if (r < 0)
-                        {
-                            readException = new EOFException("No chunk available");
-                            dataBuffer.put(POISON_PILL);
-                            return; // throw exception where we consume dataBuffer
-                        }
-                        bufferRead += r;
+                        compressedWithCRC = ByteBuffer.allocateDirect(readLength);
+                        r = ((ReadableByteChannel)source).read(compressedWithCRC);
+                        compressedWithCRC.flip();
                     }
-                    catch (IOException e)
+                    else
                     {
-                        logger.warn("Error while reading compressed input stream.", e);
-                        readException = e;
+                        // read into an on-heap araay, then copy over to an off-heap buffer. at a minumum snappy requires
+                        // off-heap buffers for decompression, else we could have just wrapped the plain byte array in a ByteBuffer
+                        if (tmp == null || tmp.length < info.parameters.chunkLength() + CHECKSUM_LENGTH)
+                            tmp = new byte[info.parameters.chunkLength() + CHECKSUM_LENGTH];
+                        source.readFully(tmp, 0, readLength);
+                        compressedWithCRC = ByteBuffer.allocateDirect(readLength);
+                        compressedWithCRC.put(tmp, 0, readLength);
+                        compressedWithCRC.position(0);
+                        r = readLength;
+                    }
+
+                    if (r < 0)
+                    {
+                        FileUtils.clean(compressedWithCRC);
+                        readException = new EOFException("No chunk available");
                         dataBuffer.put(POISON_PILL);
                         return; // throw exception where we consume dataBuffer
                     }
                 }
+                catch (IOException e)
+                {
+                    if (!(e instanceof EOFException))
+                        logger.warn("Error while reading compressed input stream.", e);
+                    if (compressedWithCRC != null)
+                        FileUtils.clean(compressedWithCRC);
+
+                    readException = e;
+                    dataBuffer.put(POISON_PILL);
+                    return; // throw exception where we consume dataBuffer
+                }
                 dataBuffer.put(compressedWithCRC);
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index f8e4b40..e40788b 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -18,8 +18,6 @@
 package org.apache.cassandra.streaming.compress;
 
 import java.io.IOException;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
 
 import com.google.common.base.Throwables;
 import org.slf4j.Logger;
@@ -28,7 +26,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
-import org.apache.cassandra.io.util.TrackedInputStream;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.TrackedDataInputPlus;
 import org.apache.cassandra.streaming.ProgressInfo;
 import org.apache.cassandra.streaming.StreamReader;
 import org.apache.cassandra.streaming.StreamSession;
@@ -59,8 +58,8 @@ public class CompressedStreamReader extends StreamReader
      * @throws java.io.IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
      */
     @Override
-    @SuppressWarnings("resource") // channel needs to remain open, streams on top of it can't be closed
-    public SSTableMultiWriter read(ReadableByteChannel channel) throws IOException
+    @SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed
+    public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException
     {
         long totalSize = totalSize();
 
@@ -76,9 +75,9 @@ public class CompressedStreamReader extends StreamReader
                      session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(), pendingRepair,
                      cfs.getTableName());
 
-        CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo,
+        CompressedInputStream cis = new CompressedInputStream(inputPlus, compressionInfo,
                                                               ChecksumType.CRC32, cfs::getCrcCheckChance);
-        TrackedInputStream in = new TrackedInputStream(cis);
+        TrackedDataInputPlus in = new TrackedDataInputPlus(cis);
 
         StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata(), in, inputVersion, getHeader(cfs.metadata()));
         SSTableMultiWriter writer = null;
@@ -120,6 +119,10 @@ public class CompressedStreamReader extends StreamReader
                 throw e;
             throw Throwables.propagate(e);
         }
+        finally
+        {
+            cis.close();
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
index 185ab22..0e78b7d 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.streaming.compress;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -30,6 +31,8 @@ import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.ChannelProxy;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus;
 import org.apache.cassandra.streaming.ProgressInfo;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.streaming.StreamWriter;
@@ -41,7 +44,7 @@ import org.apache.cassandra.utils.Pair;
  */
 public class CompressedStreamWriter extends StreamWriter
 {
-    public static final int CHUNK_SIZE = 10 * 1024 * 1024;
+    private static final int CHUNK_SIZE = 1 << 16;
 
     private static final Logger logger = LoggerFactory.getLogger(CompressedStreamWriter.class);
 
@@ -56,6 +59,8 @@ public class CompressedStreamWriter extends StreamWriter
     @Override
     public void write(DataOutputStreamPlus out) throws IOException
     {
+        assert out instanceof ByteBufDataOutputStreamPlus;
+        ByteBufDataOutputStreamPlus output = (ByteBufDataOutputStreamPlus)out;
         long totalSize = totalSize();
         logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize = {}", session.planId(),
                      sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize);
@@ -79,10 +84,24 @@ public class CompressedStreamWriter extends StreamWriter
                 long bytesTransferred = 0;
                 while (bytesTransferred < length)
                 {
-                    final long bytesTransferredFinal = bytesTransferred;
                     final int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred);
                     limiter.acquire(toTransfer);
-                    long lastWrite = out.applyToChannel((wbc) -> fc.transferTo(section.left + bytesTransferredFinal, toTransfer, wbc));
+
+                    ByteBuffer outBuffer = ByteBuffer.allocateDirect(toTransfer);
+                    long lastWrite;
+                    try
+                    {
+                        lastWrite = fc.read(outBuffer, section.left + bytesTransferred);
+                        assert lastWrite == toTransfer : String.format("could not read required number of bytes from file to be streamed: read %d bytes, wanted %d bytes", lastWrite, toTransfer);
+                        outBuffer.flip();
+                        output.writeToChannel(outBuffer);
+                    }
+                    catch (IOException e)
+                    {
+                        FileUtils.clean(outBuffer);
+                        throw e;
+                    }
+
                     bytesTransferred += lastWrite;
                     progress += lastWrite;
                     session.progress(sstable.descriptor.filenameFor(Component.DATA), ProgressInfo.Direction.OUT, progress, totalSize);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/compress/StreamCompressionInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/StreamCompressionInputStream.java b/src/java/org/apache/cassandra/streaming/compress/StreamCompressionInputStream.java
new file mode 100644
index 0000000..4b1459d
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/compress/StreamCompressionInputStream.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming.compress;
+
+import java.io.IOException;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.RebufferingInputStream;
+import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus;
+import org.apache.cassandra.streaming.async.StreamCompressionSerializer;
+
+public class StreamCompressionInputStream extends RebufferingInputStream
+{
+    /**
+     * The stream which contains buffers of compressed data that came from the peer.
+     */
+    private final DataInputPlus dataInputPlus;
+
+    private final LZ4FastDecompressor decompressor;
+    private final int protocolVersion;
+    private final StreamCompressionSerializer deserializer;
+
+    /**
+     * The parent, or owning, buffer of the current buffer being read from ({@link super#buffer}).
+     */
+    private ByteBuf currentBuf;
+
+    public StreamCompressionInputStream(DataInputPlus dataInputPlus, int protocolVersion)
+    {
+        super(Unpooled.EMPTY_BUFFER.nioBuffer());
+        currentBuf = Unpooled.EMPTY_BUFFER;
+
+        this.dataInputPlus = dataInputPlus;
+        this.protocolVersion = protocolVersion;
+        this.decompressor = LZ4Factory.fastestInstance().fastDecompressor();
+
+        ByteBufAllocator allocator = dataInputPlus instanceof RebufferingByteBufDataInputPlus
+                                     ? ((RebufferingByteBufDataInputPlus)dataInputPlus).getAllocator()
+                                     : PooledByteBufAllocator.DEFAULT;
+        deserializer = new StreamCompressionSerializer(allocator);
+    }
+
+    @Override
+    public void reBuffer() throws IOException
+    {
+        currentBuf.release();
+        currentBuf = deserializer.deserialize(decompressor, dataInputPlus, protocolVersion);
+        buffer = currentBuf.nioBuffer(0, currentBuf.readableBytes());
+    }
+
+    @Override
+    public void close()
+    {
+        currentBuf.release();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java b/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
index 44ff553..81e16f7 100644
--- a/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
@@ -17,8 +17,7 @@
  */
 package org.apache.cassandra.streaming.messages;
 
-import java.nio.channels.ReadableByteChannel;
-
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.streaming.StreamSession;
 
@@ -26,12 +25,17 @@ public class CompleteMessage extends StreamMessage
 {
     public static Serializer<CompleteMessage> serializer = new Serializer<CompleteMessage>()
     {
-        public CompleteMessage deserialize(ReadableByteChannel in, int version, StreamSession session)
+        public CompleteMessage deserialize(DataInputPlus in, int version, StreamSession session)
         {
             return new CompleteMessage();
         }
 
         public void serialize(CompleteMessage message, DataOutputStreamPlus out, int version, StreamSession session) {}
+
+        public long serializedSize(CompleteMessage message, int version)
+        {
+            return 0;
+        }
     };
 
     public CompleteMessage()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index c65e1d4..fedb971 100644
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.streaming.messages;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
@@ -29,7 +30,10 @@ import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.net.CompactEndpointSerializationHelper;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.streaming.compress.CompressionInfo;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.UUIDSerializer;
@@ -42,6 +46,8 @@ public class FileMessageHeader
     public static FileMessageHeaderSerializer serializer = new FileMessageHeaderSerializer();
 
     public final TableId tableId;
+    public UUID planId;
+    public int sessionIndex;
     public final int sequenceNumber;
     /** SSTable version */
     public final Version version;
@@ -61,11 +67,15 @@ public class FileMessageHeader
     public final UUID pendingRepair;
     public final int sstableLevel;
     public final SerializationHeader.Component header;
+    public final InetAddress sender;
 
     /* cached size value */
     private transient final long size;
 
-    public FileMessageHeader(TableId tableId,
+    private FileMessageHeader(TableId tableId,
+                             InetAddress sender,
+                             UUID planId,
+                             int sessionIndex,
                              int sequenceNumber,
                              Version version,
                              SSTableFormat.Type format,
@@ -78,6 +88,9 @@ public class FileMessageHeader
                              SerializationHeader.Component header)
     {
         this.tableId = tableId;
+        this.sender = sender;
+        this.planId = planId;
+        this.sessionIndex = sessionIndex;
         this.sequenceNumber = sequenceNumber;
         this.version = version;
         this.format = format;
@@ -93,6 +106,9 @@ public class FileMessageHeader
     }
 
     public FileMessageHeader(TableId tableId,
+                             InetAddress sender,
+                             UUID planId,
+                             int sessionIndex,
                              int sequenceNumber,
                              Version version,
                              SSTableFormat.Type format,
@@ -105,6 +121,9 @@ public class FileMessageHeader
                              SerializationHeader.Component header)
     {
         this.tableId = tableId;
+        this.sender = sender;
+        this.planId = planId;
+        this.sessionIndex = sessionIndex;
         this.sequenceNumber = sequenceNumber;
         this.version = version;
         this.format = format;
@@ -188,11 +207,20 @@ public class FileMessageHeader
         return result;
     }
 
+    public void addSessionInfo(StreamSession session)
+    {
+        planId = session.planId();
+        sessionIndex = session.sessionIndex();
+    }
+
     static class FileMessageHeaderSerializer
     {
         public CompressionInfo serialize(FileMessageHeader header, DataOutputPlus out, int version) throws IOException
         {
             header.tableId.serialize(out);
+            CompactEndpointSerializationHelper.serialize(header.sender, out);
+            UUIDSerializer.serializer.serialize(header.planId, out, version);
+            out.writeInt(header.sessionIndex);
             out.writeInt(header.sequenceNumber);
             out.writeUTF(header.version.toString());
             out.writeUTF(header.format.name);
@@ -224,6 +252,9 @@ public class FileMessageHeader
         public FileMessageHeader deserialize(DataInputPlus in, int version) throws IOException
         {
             TableId tableId = TableId.deserialize(in);
+            InetAddress sender = CompactEndpointSerializationHelper.deserialize(in);
+            UUID planId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
+            int sessionIndex = in.readInt();
             int sequenceNumber = in.readInt();
             Version sstableVersion = SSTableFormat.Type.current().info.getVersion(in.readUTF());
             SSTableFormat.Type format = SSTableFormat.Type.validate(in.readUTF());
@@ -239,12 +270,15 @@ public class FileMessageHeader
             int sstableLevel = in.readInt();
             SerializationHeader.Component header =  SerializationHeader.serializer.deserialize(sstableVersion, in);
 
-            return new FileMessageHeader(tableId, sequenceNumber, sstableVersion, format, estimatedKeys, sections, compressionInfo, repairedAt, pendingRepair, sstableLevel, header);
+            return new FileMessageHeader(tableId, sender, planId, sessionIndex, sequenceNumber, sstableVersion, format, estimatedKeys, sections, compressionInfo, repairedAt, pendingRepair, sstableLevel, header);
         }
 
         public long serializedSize(FileMessageHeader header, int version)
         {
             long size = header.tableId.serializedSize();
+            size += CompactEndpointSerializationHelper.serializedSize(header.sender);
+            size += UUIDSerializer.serializer.serializedSize(header.planId, version);
+            size += TypeSizes.sizeof(header.sessionIndex);
             size += TypeSizes.sizeof(header.sequenceNumber);
             size += TypeSizes.sizeof(header.version.toString());
             size += TypeSizes.sizeof(header.format.name);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
index 03bcaed..9f43982 100644
--- a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
@@ -18,15 +18,15 @@
 package org.apache.cassandra.streaming.messages;
 
 import java.io.IOException;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
 
+import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
 
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.streaming.StreamManager;
 import org.apache.cassandra.streaming.StreamReader;
+import org.apache.cassandra.streaming.StreamReceiveException;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.streaming.compress.CompressedStreamReader;
 import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -40,21 +40,27 @@ public class IncomingFileMessage extends StreamMessage
     public static Serializer<IncomingFileMessage> serializer = new Serializer<IncomingFileMessage>()
     {
         @SuppressWarnings("resource")
-        public IncomingFileMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException
+        public IncomingFileMessage deserialize(DataInputPlus input, int version, StreamSession session) throws IOException
         {
-            DataInputPlus input = new DataInputStreamPlus(Channels.newInputStream(in));
             FileMessageHeader header = FileMessageHeader.serializer.deserialize(input, version);
+            session = StreamManager.instance.findSession(header.sender, header.planId, header.sessionIndex);
+            if (session == null)
+                throw new IllegalStateException(String.format("unknown stream session: %s - %d", header.planId, header.sessionIndex));
+            ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(header.tableId);
+            if (cfs == null)
+                throw new StreamReceiveException(session, "CF " + header.tableId + " was dropped during streaming");
+
             StreamReader reader = !header.isCompressed() ? new StreamReader(header, session)
-                    : new CompressedStreamReader(header, session);
+                                                         : new CompressedStreamReader(header, session);
 
             try
             {
-                return new IncomingFileMessage(reader.read(in), header);
+                return new IncomingFileMessage(reader.read(input), header);
             }
             catch (Throwable t)
             {
                 JVMStabilityInspector.inspectThrowable(t);
-                throw t;
+                throw new StreamReceiveException(session, t);
             }
         }
 
@@ -62,6 +68,11 @@ public class IncomingFileMessage extends StreamMessage
         {
             throw new UnsupportedOperationException("Not allowed to call serialize on an incoming file");
         }
+
+        public long serializedSize(IncomingFileMessage message, int version)
+        {
+            throw new UnsupportedOperationException("Not allowed to call serializedSize on an incoming file");
+        }
     };
 
     public FileMessageHeader header;
@@ -77,7 +88,8 @@ public class IncomingFileMessage extends StreamMessage
     @Override
     public String toString()
     {
-        return "File (" + header + ", file: " + sstable.getFilename() + ")";
+        String filename = sstable != null ? sstable.getFilename() : null;
+        return "File (" + header + ", file: " + filename + ")";
     }
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java b/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java
index bfdc72e..f80c617 100644
--- a/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java
@@ -19,8 +19,8 @@
 package org.apache.cassandra.streaming.messages;
 
 import java.io.IOException;
-import java.nio.channels.ReadableByteChannel;
 
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.streaming.StreamSession;
 
@@ -28,13 +28,18 @@ public class KeepAliveMessage extends StreamMessage
 {
     public static Serializer<KeepAliveMessage> serializer = new Serializer<KeepAliveMessage>()
     {
-        public KeepAliveMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException
+        public KeepAliveMessage deserialize(DataInputPlus in, int version, StreamSession session) throws IOException
         {
             return new KeepAliveMessage();
         }
 
         public void serialize(KeepAliveMessage message, DataOutputStreamPlus out, int version, StreamSession session)
         {}
+
+        public long serializedSize(KeepAliveMessage message, int version)
+        {
+            return 0;
+        }
     };
 
     public KeepAliveMessage()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index e3e6b9b..f44b41c 100644
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@ -18,17 +18,18 @@
 package org.apache.cassandra.streaming.messages;
 
 import java.io.IOException;
-import java.nio.channels.ReadableByteChannel;
 import java.util.List;
 
 import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.streaming.StreamWriter;
 import org.apache.cassandra.streaming.compress.CompressedStreamWriter;
 import org.apache.cassandra.streaming.compress.CompressionInfo;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.concurrent.Ref;
 
@@ -39,7 +40,7 @@ public class OutgoingFileMessage extends StreamMessage
 {
     public static Serializer<OutgoingFileMessage> serializer = new Serializer<OutgoingFileMessage>()
     {
-        public OutgoingFileMessage deserialize(ReadableByteChannel in, int version, StreamSession session)
+        public OutgoingFileMessage deserialize(DataInputPlus in, int version, StreamSession session)
         {
             throw new UnsupportedOperationException("Not allowed to call deserialize on an outgoing file");
         }
@@ -57,6 +58,11 @@ public class OutgoingFileMessage extends StreamMessage
                 message.finishTransfer();
             }
         }
+
+        public long serializedSize(OutgoingFileMessage message, int version)
+        {
+            return 0;
+        }
     };
 
     public final FileMessageHeader header;
@@ -65,7 +71,7 @@ public class OutgoingFileMessage extends StreamMessage
     private boolean completed = false;
     private boolean transferring = false;
 
-    public OutgoingFileMessage(Ref<SSTableReader> ref, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, boolean keepSSTableLevel)
+    public OutgoingFileMessage(Ref<SSTableReader> ref, StreamSession session, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, boolean keepSSTableLevel)
     {
         super(Type.FILE);
         this.ref = ref;
@@ -73,6 +79,9 @@ public class OutgoingFileMessage extends StreamMessage
         SSTableReader sstable = ref.get();
         filename = sstable.getFilename();
         this.header = new FileMessageHeader(sstable.metadata().id,
+                                            FBUtilities.getBroadcastAddress(),
+                                            session.planId(),
+                                            session.sessionIndex(),
                                             sequenceNumber,
                                             sstable.descriptor.version,
                                             sstable.descriptor.formatType,
@@ -93,12 +102,12 @@ public class OutgoingFileMessage extends StreamMessage
         }
 
         CompressionInfo compressionInfo = FileMessageHeader.serializer.serialize(header, out, version);
-
+        out.flush();
         final SSTableReader reader = ref.get();
         StreamWriter writer = compressionInfo == null ?
-                                      new StreamWriter(reader, header.sections, session) :
-                                      new CompressedStreamWriter(reader, header.sections,
-                                                                 compressionInfo, session);
+                              new StreamWriter(reader, header.sections, session) :
+                              new CompressedStreamWriter(reader, header.sections,
+                                                         compressionInfo, session);
         writer.write(out);
     }
 
@@ -140,5 +149,10 @@ public class OutgoingFileMessage extends StreamMessage
     {
         return "File (" + header + ", file: " + filename + ")";
     }
+
+    public String getFilename()
+    {
+        return filename;
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/messages/PrepareAckMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/PrepareAckMessage.java b/src/java/org/apache/cassandra/streaming/messages/PrepareAckMessage.java
new file mode 100644
index 0000000..f43ff01
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/messages/PrepareAckMessage.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming.messages;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.streaming.StreamSession;
+
+public class PrepareAckMessage extends StreamMessage
+{
+    public static Serializer<PrepareAckMessage> serializer = new Serializer<PrepareAckMessage>()
+    {
+        public void serialize(PrepareAckMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
+        {
+            //nop
+        }
+
+        public PrepareAckMessage deserialize(DataInputPlus in, int version, StreamSession session) throws IOException
+        {
+            return new PrepareAckMessage();
+        }
+
+        public long serializedSize(PrepareAckMessage message, int version)
+        {
+            return 0;
+        }
+    };
+
+    public PrepareAckMessage()
+    {
+        super(Type.PREPARE_ACK);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "Prepare ACK";
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java b/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java
deleted file mode 100644
index 1f53be7..0000000
--- a/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming.messages;
-
-import java.io.*;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.util.ArrayList;
-import java.util.Collection;
-
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
-import org.apache.cassandra.io.util.DataOutputStreamPlus;
-import org.apache.cassandra.streaming.StreamRequest;
-import org.apache.cassandra.streaming.StreamSession;
-import org.apache.cassandra.streaming.StreamSummary;
-
-public class PrepareMessage extends StreamMessage
-{
-    public static Serializer<PrepareMessage> serializer = new Serializer<PrepareMessage>()
-    {
-        @SuppressWarnings("resource") // Not closing constructed DataInputPlus's as the channel needs to remain open.
-        public PrepareMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException
-        {
-            DataInputPlus input = new DataInputStreamPlus(Channels.newInputStream(in));
-            PrepareMessage message = new PrepareMessage();
-            // requests
-            int numRequests = input.readInt();
-            for (int i = 0; i < numRequests; i++)
-                message.requests.add(StreamRequest.serializer.deserialize(input, version));
-            // summaries
-            int numSummaries = input.readInt();
-            for (int i = 0; i < numSummaries; i++)
-                message.summaries.add(StreamSummary.serializer.deserialize(input, version));
-            return message;
-        }
-
-        public void serialize(PrepareMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
-        {
-            // requests
-            out.writeInt(message.requests.size());
-            for (StreamRequest request : message.requests)
-                StreamRequest.serializer.serialize(request, out, version);
-            // summaries
-            out.writeInt(message.summaries.size());
-            for (StreamSummary summary : message.summaries)
-                StreamSummary.serializer.serialize(summary, out, version);
-        }
-    };
-
-    /**
-     * Streaming requests
-     */
-    public final Collection<StreamRequest> requests = new ArrayList<>();
-
-    /**
-     * Summaries of streaming out
-     */
-    public final Collection<StreamSummary> summaries = new ArrayList<>();
-
-    public PrepareMessage()
-    {
-        super(Type.PREPARE);
-    }
-
-    @Override
-    public String toString()
-    {
-        final StringBuilder sb = new StringBuilder("Prepare (");
-        sb.append(requests.size()).append(" requests, ");
-        int totalFile = 0;
-        for (StreamSummary summary : summaries)
-            totalFile += summary.files;
-        sb.append(" ").append(totalFile).append(" files");
-        sb.append('}');
-        return sb.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/messages/PrepareSynAckMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/PrepareSynAckMessage.java b/src/java/org/apache/cassandra/streaming/messages/PrepareSynAckMessage.java
new file mode 100644
index 0000000..2d8026c
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/messages/PrepareSynAckMessage.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming.messages;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.StreamSummary;
+
+public class PrepareSynAckMessage extends StreamMessage
+{
+    public static Serializer<PrepareSynAckMessage> serializer = new Serializer<PrepareSynAckMessage>()
+    {
+        public void serialize(PrepareSynAckMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
+        {
+            out.writeInt(message.summaries.size());
+            for (StreamSummary summary : message.summaries)
+                StreamSummary.serializer.serialize(summary, out, version);
+        }
+
+        public PrepareSynAckMessage deserialize(DataInputPlus input, int version, StreamSession session) throws IOException
+        {
+            PrepareSynAckMessage message = new PrepareSynAckMessage();
+            int numSummaries = input.readInt();
+            for (int i = 0; i < numSummaries; i++)
+                message.summaries.add(StreamSummary.serializer.deserialize(input, version));
+            return message;
+        }
+
+        public long serializedSize(PrepareSynAckMessage message, int version)
+        {
+            long size = 4; // count of requests and count of summaries
+            for (StreamSummary summary : message.summaries)
+                size += StreamSummary.serializer.serializedSize(summary, version);
+            return size;
+        }
+    };
+
+    /**
+     * Summaries of streaming out
+     */
+    public final Collection<StreamSummary> summaries = new ArrayList<>();
+
+    public PrepareSynAckMessage()
+    {
+        super(Type.PREPARE_SYNACK);
+    }
+
+    @Override
+    public String toString()
+    {
+        final StringBuilder sb = new StringBuilder("Prepare SYNACK (");
+        int totalFile = 0;
+        for (StreamSummary summary : summaries)
+            totalFile += summary.files;
+        sb.append(" ").append(totalFile).append(" files");
+        sb.append('}');
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/messages/PrepareSynMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/PrepareSynMessage.java b/src/java/org/apache/cassandra/streaming/messages/PrepareSynMessage.java
new file mode 100644
index 0000000..6fbaafa
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/messages/PrepareSynMessage.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.messages;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.streaming.StreamRequest;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.StreamSummary;
+
+public class PrepareSynMessage extends StreamMessage
+{
+    public static Serializer<PrepareSynMessage> serializer = new Serializer<PrepareSynMessage>()
+    {
+        public PrepareSynMessage deserialize(DataInputPlus input, int version, StreamSession session) throws IOException
+        {
+            PrepareSynMessage message = new PrepareSynMessage();
+            // requests
+            int numRequests = input.readInt();
+            for (int i = 0; i < numRequests; i++)
+                message.requests.add(StreamRequest.serializer.deserialize(input, version));
+            // summaries
+            int numSummaries = input.readInt();
+            for (int i = 0; i < numSummaries; i++)
+                message.summaries.add(StreamSummary.serializer.deserialize(input, version));
+            return message;
+        }
+
+        public long serializedSize(PrepareSynMessage message, int version)
+        {
+            long size = 4 + 4; // count of requests and count of summaries
+            for (StreamRequest request : message.requests)
+                size += StreamRequest.serializer.serializedSize(request, version);
+            for (StreamSummary summary : message.summaries)
+                size += StreamSummary.serializer.serializedSize(summary, version);
+            return size;
+        }
+
+        public void serialize(PrepareSynMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
+        {
+            // requests
+            out.writeInt(message.requests.size());
+            for (StreamRequest request : message.requests)
+                StreamRequest.serializer.serialize(request, out, version);
+            // summaries
+            out.writeInt(message.summaries.size());
+            for (StreamSummary summary : message.summaries)
+                StreamSummary.serializer.serialize(summary, out, version);
+        }
+    };
+
+    /**
+     * Streaming requests
+     */
+    public final Collection<StreamRequest> requests = new ArrayList<>();
+
+    /**
+     * Summaries of streaming out
+     */
+    public final Collection<StreamSummary> summaries = new ArrayList<>();
+
+    public PrepareSynMessage()
+    {
+        super(Type.PREPARE_SYN);
+    }
+
+    @Override
+    public String toString()
+    {
+        final StringBuilder sb = new StringBuilder("Prepare SYN (");
+        sb.append(requests.size()).append(" requests, ");
+        int totalFile = 0;
+        for (StreamSummary summary : summaries)
+            totalFile += summary.files;
+        sb.append(" ").append(totalFile).append(" files");
+        sb.append('}');
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
index 55dd7e6..3988dcc 100644
--- a/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
@@ -18,11 +18,8 @@
 package org.apache.cassandra.streaming.messages;
 
 import java.io.*;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
 
 import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.streaming.StreamSession;
@@ -32,9 +29,8 @@ public class ReceivedMessage extends StreamMessage
     public static Serializer<ReceivedMessage> serializer = new Serializer<ReceivedMessage>()
     {
         @SuppressWarnings("resource") // Not closing constructed DataInputPlus's as the channel needs to remain open.
-        public ReceivedMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException
+        public ReceivedMessage deserialize(DataInputPlus input, int version, StreamSession session) throws IOException
         {
-            DataInputPlus input = new DataInputStreamPlus(Channels.newInputStream(in));
             return new ReceivedMessage(TableId.deserialize(input), input.readInt());
         }
 
@@ -43,6 +39,11 @@ public class ReceivedMessage extends StreamMessage
             message.tableId.serialize(out);
             out.writeInt(message.sequenceNumber);
         }
+
+        public long serializedSize(ReceivedMessage message, int version)
+        {
+            return message.tableId.serializedSize() + 4;
+        }
     };
 
     public final TableId tableId;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java b/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
deleted file mode 100644
index 047fb06..0000000
--- a/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming.messages;
-
-import java.io.*;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.util.UUID;
-
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
-import org.apache.cassandra.io.util.DataOutputStreamPlus;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.streaming.StreamSession;
-import org.apache.cassandra.utils.UUIDSerializer;
-
-/**
- * @deprecated retry support removed on CASSANDRA-10992
- */
-@Deprecated
-public class RetryMessage extends StreamMessage
-{
-    public static Serializer<RetryMessage> serializer = new Serializer<RetryMessage>()
-    {
-        @SuppressWarnings("resource") // Not closing constructed DataInputPlus's as the channel needs to remain open.
-        public RetryMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException
-        {
-            DataInputPlus input = new DataInputStreamPlus(Channels.newInputStream(in));
-            return new RetryMessage(UUIDSerializer.serializer.deserialize(input, MessagingService.current_version), input.readInt());
-        }
-
-        public void serialize(RetryMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
-        {
-            UUIDSerializer.serializer.serialize(message.cfId, out, MessagingService.current_version);
-            out.writeInt(message.sequenceNumber);
-        }
-    };
-
-    public final UUID cfId;
-    public final int sequenceNumber;
-
-    public RetryMessage(UUID cfId, int sequenceNumber)
-    {
-        super(Type.RETRY);
-        this.cfId = cfId;
-        this.sequenceNumber = sequenceNumber;
-    }
-
-    @Override
-    public String toString()
-    {
-        final StringBuilder sb = new StringBuilder("Retry (");
-        sb.append(cfId).append(", #").append(sequenceNumber).append(')');
-        return sb.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java b/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
index 4a5b6df..59ad90e 100644
--- a/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
@@ -17,8 +17,7 @@
  */
 package org.apache.cassandra.streaming.messages;
 
-import java.nio.channels.ReadableByteChannel;
-
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.streaming.StreamSession;
 
@@ -26,12 +25,17 @@ public class SessionFailedMessage extends StreamMessage
 {
     public static Serializer<SessionFailedMessage> serializer = new Serializer<SessionFailedMessage>()
     {
-        public SessionFailedMessage deserialize(ReadableByteChannel in, int version, StreamSession session)
+        public SessionFailedMessage deserialize(DataInputPlus in, int version, StreamSession session)
         {
             return new SessionFailedMessage();
         }
 
         public void serialize(SessionFailedMessage message, DataOutputStreamPlus out, int version, StreamSession session) {}
+
+        public long serializedSize(SessionFailedMessage message, int version)
+        {
+            return 0;
+        }
     };
 
     public SessionFailedMessage()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org