You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2018/07/27 18:36:02 UTC

[1/3] cassandra git commit: Stream entire SSTables when possible

Repository: cassandra
Updated Branches:
  refs/heads/trunk 6ba2fb939 -> 47a12c52a


http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java
new file mode 100644
index 0000000..8256ac6
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.KeyIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.CachingParams;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.streaming.StreamOperation;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class CassandraOutgoingFileTest
+{
+    public static final String KEYSPACE = "CassandraOutgoingFileTest";
+    public static final String CF_STANDARD = "Standard1";
+    public static final String CF_INDEXED = "Indexed1";
+    public static final String CF_STANDARDLOWINDEXINTERVAL = "StandardLowIndexInterval";
+
+    private static SSTableReader sstable;
+    private static ColumnFamilyStore store;
+
+    @BeforeClass
+    public static void defineSchemaAndPrepareSSTable()
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE, CF_STANDARD),
+                                    SchemaLoader.compositeIndexCFMD(KEYSPACE, CF_INDEXED, true),
+                                    SchemaLoader.standardCFMD(KEYSPACE, CF_STANDARDLOWINDEXINTERVAL)
+                                                .minIndexInterval(8)
+                                                .maxIndexInterval(256)
+                                                .caching(CachingParams.CACHE_NOTHING));
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        store = keyspace.getColumnFamilyStore(CF_STANDARD);
+
+        // insert data and compact to a single sstable
+        CompactionManager.instance.disableAutoCompaction();
+        for (int j = 0; j < 10; j++)
+        {
+            new RowUpdateBuilder(store.metadata(), j, String.valueOf(j))
+            .clustering("0")
+            .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
+            .build()
+            .applyUnsafe();
+        }
+        store.forceBlockingFlush();
+        CompactionManager.instance.performMaximal(store, false);
+
+        sstable = store.getLiveSSTables().iterator().next();
+    }
+
+    @Test
+    public void validateFullyContainedIn_SingleContiguousRange_Succeeds()
+    {
+        List<Range<Token>> requestedRanges = Arrays.asList(new Range<>(store.getPartitioner().getMinimumToken(), sstable.last.getToken()));
+
+        CassandraOutgoingFile cof = new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.ref(),
+                                                              sstable.getPositionsForRanges(requestedRanges),
+                                                              requestedRanges, sstable.estimatedKeys());
+
+        assertTrue(cof.contained(requestedRanges, sstable));
+    }
+
+    @Test
+    public void validateFullyContainedIn_PartialOverlap_Fails()
+    {
+        List<Range<Token>> requestedRanges = Arrays.asList(new Range<>(store.getPartitioner().getMinimumToken(), getTokenAtIndex(2)));
+
+        CassandraOutgoingFile cof = new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.ref(),
+                                                              sstable.getPositionsForRanges(requestedRanges),
+                                                              requestedRanges, sstable.estimatedKeys());
+
+        assertFalse(cof.contained(requestedRanges, sstable));
+    }
+
+    @Test
+    public void validateFullyContainedIn_SplitRange_Succeeds()
+    {
+        List<Range<Token>> requestedRanges = Arrays.asList(new Range<>(store.getPartitioner().getMinimumToken(), getTokenAtIndex(4)),
+                                                         new Range<>(getTokenAtIndex(2), getTokenAtIndex(6)),
+                                                         new Range<>(getTokenAtIndex(5), sstable.last.getToken()));
+
+        CassandraOutgoingFile cof = new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.ref(),
+                                                              sstable.getPositionsForRanges(requestedRanges),
+                                                              requestedRanges, sstable.estimatedKeys());
+
+        assertTrue(cof.contained(requestedRanges, sstable));
+    }
+
+    private DecoratedKey getKeyAtIndex(int i)
+    {
+        int count = 0;
+        DecoratedKey key;
+
+        try (KeyIterator iter = new KeyIterator(sstable.descriptor, sstable.metadata()))
+        {
+            do
+            {
+                key = iter.next();
+                count++;
+            } while (iter.hasNext() && count < i);
+        }
+        return key;
+    }
+
+    private Token getTokenAtIndex(int i)
+    {
+        return getKeyAtIndex(i).getToken();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java
index 061a4b2..e48abf6 100644
--- a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java
+++ b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java
@@ -15,20 +15,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.cassandra.db.streaming;
 
-import java.util.ArrayList;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
 
 import org.junit.Test;
 
 import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
 import org.apache.cassandra.db.SerializationHeader;
-import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.db.streaming.CassandraStreamHeader.CassandraStreamHeaderSerializer;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.big.BigFormat;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.serializers.SerializationUtils;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class CassandraStreamHeaderTest
 {
@@ -37,14 +42,51 @@ public class CassandraStreamHeaderTest
     {
         String ddl = "CREATE TABLE tbl (k INT PRIMARY KEY, v INT)";
         TableMetadata metadata = CreateTableStatement.parse(ddl, "ks").build();
-        CassandraStreamHeader header = new CassandraStreamHeader(BigFormat.latestVersion,
-                                                                 SSTableFormat.Type.BIG,
-                                                                 0,
-                                                                 new ArrayList<>(),
-                                                                 ((CompressionMetadata) null),
-                                                                 0,
-                                                                 SerializationHeader.makeWithoutStats(metadata).toComponent());
+        CassandraStreamHeader header =
+            CassandraStreamHeader.builder()
+                                 .withSSTableFormat(SSTableFormat.Type.BIG)
+                                 .withSSTableVersion(BigFormat.latestVersion)
+                                 .withSSTableLevel(0)
+                                 .withEstimatedKeys(0)
+                                 .withSections(Collections.emptyList())
+                                 .withSerializationHeader(SerializationHeader.makeWithoutStats(metadata).toComponent())
+                                 .withTableId(metadata.id)
+                                 .build();
 
         SerializationUtils.assertSerializationCycle(header, CassandraStreamHeader.serializer);
     }
+
+    @Test
+    public void serializerTest_EntireSSTableTransfer()
+    {
+        String ddl = "CREATE TABLE tbl (k INT PRIMARY KEY, v INT)";
+        TableMetadata metadata = CreateTableStatement.parse(ddl, "ks").build();
+
+        ComponentManifest manifest = new ComponentManifest(new LinkedHashMap<Component, Long>() {{ put(Component.DATA, 100L); }});
+
+        CassandraStreamHeader header =
+            CassandraStreamHeader.builder()
+                                 .withSSTableFormat(SSTableFormat.Type.BIG)
+                                 .withSSTableVersion(BigFormat.latestVersion)
+                                 .withSSTableLevel(0)
+                                 .withEstimatedKeys(0)
+                                 .withSections(Collections.emptyList())
+                                 .withSerializationHeader(SerializationHeader.makeWithoutStats(metadata).toComponent())
+                                 .withComponentManifest(manifest)
+                                 .isEntireSSTable(true)
+                                 .withFirstKey(Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.EMPTY_BYTE_BUFFER))
+                                 .withTableId(metadata.id)
+                                 .build();
+
+        SerializationUtils.assertSerializationCycle(header, new TestableCassandraStreamHeaderSerializer());
+    }
+
+    private static class TestableCassandraStreamHeaderSerializer extends CassandraStreamHeaderSerializer
+    {
+        @Override
+        public CassandraStreamHeader deserialize(DataInputPlus in, int version) throws IOException
+        {
+            return deserialize(in, version, tableId -> Murmur3Partitioner.instance);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/db/streaming/ComponentManifestTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/streaming/ComponentManifestTest.java b/test/unit/org/apache/cassandra/db/streaming/ComponentManifestTest.java
new file mode 100644
index 0000000..f478a00
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/streaming/ComponentManifestTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.LinkedHashMap;
+
+import org.junit.Test;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.async.ByteBufDataInputPlus;
+import org.apache.cassandra.net.async.ByteBufDataOutputPlus;
+import org.apache.cassandra.serializers.SerializationUtils;
+
+import static org.junit.Assert.assertNotEquals;
+
+public class ComponentManifestTest
+{
+    @Test
+    public void testSerialization()
+    {
+        ComponentManifest expected = new ComponentManifest(new LinkedHashMap<Component, Long>() {{ put(Component.DATA, 100L); }});
+        SerializationUtils.assertSerializationCycle(expected, ComponentManifest.serializer);
+    }
+
+    @Test(expected = EOFException.class)
+    public void testSerialization_FailsOnBadBytes() throws IOException
+    {
+        ByteBuf buf = Unpooled.buffer(512);
+        ComponentManifest expected = new ComponentManifest(new LinkedHashMap<Component, Long>() {{ put(Component.DATA, 100L); }});
+
+        DataOutputPlus output = new ByteBufDataOutputPlus(buf);
+        ComponentManifest.serializer.serialize(expected, output, MessagingService.VERSION_40);
+
+        buf.setInt(0, -100);
+
+        DataInputPlus input = new ByteBufDataInputPlus(buf);
+        ComponentManifest actual = ComponentManifest.serializer.deserialize(input, MessagingService.VERSION_40);
+
+        assertNotEquals(expected, actual);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
index cee8802..fccb344 100644
--- a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
@@ -69,7 +69,10 @@ public class BigTableWriterTest extends AbstractTransactionalTest
 
         private TestableBTW(Descriptor desc)
         {
-            this(desc, SSTableTxnWriter.create(cfs, desc, 0, 0, null, new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS)));
+            this(desc, SSTableTxnWriter.create(cfs, desc, 0, 0, null,
+                                               new SerializationHeader(true, cfs.metadata(),
+                                                                       cfs.metadata().regularAndStaticColumns(),
+                                                                       EncodingStats.NO_STATS)));
         }
 
         private TestableBTW(Descriptor desc, SSTableTxnWriter sw)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
index fcc9191..c61ee1f 100644
--- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
@@ -22,6 +22,7 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
 import java.util.UUID;
@@ -318,6 +319,7 @@ public class LegacySSTableTest
         List<OutgoingStream> streams = Lists.newArrayList(new CassandraOutgoingFile(StreamOperation.OTHER,
                                                                                     sstable.ref(),
                                                                                     sstable.getPositionsForRanges(ranges),
+                                                                                    ranges,
                                                                                     sstable.estimatedKeysForRanges(ranges)));
         new StreamPlan(StreamOperation.OTHER).transferStreams(FBUtilities.getBroadcastAddressAndPort(), streams).execute().get();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriterTest.java
new file mode 100644
index 0000000..c3931e0
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriterTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable.format.big;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Set;
+import java.util.function.Function;
+
+import com.google.common.collect.ImmutableSet;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.Slices;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableReadsListener;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus;
+import org.apache.cassandra.schema.CachingParams;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
+
+import static org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class BigTableZeroCopyWriterTest
+{
+    public static final String KEYSPACE1 = "BigTableBlockWriterTest";
+    public static final String CF_STANDARD = "Standard1";
+    public static final String CF_STANDARD2 = "Standard2";
+    public static final String CF_INDEXED = "Indexed1";
+    public static final String CF_STANDARDLOWINDEXINTERVAL = "StandardLowIndexInterval";
+
+    public static SSTableReader sstable;
+    public static ColumnFamilyStore store;
+    private static int expectedRowCount;
+
+    @BeforeClass
+    public static void defineSchema() throws Exception
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE1,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2),
+                                    SchemaLoader.compositeIndexCFMD(KEYSPACE1, CF_INDEXED, true),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARDLOWINDEXINTERVAL)
+                                                .minIndexInterval(8)
+                                                .maxIndexInterval(256)
+                                                .caching(CachingParams.CACHE_NOTHING));
+
+        String ks = KEYSPACE1;
+        String cf = "Standard1";
+
+        // clear and create just one sstable for this test
+        Keyspace keyspace = Keyspace.open(ks);
+        store = keyspace.getColumnFamilyStore(cf);
+        store.clearUnsafe();
+        store.disableAutoCompaction();
+
+        DecoratedKey firstKey = null, lastKey = null;
+        long timestamp = System.currentTimeMillis();
+        for (int i = 0; i < store.metadata().params.minIndexInterval; i++)
+        {
+            DecoratedKey key = Util.dk(String.valueOf(i));
+            if (firstKey == null)
+                firstKey = key;
+            if (lastKey == null)
+                lastKey = key;
+            if (store.metadata().partitionKeyType.compare(lastKey.getKey(), key.getKey()) < 0)
+                lastKey = key;
+
+            new RowUpdateBuilder(store.metadata(), timestamp, key.getKey())
+            .clustering("col")
+            .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
+            .build()
+            .applyUnsafe();
+            expectedRowCount++;
+        }
+        store.forceBlockingFlush();
+
+        sstable = store.getLiveSSTables().iterator().next();
+    }
+
+    @Test
+    public void writeDataFile_DataInputPlus()
+    {
+        writeDataTestCycle(buffer -> new DataInputStreamPlus(new ByteArrayInputStream(buffer.array())));
+    }
+
+    @Test
+    public void writeDataFile_RebufferingByteBufDataInputPlus()
+    {
+        writeDataTestCycle(buffer -> {
+            EmbeddedChannel channel = new EmbeddedChannel();
+            RebufferingByteBufDataInputPlus inputPlus = new RebufferingByteBufDataInputPlus(1 << 10, 1 << 20, channel.config());
+            inputPlus.append(Unpooled.wrappedBuffer(buffer));
+            return inputPlus;
+        });
+    }
+
+
+    private void writeDataTestCycle(Function<ByteBuffer, DataInputPlus> bufferMapper)
+    {
+        File dir = store.getDirectories().getDirectoryForNewSSTables();
+        Descriptor desc = store.newSSTableDescriptor(dir);
+        TableMetadataRef metadata = Schema.instance.getTableMetadataRef(desc);
+
+        LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.STREAM);
+        Set<Component> componentsToWrite = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX,
+                                                           Component.STATS);
+
+        BigTableZeroCopyWriter btzcw = new BigTableZeroCopyWriter(desc, metadata, txn, componentsToWrite);
+
+        for (Component component : componentsToWrite)
+        {
+            if (Files.exists(Paths.get(desc.filenameFor(component))))
+            {
+                Pair<DataInputPlus, Long> pair = getSSTableComponentData(sstable, component, bufferMapper);
+
+                btzcw.writeComponent(component.type, pair.left, pair.right);
+            }
+        }
+
+        Collection<SSTableReader> readers = btzcw.finish(true);
+
+        SSTableReader reader = readers.toArray(new SSTableReader[0])[0];
+
+        assertNotEquals(sstable.getFilename(), reader.getFilename());
+        assertEquals(sstable.estimatedKeys(), reader.estimatedKeys());
+        assertEquals(sstable.isPendingRepair(), reader.isPendingRepair());
+
+        assertRowCount(expectedRowCount);
+    }
+
+    private void assertRowCount(int expected)
+    {
+        int count = 0;
+        for (int i = 0; i < store.metadata().params.minIndexInterval; i++)
+        {
+            DecoratedKey dk = Util.dk(String.valueOf(i));
+            UnfilteredRowIterator rowIter = sstable.iterator(dk,
+                                                             Slices.ALL,
+                                                             ColumnFilter.all(store.metadata()),
+                                                             false,
+                                                             SSTableReadsListener.NOOP_LISTENER);
+            while (rowIter.hasNext())
+            {
+                rowIter.next();
+                count++;
+            }
+        }
+        assertEquals(expected, count);
+    }
+
+    private Pair<DataInputPlus, Long> getSSTableComponentData(SSTableReader sstable, Component component,
+                                                              Function<ByteBuffer, DataInputPlus> bufferMapper)
+    {
+        FileHandle componentFile = new FileHandle.Builder(sstable.descriptor.filenameFor(component))
+                                   .bufferSize(1024).complete();
+        ByteBuffer buffer = ByteBuffer.allocate((int) componentFile.channel.size());
+        componentFile.channel.read(buffer, 0);
+        buffer.flip();
+
+        DataInputPlus inputPlus = bufferMapper.apply(buffer);
+
+        return Pair.create(inputPlus, componentFile.channel.size());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlusTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlusTest.java b/test/unit/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlusTest.java
index 2961d9a..69df040 100644
--- a/test/unit/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlusTest.java
+++ b/test/unit/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlusTest.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.net.async;
 import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
 
 import org.junit.After;
 import org.junit.Assert;
@@ -28,7 +29,9 @@ import org.junit.Before;
 import org.junit.Test;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
 
 public class RebufferingByteBufDataInputPlusTest
 {
@@ -151,4 +154,99 @@ public class RebufferingByteBufDataInputPlusTest
         inputPlus.markClose();
         Assert.assertEquals(size, inputPlus.available());
     }
+
+    @Test
+    public void consumeUntil_SingleBuffer_Partial_HappyPath() throws IOException
+    {
+        consumeUntilTestCycle(1, 8, 0, 4);
+    }
+
+    @Test
+    public void consumeUntil_SingleBuffer_AllBytes_HappyPath() throws IOException
+    {
+        consumeUntilTestCycle(1, 8, 0, 8);
+    }
+
+    @Test
+    public void consumeUntil_MultipleBufferr_Partial_HappyPath() throws IOException
+    {
+        consumeUntilTestCycle(2, 8, 0, 13);
+    }
+
+    @Test
+    public void consumeUntil_MultipleBuffer_AllBytes_HappyPath() throws IOException
+    {
+        consumeUntilTestCycle(2, 8, 0, 16);
+    }
+
+    @Test(expected = EOFException.class)
+    public void consumeUntil_SingleBuffer_Fails() throws IOException
+    {
+        consumeUntilTestCycle(1, 8, 0, 9);
+    }
+
+    @Test(expected = EOFException.class)
+    public void consumeUntil_MultipleBuffer_Fails() throws IOException
+    {
+        consumeUntilTestCycle(2, 8, 0, 17);
+    }
+
+    private void consumeUntilTestCycle(int nBuffs, int buffSize, int startOffset, int len) throws IOException
+    {
+        byte[] expectedBytes = new byte[len];
+        int count = 0;
+        for (int j=0; j < nBuffs; j++)
+        {
+            ByteBuf buf = channel.alloc().buffer(buffSize);
+            for (int i = 0; i < buf.capacity(); i++)
+            {
+                buf.writeByte(j);
+                if (count >= startOffset && (count - startOffset) < len)
+                    expectedBytes[count - startOffset] = (byte)j;
+                count++;
+            }
+
+            inputPlus.append(buf);
+        }
+        inputPlus.append(channel.alloc().buffer(0));
+
+        TestableWritableByteChannel wbc = new TestableWritableByteChannel(len);
+
+        inputPlus.skipBytesFully(startOffset);
+        BufferedDataOutputStreamPlus writer = new BufferedDataOutputStreamPlus(wbc);
+        inputPlus.consumeUntil(writer, len);
+
+        Assert.assertEquals(String.format("Test with {} buffers starting at {} consuming {} bytes", nBuffs, startOffset,
+                                          len), len, wbc.writtenBytes.readableBytes());
+
+        Assert.assertArrayEquals(expectedBytes, wbc.writtenBytes.array());
+    }
+
+    private static class TestableWritableByteChannel implements WritableByteChannel
+    {
+        private boolean isOpen = true;
+        public ByteBuf writtenBytes;
+
+        public TestableWritableByteChannel(int initialCapacity)
+        {
+             writtenBytes = Unpooled.buffer(initialCapacity);
+        }
+
+        public int write(ByteBuffer src) throws IOException
+        {
+            int size = src.remaining();
+            writtenBytes.writeBytes(src);
+            return size;
+        }
+
+        public boolean isOpen()
+        {
+            return isOpen;
+        }
+
+        public void close() throws IOException
+        {
+            isOpen = false;
+        }
+    };
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/serializers/SerializationUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/serializers/SerializationUtils.java b/test/unit/org/apache/cassandra/serializers/SerializationUtils.java
index 7ce4ec5..b88b56f 100644
--- a/test/unit/org/apache/cassandra/serializers/SerializationUtils.java
+++ b/test/unit/org/apache/cassandra/serializers/SerializationUtils.java
@@ -63,6 +63,5 @@ public class SerializationUtils
     public static <T> void assertSerializationCycle(T src, IVersionedSerializer<T> serializer)
     {
         assertSerializationCycle(src, serializer, MessagingService.current_version);
-
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index 78b3094..8ebe333 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -92,7 +92,7 @@ public class StreamTransferTaskTest
         {
             List<Range<Token>> ranges = new ArrayList<>();
             ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
-            task.addTransferStream(new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.selfRef(), sstable.getPositionsForRanges(ranges), 1));
+            task.addTransferStream(new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.selfRef(), sstable.getPositionsForRanges(ranges), ranges, 1));
         }
         assertEquals(2, task.getTotalNumberOfFiles());
 
@@ -144,7 +144,7 @@ public class StreamTransferTaskTest
             ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
             Ref<SSTableReader> ref = sstable.selfRef();
             refs.add(ref);
-            task.addTransferStream(new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, ref, sstable.getPositionsForRanges(ranges), 1));
+            task.addTransferStream(new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, ref, sstable.getPositionsForRanges(ranges), ranges, 1));
         }
         assertEquals(2, task.getTotalNumberOfFiles());
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 72b9cbe..bc501be 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -278,6 +278,7 @@ public class StreamingTransferTest
             streams.add(new CassandraOutgoingFile(operation,
                                                   sstables.get(sstable),
                                                   sstable.getPositionsForRanges(ranges),
+                                                  ranges,
                                                   sstable.estimatedKeysForRanges(ranges)));
         }
         return streams;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[2/3] cassandra git commit: Stream entire SSTables when possible

Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamWriter.java b/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamWriter.java
deleted file mode 100644
index c5b0c53..0000000
--- a/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamWriter.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.streaming;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.io.compress.CompressionMetadata;
-import org.apache.cassandra.io.sstable.Component;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.util.ChannelProxy;
-import org.apache.cassandra.io.util.DataOutputStreamPlus;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus;
-import org.apache.cassandra.streaming.ProgressInfo;
-import org.apache.cassandra.streaming.StreamSession;
-import org.apache.cassandra.utils.FBUtilities;
-
-/**
- * CassandraStreamWriter for compressed SSTable.
- */
-public class CompressedCassandraStreamWriter extends CassandraStreamWriter
-{
-    private static final int CHUNK_SIZE = 1 << 16;
-
-    private static final Logger logger = LoggerFactory.getLogger(CompressedCassandraStreamWriter.class);
-
-    private final CompressionInfo compressionInfo;
-
-    public CompressedCassandraStreamWriter(SSTableReader sstable, Collection<SSTableReader.PartitionPositionBounds> sections, CompressionInfo compressionInfo, StreamSession session)
-    {
-        super(sstable, sections, session);
-        this.compressionInfo = compressionInfo;
-    }
-
-    @Override
-    public void write(DataOutputStreamPlus out) throws IOException
-    {
-        assert out instanceof ByteBufDataOutputStreamPlus;
-        ByteBufDataOutputStreamPlus output = (ByteBufDataOutputStreamPlus)out;
-        long totalSize = totalSize();
-        logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize = {}", session.planId(),
-                     sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize);
-        try (ChannelProxy fc = sstable.getDataChannel().sharedCopy())
-        {
-            long progress = 0L;
-            // calculate chunks to transfer. we want to send continuous chunks altogether.
-            List<SSTableReader.PartitionPositionBounds> sections = getTransferSections(compressionInfo.chunks);
-
-            int sectionIdx = 0;
-
-            // stream each of the required sections of the file
-            for (final SSTableReader.PartitionPositionBounds section : sections)
-            {
-                // length of the section to stream
-                long length = section.upperPosition - section.lowerPosition;
-
-                logger.trace("[Stream #{}] Writing section {} with length {} to stream.", session.planId(), sectionIdx++, length);
-
-                // tracks write progress
-                long bytesTransferred = 0;
-                while (bytesTransferred < length)
-                {
-                    final int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred);
-                    limiter.acquire(toTransfer);
-
-                    ByteBuffer outBuffer = ByteBuffer.allocateDirect(toTransfer);
-                    long lastWrite;
-                    try
-                    {
-                        lastWrite = fc.read(outBuffer, section.lowerPosition + bytesTransferred);
-                        assert lastWrite == toTransfer : String.format("could not read required number of bytes from file to be streamed: read %d bytes, wanted %d bytes", lastWrite, toTransfer);
-                        outBuffer.flip();
-                        output.writeToChannel(outBuffer);
-                    }
-                    catch (IOException e)
-                    {
-                        FileUtils.clean(outBuffer);
-                        throw e;
-                    }
-
-                    bytesTransferred += lastWrite;
-                    progress += lastWrite;
-                    session.progress(sstable.descriptor.filenameFor(Component.DATA), ProgressInfo.Direction.OUT, progress, totalSize);
-                }
-            }
-            logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred = {}, totalSize = {}",
-                         session.planId(), sstable.getFilename(), session.peer, FBUtilities.prettyPrintMemory(progress), FBUtilities.prettyPrintMemory(totalSize));
-        }
-    }
-
-    @Override
-    protected long totalSize()
-    {
-        long size = 0;
-        // calculate total length of transferring chunks
-        for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
-            size += chunk.length + 4; // 4 bytes for CRC
-        return size;
-    }
-
-    // chunks are assumed to be sorted by offset
-    private List<SSTableReader.PartitionPositionBounds> getTransferSections(CompressionMetadata.Chunk[] chunks)
-    {
-        List<SSTableReader.PartitionPositionBounds> transferSections = new ArrayList<>();
-        SSTableReader.PartitionPositionBounds lastSection = null;
-        for (CompressionMetadata.Chunk chunk : chunks)
-        {
-            if (lastSection != null)
-            {
-                if (chunk.offset == lastSection.upperPosition)
-                {
-                    // extend previous section to end of this chunk
-                    lastSection = new SSTableReader.PartitionPositionBounds(lastSection.lowerPosition, chunk.offset + chunk.length + 4); // 4 bytes for CRC
-                }
-                else
-                {
-                    transferSections.add(lastSection);
-                    lastSection = new SSTableReader.PartitionPositionBounds(chunk.offset, chunk.offset + chunk.length + 4);
-                }
-            }
-            else
-            {
-                lastSection = new SSTableReader.PartitionPositionBounds(chunk.offset, chunk.offset + chunk.length + 4);
-            }
-        }
-        if (lastSection != null)
-            transferSections.add(lastSection);
-        return transferSections;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java b/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java
index 2f56786..c0278e8 100644
--- a/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java
@@ -60,7 +60,7 @@ public class CompressedInputStream extends RebufferingInputStream implements Aut
     private long bufferOffset = 0;
 
     /**
-     * The current {@link CompressedCassandraStreamReader#sections} offset in the stream.
+     * The current {@link CassandraCompressedStreamReader#sections} offset in the stream.
      */
     private long current = 0;
 
@@ -98,7 +98,7 @@ public class CompressedInputStream extends RebufferingInputStream implements Aut
     }
 
     /**
-     * Invoked when crossing into the next stream boundary in {@link CompressedCassandraStreamReader#sections}.
+     * Invoked when crossing into the next stream boundary in {@link CassandraCompressedStreamReader#sections}.
      */
     public void position(long position) throws IOException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/IStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/IStreamReader.java b/src/java/org/apache/cassandra/db/streaming/IStreamReader.java
new file mode 100644
index 0000000..cf93bc2
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/streaming/IStreamReader.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.util.DataInputPlus;
+
+/**
+ * This is the interface is used by the streaming code read a SSTable stream off a channel.
+ */
+public interface IStreamReader
+{
+    public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/io/sstable/Component.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Component.java b/src/java/org/apache/cassandra/io/sstable/Component.java
index 9daac7c..a81db85 100644
--- a/src/java/org/apache/cassandra/io/sstable/Component.java
+++ b/src/java/org/apache/cassandra/io/sstable/Component.java
@@ -33,6 +33,10 @@ public class Component
 
     final static EnumSet<Type> TYPES = EnumSet.allOf(Type.class);
 
+    /**
+     * WARNING: Be careful while changing the names or string representation of the enum
+     * members. Streaming code depends on the names during streaming (Ref: CASSANDRA-14556).
+     */
     public enum Type
     {
         // the base data for an sstable: the remaining components can be regenerated
@@ -60,6 +64,7 @@ public class Component
         CUSTOM(null);
 
         final String repr;
+
         Type(String repr)
         {
             this.repr = repr;
@@ -120,7 +125,7 @@ public class Component
      * @return the component corresponding to {@code name}. Note that this always return a component as an unrecognized
      * name is parsed into a CUSTOM component.
      */
-    static Component parse(String name)
+    public static Component parse(String name)
     {
         Type type = Type.fromRepresentation(name);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index ebc35e7..4ba0533 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -131,7 +131,7 @@ public class SSTableLoader implements StreamEventHandler
                                                   List<SSTableReader.PartitionPositionBounds> sstableSections = sstable.getPositionsForRanges(tokenRanges);
                                                   long estimatedKeys = sstable.estimatedKeysForRanges(tokenRanges);
                                                   Ref<SSTableReader> ref = sstable.ref();
-                                                  OutgoingStream stream = new CassandraOutgoingFile(StreamOperation.BULK_LOAD, ref, sstableSections, estimatedKeys);
+                                                  OutgoingStream stream = new CassandraOutgoingFile(StreamOperation.BULK_LOAD, ref, sstableSections, tokenRanges, estimatedKeys);
                                                   streamingDetails.put(endpoint, stream);
                                               }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriter.java
new file mode 100644
index 0000000..400f119
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriter.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.big;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.EnumMap;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.io.util.SequentialWriterOption;
+import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadataRef;
+
+import static java.lang.String.format;
+import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory;
+
+public class BigTableZeroCopyWriter extends SSTable implements SSTableMultiWriter
+{
+    private static final Logger logger = LoggerFactory.getLogger(BigTableZeroCopyWriter.class);
+
+    private final TableMetadataRef metadata;
+    private volatile SSTableReader finalReader;
+    private final Map<Component.Type, SequentialWriter> componentWriters;
+
+    private static final SequentialWriterOption WRITER_OPTION =
+        SequentialWriterOption.newBuilder()
+                              .trickleFsync(false)
+                              .bufferSize(2 << 20)
+                              .bufferType(BufferType.OFF_HEAP)
+                              .build();
+
+    private static final ImmutableSet<Component> SUPPORTED_COMPONENTS =
+        ImmutableSet.of(Component.DATA,
+                        Component.PRIMARY_INDEX,
+                        Component.SUMMARY,
+                        Component.STATS,
+                        Component.COMPRESSION_INFO,
+                        Component.FILTER,
+                        Component.DIGEST,
+                        Component.CRC);
+
+    public BigTableZeroCopyWriter(Descriptor descriptor,
+                                  TableMetadataRef metadata,
+                                  LifecycleTransaction txn,
+                                  final Collection<Component> components)
+    {
+        super(descriptor, ImmutableSet.copyOf(components), metadata, DatabaseDescriptor.getDiskOptimizationStrategy());
+
+        txn.trackNew(this);
+        this.metadata = metadata;
+        this.componentWriters = new EnumMap<>(Component.Type.class);
+
+        if (!SUPPORTED_COMPONENTS.containsAll(components))
+            throw new AssertionError(format("Unsupported streaming component detected %s",
+                                            Sets.difference(ImmutableSet.copyOf(components), SUPPORTED_COMPONENTS)));
+
+        for (Component c : components)
+            componentWriters.put(c.type, makeWriter(descriptor, c));
+    }
+
+    private static SequentialWriter makeWriter(Descriptor descriptor, Component component)
+    {
+        return new SequentialWriter(new File(descriptor.filenameFor(component)), WRITER_OPTION, false);
+    }
+
+    private void write(DataInputPlus in, long size, SequentialWriter out) throws FSWriteError
+    {
+        final int BUFFER_SIZE = 1 << 20;
+        long bytesRead = 0;
+        byte[] buff = new byte[BUFFER_SIZE];
+        try
+        {
+            while (bytesRead < size)
+            {
+                int toRead = (int) Math.min(size - bytesRead, BUFFER_SIZE);
+                in.readFully(buff, 0, toRead);
+                int count = Math.min(toRead, BUFFER_SIZE);
+                out.write(buff, 0, count);
+                bytesRead += count;
+            }
+            out.sync();
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, out.getPath());
+        }
+    }
+
+    @Override
+    public boolean append(UnfilteredRowIterator partition)
+    {
+        throw new UnsupportedOperationException("Operation not supported by BigTableBlockWriter");
+    }
+
+    @Override
+    public Collection<SSTableReader> finish(long repairedAt, long maxDataAge, boolean openResult)
+    {
+        return finish(openResult);
+    }
+
+    @Override
+    public Collection<SSTableReader> finish(boolean openResult)
+    {
+        setOpenResult(openResult);
+        return finished();
+    }
+
+    @Override
+    public Collection<SSTableReader> finished()
+    {
+        if (finalReader == null)
+            finalReader = SSTableReader.open(descriptor, components, metadata);
+
+        return ImmutableList.of(finalReader);
+    }
+
+    @Override
+    public SSTableMultiWriter setOpenResult(boolean openResult)
+    {
+        return null;
+    }
+
+    @Override
+    public long getFilePointer()
+    {
+        return 0;
+    }
+
+    @Override
+    public TableId getTableId()
+    {
+        return metadata.id;
+    }
+
+    @Override
+    public Throwable commit(Throwable accumulate)
+    {
+        for (SequentialWriter writer : componentWriters.values())
+            accumulate = writer.commit(accumulate);
+        return accumulate;
+    }
+
+    @Override
+    public Throwable abort(Throwable accumulate)
+    {
+        for (SequentialWriter writer : componentWriters.values())
+            accumulate = writer.abort(accumulate);
+        return accumulate;
+    }
+
+    @Override
+    public void prepareToCommit()
+    {
+        for (SequentialWriter writer : componentWriters.values())
+            writer.prepareToCommit();
+    }
+
+    @Override
+    public void close()
+    {
+        for (SequentialWriter writer : componentWriters.values())
+            writer.close();
+    }
+
+    public void writeComponent(Component.Type type, DataInputPlus in, long size)
+    {
+        logger.info("Writing component {} to {} length {}", type, componentWriters.get(type).getPath(), prettyPrintMemory(size));
+
+        if (in instanceof RebufferingByteBufDataInputPlus)
+            write((RebufferingByteBufDataInputPlus) in, size, componentWriters.get(type));
+        else
+            write(in, size, componentWriters.get(type));
+    }
+
+    private void write(RebufferingByteBufDataInputPlus in, long size, SequentialWriter writer)
+    {
+        logger.info("Block Writing component to {} length {}", writer.getPath(), prettyPrintMemory(size));
+
+        try
+        {
+            long bytesWritten = in.consumeUntil(writer, size);
+
+            if (bytesWritten != size)
+                throw new IOException(format("Failed to read correct number of bytes from channel %s", writer));
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, writer.getPath());
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
index 54122ee..56d88f7 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
@@ -24,11 +24,9 @@ import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.channels.WritableByteChannel;
 
-import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 
 import net.nicoulaj.compilecommand.annotations.DontInline;
-
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.utils.memory.MemoryUtil;
 import org.apache.cassandra.utils.vint.VIntCoding;
@@ -341,7 +339,7 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
     }
 
     @Override
-    public <R> R applyToChannel(Function<WritableByteChannel, R> f) throws IOException
+    public <R> R applyToChannel(CheckedFunction<WritableByteChannel, R, IOException> f) throws IOException
     {
         if (strictFlushing)
             throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/io/util/CheckedFunction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CheckedFunction.java b/src/java/org/apache/cassandra/io/util/CheckedFunction.java
new file mode 100644
index 0000000..ec1ce9f
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/CheckedFunction.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+@FunctionalInterface
+public interface CheckedFunction<T, R, E extends Exception>
+{
+    R apply(T t) throws E;
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
index a9dbb68..16be42f 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
@@ -22,8 +22,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
 
-import com.google.common.base.Function;
-
 import org.apache.cassandra.utils.vint.VIntCoding;
 
 /**
@@ -41,7 +39,7 @@ public interface DataOutputPlus extends DataOutput
      * Safe way to operate against the underlying channel. Impossible to stash a reference to the channel
      * and forget to flush
      */
-    <R> R applyToChannel(Function<WritableByteChannel, R> c) throws IOException;
+    <R> R applyToChannel(CheckedFunction<WritableByteChannel, R, IOException> c) throws IOException;
 
     default void writeVInt(long i) throws IOException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java b/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
index 086f5c9..ef51888 100644
--- a/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
+++ b/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
@@ -25,12 +25,12 @@ import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
+import com.google.common.base.Preconditions;
+
 import net.nicoulaj.compilecommand.annotations.DontInline;
 import org.apache.cassandra.utils.FastByteOperations;
 import org.apache.cassandra.utils.vint.VIntCoding;
 
-import com.google.common.base.Preconditions;
-
 /**
  * Rough equivalent of BufferedInputStream and DataInputStream wrapping a ByteBuffer that can be refilled
  * via rebuffer. Implementations provide this buffer from various channels (socket, file, memory, etc).

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index e71f2fa..3eb1a7d 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -138,11 +138,22 @@ public class SequentialWriter extends BufferedDataOutputStreamPlus implements Tr
      */
     public SequentialWriter(File file, SequentialWriterOption option)
     {
+        this(file, option, true);
+    }
+
+    /**
+     * Create SequentialWriter for given file with specific writer option.
+     * @param file
+     * @param option
+     * @param strictFlushing
+     */
+    public SequentialWriter(File file, SequentialWriterOption option, boolean strictFlushing)
+    {
         super(openChannel(file), option.allocateBuffer());
-        strictFlushing = true;
-        fchannel = (FileChannel)channel;
+        this.strictFlushing = strictFlushing;
+        this.fchannel = (FileChannel)channel;
 
-        filePath = file.getAbsolutePath();
+        this.filePath = file.getAbsolutePath();
 
         this.option = option;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java
index 54b4cb1..d9ef010 100644
--- a/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java
+++ b/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java
@@ -378,7 +378,7 @@ public abstract class UnbufferedDataOutputStreamPlus extends DataOutputStreamPlu
     }
 
     @Override
-    public <R> R applyToChannel(Function<WritableByteChannel, R> f) throws IOException
+    public <R> R applyToChannel(CheckedFunction<WritableByteChannel, R, IOException> f) throws IOException
     {
         return f.apply(channel);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/net/async/ByteBufDataOutputPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/ByteBufDataOutputPlus.java b/src/java/org/apache/cassandra/net/async/ByteBufDataOutputPlus.java
index 0473465..a77cb07 100644
--- a/src/java/org/apache/cassandra/net/async/ByteBufDataOutputPlus.java
+++ b/src/java/org/apache/cassandra/net/async/ByteBufDataOutputPlus.java
@@ -22,10 +22,9 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
 
-import com.google.common.base.Function;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufOutputStream;
+import org.apache.cassandra.io.util.CheckedFunction;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.Memory;
 import org.apache.cassandra.io.util.UnbufferedDataOutputStreamPlus;
@@ -82,7 +81,7 @@ public class ByteBufDataOutputPlus extends ByteBufOutputStream implements DataOu
     }
 
     @Override
-    public <R> R applyToChannel(Function<WritableByteChannel, R> c) throws IOException
+    public <R> R applyToChannel(CheckedFunction<WritableByteChannel, R, IOException> c) throws IOException
     {
         throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/net/async/ByteBufDataOutputStreamPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/ByteBufDataOutputStreamPlus.java b/src/java/org/apache/cassandra/net/async/ByteBufDataOutputStreamPlus.java
index 3a544e4..777bc3e 100644
--- a/src/java/org/apache/cassandra/net/async/ByteBufDataOutputStreamPlus.java
+++ b/src/java/org/apache/cassandra/net/async/ByteBufDataOutputStreamPlus.java
@@ -20,11 +20,14 @@ package org.apache.cassandra.net.async;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.nio.channels.WritableByteChannel;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.util.concurrent.Uninterruptibles;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
@@ -35,6 +38,7 @@ import io.netty.util.concurrent.Future;
 import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
 import org.apache.cassandra.streaming.StreamSession;
 
 /**
@@ -49,6 +53,7 @@ public class ByteBufDataOutputStreamPlus extends BufferedDataOutputStreamPlus
     private final StreamSession session;
     private final Channel channel;
     private final int bufferSize;
+    private final Logger logger = LoggerFactory.getLogger(ByteBufDataOutputStreamPlus.class);
 
     /**
      * Tracks how many bytes we've written to the netty channel. This more or less follows the channel's
@@ -70,7 +75,6 @@ public class ByteBufDataOutputStreamPlus extends BufferedDataOutputStreamPlus
         this.channel = channel;
         this.currentBuf = buffer;
         this.bufferSize = bufferSize;
-
         channelRateLimiter = new Semaphore(channel.config().getWriteBufferHighWaterMark(), true);
     }
 
@@ -114,8 +118,9 @@ public class ByteBufDataOutputStreamPlus extends BufferedDataOutputStreamPlus
         doFlush(buffer.position());
 
         int byteCount = buf.readableBytes();
+
         if (!Uninterruptibles.tryAcquireUninterruptibly(channelRateLimiter, byteCount, 5, TimeUnit.MINUTES))
-            throw new IOException("outbound channel was not writable");
+            throw new IOException(String.format("outbound channel was not writable. Failed to acquire sufficient permits %d", byteCount));
 
         // the (possibly naive) assumption that we should always flush after each incoming buf
         ChannelFuture channelFuture = channel.writeAndFlush(buf);
@@ -135,6 +140,53 @@ public class ByteBufDataOutputStreamPlus extends BufferedDataOutputStreamPlus
         return channelFuture;
     }
 
+    /**
+     * Writes all data in file channel to stream BUFFER_SIZE at a time.
+     * Closes file channel when done
+     *
+     * @param f
+     * @return number of bytes transferred
+     * @throws IOException
+     */
+    public long writeToChannel(FileChannel f, StreamRateLimiter limiter) throws IOException
+    {
+        final long length = f.size();
+        long bytesTransferred = 0;
+
+        try
+        {
+            while (bytesTransferred < length)
+            {
+                int toRead = (int) Math.min(bufferSize, length - bytesTransferred);
+                NonClosingDefaultFileRegion fileRegion = new NonClosingDefaultFileRegion(f, bytesTransferred, toRead);
+
+                if (!Uninterruptibles.tryAcquireUninterruptibly(channelRateLimiter, toRead, 5, TimeUnit.MINUTES))
+                    throw new IOException(String.format("outbound channel was not writable. Failed to acquire sufficient permits %d", toRead));
+
+                limiter.acquire(toRead);
+
+                bytesTransferred += toRead;
+                final boolean shouldClose = (bytesTransferred == length); // this is the last buffer, can safely close channel
+
+                channel.writeAndFlush(fileRegion).addListener(future -> {
+                    handleBuffer(future, toRead);
+
+                    if ((shouldClose || !future.isSuccess()) && f.isOpen())
+                        f.close();
+                });
+                logger.trace("{} of {} (toRead {} cs {})", bytesTransferred, length, toRead, f.isOpen());
+            }
+
+            return bytesTransferred;
+        } catch (Exception e)
+        {
+            if (f.isOpen())
+                f.close();
+
+            throw e;
+        }
+    }
+
     @Override
     protected void doFlush(int count) throws IOException
     {
@@ -145,7 +197,7 @@ public class ByteBufDataOutputStreamPlus extends BufferedDataOutputStreamPlus
             currentBuf.writerIndex(byteCount);
 
             if (!Uninterruptibles.tryAcquireUninterruptibly(channelRateLimiter, byteCount, 2, TimeUnit.MINUTES))
-                throw new IOException("outbound channel was not writable");
+                throw new IOException(String.format("outbound channel was not writable. Failed to acquire sufficient permits %d", byteCount));
 
             channel.writeAndFlush(currentBuf).addListener(future -> handleBuffer(future, byteCount));
             currentBuf = channel.alloc().directBuffer(bufferSize, bufferSize);
@@ -161,7 +213,7 @@ public class ByteBufDataOutputStreamPlus extends BufferedDataOutputStreamPlus
     private void handleBuffer(Future<? super Void> future, int bytesWritten)
     {
         channelRateLimiter.release(bytesWritten);
-
+        logger.trace("bytesWritten {} {} because {}", bytesWritten, (future.isSuccess() == true) ? "Succeeded" : "Failed", future.cause());
         if (!future.isSuccess() && channel.isOpen())
             session.onError(future.cause());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/net/async/NonClosingDefaultFileRegion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/NonClosingDefaultFileRegion.java b/src/java/org/apache/cassandra/net/async/NonClosingDefaultFileRegion.java
new file mode 100644
index 0000000..46f0ce1
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/async/NonClosingDefaultFileRegion.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.async;
+
+import java.io.File;
+import java.nio.channels.FileChannel;
+
+import io.netty.channel.DefaultFileRegion;
+
+/**
+ * Netty's DefaultFileRegion closes the underlying FileChannel as soon as
+ * the refCnt() for the region drops to zero, this is an implementation of
+ * the DefaultFileRegion that doesn't close the FileChannel.
+ *
+ * See {@link ByteBufDataOutputStreamPlus} for its usage.
+ */
+public class NonClosingDefaultFileRegion extends DefaultFileRegion
+{
+
+    public NonClosingDefaultFileRegion(FileChannel file, long position, long count)
+    {
+        super(file, position, count);
+    }
+
+    public NonClosingDefaultFileRegion(File f, long position, long count)
+    {
+        super(f, position, count);
+    }
+
+    @Override
+    protected void deallocate()
+    {
+        // Overridden to avoid closing the file
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java b/src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java
index 1f32aa8..4e667da 100644
--- a/src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java
+++ b/src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java
@@ -31,6 +31,7 @@ import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelConfig;
 import io.netty.util.ReferenceCountUtil;
+import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
 import org.apache.cassandra.io.util.RebufferingInputStream;
 
 public class RebufferingByteBufDataInputPlus extends RebufferingInputStream implements ReadableByteChannel
@@ -249,4 +250,42 @@ public class RebufferingByteBufDataInputPlus extends RebufferingInputStream impl
     {
         return channelConfig.getAllocator();
     }
+
+    /**
+     * Consumes bytes in the stream until the given length
+     *
+     * @param writer
+     * @param len
+     * @return
+     * @throws IOException
+     */
+    public long consumeUntil(BufferedDataOutputStreamPlus writer, long len) throws IOException
+    {
+        long copied = 0; // number of bytes copied
+        while (copied < len)
+        {
+            if (buffer.remaining() == 0)
+            {
+                try
+                {
+                    reBuffer();
+                }
+                catch (EOFException e)
+                {
+                    throw new EOFException("EOF after " + copied + " bytes out of " + len);
+                }
+                if (buffer.remaining() == 0 && copied < len)
+                    throw new AssertionError("reBuffer() failed to return data");
+            }
+
+            int originalLimit = buffer.limit();
+            int toCopy = (int) Math.min(len - copied, buffer.remaining());
+            buffer.limit(buffer.position() + toCopy);
+            int written = writer.applyToChannel(c -> c.write(buffer));
+            buffer.limit(originalLimit);
+            copied += written;
+        }
+
+        return copied;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
index 34b0bbd..7eada28 100644
--- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
+++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
@@ -282,7 +282,8 @@ public class StreamCoordinator
             // create
             if (streamSessions.size() < connectionsPerHost)
             {
-                StreamSession session = new StreamSession(streamOperation, peer, factory, streamSessions.size(), pendingRepair, previewKind);
+                StreamSession session = new StreamSession(streamOperation, peer, factory, streamSessions.size(),
+                                                          pendingRepair, previewKind);
                 streamSessions.put(++lastReturned, session);
                 return session;
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 49beba1..0a96f4c 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -49,6 +49,7 @@ public class StreamReceiveTask extends StreamTask
     private volatile boolean done = false;
 
     private int remoteStreamsReceived = 0;
+    private long bytesReceived = 0;
 
     public StreamReceiveTask(StreamSession session, TableId tableId, int totalStreams, long totalSize)
     {
@@ -76,8 +77,10 @@ public class StreamReceiveTask extends StreamTask
         }
 
         remoteStreamsReceived++;
+        bytesReceived += stream.getSize();
         Preconditions.checkArgument(tableId.equals(stream.getTableId()));
-        logger.debug("recevied {} of {} total files", remoteStreamsReceived, totalStreams);
+        logger.debug("received {} of {} total files {} of total bytes {}", remoteStreamsReceived, totalStreams,
+                     bytesReceived, totalSize);
 
         receiver.received(stream);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index ef8976d..4de63be 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -17,10 +17,8 @@
  */
 package org.apache.cassandra.streaming;
 
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.*;
+import java.util.Collection;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import com.google.common.util.concurrent.AbstractFuture;
@@ -78,7 +76,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
         this(planId, streamOperation, new StreamCoordinator(streamOperation, 0, new DefaultConnectionFactory(), false, pendingRepair, previewKind));
     }
 
-    static StreamResultFuture init(UUID planId, StreamOperation streamOperation, Collection<StreamEventHandler> listeners,
+    public static StreamResultFuture init(UUID planId, StreamOperation streamOperation, Collection<StreamEventHandler> listeners,
                                    StreamCoordinator coordinator)
     {
         StreamResultFuture future = createAndRegister(planId, streamOperation, coordinator);
@@ -112,8 +110,9 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
         StreamResultFuture future = StreamManager.instance.getReceivingStream(planId);
         if (future == null)
         {
-            logger.info("[Stream #{} ID#{}] Creating new streaming plan for {} from {} channel.remote {} channel.local {} channel.id {}",
-                        planId, sessionIndex, streamOperation.getDescription(), from, channel.remoteAddress(), channel.localAddress(), channel.id());
+            logger.info("[Stream #{} ID#{}] Creating new streaming plan for {} from {} channel.remote {} channel.local {}" +
+                        " channel.id {}", planId, sessionIndex, streamOperation.getDescription(),
+                        from, channel.remoteAddress(), channel.localAddress(), channel.id());
 
             // The main reason we create a StreamResultFuture on the receiving side is for JMX exposure.
             future = new StreamResultFuture(planId, streamOperation, pendingRepair, previewKind);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index c56616e..393cd24 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -202,7 +202,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         this.pendingRepair = pendingRepair;
         this.previewKind = previewKind;
 
-        logger.debug("Creating stream session peer={} preferredPeerInetAddressAndPort={}", peer, preferredPeerInetAddressAndPort);
+        logger.debug("Creating stream session peer={} preferredPeerInetAddressAndPort={}", peer,
+                     preferredPeerInetAddressAndPort);
     }
 
     public UUID planId()
@@ -777,7 +778,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         FBUtilities.waitOnFutures(flushes);
     }
 
-    private synchronized void prepareReceiving(StreamSummary summary)
+    @VisibleForTesting
+    public synchronized void prepareReceiving(StreamSummary summary)
     {
         failIfFinished();
         if (summary.files > 0)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
index bff77cf..3fa80f5 100644
--- a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
+++ b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
@@ -322,7 +322,7 @@ public class NettyStreamingMessageSender implements StreamingMessageSender
                     throw new IllegalStateException("channel's transferring state is currently set to true. refusing to start new stream");
 
                 // close the DataOutputStreamPlus as we're done with it - but don't close the channel
-                try (DataOutputStreamPlus outPlus = ByteBufDataOutputStreamPlus.create(session, channel, 1 << 16))
+                try (DataOutputStreamPlus outPlus = ByteBufDataOutputStreamPlus.create(session, channel, 1 << 20))
                 {
                     StreamMessage.serialize(msg, outPlus, protocolVersion, session);
                     channel.flush();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java b/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
index 81fe8cd..7c10ef9 100644
--- a/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
+++ b/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
@@ -62,7 +62,7 @@ public class StreamingInboundHandler extends ChannelInboundHandlerAdapter
     static final Function<SessionIdentifier, StreamSession> DEFAULT_SESSION_PROVIDER = sid -> StreamManager.instance.findSession(sid.from, sid.planId, sid.sessionIndex);
 
     private static final int AUTO_READ_LOW_WATER_MARK = 1 << 15;
-    private static final int AUTO_READ_HIGH_WATER_MARK = 1 << 16;
+    private static final int AUTO_READ_HIGH_WATER_MARK = 1 << 20;
 
     private final InetAddressAndPort remoteAddress;
     private final int protocolVersion;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
index fbd3e21..a591a43 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
@@ -47,7 +47,8 @@ public class StreamInitMessage extends StreamMessage
     public final UUID pendingRepair;
     public final PreviewKind previewKind;
 
-    public StreamInitMessage(InetAddressAndPort from, int sessionIndex, UUID planId, StreamOperation streamOperation, UUID pendingRepair, PreviewKind previewKind)
+    public StreamInitMessage(InetAddressAndPort from, int sessionIndex, UUID planId, StreamOperation streamOperation,
+                             UUID pendingRepair, PreviewKind previewKind)
     {
         super(Type.STREAM_INIT);
         this.from = from;
@@ -93,7 +94,8 @@ public class StreamInitMessage extends StreamMessage
 
             UUID pendingRepair = in.readBoolean() ? UUIDSerializer.serializer.deserialize(in, version) : null;
             PreviewKind previewKind = PreviewKind.deserialize(in.readInt());
-            return new StreamInitMessage(from, sessionIndex, planId, StreamOperation.fromString(description), pendingRepair, previewKind);
+            return new StreamInitMessage(from, sessionIndex, planId, StreamOperation.fromString(description),
+                                         pendingRepair, previewKind);
         }
 
         public long serializedSize(StreamInitMessage message, int version)
@@ -108,6 +110,7 @@ public class StreamInitMessage extends StreamMessage
                 size += UUIDSerializer.serializer.serializedSize(message.pendingRepair, MessagingService.current_version);
             }
             size += TypeSizes.sizeof(message.previewKind.getSerializationVal());
+
             return size;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/utils/Collectors3.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/Collectors3.java b/src/java/org/apache/cassandra/utils/Collectors3.java
new file mode 100644
index 0000000..f8f262e
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/Collectors3.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collector;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+/**
+ * Some extra Collector implementations.
+ *
+ * Named Collectors3 just in case Guava ever makes a Collectors2
+ */
+public class Collectors3
+{
+    private static final Collector.Characteristics[] LIST_CHARACTERISTICS = new Collector.Characteristics[] { };
+    public static <T>  Collector<T, ?, List<T>> toImmutableList()
+    {
+        return Collector.of(ImmutableList.Builder<T>::new,
+                            ImmutableList.Builder<T>::add,
+                            (l, r) -> l.addAll(r.build()),
+                            ImmutableList.Builder<T>::build,
+                            LIST_CHARACTERISTICS);
+    }
+
+    private static final Collector.Characteristics[] SET_CHARACTERISTICS = new Collector.Characteristics[] { Collector.Characteristics.UNORDERED };
+    public static <T>  Collector<T, ?, Set<T>> toImmutableSet()
+    {
+        return Collector.of(ImmutableSet.Builder<T>::new,
+                            ImmutableSet.Builder<T>::add,
+                            (l, r) -> l.addAll(r.build()),
+                            ImmutableSet.Builder<T>::build,
+                            SET_CHARACTERISTICS);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index 5893bab..3c09637 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -46,3 +46,5 @@ enable_user_defined_functions: true
 enable_scripted_user_defined_functions: true
 prepared_statements_cache_size_mb: 1
 corrupted_tombstone_strategy: exception
+stream_entire_sstables: true
+stream_throughput_outbound_megabits_per_sec: 200000000

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
index bd7ef20..01e67f0 100644
--- a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
+++ b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
@@ -29,10 +29,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.schema.CompressionParams;
-import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.Keyspace;
@@ -41,6 +38,9 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.io.sstable.CQLSSTableWriter;
 import org.apache.cassandra.io.sstable.SSTableLoader;
+import org.apache.cassandra.schema.CompressionParams;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.OutputHandler;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
----------------------------------------------------------------------
diff --git a/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
new file mode 100644
index 0000000..3192bcc
--- /dev/null
+++ b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test.microbench;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultFileRegion;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.streaming.CassandraEntireSSTableStreamReader;
+import org.apache.cassandra.db.streaming.CassandraEntireSSTableStreamWriter;
+import org.apache.cassandra.db.streaming.CassandraOutgoingFile;
+import org.apache.cassandra.db.streaming.CassandraStreamHeader;
+import org.apache.cassandra.db.streaming.CassandraStreamReader;
+import org.apache.cassandra.db.streaming.CassandraStreamWriter;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus;
+import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus;
+import org.apache.cassandra.schema.CachingParams;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.streaming.DefaultConnectionFactory;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.streaming.SessionInfo;
+import org.apache.cassandra.streaming.StreamCoordinator;
+import org.apache.cassandra.streaming.StreamEventHandler;
+import org.apache.cassandra.streaming.StreamOperation;
+import org.apache.cassandra.streaming.StreamResultFuture;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.StreamSummary;
+import org.apache.cassandra.streaming.messages.StreamMessageHeader;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * Please ensure that this benchmark is run with stream_throughput_outbound_megabits_per_sec set to a
+ * really high value otherwise, throttling will kick in and the results will not be meaningful.
+ */
+@Warmup(iterations = 1, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS)
+@Fork(value = 1)
+@Threads(1)
+public class ZeroCopyStreamingBenchmark
+{
+    static final int STREAM_SIZE = 50 * 1024 * 1024;
+
+    @State(Scope.Thread)
+    public static class BenchmarkState
+    {
+        public static final String KEYSPACE = "ZeroCopyStreamingBenchmark";
+        public static final String CF_STANDARD = "Standard1";
+        public static final String CF_INDEXED = "Indexed1";
+        public static final String CF_STANDARDLOWINDEXINTERVAL = "StandardLowIndexInterval";
+
+        private static SSTableReader sstable;
+        private static ColumnFamilyStore store;
+        private StreamSession session;
+        private CassandraEntireSSTableStreamWriter blockStreamWriter;
+        private ByteBuf serializedBlockStream;
+        private InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
+        private CassandraEntireSSTableStreamReader blockStreamReader;
+        private CassandraStreamWriter partialStreamWriter;
+        private CassandraStreamReader partialStreamReader;
+        private ByteBuf serializedPartialStream;
+
+        @Setup
+        public void setupBenchmark() throws IOException
+        {
+            Keyspace keyspace = setupSchemaAndKeySpace();
+            store = keyspace.getColumnFamilyStore("Standard1");
+            generateData();
+
+            sstable = store.getLiveSSTables().iterator().next();
+            session = setupStreamingSessionForTest();
+            blockStreamWriter = new CassandraEntireSSTableStreamWriter(sstable, session, CassandraOutgoingFile.getComponentManifest(sstable));
+
+            CapturingNettyChannel blockStreamCaptureChannel = new CapturingNettyChannel(STREAM_SIZE);
+            ByteBufDataOutputStreamPlus out = ByteBufDataOutputStreamPlus.create(session, blockStreamCaptureChannel, 1024 * 1024);
+            blockStreamWriter.write(out);
+            serializedBlockStream = blockStreamCaptureChannel.getSerializedStream();
+            out.close();
+
+            session.prepareReceiving(new StreamSummary(sstable.metadata().id, 1, serializedBlockStream.readableBytes()));
+
+            CassandraStreamHeader entireSSTableStreamHeader =
+                CassandraStreamHeader.builder()
+                                     .withSSTableFormat(sstable.descriptor.formatType)
+                                     .withSSTableVersion(sstable.descriptor.version)
+                                     .withSSTableLevel(0)
+                                     .withEstimatedKeys(sstable.estimatedKeys())
+                                     .withSections(Collections.emptyList())
+                                     .withSerializationHeader(sstable.header.toComponent())
+                                     .withComponentManifest(CassandraOutgoingFile.getComponentManifest(sstable))
+                                     .isEntireSSTable(true)
+                                     .withFirstKey(sstable.first)
+                                     .withTableId(sstable.metadata().id)
+                                     .build();
+
+            blockStreamReader = new CassandraEntireSSTableStreamReader(new StreamMessageHeader(sstable.metadata().id,
+                                                                                               peer, session.planId(),
+                                                                                               0, 0, 0,
+                                                                                               null), entireSSTableStreamHeader, session);
+
+            List<Range<Token>> requestedRanges = Arrays.asList(new Range<>(sstable.first.minValue().getToken(), sstable.last.getToken()));
+            partialStreamWriter = new CassandraStreamWriter(sstable, sstable.getPositionsForRanges(requestedRanges), session);
+
+            CapturingNettyChannel partialStreamChannel = new CapturingNettyChannel(STREAM_SIZE);
+            partialStreamWriter.write(ByteBufDataOutputStreamPlus.create(session, partialStreamChannel, 1024 * 1024));
+            serializedPartialStream = partialStreamChannel.getSerializedStream();
+
+            CassandraStreamHeader partialSSTableStreamHeader =
+                CassandraStreamHeader.builder()
+                                     .withSSTableFormat(sstable.descriptor.formatType)
+                                     .withSSTableVersion(sstable.descriptor.version)
+                                     .withSSTableLevel(0)
+                                     .withEstimatedKeys(sstable.estimatedKeys())
+                                     .withSections(sstable.getPositionsForRanges(requestedRanges))
+                                     .withSerializationHeader(sstable.header.toComponent())
+                                     .withTableId(sstable.metadata().id)
+                                     .build();
+
+            partialStreamReader = new CassandraStreamReader(new StreamMessageHeader(sstable.metadata().id,
+                                                                                    peer, session.planId(),
+                                                                                    0, 0, 0,
+                                                                                    null),
+                                                            partialSSTableStreamHeader, session);
+        }
+
+        private Keyspace setupSchemaAndKeySpace()
+        {
+            SchemaLoader.prepareServer();
+            SchemaLoader.createKeyspace(KEYSPACE,
+                                        KeyspaceParams.simple(1),
+                                        SchemaLoader.standardCFMD(KEYSPACE, CF_STANDARD),
+                                        SchemaLoader.compositeIndexCFMD(KEYSPACE, CF_INDEXED, true),
+                                        SchemaLoader.standardCFMD(KEYSPACE, CF_STANDARDLOWINDEXINTERVAL)
+                                                    .minIndexInterval(8)
+                                                    .maxIndexInterval(256)
+                                                    .caching(CachingParams.CACHE_NOTHING));
+
+            return Keyspace.open(KEYSPACE);
+        }
+
+        private void generateData()
+        {
+            // insert data and compact to a single sstable
+            CompactionManager.instance.disableAutoCompaction();
+            for (int j = 0; j < 1_000_000; j++)
+            {
+                new RowUpdateBuilder(store.metadata(), j, String.valueOf(j))
+                .clustering("0")
+                .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
+                .build()
+                .applyUnsafe();
+            }
+            store.forceBlockingFlush();
+            CompactionManager.instance.performMaximal(store, false);
+        }
+
+        @TearDown
+        public void tearDown() throws IOException
+        {
+            SchemaLoader.cleanupAndLeaveDirs();
+            CommitLog.instance.stopUnsafe(true);
+        }
+
+        private StreamSession setupStreamingSessionForTest()
+        {
+            StreamCoordinator streamCoordinator = new StreamCoordinator(StreamOperation.BOOTSTRAP, 1, new DefaultConnectionFactory(), false, null, PreviewKind.NONE);
+            StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), StreamOperation.BOOTSTRAP, Collections.<StreamEventHandler>emptyList(), streamCoordinator);
+
+            InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
+            streamCoordinator.addSessionInfo(new SessionInfo(peer, 0, peer, Collections.emptyList(), Collections.emptyList(), StreamSession.State.INITIALIZED));
+
+            StreamSession session = streamCoordinator.getOrCreateNextSession(peer);
+            session.init(future);
+            return session;
+        }
+    }
+
+    @Benchmark
+    @BenchmarkMode(Mode.Throughput)
+    public void blockStreamWriter(BenchmarkState state) throws Exception
+    {
+        EmbeddedChannel channel = createMockNettyChannel();
+        ByteBufDataOutputStreamPlus out = ByteBufDataOutputStreamPlus.create(state.session, channel, 1024 * 1024);
+        state.blockStreamWriter.write(out);
+        out.close();
+        channel.finishAndReleaseAll();
+    }
+
+    @Benchmark
+    @BenchmarkMode(Mode.Throughput)
+    public void blockStreamReader(BenchmarkState state) throws Exception
+    {
+        EmbeddedChannel channel = createMockNettyChannel();
+        RebufferingByteBufDataInputPlus in = new RebufferingByteBufDataInputPlus(STREAM_SIZE, STREAM_SIZE, channel.config());
+        in.append(state.serializedBlockStream.retainedDuplicate());
+        SSTableMultiWriter sstableWriter = state.blockStreamReader.read(in);
+        Collection<SSTableReader> newSstables = sstableWriter.finished();
+        in.close();
+        channel.finishAndReleaseAll();
+    }
+
+    @Benchmark
+    @BenchmarkMode(Mode.Throughput)
+    public void partialStreamWriter(BenchmarkState state) throws Exception
+    {
+        EmbeddedChannel channel = createMockNettyChannel();
+        ByteBufDataOutputStreamPlus out = ByteBufDataOutputStreamPlus.create(state.session, channel, 1024 * 1024);
+        state.partialStreamWriter.write(out);
+        out.close();
+        channel.finishAndReleaseAll();
+    }
+
+    @Benchmark
+    @BenchmarkMode(Mode.Throughput)
+    public void partialStreamReader(BenchmarkState state) throws Exception
+    {
+        EmbeddedChannel channel = createMockNettyChannel();
+        RebufferingByteBufDataInputPlus in = new RebufferingByteBufDataInputPlus(STREAM_SIZE, STREAM_SIZE, channel.config());
+        in.append(state.serializedPartialStream.retainedDuplicate());
+        SSTableMultiWriter sstableWriter = state.partialStreamReader.read(in);
+        Collection<SSTableReader> newSstables = sstableWriter.finished();
+        in.close();
+        channel.finishAndReleaseAll();
+    }
+
+    private EmbeddedChannel createMockNettyChannel()
+    {
+        EmbeddedChannel channel = new EmbeddedChannel();
+        channel.config().setWriteBufferHighWaterMark(STREAM_SIZE); // avoid blocking
+        return channel;
+    }
+
+    private static class CapturingNettyChannel extends EmbeddedChannel
+    {
+        private final ByteBuf serializedStream;
+        private final WritableByteChannel proxyWBC = new WritableByteChannel()
+        {
+            public int write(ByteBuffer src) throws IOException
+            {
+                int rem = src.remaining();
+                serializedStream.writeBytes(src);
+                return rem;
+            }
+
+            public boolean isOpen()
+            {
+                return true;
+            }
+
+            public void close() throws IOException
+            {
+            }
+        };
+
+        public CapturingNettyChannel(int capacity)
+        {
+            this.serializedStream = alloc().buffer(capacity);
+            this.pipeline().addLast(new ChannelOutboundHandlerAdapter()
+            {
+                @Override
+                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception
+                {
+                    if (msg instanceof ByteBuf)
+                        serializedStream.writeBytes((ByteBuf) msg);
+                    else if (msg instanceof ByteBuffer)
+                        serializedStream.writeBytes((ByteBuffer) msg);
+                    else if (msg instanceof DefaultFileRegion)
+                        ((DefaultFileRegion) msg).transferTo(proxyWBC, 0);
+                }
+            });
+            config().setWriteBufferHighWaterMark(capacity);
+        }
+
+        public ByteBuf getSerializedStream()
+        {
+            return serializedStream.copy();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/db/VerifyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/VerifyTest.java b/test/unit/org/apache/cassandra/db/VerifyTest.java
index c9dbe14..0632274 100644
--- a/test/unit/org/apache/cassandra/db/VerifyTest.java
+++ b/test/unit/org/apache/cassandra/db/VerifyTest.java
@@ -612,15 +612,15 @@ public class VerifyTest
 
         Verifier.RangeOwnHelper roh = new Verifier.RangeOwnHelper(normalized);
 
-        roh.check(dk(1));
-        roh.check(dk(10));
-        roh.check(dk(11));
-        roh.check(dk(21));
-        roh.check(dk(25));
+        roh.validate(dk(1));
+        roh.validate(dk(10));
+        roh.validate(dk(11));
+        roh.validate(dk(21));
+        roh.validate(dk(25));
         boolean gotException = false;
         try
         {
-            roh.check(dk(26));
+            roh.validate(dk(26));
         }
         catch (Throwable t)
         {
@@ -635,9 +635,9 @@ public class VerifyTest
         List<Range<Token>> normalized = new ArrayList<>();
         normalized.add(r(0,10));
         Verifier.RangeOwnHelper roh = new Verifier.RangeOwnHelper(normalized);
-        roh.check(dk(1));
+        roh.validate(dk(1));
         // call with smaller token to get exception
-        roh.check(dk(0));
+        roh.validate(dk(0));
     }
 
 
@@ -646,9 +646,9 @@ public class VerifyTest
     {
         List<Range<Token>> normalized = Range.normalize(Collections.singletonList(r(0,0)));
         Verifier.RangeOwnHelper roh = new Verifier.RangeOwnHelper(normalized);
-        roh.check(dk(Long.MIN_VALUE));
-        roh.check(dk(0));
-        roh.check(dk(Long.MAX_VALUE));
+        roh.validate(dk(Long.MIN_VALUE));
+        roh.validate(dk(0));
+        roh.validate(dk(Long.MAX_VALUE));
     }
 
     @Test
@@ -656,12 +656,12 @@ public class VerifyTest
     {
         List<Range<Token>> normalized = Range.normalize(Collections.singletonList(r(Long.MAX_VALUE - 1000,Long.MIN_VALUE + 1000)));
         Verifier.RangeOwnHelper roh = new Verifier.RangeOwnHelper(normalized);
-        roh.check(dk(Long.MIN_VALUE));
-        roh.check(dk(Long.MAX_VALUE));
+        roh.validate(dk(Long.MIN_VALUE));
+        roh.validate(dk(Long.MAX_VALUE));
         boolean gotException = false;
         try
         {
-            roh.check(dk(26));
+            roh.validate(dk(26));
         }
         catch (Throwable t)
         {
@@ -673,7 +673,7 @@ public class VerifyTest
     @Test
     public void testEmptyRanges()
     {
-        new Verifier.RangeOwnHelper(Collections.emptyList()).check(dk(1));
+        new Verifier.RangeOwnHelper(Collections.emptyList()).validate(dk(1));
     }
 
     private DecoratedKey dk(long l)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
new file mode 100644
index 0000000..947f968
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Queue;
+import java.util.UUID;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultFileRegion;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.async.ByteBufDataInputPlus;
+import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus;
+import org.apache.cassandra.net.async.NonClosingDefaultFileRegion;
+import org.apache.cassandra.schema.CachingParams;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.streaming.DefaultConnectionFactory;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.streaming.SessionInfo;
+import org.apache.cassandra.streaming.StreamCoordinator;
+import org.apache.cassandra.streaming.StreamEventHandler;
+import org.apache.cassandra.streaming.StreamOperation;
+import org.apache.cassandra.streaming.StreamResultFuture;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.StreamSummary;
+import org.apache.cassandra.streaming.messages.StreamMessageHeader;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CassandraEntireSSTableStreamWriterTest
+{
+    public static final String KEYSPACE = "CassandraEntireSSTableStreamWriterTest";
+    public static final String CF_STANDARD = "Standard1";
+    public static final String CF_INDEXED = "Indexed1";
+    public static final String CF_STANDARDLOWINDEXINTERVAL = "StandardLowIndexInterval";
+
+    private static SSTableReader sstable;
+    private static ColumnFamilyStore store;
+
+    @BeforeClass
+    public static void defineSchemaAndPrepareSSTable()
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE, CF_STANDARD),
+                                    SchemaLoader.compositeIndexCFMD(KEYSPACE, CF_INDEXED, true),
+                                    SchemaLoader.standardCFMD(KEYSPACE, CF_STANDARDLOWINDEXINTERVAL)
+                                                .minIndexInterval(8)
+                                                .maxIndexInterval(256)
+                                                .caching(CachingParams.CACHE_NOTHING));
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        store = keyspace.getColumnFamilyStore("Standard1");
+
+        // insert data and compact to a single sstable
+        CompactionManager.instance.disableAutoCompaction();
+        for (int j = 0; j < 10; j++)
+        {
+            new RowUpdateBuilder(store.metadata(), j, String.valueOf(j))
+            .clustering("0")
+            .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
+            .build()
+            .applyUnsafe();
+        }
+        store.forceBlockingFlush();
+        CompactionManager.instance.performMaximal(store, false);
+
+        sstable = store.getLiveSSTables().iterator().next();
+    }
+
+    @Test
+    public void testBlockWriterOverWire() throws IOException
+    {
+        StreamSession session = setupStreamingSessionForTest();
+
+        CassandraEntireSSTableStreamWriter writer = new CassandraEntireSSTableStreamWriter(sstable, session, CassandraOutgoingFile.getComponentManifest(sstable));
+
+        EmbeddedChannel channel = new EmbeddedChannel();
+        ByteBufDataOutputStreamPlus out = ByteBufDataOutputStreamPlus.create(session, channel, 1024 * 1024);
+        writer.write(out);
+
+        Queue msgs = channel.outboundMessages();
+
+        assertTrue(msgs.peek() instanceof DefaultFileRegion);
+    }
+
+    @Test
+    public void testBlockReadingAndWritingOverWire() throws Exception
+    {
+        StreamSession session = setupStreamingSessionForTest();
+        InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
+
+        CassandraEntireSSTableStreamWriter writer = new CassandraEntireSSTableStreamWriter(sstable, session, CassandraOutgoingFile.getComponentManifest(sstable));
+
+        // This is needed as Netty releases the ByteBuffers as soon as the channel is flushed
+        ByteBuf serializedFile = Unpooled.buffer(8192);
+        EmbeddedChannel channel = createMockNettyChannel(serializedFile);
+        ByteBufDataOutputStreamPlus out = ByteBufDataOutputStreamPlus.create(session, channel, 1024 * 1024);
+
+        writer.write(out);
+
+        session.prepareReceiving(new StreamSummary(sstable.metadata().id, 1, 5104));
+
+        CassandraStreamHeader header =
+            CassandraStreamHeader.builder()
+                                 .withSSTableFormat(sstable.descriptor.formatType)
+                                 .withSSTableVersion(sstable.descriptor.version)
+                                 .withSSTableLevel(0)
+                                 .withEstimatedKeys(sstable.estimatedKeys())
+                                 .withSections(Collections.emptyList())
+                                 .withSerializationHeader(sstable.header.toComponent())
+                                 .withComponentManifest(CassandraOutgoingFile.getComponentManifest(sstable))
+                                 .isEntireSSTable(true)
+                                 .withFirstKey(sstable.first)
+                                 .withTableId(sstable.metadata().id)
+                                 .build();
+
+        CassandraEntireSSTableStreamReader reader = new CassandraEntireSSTableStreamReader(new StreamMessageHeader(sstable.metadata().id, peer, session.planId(), 0, 0, 0, null), header, session);
+
+        SSTableMultiWriter sstableWriter = reader.read(new ByteBufDataInputPlus(serializedFile));
+        Collection<SSTableReader> newSstables = sstableWriter.finished();
+
+        assertEquals(1, newSstables.size());
+    }
+
+    private EmbeddedChannel createMockNettyChannel(ByteBuf serializedFile) throws Exception
+    {
+        WritableByteChannel wbc = new WritableByteChannel()
+        {
+            private boolean isOpen = true;
+            public int write(ByteBuffer src) throws IOException
+            {
+                int size = src.limit();
+                serializedFile.writeBytes(src);
+                return size;
+            }
+
+            public boolean isOpen()
+            {
+                return isOpen;
+            }
+
+            public void close() throws IOException
+            {
+                isOpen = false;
+            }
+        };
+
+        return new EmbeddedChannel(new ChannelOutboundHandlerAdapter() {
+                @Override
+                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception
+                {
+                    ((NonClosingDefaultFileRegion) msg).transferTo(wbc, 0);
+                    super.write(ctx, msg, promise);
+                }
+            });
+    }
+
+    private StreamSession setupStreamingSessionForTest()
+    {
+        StreamCoordinator streamCoordinator = new StreamCoordinator(StreamOperation.BOOTSTRAP, 1, new DefaultConnectionFactory(), false, null, PreviewKind.NONE);
+        StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), StreamOperation.BOOTSTRAP, Collections.<StreamEventHandler>emptyList(), streamCoordinator);
+
+        InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
+        streamCoordinator.addSessionInfo(new SessionInfo(peer, 0, peer, Collections.emptyList(), Collections.emptyList(), StreamSession.State.INITIALIZED));
+
+        StreamSession session = streamCoordinator.getOrCreateNextSession(peer);
+        session.init(future);
+        return session;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[3/3] cassandra git commit: Stream entire SSTables when possible

Posted by al...@apache.org.
Stream entire SSTables when possible

patch by Dinesh Joshi; reviewed by Aleksey Yeschenko and Ariel Weisberg
for CASSANDRA-14566


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/47a12c52
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/47a12c52
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/47a12c52

Branch: refs/heads/trunk
Commit: 47a12c52a313258307ab88392f75c5866d9a2bb1
Parents: 6ba2fb9
Author: Dinesh A. Joshi <di...@apple.com>
Authored: Tue Jul 3 12:07:11 2018 -0700
Committer: Aleksey Yeshchenko <al...@apple.com>
Committed: Fri Jul 27 17:50:25 2018 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   3 +
 conf/cassandra.yaml                             |  12 +
 .../org/apache/cassandra/config/Config.java     |   1 +
 .../cassandra/config/DatabaseDescriptor.java    |  14 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |   2 +-
 .../org/apache/cassandra/db/DiskBoundaries.java |  17 +-
 .../cassandra/db/compaction/Verifier.java       |  26 +-
 .../apache/cassandra/db/lifecycle/LogFile.java  |   2 +-
 .../cassandra/db/lifecycle/LogReplicaSet.java   |   3 +-
 .../CassandraCompressedStreamReader.java        | 131 ++++++++
 .../CassandraCompressedStreamWriter.java        | 152 +++++++++
 .../CassandraEntireSSTableStreamReader.java     | 177 ++++++++++
 .../CassandraEntireSSTableStreamWriter.java     | 120 +++++++
 .../db/streaming/CassandraIncomingFile.java     |  17 +-
 .../db/streaming/CassandraOutgoingFile.java     | 132 +++++++-
 .../db/streaming/CassandraStreamHeader.java     | 250 ++++++++++++--
 .../db/streaming/CassandraStreamManager.java    |   4 +-
 .../db/streaming/CassandraStreamReader.java     |   5 +-
 .../db/streaming/ComponentManifest.java         | 130 ++++++++
 .../CompressedCassandraStreamReader.java        | 131 --------
 .../CompressedCassandraStreamWriter.java        | 152 ---------
 .../db/streaming/CompressedInputStream.java     |   4 +-
 .../cassandra/db/streaming/IStreamReader.java   |  32 ++
 .../apache/cassandra/io/sstable/Component.java  |   7 +-
 .../cassandra/io/sstable/SSTableLoader.java     |   2 +-
 .../format/big/BigTableZeroCopyWriter.java      | 226 +++++++++++++
 .../io/util/BufferedDataOutputStreamPlus.java   |   4 +-
 .../cassandra/io/util/CheckedFunction.java      |  25 ++
 .../cassandra/io/util/DataOutputPlus.java       |   4 +-
 .../io/util/RebufferingInputStream.java         |   4 +-
 .../cassandra/io/util/SequentialWriter.java     |  17 +-
 .../io/util/UnbufferedDataOutputStreamPlus.java |   2 +-
 .../net/async/ByteBufDataOutputPlus.java        |   5 +-
 .../net/async/ByteBufDataOutputStreamPlus.java  |  60 +++-
 .../net/async/NonClosingDefaultFileRegion.java  |  51 +++
 .../async/RebufferingByteBufDataInputPlus.java  |  39 +++
 .../cassandra/streaming/StreamCoordinator.java  |   3 +-
 .../cassandra/streaming/StreamReceiveTask.java  |   5 +-
 .../cassandra/streaming/StreamResultFuture.java |  13 +-
 .../cassandra/streaming/StreamSession.java      |   6 +-
 .../async/NettyStreamingMessageSender.java      |   2 +-
 .../async/StreamingInboundHandler.java          |   2 +-
 .../streaming/messages/StreamInitMessage.java   |   7 +-
 .../org/apache/cassandra/utils/Collectors3.java |  54 +++
 test/conf/cassandra.yaml                        |   2 +
 .../cassandra/streaming/LongStreamingTest.java  |   6 +-
 .../microbench/ZeroCopyStreamingBenchmark.java  | 329 +++++++++++++++++++
 .../org/apache/cassandra/db/VerifyTest.java     |  30 +-
 .../CassandraEntireSSTableStreamWriterTest.java | 209 ++++++++++++
 .../db/streaming/CassandraOutgoingFileTest.java | 145 ++++++++
 .../db/streaming/CassandraStreamHeaderTest.java |  62 +++-
 .../db/streaming/ComponentManifestTest.java     |  64 ++++
 .../io/sstable/BigTableWriterTest.java          |   5 +-
 .../cassandra/io/sstable/LegacySSTableTest.java |   2 +
 .../format/big/BigTableZeroCopyWriterTest.java  | 208 ++++++++++++
 .../RebufferingByteBufDataInputPlusTest.java    |  98 ++++++
 .../serializers/SerializationUtils.java         |   1 -
 .../streaming/StreamTransferTaskTest.java       |   4 +-
 .../streaming/StreamingTransferTest.java        |   1 +
 60 files changed, 2809 insertions(+), 413 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6b6418c..6ede70e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Stream entire SSTables when possible (CASSANDRA-14556)
  * Add experimental support for Java 11 (CASSANDRA-9608)
  * Make PeriodicCommitLogService.blockWhenSyncLagsNanos configurable (CASSANDRA-14580)
  * Improve logging in MessageInHandler's constructor (CASSANDRA-14576)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 75885e9..3fab849 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -91,6 +91,9 @@ New features
      statements after the cache has been refreshed. CASSANDRA-13985
    - Support for audit logging of database activity. If enabled, logs every incoming
      CQL command request, Authentication (successful as well as unsuccessful login) to a node.
+   - Faster streaming of entire SSTables using ZeroCopy APIs. If enabled, Cassandra will use stream
+     entire SSTables, significantly speeding up transfers. Any streaming related operations will see
+     corresponding improvement. See CASSANDRA-14556.
 
 Upgrading
 ---------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 439b85a..663daaa 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -788,6 +788,18 @@ compaction_throughput_mb_per_sec: 16
 # between the sstables, reducing page cache churn and keeping hot rows hot
 sstable_preemptive_open_interval_in_mb: 50
 
+# When enabled, permits Cassandra to zero-copy stream entire eligible
+# SSTables between nodes, including every component.
+# This speeds up the network transfer significantly subject to
+# throttling specified by stream_throughput_outbound_megabits_per_sec.
+# Enabling this will reduce the GC pressure on sending and receiving node.
+# When unset, the default is enabled. While this feature tries to keep the
+# disks balanced, it cannot guarantee it. This feature will be automatically
+# disabled if internode encryption is enabled. Currently this can be used with
+# Leveled Compaction. Once CASSANDRA-14586 is fixed other compaction strategies
+# will benefit as well when used in combination with CASSANDRA-6696.
+# stream_entire_sstables: true
+
 # Throttles all outbound streaming file transfers on this node to the
 # given total throughput in Mbps. This is necessary because Cassandra does
 # mostly sequential IO when streaming data during bootstrap or repair, which

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 0d4760e..3a7ff0d 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -381,6 +381,7 @@ public class Config
     public int block_for_peers_timeout_in_secs = 10;
     public volatile boolean automatic_sstable_upgrade = false;
     public volatile int max_concurrent_automatic_sstable_upgrades = 1;
+    public boolean stream_entire_sstables = true;
 
     public volatile AuditLogOptions audit_logging_options = new AuditLogOptions();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 6301ab0..366dac7 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -714,6 +714,15 @@ public class DatabaseDescriptor
                                             "server_encryption_options.internode_encryption = " + conf.server_encryption_options.internode_encryption, false);
         }
 
+        if (conf.stream_entire_sstables)
+        {
+            if (conf.server_encryption_options.enabled || conf.server_encryption_options.optional)
+            {
+                logger.warn("Internode encryption enabled. Disabling zero copy SSTable transfers for streaming.");
+                conf.stream_entire_sstables = false;
+            }
+        }
+
         if (conf.max_value_size_in_mb <= 0)
             throw new ConfigurationException("max_value_size_in_mb must be positive", false);
         else if (conf.max_value_size_in_mb >= 2048)
@@ -2274,6 +2283,11 @@ public class DatabaseDescriptor
         return conf.streaming_connections_per_host;
     }
 
+    public static boolean streamEntireSSTables()
+    {
+        return conf.stream_entire_sstables;
+    }
+
     public static String getLocalDataCenter()
     {
         return localDC;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 9c4921e..f03ffe6 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -791,7 +791,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return newSSTableDescriptor(directory, format.info.getLatestVersion(), format);
     }
 
-    private Descriptor newSSTableDescriptor(File directory, Version version, SSTableFormat.Type format)
+    public Descriptor newSSTableDescriptor(File directory, Version version, SSTableFormat.Type format)
     {
         return new Descriptor(version,
                               directory,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/DiskBoundaries.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DiskBoundaries.java b/src/java/org/apache/cassandra/db/DiskBoundaries.java
index 086bc84..22f17b0 100644
--- a/src/java/org/apache/cassandra/db/DiskBoundaries.java
+++ b/src/java/org/apache/cassandra/db/DiskBoundaries.java
@@ -129,4 +129,19 @@ public class DiskBoundaries
     {
         return directories.get(getDiskIndex(sstable));
     }
-}
\ No newline at end of file
+
+    public Directories.DataDirectory getCorrectDiskForKey(DecoratedKey key)
+    {
+        if (positions == null)
+            return null;
+
+        return directories.get(getDiskIndex(key));
+    }
+
+    private int getDiskIndex(DecoratedKey key)
+    {
+        int pos = Collections.binarySearch(positions, key);
+        assert pos < 0;
+        return -pos - 1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/compaction/Verifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java
index bc9679d..db49369 100644
--- a/src/java/org/apache/cassandra/db/compaction/Verifier.java
+++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java
@@ -180,7 +180,7 @@ public class Verifier implements Closeable
                 while (iter.hasNext())
                 {
                     DecoratedKey key = iter.next();
-                    rangeOwnHelper.check(key);
+                    rangeOwnHelper.validate(key);
                 }
             }
             catch (Throwable t)
@@ -262,7 +262,7 @@ public class Verifier implements Closeable
                 {
                     try
                     {
-                        rangeOwnHelper.check(key);
+                        rangeOwnHelper.validate(key);
                     }
                     catch (Throwable t)
                     {
@@ -360,13 +360,27 @@ public class Verifier implements Closeable
          * @param key the key
          * @throws RuntimeException if the key is not contained
          */
-        public void check(DecoratedKey key)
+        public void validate(DecoratedKey key)
+        {
+            if (!check(key))
+                throw new RuntimeException("Key " + key + " is not contained in the given ranges");
+        }
+
+        /**
+         * check if the given key is contained in any of the given ranges
+         *
+         * Must be called in sorted order - key should be increasing
+         *
+         * @param key the key
+         * @return boolean
+         */
+        public boolean check(DecoratedKey key)
         {
             assert lastKey == null || key.compareTo(lastKey) > 0;
             lastKey = key;
 
             if (normalizedRanges.isEmpty()) // handle tests etc where we don't have any ranges
-                return;
+                return true;
 
             if (rangeIndex > normalizedRanges.size() - 1)
                 throw new IllegalStateException("RangeOwnHelper can only be used to find the first out-of-range-token");
@@ -375,8 +389,10 @@ public class Verifier implements Closeable
             {
                 rangeIndex++;
                 if (rangeIndex > normalizedRanges.size() - 1)
-                    throw new RuntimeException("Key "+key+" is not contained in the given ranges");
+                    return false;
             }
+
+            return true;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
index af6f435..98be0a0 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
@@ -66,7 +66,7 @@ final class LogFile implements AutoCloseable
     private final LogReplicaSet replicas = new LogReplicaSet();
 
     // The transaction records, this set must be ORDER PRESERVING
-    private final LinkedHashSet<LogRecord> records = new LinkedHashSet<>();
+    private final Set<LogRecord> records = Collections.synchronizedSet(new LinkedHashSet<>()); // TODO: Hack until we fix CASSANDRA-14554
 
     // The type of the transaction
     private final OperationType type;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java b/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java
index 65be285..f5423d6 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.lifecycle;
 
 import java.io.File;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -45,7 +46,7 @@ public class LogReplicaSet implements AutoCloseable
 {
     private static final Logger logger = LoggerFactory.getLogger(LogReplicaSet.class);
 
-    private final Map<File, LogReplica> replicasByFile = new LinkedHashMap<>();
+    private final Map<File, LogReplica> replicasByFile = Collections.synchronizedMap(new LinkedHashMap<>()); // TODO: Hack until we fix CASSANDRA-14554
 
     private Collection<LogReplica> replicas()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
new file mode 100644
index 0000000..eb993ff
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+
+import com.google.common.base.Throwables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.TrackedDataInputPlus;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.messages.StreamMessageHeader;
+import org.apache.cassandra.utils.ChecksumType;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.utils.Throwables.extractIOExceptionCause;
+
+/**
+ * CassandraStreamReader that reads from streamed compressed SSTable
+ */
+public class CassandraCompressedStreamReader extends CassandraStreamReader
+{
+    private static final Logger logger = LoggerFactory.getLogger(CassandraCompressedStreamReader.class);
+
+    protected final CompressionInfo compressionInfo;
+
+    public CassandraCompressedStreamReader(StreamMessageHeader header, CassandraStreamHeader streamHeader, StreamSession session)
+    {
+        super(header, streamHeader, session);
+        this.compressionInfo = streamHeader.compressionInfo;
+    }
+
+    /**
+     * @return SSTable transferred
+     * @throws java.io.IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
+     */
+    @Override
+    @SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed
+    public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException
+    {
+        long totalSize = totalSize();
+
+        ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
+
+        if (cfs == null)
+        {
+            // schema was dropped during streaming
+            throw new IOException("CF " + tableId + " was dropped during streaming");
+        }
+
+        logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', pendingRepair = '{}', table = '{}'.",
+                     session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(), pendingRepair,
+                     cfs.getTableName());
+
+        StreamDeserializer deserializer = null;
+        SSTableMultiWriter writer = null;
+        try (CompressedInputStream cis = new CompressedInputStream(inputPlus, compressionInfo, ChecksumType.CRC32, cfs::getCrcCheckChance))
+        {
+            TrackedDataInputPlus in = new TrackedDataInputPlus(cis);
+            deserializer = new StreamDeserializer(cfs.metadata(), in, inputVersion, getHeader(cfs.metadata()));
+            writer = createWriter(cfs, totalSize, repairedAt, pendingRepair, format);
+            String filename = writer.getFilename();
+            int sectionIdx = 0;
+            for (SSTableReader.PartitionPositionBounds section : sections)
+            {
+                assert cis.getTotalCompressedBytesRead() <= totalSize;
+                long sectionLength = section.upperPosition - section.lowerPosition;
+
+                logger.trace("[Stream #{}] Reading section {} with length {} from stream.", session.planId(), sectionIdx++, sectionLength);
+                // skip to beginning of section inside chunk
+                cis.position(section.lowerPosition);
+                in.reset(0);
+
+                while (in.getBytesRead() < sectionLength)
+                {
+                    writePartition(deserializer, writer);
+                    // when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred
+                    session.progress(filename, ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize);
+                }
+            }
+            logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}", session.planId(), fileSeqNum,
+                         session.peer, FBUtilities.prettyPrintMemory(cis.getTotalCompressedBytesRead()), FBUtilities.prettyPrintMemory(totalSize));
+            return writer;
+        }
+        catch (Throwable e)
+        {
+            Object partitionKey = deserializer != null ? deserializer.partitionKey() : "";
+            logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.",
+                        session.planId(), partitionKey, cfs.keyspace.getName(), cfs.getTableName());
+            if (writer != null)
+            {
+                writer.abort(e);
+            }
+            if (extractIOExceptionCause(e).isPresent())
+                throw e;
+            throw Throwables.propagate(e);
+        }
+    }
+
+    @Override
+    protected long totalSize()
+    {
+        long size = 0;
+        // calculate total length of transferring chunks
+        for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
+            size += chunk.length + 4; // 4 bytes for CRC
+        return size;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
new file mode 100644
index 0000000..3b971f8
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.ChannelProxy;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * CassandraStreamWriter for compressed SSTable.
+ */
+public class CassandraCompressedStreamWriter extends CassandraStreamWriter
+{
+    private static final int CHUNK_SIZE = 1 << 16;
+
+    private static final Logger logger = LoggerFactory.getLogger(CassandraCompressedStreamWriter.class);
+
+    private final CompressionInfo compressionInfo;
+
+    public CassandraCompressedStreamWriter(SSTableReader sstable, Collection<SSTableReader.PartitionPositionBounds> sections, CompressionInfo compressionInfo, StreamSession session)
+    {
+        super(sstable, sections, session);
+        this.compressionInfo = compressionInfo;
+    }
+
+    @Override
+    public void write(DataOutputStreamPlus out) throws IOException
+    {
+        assert out instanceof ByteBufDataOutputStreamPlus;
+        ByteBufDataOutputStreamPlus output = (ByteBufDataOutputStreamPlus)out;
+        long totalSize = totalSize();
+        logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize = {}", session.planId(),
+                     sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize);
+        try (ChannelProxy fc = sstable.getDataChannel().sharedCopy())
+        {
+            long progress = 0L;
+            // calculate chunks to transfer. we want to send continuous chunks altogether.
+            List<SSTableReader.PartitionPositionBounds> sections = getTransferSections(compressionInfo.chunks);
+
+            int sectionIdx = 0;
+
+            // stream each of the required sections of the file
+            for (final SSTableReader.PartitionPositionBounds section : sections)
+            {
+                // length of the section to stream
+                long length = section.upperPosition - section.lowerPosition;
+
+                logger.trace("[Stream #{}] Writing section {} with length {} to stream.", session.planId(), sectionIdx++, length);
+
+                // tracks write progress
+                long bytesTransferred = 0;
+                while (bytesTransferred < length)
+                {
+                    final int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred);
+                    limiter.acquire(toTransfer);
+
+                    ByteBuffer outBuffer = ByteBuffer.allocateDirect(toTransfer);
+                    long lastWrite;
+                    try
+                    {
+                        lastWrite = fc.read(outBuffer, section.lowerPosition + bytesTransferred);
+                        assert lastWrite == toTransfer : String.format("could not read required number of bytes from file to be streamed: read %d bytes, wanted %d bytes", lastWrite, toTransfer);
+                        outBuffer.flip();
+                        output.writeToChannel(outBuffer);
+                    }
+                    catch (IOException e)
+                    {
+                        FileUtils.clean(outBuffer);
+                        throw e;
+                    }
+
+                    bytesTransferred += lastWrite;
+                    progress += lastWrite;
+                    session.progress(sstable.descriptor.filenameFor(Component.DATA), ProgressInfo.Direction.OUT, progress, totalSize);
+                }
+            }
+            logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred = {}, totalSize = {}",
+                         session.planId(), sstable.getFilename(), session.peer, FBUtilities.prettyPrintMemory(progress), FBUtilities.prettyPrintMemory(totalSize));
+        }
+    }
+
+    @Override
+    protected long totalSize()
+    {
+        long size = 0;
+        // calculate total length of transferring chunks
+        for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
+            size += chunk.length + 4; // 4 bytes for CRC
+        return size;
+    }
+
+    // chunks are assumed to be sorted by offset
+    private List<SSTableReader.PartitionPositionBounds> getTransferSections(CompressionMetadata.Chunk[] chunks)
+    {
+        List<SSTableReader.PartitionPositionBounds> transferSections = new ArrayList<>();
+        SSTableReader.PartitionPositionBounds lastSection = null;
+        for (CompressionMetadata.Chunk chunk : chunks)
+        {
+            if (lastSection != null)
+            {
+                if (chunk.offset == lastSection.upperPosition)
+                {
+                    // extend previous section to end of this chunk
+                    lastSection = new SSTableReader.PartitionPositionBounds(lastSection.lowerPosition, chunk.offset + chunk.length + 4); // 4 bytes for CRC
+                }
+                else
+                {
+                    transferSections.add(lastSection);
+                    lastSection = new SSTableReader.PartitionPositionBounds(chunk.offset, chunk.offset + chunk.length + 4);
+                }
+            }
+            else
+            {
+                lastSection = new SSTableReader.PartitionPositionBounds(chunk.offset, chunk.offset + chunk.length + 4);
+            }
+        }
+        if (lastSection != null)
+            transferSections.add(lastSection);
+        return transferSections;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
new file mode 100644
index 0000000..6f8f06a
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+
+import com.google.common.base.Throwables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.big.BigTableZeroCopyWriter;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamReceiver;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.messages.StreamMessageHeader;
+
+import static java.lang.String.format;
+import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory;
+
+/**
+ * CassandraEntireSSTableStreamReader reads SSTable off the wire and writes it to disk.
+ */
+public class CassandraEntireSSTableStreamReader implements IStreamReader
+{
+    private static final Logger logger = LoggerFactory.getLogger(CassandraEntireSSTableStreamReader.class);
+
+    private final TableId tableId;
+    private final StreamSession session;
+    private final CassandraStreamHeader header;
+    private final int fileSequenceNumber;
+
+    public CassandraEntireSSTableStreamReader(StreamMessageHeader messageHeader, CassandraStreamHeader streamHeader, StreamSession session)
+    {
+        if (streamHeader.format != SSTableFormat.Type.BIG)
+            throw new AssertionError("Unsupported SSTable format " + streamHeader.format);
+
+        if (session.getPendingRepair() != null)
+        {
+            // we should only ever be streaming pending repair sstables if the session has a pending repair id
+            if (!session.getPendingRepair().equals(messageHeader.pendingRepair))
+                throw new IllegalStateException(format("Stream Session & SSTable (%s) pendingRepair UUID mismatch.", messageHeader.tableId));
+        }
+
+        this.header = streamHeader;
+        this.session = session;
+        this.tableId = messageHeader.tableId;
+        this.fileSequenceNumber = messageHeader.sequenceNumber;
+    }
+
+    /**
+     * @param in where this reads data from
+     * @return SSTable transferred
+     * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
+     */
+    @SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed
+    @Override
+    public SSTableMultiWriter read(DataInputPlus in) throws IOException
+    {
+        ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
+        if (cfs == null)
+        {
+            // schema was dropped during streaming
+            throw new IOException("Table " + tableId + " was dropped during streaming");
+        }
+
+        ComponentManifest manifest = header.componentManifest;
+        long totalSize = manifest.totalSize();
+
+        logger.debug("[Stream #{}] Started receiving sstable #{} from {}, size = {}, table = {}",
+                     session.planId(),
+                     fileSequenceNumber,
+                     session.peer,
+                     prettyPrintMemory(totalSize),
+                     cfs.metadata());
+
+        BigTableZeroCopyWriter writer = null;
+
+        try
+        {
+            writer = createWriter(cfs, totalSize, manifest.components());
+            long bytesRead = 0;
+            for (Component component : manifest.components())
+            {
+                long length = manifest.sizeOf(component);
+
+                logger.debug("[Stream #{}] Started receiving {} component from {}, componentSize = {}, readBytes = {}, totalSize = {}",
+                             session.planId(),
+                             component,
+                             session.peer,
+                             prettyPrintMemory(length),
+                             prettyPrintMemory(bytesRead),
+                             prettyPrintMemory(totalSize));
+
+                writer.writeComponent(component.type, in, length);
+                session.progress(writer.descriptor.filenameFor(component), ProgressInfo.Direction.IN, length, length);
+                bytesRead += length;
+
+                logger.debug("[Stream #{}] Finished receiving {} component from {}, componentSize = {}, readBytes = {}, totalSize = {}",
+                             session.planId(),
+                             component,
+                             session.peer,
+                             prettyPrintMemory(length),
+                             prettyPrintMemory(bytesRead),
+                             prettyPrintMemory(totalSize));
+            }
+
+            writer.descriptor.getMetadataSerializer().mutateLevel(writer.descriptor, header.sstableLevel);
+            return writer;
+        }
+        catch (Throwable e)
+        {
+            logger.error("[Stream {}] Error while reading sstable from stream for table = {}", session.planId(), cfs.metadata(), e);
+            if (writer != null)
+                e = writer.abort(e);
+            Throwables.throwIfUnchecked(e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private File getDataDir(ColumnFamilyStore cfs, long totalSize) throws IOException
+    {
+        Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize);
+        if (localDir == null)
+            throw new IOException(format("Insufficient disk space to store %s", prettyPrintMemory(totalSize)));
+
+        File dir = cfs.getDirectories().getLocationForDisk(cfs.getDiskBoundaries().getCorrectDiskForKey(header.firstKey));
+
+        if (dir == null)
+            return cfs.getDirectories().getDirectoryForNewSSTables();
+
+        return dir;
+    }
+
+    @SuppressWarnings("resource")
+    protected BigTableZeroCopyWriter createWriter(ColumnFamilyStore cfs, long totalSize, Collection<Component> components) throws IOException
+    {
+        File dataDir = getDataDir(cfs, totalSize);
+
+        StreamReceiver streamReceiver = session.getAggregator(tableId);
+        assert streamReceiver instanceof CassandraStreamReceiver;
+
+        LifecycleTransaction txn = CassandraStreamReceiver.fromReceiver(session.getAggregator(tableId)).getTransaction();
+
+        Descriptor desc = cfs.newSSTableDescriptor(dataDir, header.version, header.format);
+
+        logger.debug("[Table #{}] {} Components to write: {}", cfs.metadata(), desc.filenameFor(Component.DATA), components);
+
+        return new BigTableZeroCopyWriter(desc, cfs.metadata, txn, components);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java
new file mode 100644
index 0000000..7a20110
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamManager;
+import org.apache.cassandra.streaming.StreamSession;
+
+import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
+import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory;
+
+/**
+ * CassandraEntireSSTableStreamWriter streams the entire SSTable to given channel.
+ */
+public class CassandraEntireSSTableStreamWriter
+{
+    private static final Logger logger = LoggerFactory.getLogger(CassandraEntireSSTableStreamWriter.class);
+
+    private final SSTableReader sstable;
+    private final ComponentManifest manifest;
+    private final StreamSession session;
+    private final StreamRateLimiter limiter;
+
+    public CassandraEntireSSTableStreamWriter(SSTableReader sstable, StreamSession session, ComponentManifest manifest)
+    {
+        this.session = session;
+        this.sstable = sstable;
+        this.manifest = manifest;
+        this.limiter = StreamManager.getRateLimiter(session.peer);
+    }
+
+    /**
+     * Stream the entire file to given channel.
+     * <p>
+     *
+     * @param out where this writes data to
+     * @throws IOException on any I/O error
+     */
+    public void write(ByteBufDataOutputStreamPlus out) throws IOException
+    {
+        long totalSize = manifest.totalSize();
+        logger.debug("[Stream #{}] Start streaming sstable {} to {}, repairedAt = {}, totalSize = {}",
+                     session.planId(),
+                     sstable.getFilename(),
+                     session.peer,
+                     sstable.getSSTableMetadata().repairedAt,
+                     prettyPrintMemory(totalSize));
+
+        long progress = 0L;
+
+        for (Component component : manifest.components())
+        {
+            @SuppressWarnings("resource") // this is closed after the file is transferred by ByteBufDataOutputStreamPlus
+            FileChannel in = new RandomAccessFile(sstable.descriptor.filenameFor(component), "r").getChannel();
+
+            // Total Length to transmit for this file
+            long length = in.size();
+
+            // tracks write progress
+            logger.debug("[Stream #{}] Streaming {}.{} gen {} component {} size {}", session.planId(),
+                         sstable.getKeyspaceName(),
+                         sstable.getColumnFamilyName(),
+                         sstable.descriptor.generation,
+                         component,
+                         prettyPrintMemory(length));
+
+            long bytesWritten = out.writeToChannel(in, limiter);
+            progress += bytesWritten;
+
+            session.progress(sstable.descriptor.filenameFor(component), ProgressInfo.Direction.OUT, bytesWritten, length);
+
+            logger.debug("[Stream #{}] Finished streaming {}.{} gen {} component {} to {}, xfered = {}, length = {}, totalSize = {}",
+                         session.planId(),
+                         sstable.getKeyspaceName(),
+                         sstable.getColumnFamilyName(),
+                         sstable.descriptor.generation,
+                         component,
+                         session.peer,
+                         prettyPrintMemory(bytesWritten),
+                         prettyPrintMemory(length),
+                         prettyPrintMemory(totalSize));
+        }
+
+        out.flush();
+
+        logger.debug("[Stream #{}] Finished streaming sstable {} to {}, xfered = {}, totalSize = {}",
+                     session.planId(),
+                     sstable.getFilename(),
+                     session.peer,
+                     prettyPrintMemory(progress),
+                     prettyPrintMemory(totalSize));
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java b/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
index 16698e5..c65ca62 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.util.Objects;
 
 import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
@@ -45,6 +47,8 @@ public class CassandraIncomingFile implements IncomingStream
     private volatile SSTableMultiWriter sstable;
     private volatile long size = -1;
 
+    private static final Logger logger = LoggerFactory.getLogger(CassandraIncomingFile.class);
+
     public CassandraIncomingFile(ColumnFamilyStore cfs, StreamSession session, StreamMessageHeader header)
     {
         this.cfs = cfs;
@@ -56,9 +60,16 @@ public class CassandraIncomingFile implements IncomingStream
     public synchronized void read(DataInputPlus in, int version) throws IOException
     {
         CassandraStreamHeader streamHeader = CassandraStreamHeader.serializer.deserialize(in, version);
-        CassandraStreamReader reader = !streamHeader.isCompressed()
-                                       ? new CassandraStreamReader(header, streamHeader, session)
-                                       : new CompressedCassandraStreamReader(header, streamHeader, session);
+        logger.debug("Incoming stream entireSSTable={} components={}", streamHeader.isEntireSSTable, streamHeader.componentManifest);
+
+        IStreamReader reader;
+        if (streamHeader.isEntireSSTable)
+            reader = new CassandraEntireSSTableStreamReader(header, streamHeader, session);
+        else if (streamHeader.isCompressed())
+            reader = new CassandraCompressedStreamReader(header, streamHeader, session);
+        else
+            reader = new CassandraStreamReader(header, streamHeader, session);
+
         size = streamHeader.size();
         sstable = reader.read(in);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
index 6ec1f85..5252187 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
@@ -18,51 +18,100 @@
 
 package org.apache.cassandra.db.streaming;
 
+import java.io.File;
 import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Objects;
 import java.util.UUID;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
+import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.KeyIterator;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.streaming.OutgoingStream;
 import org.apache.cassandra.streaming.StreamOperation;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.utils.concurrent.Ref;
 
+import static org.apache.cassandra.db.compaction.Verifier.RangeOwnHelper;
+
 /**
  * used to transfer the part(or whole) of a SSTable data file
  */
 public class CassandraOutgoingFile implements OutgoingStream
 {
+    public static final List<Component> STREAM_COMPONENTS = ImmutableList.of(Component.DATA, Component.PRIMARY_INDEX, Component.STATS,
+                                                                             Component.COMPRESSION_INFO, Component.FILTER, Component.SUMMARY,
+                                                                             Component.DIGEST, Component.CRC);
+
     private final Ref<SSTableReader> ref;
     private final long estimatedKeys;
     private final List<SSTableReader.PartitionPositionBounds> sections;
     private final String filename;
     private final CassandraStreamHeader header;
     private final boolean keepSSTableLevel;
+    private final ComponentManifest manifest;
+    private Boolean isFullyContained;
 
-    public CassandraOutgoingFile(StreamOperation operation, Ref<SSTableReader> ref, List<SSTableReader.PartitionPositionBounds> sections, long estimatedKeys)
+    private final List<Range<Token>> ranges;
+
+    public CassandraOutgoingFile(StreamOperation operation, Ref<SSTableReader> ref,
+                                 List<SSTableReader.PartitionPositionBounds> sections, Collection<Range<Token>> ranges,
+                                 long estimatedKeys)
     {
         Preconditions.checkNotNull(ref.get());
         this.ref = ref;
         this.estimatedKeys = estimatedKeys;
         this.sections = sections;
+        this.ranges = ImmutableList.copyOf(ranges);
         this.filename = ref.get().getFilename();
+        this.manifest = getComponentManifest(ref.get());
 
         SSTableReader sstable = ref.get();
         keepSSTableLevel = operation == StreamOperation.BOOTSTRAP || operation == StreamOperation.REBUILD;
-        this.header = new CassandraStreamHeader(sstable.descriptor.version,
-                                                sstable.descriptor.formatType,
-                                                estimatedKeys,
-                                                sections,
-                                                sstable.compression ? sstable.getCompressionMetadata() : null,
-                                                keepSSTableLevel ? sstable.getSSTableLevel() : 0,
-                                                sstable.header.toComponent());
+        this.header =
+            CassandraStreamHeader.builder()
+                                 .withSSTableFormat(sstable.descriptor.formatType)
+                                 .withSSTableVersion(sstable.descriptor.version)
+                                 .withSSTableLevel(keepSSTableLevel ? sstable.getSSTableLevel() : 0)
+                                 .withEstimatedKeys(estimatedKeys)
+                                 .withSections(sections)
+                                 .withCompressionMetadata(sstable.compression ? sstable.getCompressionMetadata() : null)
+                                 .withSerializationHeader(sstable.header.toComponent())
+                                 .isEntireSSTable(shouldStreamEntireSSTable())
+                                 .withComponentManifest(manifest)
+                                 .withFirstKey(sstable.first)
+                                 .withTableId(sstable.metadata().id)
+                                 .build();
+    }
+
+    @VisibleForTesting
+    public static ComponentManifest getComponentManifest(SSTableReader sstable)
+    {
+        LinkedHashMap<Component, Long> components = new LinkedHashMap<>(STREAM_COMPONENTS.size());
+        for (Component component : STREAM_COMPONENTS)
+        {
+            File file = new File(sstable.descriptor.filenameFor(component));
+            if (file.exists())
+                components.put(component, file.length());
+        }
+
+        return new ComponentManifest(components);
     }
 
     public static CassandraOutgoingFile fromStream(OutgoingStream stream)
@@ -114,11 +163,68 @@ public class CassandraOutgoingFile implements OutgoingStream
         CassandraStreamHeader.serializer.serialize(header, out, version);
         out.flush();
 
-        CassandraStreamWriter writer = header.compressionInfo == null ?
-                                       new CassandraStreamWriter(sstable, header.sections, session) :
-                                       new CompressedCassandraStreamWriter(sstable, header.sections,
-                                                                           header.compressionInfo, session);
-        writer.write(out);
+        if (shouldStreamEntireSSTable() && out instanceof ByteBufDataOutputStreamPlus)
+        {
+            CassandraEntireSSTableStreamWriter writer = new CassandraEntireSSTableStreamWriter(sstable, session, manifest);
+            writer.write((ByteBufDataOutputStreamPlus) out);
+        }
+        else
+        {
+            CassandraStreamWriter writer = (header.compressionInfo == null) ?
+                     new CassandraStreamWriter(sstable, header.sections, session) :
+                     new CassandraCompressedStreamWriter(sstable, header.sections,
+                                                         header.compressionInfo, session);
+            writer.write(out);
+        }
+    }
+
+    @VisibleForTesting
+    public boolean shouldStreamEntireSSTable()
+    {
+        // don't stream if full sstable transfers are disabled or legacy counter shards are present
+        if (!DatabaseDescriptor.streamEntireSSTables() || ref.get().getSSTableMetadata().hasLegacyCounterShards)
+            return false;
+
+        ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(getTableId());
+
+        if (cfs == null)
+            return false;
+
+        AbstractCompactionStrategy compactionStrategy = cfs.getCompactionStrategyManager()
+                                                           .getCompactionStrategyFor(ref.get());
+
+        if (compactionStrategy instanceof LeveledCompactionStrategy)
+            return contained(ranges, ref.get());
+
+        return false;
+    }
+
+    @VisibleForTesting
+    public boolean contained(List<Range<Token>> normalizedRanges, SSTableReader sstable)
+    {
+        if (isFullyContained != null)
+            return isFullyContained;
+
+        isFullyContained = computeContainment(normalizedRanges, sstable);
+        return isFullyContained;
+    }
+
+    private boolean computeContainment(List<Range<Token>> normalizedRanges, SSTableReader sstable)
+    {
+        if (normalizedRanges == null)
+            return false;
+
+        RangeOwnHelper rangeOwnHelper = new RangeOwnHelper(normalizedRanges);
+        try (KeyIterator iter = new KeyIterator(sstable.descriptor, sstable.metadata()))
+        {
+            while (iter.hasNext())
+            {
+                DecoratedKey key = iter.next();
+                if (!rangeOwnHelper.check(key))
+                    return false;
+            }
+        }
+        return true;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java
index 43631b0..2af56de 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java
@@ -15,16 +15,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.cassandra.db.streaming;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import java.util.function.Function;
+
+import com.google.common.annotations.VisibleForTesting;
 
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
@@ -32,6 +38,10 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static com.google.common.base.Preconditions.checkNotNull;
 
 public class CassandraStreamHeader
 {
@@ -50,33 +60,38 @@ public class CassandraStreamHeader
     private final CompressionMetadata compressionMetadata;
     public volatile CompressionInfo compressionInfo;
     public final int sstableLevel;
-    public final SerializationHeader.Component header;
+    public final SerializationHeader.Component serializationHeader;
+
+    /* flag indicating whether this is a partial or entire sstable transfer */
+    public final boolean isEntireSSTable;
+    /* first token of the sstable required for faster streaming */
+    public final DecoratedKey firstKey;
+    public final TableId tableId;
+    public final ComponentManifest componentManifest;
 
     /* cached size value */
     private transient final long size;
 
-    private CassandraStreamHeader(Version version, SSTableFormat.Type format, long estimatedKeys, List<SSTableReader.PartitionPositionBounds> sections, CompressionMetadata compressionMetadata, CompressionInfo compressionInfo, int sstableLevel, SerializationHeader.Component header)
-    {
-        this.version = version;
-        this.format = format;
-        this.estimatedKeys = estimatedKeys;
-        this.sections = sections;
-        this.compressionMetadata = compressionMetadata;
-        this.compressionInfo = compressionInfo;
-        this.sstableLevel = sstableLevel;
-        this.header = header;
-
-        this.size = calculateSize();
-    }
-
-    public CassandraStreamHeader(Version version, SSTableFormat.Type format, long estimatedKeys, List<SSTableReader.PartitionPositionBounds> sections, CompressionMetadata compressionMetadata, int sstableLevel, SerializationHeader.Component header)
+    private CassandraStreamHeader(Builder builder)
     {
-        this(version, format, estimatedKeys, sections, compressionMetadata, null, sstableLevel, header);
+        version = builder.version;
+        format = builder.format;
+        estimatedKeys = builder.estimatedKeys;
+        sections = builder.sections;
+        compressionMetadata = builder.compressionMetadata;
+        compressionInfo = builder.compressionInfo;
+        sstableLevel = builder.sstableLevel;
+        serializationHeader = builder.serializationHeader;
+        tableId = builder.tableId;
+        isEntireSSTable = builder.isEntireSSTable;
+        componentManifest = builder.componentManifest;
+        firstKey = builder.firstKey;
+        size = calculateSize();
     }
 
-    public CassandraStreamHeader(Version version, SSTableFormat.Type format, long estimatedKeys, List<SSTableReader.PartitionPositionBounds> sections, CompressionInfo compressionInfo, int sstableLevel, SerializationHeader.Component header)
+    public static Builder builder()
     {
-        this(version, format, estimatedKeys, sections, null, compressionInfo, sstableLevel, header);
+        return new Builder();
     }
 
     public boolean isCompressed()
@@ -94,6 +109,9 @@ public class CassandraStreamHeader
 
     private long calculateSize()
     {
+        if (isEntireSSTable)
+            return componentManifest.totalSize();
+
         long transferSize = 0;
         if (compressionInfo != null)
         {
@@ -112,9 +130,7 @@ public class CassandraStreamHeader
     public synchronized void calculateCompressionInfo()
     {
         if (compressionMetadata != null && compressionInfo == null)
-        {
             compressionInfo = CompressionInfo.fromCompressionMetadata(compressionMetadata, sections);
-        }
     }
 
     @Override
@@ -125,9 +141,11 @@ public class CassandraStreamHeader
                ", format=" + format +
                ", estimatedKeys=" + estimatedKeys +
                ", sections=" + sections +
-               ", compressionInfo=" + compressionInfo +
                ", sstableLevel=" + sstableLevel +
-               ", header=" + header +
+               ", header=" + serializationHeader +
+               ", isEntireSSTable=" + isEntireSSTable +
+               ", firstKey=" + firstKey +
+               ", tableId=" + tableId +
                '}';
     }
 
@@ -138,20 +156,26 @@ public class CassandraStreamHeader
         CassandraStreamHeader that = (CassandraStreamHeader) o;
         return estimatedKeys == that.estimatedKeys &&
                sstableLevel == that.sstableLevel &&
+               isEntireSSTable == that.isEntireSSTable &&
                Objects.equals(version, that.version) &&
                format == that.format &&
                Objects.equals(sections, that.sections) &&
                Objects.equals(compressionInfo, that.compressionInfo) &&
-               Objects.equals(header, that.header);
+               Objects.equals(serializationHeader, that.serializationHeader) &&
+               Objects.equals(componentManifest, that.componentManifest) &&
+               Objects.equals(firstKey, that.firstKey) &&
+               Objects.equals(tableId, that.tableId);
     }
 
     public int hashCode()
     {
-        return Objects.hash(version, format, estimatedKeys, sections, compressionInfo, sstableLevel, header);
+        return Objects.hash(version, format, estimatedKeys, sections, compressionInfo, sstableLevel, serializationHeader, componentManifest,
+                            isEntireSSTable, firstKey, tableId);
     }
 
+    public static final IVersionedSerializer<CassandraStreamHeader> serializer = new CassandraStreamHeaderSerializer();
 
-    public static final IVersionedSerializer<CassandraStreamHeader> serializer = new IVersionedSerializer<CassandraStreamHeader>()
+    public static class CassandraStreamHeaderSerializer implements IVersionedSerializer<CassandraStreamHeader>
     {
         public void serialize(CassandraStreamHeader header, DataOutputPlus out, int version) throws IOException
         {
@@ -168,11 +192,33 @@ public class CassandraStreamHeader
             header.calculateCompressionInfo();
             CompressionInfo.serializer.serialize(header.compressionInfo, out, version);
             out.writeInt(header.sstableLevel);
-            SerializationHeader.serializer.serialize(header.version, header.header, out);
+
+            SerializationHeader.serializer.serialize(header.version, header.serializationHeader, out);
+
+            header.tableId.serialize(out);
+            out.writeBoolean(header.isEntireSSTable);
+
+            if (header.isEntireSSTable)
+            {
+                ComponentManifest.serializer.serialize(header.componentManifest, out, version);
+                ByteBufferUtil.writeWithVIntLength(header.firstKey.getKey(), out);
+            }
         }
 
         public CassandraStreamHeader deserialize(DataInputPlus in, int version) throws IOException
         {
+            return deserialize(in, version, tableId -> {
+                ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
+                if (cfs != null)
+                    return cfs.getPartitioner();
+
+                return null;
+            });
+        }
+
+        @VisibleForTesting
+        public CassandraStreamHeader deserialize(DataInputPlus in, int version, Function<TableId, IPartitioner> partitionerMapper) throws IOException
+        {
             Version sstableVersion = SSTableFormat.Type.current().info.getVersion(in.readUTF());
             SSTableFormat.Type format = SSTableFormat.Type.validate(in.readUTF());
 
@@ -183,9 +229,36 @@ public class CassandraStreamHeader
                 sections.add(new SSTableReader.PartitionPositionBounds(in.readLong(), in.readLong()));
             CompressionInfo compressionInfo = CompressionInfo.serializer.deserialize(in, version);
             int sstableLevel = in.readInt();
+
             SerializationHeader.Component header =  SerializationHeader.serializer.deserialize(sstableVersion, in);
 
-            return new CassandraStreamHeader(sstableVersion, format, estimatedKeys, sections, compressionInfo, sstableLevel, header);
+            TableId tableId = TableId.deserialize(in);
+            boolean isEntireSSTable = in.readBoolean();
+            ComponentManifest manifest = null;
+            DecoratedKey firstKey = null;
+
+            if (isEntireSSTable)
+            {
+                manifest = ComponentManifest.serializer.deserialize(in, version);
+                ByteBuffer keyBuf = ByteBufferUtil.readWithVIntLength(in);
+                IPartitioner partitioner = partitionerMapper.apply(tableId);
+                if (partitioner == null)
+                    throw new IllegalArgumentException(String.format("Could not determine partitioner for tableId %s", tableId));
+                firstKey = partitioner.decorateKey(keyBuf);
+            }
+
+            return builder().withSSTableFormat(format)
+                            .withSSTableVersion(sstableVersion)
+                            .withSSTableLevel(sstableLevel)
+                            .withEstimatedKeys(estimatedKeys)
+                            .withSections(sections)
+                            .withCompressionInfo(compressionInfo)
+                            .withSerializationHeader(header)
+                            .withComponentManifest(manifest)
+                            .isEntireSSTable(isEntireSSTable)
+                            .withFirstKey(firstKey)
+                            .withTableId(tableId)
+                            .build();
         }
 
         public long serializedSize(CassandraStreamHeader header, int version)
@@ -201,12 +274,127 @@ public class CassandraStreamHeader
                 size += TypeSizes.sizeof(section.lowerPosition);
                 size += TypeSizes.sizeof(section.upperPosition);
             }
+
+            header.calculateCompressionInfo();
             size += CompressionInfo.serializer.serializedSize(header.compressionInfo, version);
             size += TypeSizes.sizeof(header.sstableLevel);
 
-            size += SerializationHeader.serializer.serializedSize(header.version, header.header);
+            size += SerializationHeader.serializer.serializedSize(header.version, header.serializationHeader);
+
+            size += header.tableId.serializedSize();
+            size += TypeSizes.sizeof(header.isEntireSSTable);
 
+            if (header.isEntireSSTable)
+            {
+                size += ComponentManifest.serializer.serializedSize(header.componentManifest, version);
+                size += ByteBufferUtil.serializedSizeWithVIntLength(header.firstKey.getKey());
+            }
             return size;
         }
-    };
+    }
+
+    public static final class Builder
+    {
+        private Version version;
+        private SSTableFormat.Type format;
+        private long estimatedKeys;
+        private List<SSTableReader.PartitionPositionBounds> sections;
+        private CompressionMetadata compressionMetadata;
+        private CompressionInfo compressionInfo;
+        private int sstableLevel;
+        private SerializationHeader.Component serializationHeader;
+        private ComponentManifest componentManifest;
+        private boolean isEntireSSTable;
+        private DecoratedKey firstKey;
+        private TableId tableId;
+
+        public Builder withSSTableFormat(SSTableFormat.Type format)
+        {
+            this.format = format;
+            return this;
+        }
+
+        public Builder withSSTableVersion(Version version)
+        {
+            this.version = version;
+            return this;
+        }
+
+        public Builder withSSTableLevel(int sstableLevel)
+        {
+            this.sstableLevel = sstableLevel;
+            return this;
+        }
+
+        public Builder withEstimatedKeys(long estimatedKeys)
+        {
+            this.estimatedKeys = estimatedKeys;
+            return this;
+        }
+
+        public Builder withSections(List<SSTableReader.PartitionPositionBounds> sections)
+        {
+            this.sections = sections;
+            return this;
+        }
+
+        public Builder withCompressionMetadata(CompressionMetadata compressionMetadata)
+        {
+            this.compressionMetadata = compressionMetadata;
+            return this;
+        }
+
+        public Builder withCompressionInfo(CompressionInfo compressionInfo)
+        {
+            this.compressionInfo = compressionInfo;
+            return this;
+        }
+
+        public Builder withSerializationHeader(SerializationHeader.Component header)
+        {
+            this.serializationHeader = header;
+            return this;
+        }
+
+        public Builder withTableId(TableId tableId)
+        {
+            this.tableId = tableId;
+            return this;
+        }
+
+        public Builder isEntireSSTable(boolean isEntireSSTable)
+        {
+            this.isEntireSSTable = isEntireSSTable;
+            return this;
+        }
+
+        public Builder withComponentManifest(ComponentManifest componentManifest)
+        {
+            this.componentManifest = componentManifest;
+            return this;
+        }
+
+        public Builder withFirstKey(DecoratedKey firstKey)
+        {
+            this.firstKey = firstKey;
+            return this;
+        }
+
+        public CassandraStreamHeader build()
+        {
+            checkNotNull(version);
+            checkNotNull(format);
+            checkNotNull(sections);
+            checkNotNull(serializationHeader);
+            checkNotNull(tableId);
+
+            if (isEntireSSTable)
+            {
+                checkNotNull(componentManifest);
+                checkNotNull(firstKey);
+            }
+
+            return new CassandraStreamHeader(this);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java
index 673b62c..43667d0 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java
@@ -47,7 +47,6 @@ import org.apache.cassandra.streaming.StreamReceiver;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.streaming.TableStreamManager;
 import org.apache.cassandra.streaming.messages.StreamMessageHeader;
-import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.concurrent.Ref;
 import org.apache.cassandra.utils.concurrent.Refs;
 
@@ -152,7 +151,8 @@ public class CassandraStreamManager implements TableStreamManager
                     ref.release();
                     continue;
                 }
-                streams.add(new CassandraOutgoingFile(session.getStreamOperation(), ref, sections, sstable.estimatedKeysForRanges(ranges)));
+                streams.add(new CassandraOutgoingFile(session.getStreamOperation(), ref, sections, ranges,
+                                                      sstable.estimatedKeysForRanges(ranges)));
             }
 
             return streams;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
index 3930196..fccabfe 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
@@ -53,7 +53,7 @@ import org.apache.cassandra.utils.FBUtilities;
 /**
  * CassandraStreamReader reads from stream and writes to SSTable.
  */
-public class CassandraStreamReader
+public class CassandraStreamReader implements IStreamReader
 {
     private static final Logger logger = LoggerFactory.getLogger(CassandraStreamReader.class);
     protected final TableId tableId;
@@ -85,7 +85,7 @@ public class CassandraStreamReader
         this.pendingRepair = header.pendingRepair;
         this.format = streamHeader.format;
         this.sstableLevel = streamHeader.sstableLevel;
-        this.header = streamHeader.header;
+        this.header = streamHeader.serializationHeader;
         this.fileSeqNum = header.sequenceNumber;
     }
 
@@ -95,6 +95,7 @@ public class CassandraStreamReader
      * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
      */
     @SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed
+    @Override
     public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException
     {
         long totalSize = totalSize();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java b/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java
new file mode 100644
index 0000000..90e3dbd
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Iterators;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+public final class ComponentManifest implements Iterable<Component>
+{
+    private final LinkedHashMap<Component, Long> components;
+
+    public ComponentManifest(Map<Component, Long> components)
+    {
+        this.components = new LinkedHashMap<>(components);
+    }
+
+    public long sizeOf(Component component)
+    {
+        Long size = components.get(component);
+        if (size == null)
+            throw new IllegalArgumentException("Component " + component + " is not present in the manifest");
+        return size;
+    }
+
+    public long totalSize()
+    {
+        long totalSize = 0;
+        for (Long size : components.values())
+            totalSize += size;
+        return totalSize;
+    }
+
+    public List<Component> components()
+    {
+        return new ArrayList<>(components.keySet());
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+            return true;
+
+        if (!(o instanceof ComponentManifest))
+            return false;
+
+        ComponentManifest that = (ComponentManifest) o;
+        return components.equals(that.components);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return components.hashCode();
+    }
+
+    public static final IVersionedSerializer<ComponentManifest> serializer = new IVersionedSerializer<ComponentManifest>()
+    {
+        public void serialize(ComponentManifest manifest, DataOutputPlus out, int version) throws IOException
+        {
+            out.writeUnsignedVInt(manifest.components.size());
+            for (Map.Entry<Component, Long> entry : manifest.components.entrySet())
+            {
+                out.writeUTF(entry.getKey().name);
+                out.writeUnsignedVInt(entry.getValue());
+            }
+        }
+
+        public ComponentManifest deserialize(DataInputPlus in, int version) throws IOException
+        {
+            int size = (int) in.readUnsignedVInt();
+
+            LinkedHashMap<Component, Long> components = new LinkedHashMap<>(size);
+
+            for (int i = 0; i < size; i++)
+            {
+                Component component = Component.parse(in.readUTF());
+                long length = in.readUnsignedVInt();
+                components.put(component, length);
+            }
+
+            return new ComponentManifest(components);
+        }
+
+        public long serializedSize(ComponentManifest manifest, int version)
+        {
+            long size = TypeSizes.sizeofUnsignedVInt(manifest.components.size());
+            for (Map.Entry<Component, Long> entry : manifest.components.entrySet())
+            {
+                size += TypeSizes.sizeof(entry.getKey().name);
+                size += TypeSizes.sizeofUnsignedVInt(entry.getValue());
+            }
+            return size;
+        }
+    };
+
+    @Override
+    public Iterator<Component> iterator()
+    {
+        return Iterators.unmodifiableIterator(components.keySet().iterator());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamReader.java
deleted file mode 100644
index c71edfb..0000000
--- a/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamReader.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.streaming;
-
-import java.io.IOException;
-
-import com.google.common.base.Throwables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.io.compress.CompressionMetadata;
-import org.apache.cassandra.io.sstable.SSTableMultiWriter;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.TrackedDataInputPlus;
-import org.apache.cassandra.streaming.ProgressInfo;
-import org.apache.cassandra.streaming.StreamSession;
-import org.apache.cassandra.streaming.messages.StreamMessageHeader;
-import org.apache.cassandra.utils.ChecksumType;
-import org.apache.cassandra.utils.FBUtilities;
-
-import static org.apache.cassandra.utils.Throwables.extractIOExceptionCause;
-
-/**
- * CassandraStreamReader that reads from streamed compressed SSTable
- */
-public class CompressedCassandraStreamReader extends CassandraStreamReader
-{
-    private static final Logger logger = LoggerFactory.getLogger(CompressedCassandraStreamReader.class);
-
-    protected final CompressionInfo compressionInfo;
-
-    public CompressedCassandraStreamReader(StreamMessageHeader header, CassandraStreamHeader streamHeader, StreamSession session)
-    {
-        super(header, streamHeader, session);
-        this.compressionInfo = streamHeader.compressionInfo;
-    }
-
-    /**
-     * @return SSTable transferred
-     * @throws java.io.IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
-     */
-    @Override
-    @SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed
-    public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException
-    {
-        long totalSize = totalSize();
-
-        ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
-
-        if (cfs == null)
-        {
-            // schema was dropped during streaming
-            throw new IOException("CF " + tableId + " was dropped during streaming");
-        }
-
-        logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', pendingRepair = '{}', table = '{}'.",
-                     session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(), pendingRepair,
-                     cfs.getTableName());
-
-        StreamDeserializer deserializer = null;
-        SSTableMultiWriter writer = null;
-        try (CompressedInputStream cis = new CompressedInputStream(inputPlus, compressionInfo, ChecksumType.CRC32, cfs::getCrcCheckChance))
-        {
-            TrackedDataInputPlus in = new TrackedDataInputPlus(cis);
-            deserializer = new StreamDeserializer(cfs.metadata(), in, inputVersion, getHeader(cfs.metadata()));
-            writer = createWriter(cfs, totalSize, repairedAt, pendingRepair, format);
-            String filename = writer.getFilename();
-            int sectionIdx = 0;
-            for (SSTableReader.PartitionPositionBounds section : sections)
-            {
-                assert cis.getTotalCompressedBytesRead() <= totalSize;
-                long sectionLength = section.upperPosition - section.lowerPosition;
-
-                logger.trace("[Stream #{}] Reading section {} with length {} from stream.", session.planId(), sectionIdx++, sectionLength);
-                // skip to beginning of section inside chunk
-                cis.position(section.lowerPosition);
-                in.reset(0);
-
-                while (in.getBytesRead() < sectionLength)
-                {
-                    writePartition(deserializer, writer);
-                    // when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred
-                    session.progress(filename, ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize);
-                }
-            }
-            logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}", session.planId(), fileSeqNum,
-                         session.peer, FBUtilities.prettyPrintMemory(cis.getTotalCompressedBytesRead()), FBUtilities.prettyPrintMemory(totalSize));
-            return writer;
-        }
-        catch (Throwable e)
-        {
-            Object partitionKey = deserializer != null ? deserializer.partitionKey() : "";
-            logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.",
-                        session.planId(), partitionKey, cfs.keyspace.getName(), cfs.getTableName());
-            if (writer != null)
-            {
-                writer.abort(e);
-            }
-            if (extractIOExceptionCause(e).isPresent())
-                throw e;
-            throw Throwables.propagate(e);
-        }
-    }
-
-    @Override
-    protected long totalSize()
-    {
-        long size = 0;
-        // calculate total length of transferring chunks
-        for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
-            size += chunk.length + 4; // 4 bytes for CRC
-        return size;
-    }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org