You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2018/03/15 23:42:52 UTC

[4/5] cassandra git commit: Abstract streaming for pluggable storage

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java b/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java
new file mode 100644
index 0000000..2f56786
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.streaming;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.util.Iterator;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.DoubleSupplier;
+
+import com.google.common.collect.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.util.concurrent.FastThreadLocalThread;
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.RebufferingInputStream;
+import org.apache.cassandra.db.streaming.CassandraStreamReader.StreamDeserializer;
+import org.apache.cassandra.utils.ChecksumType;
+import org.apache.cassandra.utils.WrappedRunnable;
+
+/**
+ * InputStream which reads data from underlining source with given {@link CompressionInfo}. Uses {@link #buffer} as a buffer
+ * for uncompressed data (which is read by stream consumers - {@link StreamDeserializer} in this case).
+ */
+public class CompressedInputStream extends RebufferingInputStream implements AutoCloseable
+{
+
+    private static final Logger logger = LoggerFactory.getLogger(CompressedInputStream.class);
+
+    private final CompressionInfo info;
+    // chunk buffer
+    private final BlockingQueue<ByteBuffer> dataBuffer;
+    private final DoubleSupplier crcCheckChanceSupplier;
+
+    /**
+     * The base offset of the current {@link #buffer} from the beginning of the stream.
+     */
+    private long bufferOffset = 0;
+
+    /**
+     * The current {@link CompressedCassandraStreamReader#sections} offset in the stream.
+     */
+    private long current = 0;
+
+    private final ChecksumType checksumType;
+
+    private static final int CHECKSUM_LENGTH = 4;
+
+    /**
+     * Indicates there was a problem when reading from source stream.
+     * When this is added to the <code>dataBuffer</code> by the stream Reader,
+     * it is expected that the <code>readException</code> variable is populated
+     * with the cause of the error when reading from source stream, so it is
+     * thrown to the consumer on subsequent read operation.
+     */
+    private static final ByteBuffer POISON_PILL = ByteBuffer.wrap(new byte[0]);
+
+    private volatile IOException readException = null;
+
+    private long totalCompressedBytesRead;
+
+    /**
+     * @param source Input source to read compressed data from
+     * @param info Compression info
+     */
+    public CompressedInputStream(DataInputPlus source, CompressionInfo info, ChecksumType checksumType, DoubleSupplier crcCheckChanceSupplier)
+    {
+        super(ByteBuffer.allocateDirect(info.parameters.chunkLength()));
+        buffer.limit(buffer.position()); // force the buffer to appear "consumed" so that it triggers reBuffer on the first read
+        this.info = info;
+        this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
+        this.crcCheckChanceSupplier = crcCheckChanceSupplier;
+        this.checksumType = checksumType;
+
+        new FastThreadLocalThread(new Reader(source, info, dataBuffer)).start();
+    }
+
+    /**
+     * Invoked when crossing into the next stream boundary in {@link CompressedCassandraStreamReader#sections}.
+     */
+    public void position(long position) throws IOException
+    {
+        if (readException != null)
+            throw readException;
+
+        assert position >= current : "stream can only read forward.";
+        current = position;
+
+        if (current > bufferOffset + buffer.limit())
+            reBuffer(false);
+
+        buffer.position((int)(current - bufferOffset));
+    }
+
+    protected void reBuffer() throws IOException
+    {
+        reBuffer(true);
+    }
+
+    private void reBuffer(boolean updateCurrent) throws IOException
+    {
+        if (readException != null)
+        {
+            FileUtils.clean(buffer);
+            buffer = null;
+            throw readException;
+        }
+
+        // increment the offset into the stream based on the current buffer's read count
+        if (updateCurrent)
+            current += buffer.position();
+
+        try
+        {
+            ByteBuffer compressedWithCRC = dataBuffer.take();
+            if (compressedWithCRC == POISON_PILL)
+            {
+                assert readException != null;
+                throw readException;
+            }
+
+            decompress(compressedWithCRC);
+        }
+        catch (InterruptedException e)
+        {
+            throw new EOFException("No chunk available");
+        }
+    }
+
+    private void decompress(ByteBuffer compressed) throws IOException
+    {
+        int length = compressed.remaining();
+
+        // uncompress if the buffer size is less than the max chunk size. else, if the buffer size is greater than or equal to the maxCompressedLength,
+        // we assume the buffer is not compressed. see CASSANDRA-10520
+        final boolean releaseCompressedBuffer;
+        if (length - CHECKSUM_LENGTH < info.parameters.maxCompressedLength())
+        {
+            buffer.clear();
+            compressed.limit(length - CHECKSUM_LENGTH);
+            info.parameters.getSstableCompressor().uncompress(compressed, buffer);
+            buffer.flip();
+            releaseCompressedBuffer = true;
+        }
+        else
+        {
+            FileUtils.clean(buffer);
+            buffer = compressed;
+            buffer.limit(length - CHECKSUM_LENGTH);
+            releaseCompressedBuffer = false;
+        }
+        totalCompressedBytesRead += length;
+
+        // validate crc randomly
+        double crcCheckChance = this.crcCheckChanceSupplier.getAsDouble();
+        if (crcCheckChance >= 1d ||
+            (crcCheckChance > 0d && crcCheckChance > ThreadLocalRandom.current().nextDouble()))
+        {
+            ByteBuffer crcBuf = compressed.duplicate();
+            crcBuf.limit(length - CHECKSUM_LENGTH).position(0);
+            int checksum = (int) checksumType.of(crcBuf);
+
+            crcBuf.limit(length);
+            if (crcBuf.getInt() != checksum)
+                throw new IOException("CRC unmatched");
+        }
+
+        if (releaseCompressedBuffer)
+            FileUtils.clean(compressed);
+
+        // buffer offset is always aligned
+        final int compressedChunkLength = info.parameters.chunkLength();
+        bufferOffset = current & ~(compressedChunkLength - 1);
+    }
+
+    public long getTotalCompressedBytesRead()
+    {
+        return totalCompressedBytesRead;
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * Releases the resources specific to this instance, but not the {@link DataInputPlus} that is used by the {@link Reader}.
+     */
+    @Override
+    public void close()
+    {
+        if (buffer != null)
+        {
+            FileUtils.clean(buffer);
+            buffer = null;
+        }
+    }
+
+    class Reader extends WrappedRunnable
+    {
+        private final DataInputPlus source;
+        private final Iterator<CompressionMetadata.Chunk> chunks;
+        private final BlockingQueue<ByteBuffer> dataBuffer;
+
+        Reader(DataInputPlus source, CompressionInfo info, BlockingQueue<ByteBuffer> dataBuffer)
+        {
+            this.source = source;
+            this.chunks = Iterators.forArray(info.chunks);
+            this.dataBuffer = dataBuffer;
+        }
+
+        protected void runMayThrow() throws Exception
+        {
+            byte[] tmp = null;
+            while (chunks.hasNext())
+            {
+                CompressionMetadata.Chunk chunk = chunks.next();
+
+                int readLength = chunk.length + 4; // read with CRC
+                ByteBuffer compressedWithCRC = null;
+                try
+                {
+                    final int r;
+                    if (source instanceof ReadableByteChannel)
+                    {
+                        compressedWithCRC = ByteBuffer.allocateDirect(readLength);
+                        r = ((ReadableByteChannel)source).read(compressedWithCRC);
+                        compressedWithCRC.flip();
+                    }
+                    else
+                    {
+                        // read into an on-heap araay, then copy over to an off-heap buffer. at a minumum snappy requires
+                        // off-heap buffers for decompression, else we could have just wrapped the plain byte array in a ByteBuffer
+                        if (tmp == null || tmp.length < info.parameters.chunkLength() + CHECKSUM_LENGTH)
+                            tmp = new byte[info.parameters.chunkLength() + CHECKSUM_LENGTH];
+                        source.readFully(tmp, 0, readLength);
+                        compressedWithCRC = ByteBuffer.allocateDirect(readLength);
+                        compressedWithCRC.put(tmp, 0, readLength);
+                        compressedWithCRC.position(0);
+                        r = readLength;
+                    }
+
+                    if (r < 0)
+                    {
+                        FileUtils.clean(compressedWithCRC);
+                        readException = new EOFException("No chunk available");
+                        dataBuffer.put(POISON_PILL);
+                        return; // throw exception where we consume dataBuffer
+                    }
+                }
+                catch (IOException e)
+                {
+                    if (!(e instanceof EOFException))
+                        logger.warn("Error while reading compressed input stream.", e);
+                    if (compressedWithCRC != null)
+                        FileUtils.clean(compressedWithCRC);
+
+                    readException = e;
+                    dataBuffer.put(POISON_PILL);
+                    return; // throw exception where we consume dataBuffer
+                }
+                dataBuffer.put(compressedWithCRC);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/db/streaming/CompressionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/CompressionInfo.java b/src/java/org/apache/cassandra/db/streaming/CompressionInfo.java
new file mode 100644
index 0000000..0f0d5c7
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/streaming/CompressionInfo.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.schema.CompressionParams;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * Container that carries compression parameters and chunks to decompress data from stream.
+ */
+public class CompressionInfo
+{
+    public static final IVersionedSerializer<CompressionInfo> serializer = new CompressionInfoSerializer();
+
+    public final CompressionMetadata.Chunk[] chunks;
+    public final CompressionParams parameters;
+
+    public CompressionInfo(CompressionMetadata.Chunk[] chunks, CompressionParams parameters)
+    {
+        assert chunks != null && parameters != null;
+        this.chunks = chunks;
+        this.parameters = parameters;
+    }
+
+    static CompressionInfo fromCompressionMetadata(CompressionMetadata metadata, List<Pair<Long, Long>> sections)
+    {
+        if (metadata == null)
+        {
+            return null;
+        }
+        else
+        {
+            return new CompressionInfo(metadata.getChunksForSections(sections), metadata.parameters);
+        }
+
+    }
+
+    static class CompressionInfoSerializer implements IVersionedSerializer<CompressionInfo>
+    {
+        public void serialize(CompressionInfo info, DataOutputPlus out, int version) throws IOException
+        {
+            if (info == null)
+            {
+                out.writeInt(-1);
+                return;
+            }
+
+            int chunkCount = info.chunks.length;
+            out.writeInt(chunkCount);
+            for (int i = 0; i < chunkCount; i++)
+                CompressionMetadata.Chunk.serializer.serialize(info.chunks[i], out, version);
+            // compression params
+            CompressionParams.serializer.serialize(info.parameters, out, version);
+        }
+
+        public CompressionInfo deserialize(DataInputPlus in, int version) throws IOException
+        {
+            // chunks
+            int chunkCount = in.readInt();
+            if (chunkCount < 0)
+                return null;
+
+            CompressionMetadata.Chunk[] chunks = new CompressionMetadata.Chunk[chunkCount];
+            for (int i = 0; i < chunkCount; i++)
+                chunks[i] = CompressionMetadata.Chunk.serializer.deserialize(in, version);
+
+            // compression params
+            CompressionParams parameters = CompressionParams.serializer.deserialize(in, version);
+            return new CompressionInfo(chunks, parameters);
+        }
+
+        public long serializedSize(CompressionInfo info, int version)
+        {
+            if (info == null)
+                return TypeSizes.sizeof(-1);
+
+            // chunks
+            int chunkCount = info.chunks.length;
+            long size = TypeSizes.sizeof(chunkCount);
+            for (int i = 0; i < chunkCount; i++)
+                size += CompressionMetadata.Chunk.serializer.serializedSize(info.chunks[i], version);
+            // compression params
+            size += CompressionParams.serializer.serializedSize(info.parameters, version);
+            return size;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/db/streaming/package-info.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/package-info.java b/src/java/org/apache/cassandra/db/streaming/package-info.java
new file mode 100644
index 0000000..1e117aa
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/streaming/package-info.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <h2>File transfer</h2>
+ *
+ * When tranferring whole or subsections of an sstable, only the DATA component is shipped. To that end,
+ * there are three "modes" of an sstable transfer that need to be handled somewhat differently:
+ *
+ * 1) uncompressed sstable - data needs to be read into user space so it can be manipulated: checksum validation,
+ * apply stream compression (see next section), and/or TLS encryption.
+ *
+ * 2) compressed sstable, transferred with SSL/TLS - data needs to be read into user space as that is where the TLS encryption
+ * needs to happen. Netty does not allow the pretense of doing zero-copy transfers when TLS is in the pipeline;
+ * data must explicitly be pulled into user-space memory for TLS encryption to work.
+ *
+ * 3) compressed sstable, transferred without SSL/TLS - data can be streamed via zero-copy transfer as the data does not
+ * need to be manipulated (it can be sent "as-is").
+ *
+ * <h3>Compressing the data</h3>
+ * We always want to transfer as few bytes as possible of the wire when streaming a file. If the
+ * sstable is not already compressed via table compression options, we apply an on-the-fly stream compression
+ * to the data. The stream compression format is documented in
+ * {@link org.apache.cassandra.streaming.async.StreamCompressionSerializer}
+ *
+ * You may be wondering: why implement your own compression scheme? why not use netty's built-in compression codecs,
+ * like {@link io.netty.handler.codec.compression.Lz4FrameEncoder}? That makes complete sense if all the sstables
+ * to be streamed are non using sstable compression (and obviously you wouldn't use stream compression when the sstables
+ * are using sstable compression). The problem is when you have a mix of files, some using sstable compression
+ * and some not. You can either:
+ *
+ * - send the files of one type over one kind of socket, and the others over another socket
+ * - send them both over the same socket, but then auto-adjust per each file type.
+ *
+ * I've opted for the latter to keep socket/channel management simpler and cleaner.
+ *
+ */
+package org.apache.cassandra.db.streaming;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index 439ebc6..dfabac2 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.dht;
 import java.util.*;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
@@ -155,11 +156,12 @@ public class RangeStreamer
                          boolean connectSequentially,
                          int connectionsPerHost)
     {
+        Preconditions.checkArgument(streamOperation == StreamOperation.BOOTSTRAP || streamOperation == StreamOperation.REBUILD, streamOperation);
         this.metadata = metadata;
         this.tokens = tokens;
         this.address = address;
         this.description = streamOperation.getDescription();
-        this.streamPlan = new StreamPlan(streamOperation, connectionsPerHost, true, connectSequentially, null, PreviewKind.NONE);
+        this.streamPlan = new StreamPlan(streamOperation, connectionsPerHost, connectSequentially, null, PreviewKind.NONE);
         this.useStrictConsistency = useStrictConsistency;
         this.snitch = snitch;
         this.stateStore = stateStore;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 7d77ad5..980fdf1 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -18,12 +18,12 @@
 package org.apache.cassandra.io.sstable;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.*;
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 
+import org.apache.cassandra.db.streaming.CassandraOutgoingFile;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.io.FSError;
 import org.apache.cassandra.schema.TableMetadataRef;
@@ -52,7 +52,7 @@ public class SSTableLoader implements StreamEventHandler
     private final Set<InetAddressAndPort> failedHosts = new HashSet<>();
 
     private final List<SSTableReader> sstables = new ArrayList<>();
-    private final Multimap<InetAddressAndPort, StreamSession.SSTableStreamingSections> streamingDetails = HashMultimap.create();
+    private final Multimap<InetAddressAndPort, OutgoingStream> streamingDetails = HashMultimap.create();
 
     public SSTableLoader(File directory, Client client, OutputHandler outputHandler)
     {
@@ -131,8 +131,8 @@ public class SSTableLoader implements StreamEventHandler
                                                   List<Pair<Long, Long>> sstableSections = sstable.getPositionsForRanges(tokenRanges);
                                                   long estimatedKeys = sstable.estimatedKeysForRanges(tokenRanges);
                                                   Ref<SSTableReader> ref = sstable.ref();
-                                                  StreamSession.SSTableStreamingSections details = new StreamSession.SSTableStreamingSections(ref, sstableSections, estimatedKeys);
-                                                  streamingDetails.put(endpoint, details);
+                                                  OutgoingStream stream = new CassandraOutgoingFile(StreamOperation.BULK_LOAD, ref, sstableSections, estimatedKeys);
+                                                  streamingDetails.put(endpoint, stream);
                                               }
 
                                               // to conserve heap space when bulk loading
@@ -160,7 +160,7 @@ public class SSTableLoader implements StreamEventHandler
         client.init(keyspace);
         outputHandler.output("Established connection to initial hosts");
 
-        StreamPlan plan = new StreamPlan(StreamOperation.BULK_LOAD, connectionsPerHost, false, false, null, PreviewKind.NONE).connectionFactory(client.getConnectionFactory());
+        StreamPlan plan = new StreamPlan(StreamOperation.BULK_LOAD, connectionsPerHost, false, null, PreviewKind.NONE).connectionFactory(client.getConnectionFactory());
 
         Map<InetAddressAndPort, Collection<Range<Token>>> endpointToRanges = client.getEndpointToRangesMap();
         openSSTables(endpointToRanges);
@@ -178,15 +178,15 @@ public class SSTableLoader implements StreamEventHandler
             if (toIgnore.contains(remote))
                 continue;
 
-            List<StreamSession.SSTableStreamingSections> endpointDetails = new LinkedList<>();
+            List<OutgoingStream> streams = new LinkedList<>();
 
             // references are acquired when constructing the SSTableStreamingSections above
-            for (StreamSession.SSTableStreamingSections details : streamingDetails.get(remote))
+            for (OutgoingStream stream : streamingDetails.get(remote))
             {
-                endpointDetails.add(details);
+                streams.add(stream);
             }
 
-            plan.transferFiles(remote, endpointDetails);
+            plan.transferStreams(remote, streams);
         }
         plan.listeners(this, listeners);
         return plan.execute();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
index 2c4fae4..8d58673 100644
--- a/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
@@ -52,7 +52,6 @@ public class AsymmetricLocalSyncTask extends AsymmetricSyncTask implements Strea
         InetAddressAndPort preferred = SystemKeyspace.getPreferredIP(fetchFrom);
         StreamPlan plan = new StreamPlan(StreamOperation.REPAIR,
                                          1, false,
-                                         false,
                                          pendingRepair,
                                          previewKind)
                           .listeners(this)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/repair/LocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
index 60d571b..3901c75 100644
--- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
@@ -61,7 +61,7 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler
     @VisibleForTesting
     StreamPlan createStreamPlan(InetAddressAndPort dst, InetAddressAndPort preferred, List<Range<Token>> differences)
     {
-        StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, 1, false, false, pendingRepair, previewKind)
+        StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, 1, false, pendingRepair, previewKind)
                           .listeners(this)
                           .flushBeforeTransfer(pendingRepair == null)
                           .requestRanges(dst, preferred, desc.keyspace, differences, desc.columnFamily);  // request ranges from the remote node

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index 0122b31..725e84d 100644
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@ -79,7 +79,7 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler
     @VisibleForTesting
     StreamPlan createStreamPlan(InetAddressAndPort dest, InetAddressAndPort preferred)
     {
-        StreamPlan sp = new StreamPlan(StreamOperation.REPAIR, 1, false, false, pendingRepair, previewKind)
+        StreamPlan sp = new StreamPlan(StreamOperation.REPAIR, 1, false, pendingRepair, previewKind)
                .listeners(this)
                .flushBeforeTransfer(pendingRepair == null) // sstables are isolated at the beginning of an incremental repair session, so flushing isn't neccessary
                .requestRanges(dest, preferred, desc.keyspace, ranges, desc.columnFamily); // request ranges from the remote node

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/IncomingStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/IncomingStream.java b/src/java/org/apache/cassandra/streaming/IncomingStream.java
new file mode 100644
index 0000000..18bebf5
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/IncomingStream.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.schema.TableId;
+
+/**
+ * The counterpart of {@link OutgoingStream} on the receiving side.
+ *
+ * Data streamed in can (and should) be persisted, but must not be included in the table's
+ * live data set until added by {@link StreamReceiver}. If the stream fails, the stream receiver will
+ * delete the streamed data, but implementations still need to handle the case where it's process dies
+ * during streaming, and it has data left around on startup, in which case it should be deleted.
+ */
+public interface IncomingStream
+{
+
+    /**
+     * Read in the stream data.
+     */
+    void read(DataInputPlus inputPlus, int version) throws IOException;
+
+    String getName();
+    long getSize();
+    TableId getTableId();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/OutgoingStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/OutgoingStream.java b/src/java/org/apache/cassandra/streaming/OutgoingStream.java
new file mode 100644
index 0000000..e71b985
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/OutgoingStream.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.schema.TableId;
+
+/**
+ * Some subset of data to be streamed. Implementations handle writing out their data via the write method.
+ * On the receiving end, {@link IncomingStream} streams the data in.
+ *
+ * All the data contained in a given stream needs to have the same repairedAt timestamp (or 0) and pendingRepair
+ * id (or null).
+ */
+public interface OutgoingStream
+{
+    /**
+     * Write the streams data into the socket
+     */
+    void write(StreamSession session, DataOutputStreamPlus output, int version) throws IOException;
+
+    /**
+     * Release any resources held by the stream
+     */
+    void finish();
+
+    long getRepairedAt();
+    UUID getPendingRepair();
+
+    String getName();
+    long getSize();
+    TableId getTableId();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/PreviewKind.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/PreviewKind.java b/src/java/org/apache/cassandra/streaming/PreviewKind.java
index 3b4d2a0..51c5746 100644
--- a/src/java/org/apache/cassandra/streaming/PreviewKind.java
+++ b/src/java/org/apache/cassandra/streaming/PreviewKind.java
@@ -21,26 +21,19 @@ package org.apache.cassandra.streaming;
 
 import java.util.UUID;
 
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-
 public enum PreviewKind
 {
-    NONE(0, null),
-    ALL(1, Predicates.alwaysTrue()),
-    UNREPAIRED(2, Predicates.not(SSTableReader::isRepaired)),
-    REPAIRED(3, SSTableReader::isRepaired);
+    NONE(0),
+    ALL(1),
+    UNREPAIRED(2),
+    REPAIRED(3);
 
     private final int serializationVal;
-    private final Predicate<SSTableReader> streamingPredicate;
 
-    PreviewKind(int serializationVal, Predicate<SSTableReader> streamingPredicate)
+    PreviewKind(int serializationVal)
     {
         assert ordinal() == serializationVal;
         this.serializationVal = serializationVal;
-        this.streamingPredicate = streamingPredicate;
     }
 
     public int getSerializationVal()
@@ -53,10 +46,6 @@ public enum PreviewKind
         return values()[serializationVal];
     }
 
-    public Predicate<SSTableReader> getStreamingPredicate()
-    {
-        return streamingPredicate;
-    }
 
     public boolean isPreview()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/ProgressInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ProgressInfo.java b/src/java/org/apache/cassandra/streaming/ProgressInfo.java
index 2334599..ac91855 100644
--- a/src/java/org/apache/cassandra/streaming/ProgressInfo.java
+++ b/src/java/org/apache/cassandra/streaming/ProgressInfo.java
@@ -24,7 +24,7 @@ import com.google.common.base.Objects;
 import org.apache.cassandra.locator.InetAddressAndPort;
 
 /**
- * ProgressInfo contains file transfer progress.
+ * ProgressInfo contains stream transfer progress.
  */
 public class ProgressInfo implements Serializable
 {
@@ -69,7 +69,7 @@ public class ProgressInfo implements Serializable
     }
 
     /**
-     * @return true if file transfer is completed
+     * @return true if transfer is completed
      */
     public boolean isCompleted()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/SessionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/SessionInfo.java b/src/java/org/apache/cassandra/streaming/SessionInfo.java
index bbca753..4b4bbed 100644
--- a/src/java/org/apache/cassandra/streaming/SessionInfo.java
+++ b/src/java/org/apache/cassandra/streaming/SessionInfo.java
@@ -70,7 +70,7 @@ public final class SessionInfo implements Serializable
     }
 
     /**
-     * Update progress of receiving/sending file.
+     * Update progress of receiving/sending stream.
      *
      * @param newProgress new progress info
      */
@@ -157,11 +157,11 @@ public final class SessionInfo implements Serializable
         return getTotalSizes(sendingSummaries);
     }
 
-    private long getTotalSizeInProgress(Collection<ProgressInfo> files)
+    private long getTotalSizeInProgress(Collection<ProgressInfo> streams)
     {
         long total = 0;
-        for (ProgressInfo file : files)
-            total += file.currentBytes;
+        for (ProgressInfo stream : streams)
+            total += stream.currentBytes;
         return total;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
index a22e07d..139488d 100644
--- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
+++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
@@ -46,18 +46,18 @@ public class StreamCoordinator
     private final boolean connectSequentially;
 
     private Map<InetAddressAndPort, HostStreamingData> peerSessions = new HashMap<>();
+    private final StreamOperation streamOperation;
     private final int connectionsPerHost;
     private StreamConnectionFactory factory;
-    private final boolean keepSSTableLevel;
     private Iterator<StreamSession> sessionsToConnect = null;
     private final UUID pendingRepair;
     private final PreviewKind previewKind;
 
-    public StreamCoordinator(int connectionsPerHost, boolean keepSSTableLevel, StreamConnectionFactory factory,
+    public StreamCoordinator(StreamOperation streamOperation, int connectionsPerHost, StreamConnectionFactory factory,
                              boolean connectSequentially, UUID pendingRepair, PreviewKind previewKind)
     {
+        this.streamOperation = streamOperation;
         this.connectionsPerHost = connectionsPerHost;
-        this.keepSSTableLevel = keepSSTableLevel;
         this.factory = factory;
         this.connectSequentially = connectSequentially;
         this.pendingRepair = pendingRepair;
@@ -191,51 +191,47 @@ public class StreamCoordinator
         return result;
     }
 
-    public synchronized void transferFiles(InetAddressAndPort to, Collection<StreamSession.SSTableStreamingSections> sstableDetails)
+    public synchronized void transferStreams(InetAddressAndPort to, Collection<OutgoingStream> streams)
     {
         HostStreamingData sessionList = getOrCreateHostData(to);
 
         if (connectionsPerHost > 1)
         {
-            List<List<StreamSession.SSTableStreamingSections>> buckets = sliceSSTableDetails(sstableDetails);
+            List<Collection<OutgoingStream>> buckets = bucketStreams(streams);
 
-            for (List<StreamSession.SSTableStreamingSections> subList : buckets)
+            for (Collection<OutgoingStream> bucket : buckets)
             {
                 StreamSession session = sessionList.getOrCreateNextSession(to, to);
-                session.addTransferFiles(subList);
+                session.addTransferStreams(bucket);
             }
         }
         else
         {
             StreamSession session = sessionList.getOrCreateNextSession(to, to);
-            session.addTransferFiles(sstableDetails);
+            session.addTransferStreams(streams);
         }
     }
 
-    private List<List<StreamSession.SSTableStreamingSections>> sliceSSTableDetails(Collection<StreamSession.SSTableStreamingSections> sstableDetails)
+    private List<Collection<OutgoingStream>> bucketStreams(Collection<OutgoingStream> streams)
     {
         // There's no point in divvying things up into more buckets than we have sstableDetails
-        int targetSlices = Math.min(sstableDetails.size(), connectionsPerHost);
-        int step = Math.round((float) sstableDetails.size() / (float) targetSlices);
+        int targetSlices = Math.min(streams.size(), connectionsPerHost);
+        int step = Math.round((float) streams.size() / (float) targetSlices);
         int index = 0;
 
-        List<List<StreamSession.SSTableStreamingSections>> result = new ArrayList<>();
-        List<StreamSession.SSTableStreamingSections> slice = null;
-        Iterator<StreamSession.SSTableStreamingSections> iter = sstableDetails.iterator();
-        while (iter.hasNext())
-        {
-            StreamSession.SSTableStreamingSections streamSession = iter.next();
+        List<Collection<OutgoingStream>> result = new ArrayList<>();
+        List<OutgoingStream> slice = null;
 
+        for (OutgoingStream stream: streams)
+        {
             if (index % step == 0)
             {
                 slice = new ArrayList<>();
                 result.add(slice);
             }
-            slice.add(streamSession);
+            slice.add(stream);
             ++index;
-            iter.remove();
         }
-
         return result;
     }
 
@@ -302,7 +298,7 @@ public class StreamCoordinator
             // create
             if (streamSessions.size() < connectionsPerHost)
             {
-                StreamSession session = new StreamSession(peer, connecting, factory, streamSessions.size(), keepSSTableLevel, pendingRepair, previewKind);
+                StreamSession session = new StreamSession(streamOperation, peer, connecting, factory, streamSessions.size(), pendingRepair, previewKind);
                 streamSessions.put(++lastReturned, session);
                 return session;
             }
@@ -334,7 +330,7 @@ public class StreamCoordinator
             StreamSession session = streamSessions.get(id);
             if (session == null)
             {
-                session = new StreamSession(peer, connecting, factory, id, keepSSTableLevel, pendingRepair, previewKind);
+                session = new StreamSession(streamOperation, peer, connecting, factory, id, pendingRepair, previewKind);
                 streamSessions.put(id, session);
             }
             return session;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/StreamHook.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamHook.java b/src/java/org/apache/cassandra/streaming/StreamHook.java
index d610297..86b5182 100644
--- a/src/java/org/apache/cassandra/streaming/StreamHook.java
+++ b/src/java/org/apache/cassandra/streaming/StreamHook.java
@@ -18,19 +18,17 @@
 
 package org.apache.cassandra.streaming;
 
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.io.sstable.SSTableMultiWriter;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.streaming.messages.OutgoingStreamMessage;
 import org.apache.cassandra.utils.FBUtilities;
 
 public interface StreamHook
 {
     public static final StreamHook instance = createHook();
 
-    public OutgoingFileMessage reportOutgoingFile(StreamSession session, SSTableReader sstable, OutgoingFileMessage message);
+    public OutgoingStreamMessage reportOutgoingStream(StreamSession session, OutgoingStream stream, OutgoingStreamMessage message);
     public void reportStreamFuture(StreamSession session, StreamResultFuture future);
-    public void reportIncomingFile(ColumnFamilyStore cfs, SSTableMultiWriter writer, StreamSession session, int sequenceNumber);
+    public void reportIncomingStream(TableId tableId, IncomingStream stream, StreamSession session, int sequenceNumber);
 
     static StreamHook createHook()
     {
@@ -43,14 +41,14 @@ public interface StreamHook
         {
             return new StreamHook()
             {
-                public OutgoingFileMessage reportOutgoingFile(StreamSession session, SSTableReader sstable, OutgoingFileMessage message)
+                public OutgoingStreamMessage reportOutgoingStream(StreamSession session, OutgoingStream stream, OutgoingStreamMessage message)
                 {
                     return message;
                 }
 
                 public void reportStreamFuture(StreamSession session, StreamResultFuture future) {}
 
-                public void reportIncomingFile(ColumnFamilyStore cfs, SSTableMultiWriter writer, StreamSession session, int sequenceNumber) {}
+                public void reportIncomingStream(TableId tableId, IncomingStream stream, StreamSession session, int sequenceNumber) {}
             };
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index 43e9068..98d68ce 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -48,19 +48,19 @@ public class StreamPlan
      */
     public StreamPlan(StreamOperation streamOperation)
     {
-        this(streamOperation, 1, false, false, NO_PENDING_REPAIR, PreviewKind.NONE);
+        this(streamOperation, 1, false, NO_PENDING_REPAIR, PreviewKind.NONE);
     }
 
-    public StreamPlan(StreamOperation streamOperation, boolean keepSSTableLevels, boolean connectSequentially)
+    public StreamPlan(StreamOperation streamOperation, boolean connectSequentially)
     {
-        this(streamOperation, 1, keepSSTableLevels, connectSequentially, NO_PENDING_REPAIR, PreviewKind.NONE);
+        this(streamOperation, 1, connectSequentially, NO_PENDING_REPAIR, PreviewKind.NONE);
     }
 
-    public StreamPlan(StreamOperation streamOperation, int connectionsPerHost, boolean keepSSTableLevels,
+    public StreamPlan(StreamOperation streamOperation, int connectionsPerHost,
                       boolean connectSequentially, UUID pendingRepair, PreviewKind previewKind)
     {
         this.streamOperation = streamOperation;
-        this.coordinator = new StreamCoordinator(connectionsPerHost, keepSSTableLevels, new DefaultConnectionFactory(),
+        this.coordinator = new StreamCoordinator(streamOperation, connectionsPerHost, new DefaultConnectionFactory(),
                                                  connectSequentially, pendingRepair, previewKind);
     }
 
@@ -137,18 +137,16 @@ public class StreamPlan
     }
 
     /**
-     * Add transfer task to send given SSTable files.
+     * Add transfer task to send given streams
      *
      * @param to endpoint address of receiver
-     * @param sstableDetails sstables with file positions and estimated key count.
-     *                       this collection will be modified to remove those files that are successfully handed off
+     * @param streams streams to send
      * @return this object for chaining
      */
-    public StreamPlan transferFiles(InetAddressAndPort to, Collection<StreamSession.SSTableStreamingSections> sstableDetails)
+    public StreamPlan transferStreams(InetAddressAndPort to, Collection<OutgoingStream> streams)
     {
-        coordinator.transferFiles(to, sstableDetails);
+        coordinator.transferStreams(to, streams);
         return this;
-
     }
 
     public StreamPlan listeners(StreamEventHandler handler, StreamEventHandler... handlers)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
deleted file mode 100644
index f4eb9c4..0000000
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming;
-
-import java.io.*;
-import java.util.Collection;
-import java.util.UUID;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.UnmodifiableIterator;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.io.util.TrackedDataInputPlus;
-import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.io.sstable.SSTableMultiWriter;
-import org.apache.cassandra.io.sstable.SSTableSimpleIterator;
-import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
-import org.apache.cassandra.io.sstable.format.SSTableFormat;
-import org.apache.cassandra.io.sstable.format.Version;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.streaming.compress.StreamCompressionInputStream;
-import org.apache.cassandra.streaming.messages.FileMessageHeader;
-import org.apache.cassandra.streaming.messages.StreamMessage;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
-
-/**
- * StreamReader reads from stream and writes to SSTable.
- */
-public class StreamReader
-{
-    private static final Logger logger = LoggerFactory.getLogger(StreamReader.class);
-    protected final TableId tableId;
-    protected final long estimatedKeys;
-    protected final Collection<Pair<Long, Long>> sections;
-    protected final StreamSession session;
-    protected final Version inputVersion;
-    protected final long repairedAt;
-    protected final UUID pendingRepair;
-    protected final SSTableFormat.Type format;
-    protected final int sstableLevel;
-    protected final SerializationHeader.Component header;
-    protected final int fileSeqNum;
-
-    public StreamReader(FileMessageHeader header, StreamSession session)
-    {
-        if (session.getPendingRepair() != null)
-        {
-            // we should only ever be streaming pending repair
-            // sstables if the session has a pending repair id
-            assert session.getPendingRepair().equals(header.pendingRepair);
-        }
-        this.session = session;
-        this.tableId = header.tableId;
-        this.estimatedKeys = header.estimatedKeys;
-        this.sections = header.sections;
-        this.inputVersion = header.version;
-        this.repairedAt = header.repairedAt;
-        this.pendingRepair = header.pendingRepair;
-        this.format = header.format;
-        this.sstableLevel = header.sstableLevel;
-        this.header = header.header;
-        this.fileSeqNum = header.sequenceNumber;
-    }
-
-    /**
-     * @param inputPlus where this reads data from
-     * @return SSTable transferred
-     * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
-     */
-    @SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed
-    public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException
-    {
-        long totalSize = totalSize();
-
-        ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
-        if (cfs == null)
-        {
-            // schema was dropped during streaming
-            throw new IOException("CF " + tableId + " was dropped during streaming");
-        }
-
-        logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}', pendingRepair = '{}'.",
-                     session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(),
-                     cfs.getTableName(), pendingRepair);
-
-        StreamDeserializer deserializer = null;
-        SSTableMultiWriter writer = null;
-        try (StreamCompressionInputStream streamCompressionInputStream = new StreamCompressionInputStream(inputPlus, StreamMessage.CURRENT_VERSION))
-        {
-            TrackedDataInputPlus in = new TrackedDataInputPlus(streamCompressionInputStream);
-            deserializer = new StreamDeserializer(cfs.metadata(), in, inputVersion, getHeader(cfs.metadata()));
-            writer = createWriter(cfs, totalSize, repairedAt, pendingRepair, format);
-            while (in.getBytesRead() < totalSize)
-            {
-                writePartition(deserializer, writer);
-                // TODO move this to BytesReadTracker
-                session.progress(writer.getFilename(), ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
-            }
-            logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}",
-                         session.planId(), fileSeqNum, session.peer, FBUtilities.prettyPrintMemory(in.getBytesRead()), FBUtilities.prettyPrintMemory(totalSize));
-            return writer;
-        }
-        catch (Throwable e)
-        {
-            Object partitionKey = deserializer != null ? deserializer.partitionKey() : "";
-            logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.",
-                        session.planId(), partitionKey, cfs.keyspace.getName(), cfs.getTableName(), e);
-            if (writer != null)
-            {
-                writer.abort(e);
-            }
-            throw Throwables.propagate(e);
-        }
-    }
-
-    protected SerializationHeader getHeader(TableMetadata metadata)
-    {
-        return header != null? header.toHeader(metadata) : null; //pre-3.0 sstable have no SerializationHeader
-    }
-
-    protected SSTableMultiWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, UUID pendingRepair, SSTableFormat.Type format) throws IOException
-    {
-        Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize);
-        if (localDir == null)
-            throw new IOException(String.format("Insufficient disk space to store %s", FBUtilities.prettyPrintMemory(totalSize)));
-
-        RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, pendingRepair, format, sstableLevel, totalSize, session.getTransaction(tableId), getHeader(cfs.metadata()));
-        StreamHook.instance.reportIncomingFile(cfs, writer, session, fileSeqNum);
-        return writer;
-    }
-
-    protected long totalSize()
-    {
-        long size = 0;
-        for (Pair<Long, Long> section : sections)
-            size += section.right - section.left;
-        return size;
-    }
-
-    protected void writePartition(StreamDeserializer deserializer, SSTableMultiWriter writer) throws IOException
-    {
-        writer.append(deserializer.newPartition());
-        deserializer.checkForExceptions();
-    }
-
-    public static class StreamDeserializer extends UnmodifiableIterator<Unfiltered> implements UnfilteredRowIterator
-    {
-        private final TableMetadata metadata;
-        private final DataInputPlus in;
-        private final SerializationHeader header;
-        private final SerializationHelper helper;
-
-        private DecoratedKey key;
-        private DeletionTime partitionLevelDeletion;
-        private SSTableSimpleIterator iterator;
-        private Row staticRow;
-        private IOException exception;
-
-        public StreamDeserializer(TableMetadata metadata, DataInputPlus in, Version version, SerializationHeader header) throws IOException
-        {
-            this.metadata = metadata;
-            this.in = in;
-            this.helper = new SerializationHelper(metadata, version.correspondingMessagingVersion(), SerializationHelper.Flag.PRESERVE_SIZE);
-            this.header = header;
-        }
-
-        public StreamDeserializer newPartition() throws IOException
-        {
-            key = metadata.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in));
-            partitionLevelDeletion = DeletionTime.serializer.deserialize(in);
-            iterator = SSTableSimpleIterator.create(metadata, in, header, helper, partitionLevelDeletion);
-            staticRow = iterator.readStaticRow();
-            return this;
-        }
-
-        public TableMetadata metadata()
-        {
-            return metadata;
-        }
-
-        public RegularAndStaticColumns columns()
-        {
-            // We don't know which columns we'll get so assume it can be all of them
-            return metadata.regularAndStaticColumns();
-        }
-
-        public boolean isReverseOrder()
-        {
-            return false;
-        }
-
-        public DecoratedKey partitionKey()
-        {
-            return key;
-        }
-
-        public DeletionTime partitionLevelDeletion()
-        {
-            return partitionLevelDeletion;
-        }
-
-        public Row staticRow()
-        {
-            return staticRow;
-        }
-
-        public EncodingStats stats()
-        {
-            return header.stats();
-        }
-
-        public boolean hasNext()
-        {
-            try
-            {
-                return iterator.hasNext();
-            }
-            catch (IOError e)
-            {
-                if (e.getCause() != null && e.getCause() instanceof IOException)
-                {
-                    exception = (IOException)e.getCause();
-                    return false;
-                }
-                throw e;
-            }
-        }
-
-        public Unfiltered next()
-        {
-            // Note that in practice we know that IOException will be thrown by hasNext(), because that's
-            // where the actual reading happens, so we don't bother catching RuntimeException here (contrarily
-            // to what we do in hasNext)
-            Unfiltered unfiltered = iterator.next();
-            return metadata.isCounter() && unfiltered.kind() == Unfiltered.Kind.ROW
-                   ? maybeMarkLocalToBeCleared((Row) unfiltered)
-                   : unfiltered;
-        }
-
-        private Row maybeMarkLocalToBeCleared(Row row)
-        {
-            return metadata.isCounter() ? row.markCounterLocalToBeCleared() : row;
-        }
-
-        public void checkForExceptions() throws IOException
-        {
-            if (exception != null)
-                throw exception;
-        }
-
-        public void close()
-        {
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index b823311..49beba1 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -17,38 +17,16 @@
  */
 package org.apache.cassandra.streaming;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.db.rows.ThrottledUnfilteredIterator;
-import org.apache.cassandra.db.rows.UnfilteredRowIterator;
-import org.apache.cassandra.db.view.View;
-import org.apache.cassandra.dht.Bounds;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.ISSTableScanner;
-import org.apache.cassandra.io.sstable.SSTableMultiWriter;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.cassandra.utils.Throwables;
-import org.apache.cassandra.utils.concurrent.Refs;
 
 /**
  * Task that manages receiving files for the session for certain ColumnFamily.
@@ -59,69 +37,51 @@ public class StreamReceiveTask extends StreamTask
 
     private static final ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("StreamReceiveTask"));
 
-    private static final int MAX_ROWS_PER_BATCH = Integer.getInteger("cassandra.repair.mutation_repair_rows_per_batch", 100);
+    private final StreamReceiver receiver;
 
-    // number of files to receive
-    private final int totalFiles;
-    // total size of files to receive
-    private final long totalSize;
+    // number of streams to receive
+    private final int totalStreams;
 
-    // Transaction tracking new files received
-    private final LifecycleTransaction txn;
+    // total size of streams to receive
+    private final long totalSize;
 
     // true if task is done (either completed or aborted)
     private volatile boolean done = false;
 
-    //  holds references to SSTables received
-    protected Collection<SSTableReader> sstables;
-
-    private int remoteSSTablesReceived = 0;
+    private int remoteStreamsReceived = 0;
 
-    public StreamReceiveTask(StreamSession session, TableId tableId, int totalFiles, long totalSize)
+    public StreamReceiveTask(StreamSession session, TableId tableId, int totalStreams, long totalSize)
     {
         super(session, tableId);
-        this.totalFiles = totalFiles;
+        this.receiver = ColumnFamilyStore.getIfExists(tableId).getStreamManager().createStreamReceiver(session, totalStreams);
+        this.totalStreams = totalStreams;
         this.totalSize = totalSize;
-        // this is an "offline" transaction, as we currently manually expose the sstables once done;
-        // this should be revisited at a later date, so that LifecycleTransaction manages all sstable state changes
-        this.txn = LifecycleTransaction.offline(OperationType.STREAM);
-        this.sstables = new ArrayList<>(totalFiles);
     }
 
     /**
-     * Process received file.
+     * Process received stream.
      *
-     * @param sstable SSTable file received.
+     * @param stream Stream received.
      */
-    public synchronized void received(SSTableMultiWriter sstable)
+    public synchronized void received(IncomingStream stream)
     {
         Preconditions.checkState(!session.isPreview(), "we should never receive sstables when previewing");
 
         if (done)
         {
-            logger.warn("[{}] Received sstable {} on already finished stream received task. Aborting sstable.", session.planId(),
-                        sstable.getFilename());
-            Throwables.maybeFail(sstable.abort(null));
+            logger.warn("[{}] Received stream {} on already finished stream received task. Aborting stream.", session.planId(),
+                        stream.getName());
+            receiver.discardStream(stream);
             return;
         }
 
-        remoteSSTablesReceived++;
-        assert tableId.equals(sstable.getTableId());
-        logger.debug("recevied {} of {} total files", remoteSSTablesReceived, totalFiles);
+        remoteStreamsReceived++;
+        Preconditions.checkArgument(tableId.equals(stream.getTableId()));
+        logger.debug("recevied {} of {} total files", remoteStreamsReceived, totalStreams);
 
-        Collection<SSTableReader> finished = null;
-        try
-        {
-            finished = sstable.finish(true);
-        }
-        catch (Throwable t)
-        {
-            Throwables.maybeFail(sstable.abort(t));
-        }
-        txn.update(finished, false);
-        sstables.addAll(finished);
+        receiver.received(stream);
 
-        if (remoteSSTablesReceived == totalFiles)
+        if (remoteStreamsReceived == totalStreams)
         {
             done = true;
             executor.submit(new OnCompletionRunnable(this));
@@ -130,7 +90,7 @@ public class StreamReceiveTask extends StreamTask
 
     public int getTotalNumberOfFiles()
     {
-        return totalFiles;
+        return totalStreams;
     }
 
     public long getTotalSize()
@@ -138,11 +98,11 @@ public class StreamReceiveTask extends StreamTask
         return totalSize;
     }
 
-    public synchronized LifecycleTransaction getTransaction()
+    public synchronized StreamReceiver getReceiver()
     {
         if (done)
             throw new RuntimeException(String.format("Stream receive task %s of cf %s already finished.", session.planId(), tableId));
-        return txn;
+        return receiver;
     }
 
     private static class OnCompletionRunnable implements Runnable
@@ -154,116 +114,19 @@ public class StreamReceiveTask extends StreamTask
             this.task = task;
         }
 
-        /*
-         * We have a special path for views and for CDC.
-         *
-         * For views, since the view requires cleaning up any pre-existing state, we must put all partitions
-         * through the same write path as normal mutations. This also ensures any 2is are also updated.
-         *
-         * For CDC-enabled tables, we want to ensure that the mutations are run through the CommitLog so they
-         * can be archived by the CDC process on discard.
-         */
-        private boolean requiresWritePath(ColumnFamilyStore cfs) {
-            return hasCDC(cfs) || (task.session.streamOperation().requiresViewBuild() && hasViews(cfs));
-        }
-
-        private boolean hasViews(ColumnFamilyStore cfs)
-        {
-            return !Iterables.isEmpty(View.findAll(cfs.metadata.keyspace, cfs.getTableName()));
-        }
-
-        private boolean hasCDC(ColumnFamilyStore cfs)
-        {
-            return cfs.metadata().params.cdc;
-        }
-
-        private void sendThroughWritePath(ColumnFamilyStore cfs, Collection<SSTableReader> readers) {
-            boolean hasCdc = hasCDC(cfs);
-            ColumnFilter filter = ColumnFilter.all(cfs.metadata());
-            for (SSTableReader reader : readers)
-            {
-                Keyspace ks = Keyspace.open(reader.getKeyspaceName());
-                // When doing mutation-based repair we split each partition into smaller batches
-                // ({@link Stream MAX_ROWS_PER_BATCH}) to avoid OOMing and generating heap pressure
-                try (ISSTableScanner scanner = reader.getScanner();
-                     CloseableIterator<UnfilteredRowIterator> throttledPartitions = ThrottledUnfilteredIterator.throttle(scanner, MAX_ROWS_PER_BATCH))
-                {
-                    while (throttledPartitions.hasNext())
-                    {
-                        // MV *can* be applied unsafe if there's no CDC on the CFS as we flush
-                        // before transaction is done.
-                        //
-                        // If the CFS has CDC, however, these updates need to be written to the CommitLog
-                        // so they get archived into the cdc_raw folder
-                        ks.apply(new Mutation(PartitionUpdate.fromIterator(throttledPartitions.next(), filter)),
-                                 hasCdc,
-                                 true,
-                                 false);
-                    }
-                }
-            }
-        }
-
         public void run()
         {
-            ColumnFamilyStore cfs = null;
-            boolean requiresWritePath = false;
             try
             {
-                cfs = ColumnFamilyStore.getIfExists(task.tableId);
-                if (cfs == null)
+                if (ColumnFamilyStore.getIfExists(task.tableId) == null)
                 {
                     // schema was dropped during streaming
-                    task.sstables.clear();
-                    task.abortTransaction();
+                    task.receiver.abort();
                     task.session.taskCompleted(task);
                     return;
                 }
 
-                requiresWritePath = requiresWritePath(cfs);
-                Collection<SSTableReader> readers = task.sstables;
-
-                try (Refs<SSTableReader> refs = Refs.ref(readers))
-                {
-                    if (requiresWritePath)
-                    {
-                        sendThroughWritePath(cfs, readers);
-                    }
-                    else
-                    {
-                        task.finishTransaction();
-
-                        // add sstables (this will build secondary indexes too, see CASSANDRA-10130)
-                        logger.debug("[Stream #{}] Received {} sstables from {} ({})", task.session.planId(), readers.size(), task.session.peer, readers);
-                        cfs.addSSTables(readers);
-
-                        //invalidate row and counter cache
-                        if (cfs.isRowCacheEnabled() || cfs.metadata().isCounter())
-                        {
-                            List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
-                            readers.forEach(sstable -> boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken())));
-                            Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
-
-                            if (cfs.isRowCacheEnabled())
-                            {
-                                int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
-                                if (invalidatedKeys > 0)
-                                    logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
-                                                 "receive task completed.", task.session.planId(), invalidatedKeys,
-                                                 cfs.keyspace.getName(), cfs.getTableName());
-                            }
-
-                            if (cfs.metadata().isCounter())
-                            {
-                                int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
-                                if (invalidatedKeys > 0)
-                                    logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
-                                                 "receive task completed.", task.session.planId(), invalidatedKeys,
-                                                 cfs.keyspace.getName(), cfs.getTableName());
-                            }
-                        }
-                    }
-                }
+                task.receiver.finished();;
                 task.session.taskCompleted(task);
             }
             catch (Throwable t)
@@ -273,14 +136,7 @@ public class StreamReceiveTask extends StreamTask
             }
             finally
             {
-                // We don't keep the streamed sstables since we've applied them manually so we abort the txn and delete
-                // the streamed sstables.
-                if (requiresWritePath)
-                {
-                    if (cfs != null)
-                        cfs.forceBlockingFlush();
-                    task.abortTransaction();
-                }
+                task.receiver.cleanup();
             }
         }
     }
@@ -297,17 +153,6 @@ public class StreamReceiveTask extends StreamTask
             return;
 
         done = true;
-        abortTransaction();
-        sstables.clear();
-    }
-
-    private synchronized void abortTransaction()
-    {
-        txn.abort();
-    }
-
-    private synchronized void finishTransaction()
-    {
-        txn.finish();
+        receiver.abort();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/StreamReceiver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiver.java b/src/java/org/apache/cassandra/streaming/StreamReceiver.java
new file mode 100644
index 0000000..bc357ef
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiver.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+/**
+ * StreamReceiver acts as a staging area for incoming data. Received data
+ * ends up here, and is kept separate from the live data until all streams
+ * for a session have been received successfully
+ */
+public interface StreamReceiver
+{
+    /**
+     * Called after we've finished receiving stream data. The data covered by the given stream should
+     * be kept isolated from the live dataset for it's table.
+     */
+    void received(IncomingStream stream);
+
+    /**
+     * This is called when we've received stream data we can't add to the received set for some reason,
+     * usually when we've received data for a session which has been closed. The data backing this stream
+     * should be deleted, and any resources associated with the given stream should be released.
+     */
+    void discardStream(IncomingStream stream);
+
+    /**
+     * Called when something went wrong with a stream session. All data associated with this receiver
+     * should be deleted, and any associated resources should be cleaned up
+     */
+    void abort();
+
+    /**
+     * Called when a stream session has succesfully completed. All stream data being held by this receiver
+     * should be added to the live data sets for their respective tables before this method returns.
+     */
+    void finished();
+
+    /**
+     * Called after finished has returned and we've sent any messages to other nodes. Mainly for
+     * signaling that mvs and cdc should cleanup.
+     */
+    void cleanup();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index 544f37f..3b11fb6 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -73,9 +73,9 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
             set(getCurrentState());
     }
 
-    private StreamResultFuture(UUID planId, StreamOperation streamOperation, boolean keepSSTableLevels, UUID pendingRepair, PreviewKind previewKind)
+    private StreamResultFuture(UUID planId, StreamOperation streamOperation, UUID pendingRepair, PreviewKind previewKind)
     {
-        this(planId, streamOperation, new StreamCoordinator(0, keepSSTableLevels, new DefaultConnectionFactory(), false, pendingRepair, previewKind));
+        this(planId, streamOperation, new StreamCoordinator(streamOperation, 0, new DefaultConnectionFactory(), false, pendingRepair, previewKind));
     }
 
     static StreamResultFuture init(UUID planId, StreamOperation streamOperation, Collection<StreamEventHandler> listeners,
@@ -106,7 +106,6 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
                                                                     StreamOperation streamOperation,
                                                                     InetAddressAndPort from,
                                                                     Channel channel,
-                                                                    boolean keepSSTableLevel,
                                                                     UUID pendingRepair,
                                                                     PreviewKind previewKind)
     {
@@ -116,7 +115,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
             logger.info("[Stream #{} ID#{}] Creating new streaming plan for {}", planId, sessionIndex, streamOperation.getDescription());
 
             // The main reason we create a StreamResultFuture on the receiving side is for JMX exposure.
-            future = new StreamResultFuture(planId, streamOperation, keepSSTableLevel, pendingRepair, previewKind);
+            future = new StreamResultFuture(planId, streamOperation, pendingRepair, previewKind);
             StreamManager.instance.registerReceiving(future);
         }
         future.attachConnection(from, sessionIndex, channel);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org