You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2018/07/27 18:36:02 UTC
[1/3] cassandra git commit: Stream entire SSTables when possible
Repository: cassandra
Updated Branches:
refs/heads/trunk 6ba2fb939 -> 47a12c52a
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java
new file mode 100644
index 0000000..8256ac6
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.KeyIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.CachingParams;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.streaming.StreamOperation;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class CassandraOutgoingFileTest
+{
+ public static final String KEYSPACE = "CassandraOutgoingFileTest";
+ public static final String CF_STANDARD = "Standard1";
+ public static final String CF_INDEXED = "Indexed1";
+ public static final String CF_STANDARDLOWINDEXINTERVAL = "StandardLowIndexInterval";
+
+ private static SSTableReader sstable;
+ private static ColumnFamilyStore store;
+
+ @BeforeClass
+ public static void defineSchemaAndPrepareSSTable()
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE, CF_STANDARD),
+ SchemaLoader.compositeIndexCFMD(KEYSPACE, CF_INDEXED, true),
+ SchemaLoader.standardCFMD(KEYSPACE, CF_STANDARDLOWINDEXINTERVAL)
+ .minIndexInterval(8)
+ .maxIndexInterval(256)
+ .caching(CachingParams.CACHE_NOTHING));
+
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ store = keyspace.getColumnFamilyStore(CF_STANDARD);
+
+ // insert data and compact to a single sstable
+ CompactionManager.instance.disableAutoCompaction();
+ for (int j = 0; j < 10; j++)
+ {
+ new RowUpdateBuilder(store.metadata(), j, String.valueOf(j))
+ .clustering("0")
+ .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
+ .build()
+ .applyUnsafe();
+ }
+ store.forceBlockingFlush();
+ CompactionManager.instance.performMaximal(store, false);
+
+ sstable = store.getLiveSSTables().iterator().next();
+ }
+
+ @Test
+ public void validateFullyContainedIn_SingleContiguousRange_Succeeds()
+ {
+ List<Range<Token>> requestedRanges = Arrays.asList(new Range<>(store.getPartitioner().getMinimumToken(), sstable.last.getToken()));
+
+ CassandraOutgoingFile cof = new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.ref(),
+ sstable.getPositionsForRanges(requestedRanges),
+ requestedRanges, sstable.estimatedKeys());
+
+ assertTrue(cof.contained(requestedRanges, sstable));
+ }
+
+ @Test
+ public void validateFullyContainedIn_PartialOverlap_Fails()
+ {
+ List<Range<Token>> requestedRanges = Arrays.asList(new Range<>(store.getPartitioner().getMinimumToken(), getTokenAtIndex(2)));
+
+ CassandraOutgoingFile cof = new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.ref(),
+ sstable.getPositionsForRanges(requestedRanges),
+ requestedRanges, sstable.estimatedKeys());
+
+ assertFalse(cof.contained(requestedRanges, sstable));
+ }
+
+ @Test
+ public void validateFullyContainedIn_SplitRange_Succeeds()
+ {
+ List<Range<Token>> requestedRanges = Arrays.asList(new Range<>(store.getPartitioner().getMinimumToken(), getTokenAtIndex(4)),
+ new Range<>(getTokenAtIndex(2), getTokenAtIndex(6)),
+ new Range<>(getTokenAtIndex(5), sstable.last.getToken()));
+
+ CassandraOutgoingFile cof = new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.ref(),
+ sstable.getPositionsForRanges(requestedRanges),
+ requestedRanges, sstable.estimatedKeys());
+
+ assertTrue(cof.contained(requestedRanges, sstable));
+ }
+
+ private DecoratedKey getKeyAtIndex(int i)
+ {
+ int count = 0;
+ DecoratedKey key;
+
+ try (KeyIterator iter = new KeyIterator(sstable.descriptor, sstable.metadata()))
+ {
+ do
+ {
+ key = iter.next();
+ count++;
+ } while (iter.hasNext() && count < i);
+ }
+ return key;
+ }
+
+ private Token getTokenAtIndex(int i)
+ {
+ return getKeyAtIndex(i).getToken();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java
index 061a4b2..e48abf6 100644
--- a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java
+++ b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java
@@ -15,20 +15,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.cassandra.db.streaming;
-import java.util.ArrayList;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
import org.junit.Test;
import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
import org.apache.cassandra.db.SerializationHeader;
-import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.db.streaming.CassandraStreamHeader.CassandraStreamHeaderSerializer;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.big.BigFormat;
+import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.serializers.SerializationUtils;
+import org.apache.cassandra.utils.ByteBufferUtil;
public class CassandraStreamHeaderTest
{
@@ -37,14 +42,51 @@ public class CassandraStreamHeaderTest
{
String ddl = "CREATE TABLE tbl (k INT PRIMARY KEY, v INT)";
TableMetadata metadata = CreateTableStatement.parse(ddl, "ks").build();
- CassandraStreamHeader header = new CassandraStreamHeader(BigFormat.latestVersion,
- SSTableFormat.Type.BIG,
- 0,
- new ArrayList<>(),
- ((CompressionMetadata) null),
- 0,
- SerializationHeader.makeWithoutStats(metadata).toComponent());
+ CassandraStreamHeader header =
+ CassandraStreamHeader.builder()
+ .withSSTableFormat(SSTableFormat.Type.BIG)
+ .withSSTableVersion(BigFormat.latestVersion)
+ .withSSTableLevel(0)
+ .withEstimatedKeys(0)
+ .withSections(Collections.emptyList())
+ .withSerializationHeader(SerializationHeader.makeWithoutStats(metadata).toComponent())
+ .withTableId(metadata.id)
+ .build();
SerializationUtils.assertSerializationCycle(header, CassandraStreamHeader.serializer);
}
+
+ @Test
+ public void serializerTest_EntireSSTableTransfer()
+ {
+ String ddl = "CREATE TABLE tbl (k INT PRIMARY KEY, v INT)";
+ TableMetadata metadata = CreateTableStatement.parse(ddl, "ks").build();
+
+ ComponentManifest manifest = new ComponentManifest(new LinkedHashMap<Component, Long>() {{ put(Component.DATA, 100L); }});
+
+ CassandraStreamHeader header =
+ CassandraStreamHeader.builder()
+ .withSSTableFormat(SSTableFormat.Type.BIG)
+ .withSSTableVersion(BigFormat.latestVersion)
+ .withSSTableLevel(0)
+ .withEstimatedKeys(0)
+ .withSections(Collections.emptyList())
+ .withSerializationHeader(SerializationHeader.makeWithoutStats(metadata).toComponent())
+ .withComponentManifest(manifest)
+ .isEntireSSTable(true)
+ .withFirstKey(Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.EMPTY_BYTE_BUFFER))
+ .withTableId(metadata.id)
+ .build();
+
+ SerializationUtils.assertSerializationCycle(header, new TestableCassandraStreamHeaderSerializer());
+ }
+
+ private static class TestableCassandraStreamHeaderSerializer extends CassandraStreamHeaderSerializer
+ {
+ @Override
+ public CassandraStreamHeader deserialize(DataInputPlus in, int version) throws IOException
+ {
+ return deserialize(in, version, tableId -> Murmur3Partitioner.instance);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/db/streaming/ComponentManifestTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/streaming/ComponentManifestTest.java b/test/unit/org/apache/cassandra/db/streaming/ComponentManifestTest.java
new file mode 100644
index 0000000..f478a00
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/streaming/ComponentManifestTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.LinkedHashMap;
+
+import org.junit.Test;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.async.ByteBufDataInputPlus;
+import org.apache.cassandra.net.async.ByteBufDataOutputPlus;
+import org.apache.cassandra.serializers.SerializationUtils;
+
+import static org.junit.Assert.assertNotEquals;
+
+public class ComponentManifestTest
+{
+ @Test
+ public void testSerialization()
+ {
+ ComponentManifest expected = new ComponentManifest(new LinkedHashMap<Component, Long>() {{ put(Component.DATA, 100L); }});
+ SerializationUtils.assertSerializationCycle(expected, ComponentManifest.serializer);
+ }
+
+ @Test(expected = EOFException.class)
+ public void testSerialization_FailsOnBadBytes() throws IOException
+ {
+ ByteBuf buf = Unpooled.buffer(512);
+ ComponentManifest expected = new ComponentManifest(new LinkedHashMap<Component, Long>() {{ put(Component.DATA, 100L); }});
+
+ DataOutputPlus output = new ByteBufDataOutputPlus(buf);
+ ComponentManifest.serializer.serialize(expected, output, MessagingService.VERSION_40);
+
+ buf.setInt(0, -100);
+
+ DataInputPlus input = new ByteBufDataInputPlus(buf);
+ ComponentManifest actual = ComponentManifest.serializer.deserialize(input, MessagingService.VERSION_40);
+
+ assertNotEquals(expected, actual);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
index cee8802..fccb344 100644
--- a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
@@ -69,7 +69,10 @@ public class BigTableWriterTest extends AbstractTransactionalTest
private TestableBTW(Descriptor desc)
{
- this(desc, SSTableTxnWriter.create(cfs, desc, 0, 0, null, new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS)));
+ this(desc, SSTableTxnWriter.create(cfs, desc, 0, 0, null,
+ new SerializationHeader(true, cfs.metadata(),
+ cfs.metadata().regularAndStaticColumns(),
+ EncodingStats.NO_STATS)));
}
private TestableBTW(Descriptor desc, SSTableTxnWriter sw)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
index fcc9191..c61ee1f 100644
--- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
@@ -22,6 +22,7 @@ import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.UUID;
@@ -318,6 +319,7 @@ public class LegacySSTableTest
List<OutgoingStream> streams = Lists.newArrayList(new CassandraOutgoingFile(StreamOperation.OTHER,
sstable.ref(),
sstable.getPositionsForRanges(ranges),
+ ranges,
sstable.estimatedKeysForRanges(ranges)));
new StreamPlan(StreamOperation.OTHER).transferStreams(FBUtilities.getBroadcastAddressAndPort(), streams).execute().get();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriterTest.java
new file mode 100644
index 0000000..c3931e0
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriterTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable.format.big;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Set;
+import java.util.function.Function;
+
+import com.google.common.collect.ImmutableSet;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.Slices;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableReadsListener;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus;
+import org.apache.cassandra.schema.CachingParams;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
+
+import static org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class BigTableZeroCopyWriterTest
+{
+ public static final String KEYSPACE1 = "BigTableBlockWriterTest";
+ public static final String CF_STANDARD = "Standard1";
+ public static final String CF_STANDARD2 = "Standard2";
+ public static final String CF_INDEXED = "Indexed1";
+ public static final String CF_STANDARDLOWINDEXINTERVAL = "StandardLowIndexInterval";
+
+ public static SSTableReader sstable;
+ public static ColumnFamilyStore store;
+ private static int expectedRowCount;
+
+ @BeforeClass
+ public static void defineSchema() throws Exception
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE1,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD),
+ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2),
+ SchemaLoader.compositeIndexCFMD(KEYSPACE1, CF_INDEXED, true),
+ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARDLOWINDEXINTERVAL)
+ .minIndexInterval(8)
+ .maxIndexInterval(256)
+ .caching(CachingParams.CACHE_NOTHING));
+
+ String ks = KEYSPACE1;
+ String cf = "Standard1";
+
+ // clear and create just one sstable for this test
+ Keyspace keyspace = Keyspace.open(ks);
+ store = keyspace.getColumnFamilyStore(cf);
+ store.clearUnsafe();
+ store.disableAutoCompaction();
+
+ DecoratedKey firstKey = null, lastKey = null;
+ long timestamp = System.currentTimeMillis();
+ for (int i = 0; i < store.metadata().params.minIndexInterval; i++)
+ {
+ DecoratedKey key = Util.dk(String.valueOf(i));
+ if (firstKey == null)
+ firstKey = key;
+ if (lastKey == null)
+ lastKey = key;
+ if (store.metadata().partitionKeyType.compare(lastKey.getKey(), key.getKey()) < 0)
+ lastKey = key;
+
+ new RowUpdateBuilder(store.metadata(), timestamp, key.getKey())
+ .clustering("col")
+ .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
+ .build()
+ .applyUnsafe();
+ expectedRowCount++;
+ }
+ store.forceBlockingFlush();
+
+ sstable = store.getLiveSSTables().iterator().next();
+ }
+
+ @Test
+ public void writeDataFile_DataInputPlus()
+ {
+ writeDataTestCycle(buffer -> new DataInputStreamPlus(new ByteArrayInputStream(buffer.array())));
+ }
+
+ @Test
+ public void writeDataFile_RebufferingByteBufDataInputPlus()
+ {
+ writeDataTestCycle(buffer -> {
+ EmbeddedChannel channel = new EmbeddedChannel();
+ RebufferingByteBufDataInputPlus inputPlus = new RebufferingByteBufDataInputPlus(1 << 10, 1 << 20, channel.config());
+ inputPlus.append(Unpooled.wrappedBuffer(buffer));
+ return inputPlus;
+ });
+ }
+
+
+ private void writeDataTestCycle(Function<ByteBuffer, DataInputPlus> bufferMapper)
+ {
+ File dir = store.getDirectories().getDirectoryForNewSSTables();
+ Descriptor desc = store.newSSTableDescriptor(dir);
+ TableMetadataRef metadata = Schema.instance.getTableMetadataRef(desc);
+
+ LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.STREAM);
+ Set<Component> componentsToWrite = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX,
+ Component.STATS);
+
+ BigTableZeroCopyWriter btzcw = new BigTableZeroCopyWriter(desc, metadata, txn, componentsToWrite);
+
+ for (Component component : componentsToWrite)
+ {
+ if (Files.exists(Paths.get(desc.filenameFor(component))))
+ {
+ Pair<DataInputPlus, Long> pair = getSSTableComponentData(sstable, component, bufferMapper);
+
+ btzcw.writeComponent(component.type, pair.left, pair.right);
+ }
+ }
+
+ Collection<SSTableReader> readers = btzcw.finish(true);
+
+ SSTableReader reader = readers.toArray(new SSTableReader[0])[0];
+
+ assertNotEquals(sstable.getFilename(), reader.getFilename());
+ assertEquals(sstable.estimatedKeys(), reader.estimatedKeys());
+ assertEquals(sstable.isPendingRepair(), reader.isPendingRepair());
+
+ assertRowCount(expectedRowCount);
+ }
+
+ private void assertRowCount(int expected)
+ {
+ int count = 0;
+ for (int i = 0; i < store.metadata().params.minIndexInterval; i++)
+ {
+ DecoratedKey dk = Util.dk(String.valueOf(i));
+ UnfilteredRowIterator rowIter = sstable.iterator(dk,
+ Slices.ALL,
+ ColumnFilter.all(store.metadata()),
+ false,
+ SSTableReadsListener.NOOP_LISTENER);
+ while (rowIter.hasNext())
+ {
+ rowIter.next();
+ count++;
+ }
+ }
+ assertEquals(expected, count);
+ }
+
+ private Pair<DataInputPlus, Long> getSSTableComponentData(SSTableReader sstable, Component component,
+ Function<ByteBuffer, DataInputPlus> bufferMapper)
+ {
+ FileHandle componentFile = new FileHandle.Builder(sstable.descriptor.filenameFor(component))
+ .bufferSize(1024).complete();
+ ByteBuffer buffer = ByteBuffer.allocate((int) componentFile.channel.size());
+ componentFile.channel.read(buffer, 0);
+ buffer.flip();
+
+ DataInputPlus inputPlus = bufferMapper.apply(buffer);
+
+ return Pair.create(inputPlus, componentFile.channel.size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlusTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlusTest.java b/test/unit/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlusTest.java
index 2961d9a..69df040 100644
--- a/test/unit/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlusTest.java
+++ b/test/unit/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlusTest.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.net.async;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
import org.junit.After;
import org.junit.Assert;
@@ -28,7 +29,9 @@ import org.junit.Before;
import org.junit.Test;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
public class RebufferingByteBufDataInputPlusTest
{
@@ -151,4 +154,99 @@ public class RebufferingByteBufDataInputPlusTest
inputPlus.markClose();
Assert.assertEquals(size, inputPlus.available());
}
+
+ @Test
+ public void consumeUntil_SingleBuffer_Partial_HappyPath() throws IOException
+ {
+ consumeUntilTestCycle(1, 8, 0, 4);
+ }
+
+ @Test
+ public void consumeUntil_SingleBuffer_AllBytes_HappyPath() throws IOException
+ {
+ consumeUntilTestCycle(1, 8, 0, 8);
+ }
+
+ @Test
+ public void consumeUntil_MultipleBufferr_Partial_HappyPath() throws IOException
+ {
+ consumeUntilTestCycle(2, 8, 0, 13);
+ }
+
+ @Test
+ public void consumeUntil_MultipleBuffer_AllBytes_HappyPath() throws IOException
+ {
+ consumeUntilTestCycle(2, 8, 0, 16);
+ }
+
+ @Test(expected = EOFException.class)
+ public void consumeUntil_SingleBuffer_Fails() throws IOException
+ {
+ consumeUntilTestCycle(1, 8, 0, 9);
+ }
+
+ @Test(expected = EOFException.class)
+ public void consumeUntil_MultipleBuffer_Fails() throws IOException
+ {
+ consumeUntilTestCycle(2, 8, 0, 17);
+ }
+
+ private void consumeUntilTestCycle(int nBuffs, int buffSize, int startOffset, int len) throws IOException
+ {
+ byte[] expectedBytes = new byte[len];
+ int count = 0;
+ for (int j=0; j < nBuffs; j++)
+ {
+ ByteBuf buf = channel.alloc().buffer(buffSize);
+ for (int i = 0; i < buf.capacity(); i++)
+ {
+ buf.writeByte(j);
+ if (count >= startOffset && (count - startOffset) < len)
+ expectedBytes[count - startOffset] = (byte)j;
+ count++;
+ }
+
+ inputPlus.append(buf);
+ }
+ inputPlus.append(channel.alloc().buffer(0));
+
+ TestableWritableByteChannel wbc = new TestableWritableByteChannel(len);
+
+ inputPlus.skipBytesFully(startOffset);
+ BufferedDataOutputStreamPlus writer = new BufferedDataOutputStreamPlus(wbc);
+ inputPlus.consumeUntil(writer, len);
+
+ Assert.assertEquals(String.format("Test with {} buffers starting at {} consuming {} bytes", nBuffs, startOffset,
+ len), len, wbc.writtenBytes.readableBytes());
+
+ Assert.assertArrayEquals(expectedBytes, wbc.writtenBytes.array());
+ }
+
+ private static class TestableWritableByteChannel implements WritableByteChannel
+ {
+ private boolean isOpen = true;
+ public ByteBuf writtenBytes;
+
+ public TestableWritableByteChannel(int initialCapacity)
+ {
+ writtenBytes = Unpooled.buffer(initialCapacity);
+ }
+
+ public int write(ByteBuffer src) throws IOException
+ {
+ int size = src.remaining();
+ writtenBytes.writeBytes(src);
+ return size;
+ }
+
+ public boolean isOpen()
+ {
+ return isOpen;
+ }
+
+ public void close() throws IOException
+ {
+ isOpen = false;
+ }
+ };
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/serializers/SerializationUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/serializers/SerializationUtils.java b/test/unit/org/apache/cassandra/serializers/SerializationUtils.java
index 7ce4ec5..b88b56f 100644
--- a/test/unit/org/apache/cassandra/serializers/SerializationUtils.java
+++ b/test/unit/org/apache/cassandra/serializers/SerializationUtils.java
@@ -63,6 +63,5 @@ public class SerializationUtils
public static <T> void assertSerializationCycle(T src, IVersionedSerializer<T> serializer)
{
assertSerializationCycle(src, serializer, MessagingService.current_version);
-
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index 78b3094..8ebe333 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -92,7 +92,7 @@ public class StreamTransferTaskTest
{
List<Range<Token>> ranges = new ArrayList<>();
ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
- task.addTransferStream(new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.selfRef(), sstable.getPositionsForRanges(ranges), 1));
+ task.addTransferStream(new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.selfRef(), sstable.getPositionsForRanges(ranges), ranges, 1));
}
assertEquals(2, task.getTotalNumberOfFiles());
@@ -144,7 +144,7 @@ public class StreamTransferTaskTest
ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
Ref<SSTableReader> ref = sstable.selfRef();
refs.add(ref);
- task.addTransferStream(new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, ref, sstable.getPositionsForRanges(ranges), 1));
+ task.addTransferStream(new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, ref, sstable.getPositionsForRanges(ranges), ranges, 1));
}
assertEquals(2, task.getTotalNumberOfFiles());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 72b9cbe..bc501be 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -278,6 +278,7 @@ public class StreamingTransferTest
streams.add(new CassandraOutgoingFile(operation,
sstables.get(sstable),
sstable.getPositionsForRanges(ranges),
+ ranges,
sstable.estimatedKeysForRanges(ranges)));
}
return streams;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[2/3] cassandra git commit: Stream entire SSTables when possible
Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamWriter.java b/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamWriter.java
deleted file mode 100644
index c5b0c53..0000000
--- a/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamWriter.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.streaming;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.io.compress.CompressionMetadata;
-import org.apache.cassandra.io.sstable.Component;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.util.ChannelProxy;
-import org.apache.cassandra.io.util.DataOutputStreamPlus;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus;
-import org.apache.cassandra.streaming.ProgressInfo;
-import org.apache.cassandra.streaming.StreamSession;
-import org.apache.cassandra.utils.FBUtilities;
-
-/**
- * CassandraStreamWriter for compressed SSTable.
- */
-public class CompressedCassandraStreamWriter extends CassandraStreamWriter
-{
- private static final int CHUNK_SIZE = 1 << 16;
-
- private static final Logger logger = LoggerFactory.getLogger(CompressedCassandraStreamWriter.class);
-
- private final CompressionInfo compressionInfo;
-
- public CompressedCassandraStreamWriter(SSTableReader sstable, Collection<SSTableReader.PartitionPositionBounds> sections, CompressionInfo compressionInfo, StreamSession session)
- {
- super(sstable, sections, session);
- this.compressionInfo = compressionInfo;
- }
-
- @Override
- public void write(DataOutputStreamPlus out) throws IOException
- {
- assert out instanceof ByteBufDataOutputStreamPlus;
- ByteBufDataOutputStreamPlus output = (ByteBufDataOutputStreamPlus)out;
- long totalSize = totalSize();
- logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize = {}", session.planId(),
- sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize);
- try (ChannelProxy fc = sstable.getDataChannel().sharedCopy())
- {
- long progress = 0L;
- // calculate chunks to transfer. we want to send continuous chunks altogether.
- List<SSTableReader.PartitionPositionBounds> sections = getTransferSections(compressionInfo.chunks);
-
- int sectionIdx = 0;
-
- // stream each of the required sections of the file
- for (final SSTableReader.PartitionPositionBounds section : sections)
- {
- // length of the section to stream
- long length = section.upperPosition - section.lowerPosition;
-
- logger.trace("[Stream #{}] Writing section {} with length {} to stream.", session.planId(), sectionIdx++, length);
-
- // tracks write progress
- long bytesTransferred = 0;
- while (bytesTransferred < length)
- {
- final int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred);
- limiter.acquire(toTransfer);
-
- ByteBuffer outBuffer = ByteBuffer.allocateDirect(toTransfer);
- long lastWrite;
- try
- {
- lastWrite = fc.read(outBuffer, section.lowerPosition + bytesTransferred);
- assert lastWrite == toTransfer : String.format("could not read required number of bytes from file to be streamed: read %d bytes, wanted %d bytes", lastWrite, toTransfer);
- outBuffer.flip();
- output.writeToChannel(outBuffer);
- }
- catch (IOException e)
- {
- FileUtils.clean(outBuffer);
- throw e;
- }
-
- bytesTransferred += lastWrite;
- progress += lastWrite;
- session.progress(sstable.descriptor.filenameFor(Component.DATA), ProgressInfo.Direction.OUT, progress, totalSize);
- }
- }
- logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred = {}, totalSize = {}",
- session.planId(), sstable.getFilename(), session.peer, FBUtilities.prettyPrintMemory(progress), FBUtilities.prettyPrintMemory(totalSize));
- }
- }
-
- @Override
- protected long totalSize()
- {
- long size = 0;
- // calculate total length of transferring chunks
- for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
- size += chunk.length + 4; // 4 bytes for CRC
- return size;
- }
-
- // chunks are assumed to be sorted by offset
- private List<SSTableReader.PartitionPositionBounds> getTransferSections(CompressionMetadata.Chunk[] chunks)
- {
- List<SSTableReader.PartitionPositionBounds> transferSections = new ArrayList<>();
- SSTableReader.PartitionPositionBounds lastSection = null;
- for (CompressionMetadata.Chunk chunk : chunks)
- {
- if (lastSection != null)
- {
- if (chunk.offset == lastSection.upperPosition)
- {
- // extend previous section to end of this chunk
- lastSection = new SSTableReader.PartitionPositionBounds(lastSection.lowerPosition, chunk.offset + chunk.length + 4); // 4 bytes for CRC
- }
- else
- {
- transferSections.add(lastSection);
- lastSection = new SSTableReader.PartitionPositionBounds(chunk.offset, chunk.offset + chunk.length + 4);
- }
- }
- else
- {
- lastSection = new SSTableReader.PartitionPositionBounds(chunk.offset, chunk.offset + chunk.length + 4);
- }
- }
- if (lastSection != null)
- transferSections.add(lastSection);
- return transferSections;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java b/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java
index 2f56786..c0278e8 100644
--- a/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java
@@ -60,7 +60,7 @@ public class CompressedInputStream extends RebufferingInputStream implements Aut
private long bufferOffset = 0;
/**
- * The current {@link CompressedCassandraStreamReader#sections} offset in the stream.
+ * The current {@link CassandraCompressedStreamReader#sections} offset in the stream.
*/
private long current = 0;
@@ -98,7 +98,7 @@ public class CompressedInputStream extends RebufferingInputStream implements Aut
}
/**
- * Invoked when crossing into the next stream boundary in {@link CompressedCassandraStreamReader#sections}.
+ * Invoked when crossing into the next stream boundary in {@link CassandraCompressedStreamReader#sections}.
*/
public void position(long position) throws IOException
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/IStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/IStreamReader.java b/src/java/org/apache/cassandra/db/streaming/IStreamReader.java
new file mode 100644
index 0000000..cf93bc2
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/streaming/IStreamReader.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.util.DataInputPlus;
+
+/**
+ * This is the interface is used by the streaming code read a SSTable stream off a channel.
+ */
+public interface IStreamReader
+{
+ public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/io/sstable/Component.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Component.java b/src/java/org/apache/cassandra/io/sstable/Component.java
index 9daac7c..a81db85 100644
--- a/src/java/org/apache/cassandra/io/sstable/Component.java
+++ b/src/java/org/apache/cassandra/io/sstable/Component.java
@@ -33,6 +33,10 @@ public class Component
final static EnumSet<Type> TYPES = EnumSet.allOf(Type.class);
+ /**
+ * WARNING: Be careful while changing the names or string representation of the enum
+ * members. Streaming code depends on the names during streaming (Ref: CASSANDRA-14556).
+ */
public enum Type
{
// the base data for an sstable: the remaining components can be regenerated
@@ -60,6 +64,7 @@ public class Component
CUSTOM(null);
final String repr;
+
Type(String repr)
{
this.repr = repr;
@@ -120,7 +125,7 @@ public class Component
* @return the component corresponding to {@code name}. Note that this always return a component as an unrecognized
* name is parsed into a CUSTOM component.
*/
- static Component parse(String name)
+ public static Component parse(String name)
{
Type type = Type.fromRepresentation(name);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index ebc35e7..4ba0533 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -131,7 +131,7 @@ public class SSTableLoader implements StreamEventHandler
List<SSTableReader.PartitionPositionBounds> sstableSections = sstable.getPositionsForRanges(tokenRanges);
long estimatedKeys = sstable.estimatedKeysForRanges(tokenRanges);
Ref<SSTableReader> ref = sstable.ref();
- OutgoingStream stream = new CassandraOutgoingFile(StreamOperation.BULK_LOAD, ref, sstableSections, estimatedKeys);
+ OutgoingStream stream = new CassandraOutgoingFile(StreamOperation.BULK_LOAD, ref, sstableSections, tokenRanges, estimatedKeys);
streamingDetails.put(endpoint, stream);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriter.java
new file mode 100644
index 0000000..400f119
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriter.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.big;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.EnumMap;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.io.util.SequentialWriterOption;
+import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadataRef;
+
+import static java.lang.String.format;
+import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory;
+
+public class BigTableZeroCopyWriter extends SSTable implements SSTableMultiWriter
+{
+ private static final Logger logger = LoggerFactory.getLogger(BigTableZeroCopyWriter.class);
+
+ private final TableMetadataRef metadata;
+ private volatile SSTableReader finalReader;
+ private final Map<Component.Type, SequentialWriter> componentWriters;
+
+ private static final SequentialWriterOption WRITER_OPTION =
+ SequentialWriterOption.newBuilder()
+ .trickleFsync(false)
+ .bufferSize(2 << 20)
+ .bufferType(BufferType.OFF_HEAP)
+ .build();
+
+ private static final ImmutableSet<Component> SUPPORTED_COMPONENTS =
+ ImmutableSet.of(Component.DATA,
+ Component.PRIMARY_INDEX,
+ Component.SUMMARY,
+ Component.STATS,
+ Component.COMPRESSION_INFO,
+ Component.FILTER,
+ Component.DIGEST,
+ Component.CRC);
+
+ public BigTableZeroCopyWriter(Descriptor descriptor,
+ TableMetadataRef metadata,
+ LifecycleTransaction txn,
+ final Collection<Component> components)
+ {
+ super(descriptor, ImmutableSet.copyOf(components), metadata, DatabaseDescriptor.getDiskOptimizationStrategy());
+
+ txn.trackNew(this);
+ this.metadata = metadata;
+ this.componentWriters = new EnumMap<>(Component.Type.class);
+
+ if (!SUPPORTED_COMPONENTS.containsAll(components))
+ throw new AssertionError(format("Unsupported streaming component detected %s",
+ Sets.difference(ImmutableSet.copyOf(components), SUPPORTED_COMPONENTS)));
+
+ for (Component c : components)
+ componentWriters.put(c.type, makeWriter(descriptor, c));
+ }
+
+ private static SequentialWriter makeWriter(Descriptor descriptor, Component component)
+ {
+ return new SequentialWriter(new File(descriptor.filenameFor(component)), WRITER_OPTION, false);
+ }
+
+ private void write(DataInputPlus in, long size, SequentialWriter out) throws FSWriteError
+ {
+ final int BUFFER_SIZE = 1 << 20;
+ long bytesRead = 0;
+ byte[] buff = new byte[BUFFER_SIZE];
+ try
+ {
+ while (bytesRead < size)
+ {
+ int toRead = (int) Math.min(size - bytesRead, BUFFER_SIZE);
+ in.readFully(buff, 0, toRead);
+ int count = Math.min(toRead, BUFFER_SIZE);
+ out.write(buff, 0, count);
+ bytesRead += count;
+ }
+ out.sync();
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, out.getPath());
+ }
+ }
+
+ @Override
+ public boolean append(UnfilteredRowIterator partition)
+ {
+ throw new UnsupportedOperationException("Operation not supported by BigTableBlockWriter");
+ }
+
+ @Override
+ public Collection<SSTableReader> finish(long repairedAt, long maxDataAge, boolean openResult)
+ {
+ return finish(openResult);
+ }
+
+ @Override
+ public Collection<SSTableReader> finish(boolean openResult)
+ {
+ setOpenResult(openResult);
+ return finished();
+ }
+
+ @Override
+ public Collection<SSTableReader> finished()
+ {
+ if (finalReader == null)
+ finalReader = SSTableReader.open(descriptor, components, metadata);
+
+ return ImmutableList.of(finalReader);
+ }
+
+ @Override
+ public SSTableMultiWriter setOpenResult(boolean openResult)
+ {
+ return null;
+ }
+
+ @Override
+ public long getFilePointer()
+ {
+ return 0;
+ }
+
+ @Override
+ public TableId getTableId()
+ {
+ return metadata.id;
+ }
+
+ @Override
+ public Throwable commit(Throwable accumulate)
+ {
+ for (SequentialWriter writer : componentWriters.values())
+ accumulate = writer.commit(accumulate);
+ return accumulate;
+ }
+
+ @Override
+ public Throwable abort(Throwable accumulate)
+ {
+ for (SequentialWriter writer : componentWriters.values())
+ accumulate = writer.abort(accumulate);
+ return accumulate;
+ }
+
+ @Override
+ public void prepareToCommit()
+ {
+ for (SequentialWriter writer : componentWriters.values())
+ writer.prepareToCommit();
+ }
+
+ @Override
+ public void close()
+ {
+ for (SequentialWriter writer : componentWriters.values())
+ writer.close();
+ }
+
+ public void writeComponent(Component.Type type, DataInputPlus in, long size)
+ {
+ logger.info("Writing component {} to {} length {}", type, componentWriters.get(type).getPath(), prettyPrintMemory(size));
+
+ if (in instanceof RebufferingByteBufDataInputPlus)
+ write((RebufferingByteBufDataInputPlus) in, size, componentWriters.get(type));
+ else
+ write(in, size, componentWriters.get(type));
+ }
+
+ private void write(RebufferingByteBufDataInputPlus in, long size, SequentialWriter writer)
+ {
+ logger.info("Block Writing component to {} length {}", writer.getPath(), prettyPrintMemory(size));
+
+ try
+ {
+ long bytesWritten = in.consumeUntil(writer, size);
+
+ if (bytesWritten != size)
+ throw new IOException(format("Failed to read correct number of bytes from channel %s", writer));
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, writer.getPath());
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
index 54122ee..56d88f7 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
@@ -24,11 +24,9 @@ import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.WritableByteChannel;
-import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import net.nicoulaj.compilecommand.annotations.DontInline;
-
import org.apache.cassandra.config.Config;
import org.apache.cassandra.utils.memory.MemoryUtil;
import org.apache.cassandra.utils.vint.VIntCoding;
@@ -341,7 +339,7 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
}
@Override
- public <R> R applyToChannel(Function<WritableByteChannel, R> f) throws IOException
+ public <R> R applyToChannel(CheckedFunction<WritableByteChannel, R, IOException> f) throws IOException
{
if (strictFlushing)
throw new UnsupportedOperationException();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/io/util/CheckedFunction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CheckedFunction.java b/src/java/org/apache/cassandra/io/util/CheckedFunction.java
new file mode 100644
index 0000000..ec1ce9f
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/CheckedFunction.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+@FunctionalInterface
+public interface CheckedFunction<T, R, E extends Exception>
+{
+ R apply(T t) throws E;
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
index a9dbb68..16be42f 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
@@ -22,8 +22,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
-import com.google.common.base.Function;
-
import org.apache.cassandra.utils.vint.VIntCoding;
/**
@@ -41,7 +39,7 @@ public interface DataOutputPlus extends DataOutput
* Safe way to operate against the underlying channel. Impossible to stash a reference to the channel
* and forget to flush
*/
- <R> R applyToChannel(Function<WritableByteChannel, R> c) throws IOException;
+ <R> R applyToChannel(CheckedFunction<WritableByteChannel, R, IOException> c) throws IOException;
default void writeVInt(long i) throws IOException
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java b/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
index 086f5c9..ef51888 100644
--- a/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
+++ b/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
@@ -25,12 +25,12 @@ import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import com.google.common.base.Preconditions;
+
import net.nicoulaj.compilecommand.annotations.DontInline;
import org.apache.cassandra.utils.FastByteOperations;
import org.apache.cassandra.utils.vint.VIntCoding;
-import com.google.common.base.Preconditions;
-
/**
* Rough equivalent of BufferedInputStream and DataInputStream wrapping a ByteBuffer that can be refilled
* via rebuffer. Implementations provide this buffer from various channels (socket, file, memory, etc).
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index e71f2fa..3eb1a7d 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -138,11 +138,22 @@ public class SequentialWriter extends BufferedDataOutputStreamPlus implements Tr
*/
public SequentialWriter(File file, SequentialWriterOption option)
{
+ this(file, option, true);
+ }
+
+ /**
+ * Create SequentialWriter for given file with specific writer option.
+ * @param file
+ * @param option
+ * @param strictFlushing
+ */
+ public SequentialWriter(File file, SequentialWriterOption option, boolean strictFlushing)
+ {
super(openChannel(file), option.allocateBuffer());
- strictFlushing = true;
- fchannel = (FileChannel)channel;
+ this.strictFlushing = strictFlushing;
+ this.fchannel = (FileChannel)channel;
- filePath = file.getAbsolutePath();
+ this.filePath = file.getAbsolutePath();
this.option = option;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java
index 54b4cb1..d9ef010 100644
--- a/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java
+++ b/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java
@@ -378,7 +378,7 @@ public abstract class UnbufferedDataOutputStreamPlus extends DataOutputStreamPlu
}
@Override
- public <R> R applyToChannel(Function<WritableByteChannel, R> f) throws IOException
+ public <R> R applyToChannel(CheckedFunction<WritableByteChannel, R, IOException> f) throws IOException
{
return f.apply(channel);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/net/async/ByteBufDataOutputPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/ByteBufDataOutputPlus.java b/src/java/org/apache/cassandra/net/async/ByteBufDataOutputPlus.java
index 0473465..a77cb07 100644
--- a/src/java/org/apache/cassandra/net/async/ByteBufDataOutputPlus.java
+++ b/src/java/org/apache/cassandra/net/async/ByteBufDataOutputPlus.java
@@ -22,10 +22,9 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
-import com.google.common.base.Function;
-
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
+import org.apache.cassandra.io.util.CheckedFunction;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.io.util.Memory;
import org.apache.cassandra.io.util.UnbufferedDataOutputStreamPlus;
@@ -82,7 +81,7 @@ public class ByteBufDataOutputPlus extends ByteBufOutputStream implements DataOu
}
@Override
- public <R> R applyToChannel(Function<WritableByteChannel, R> c) throws IOException
+ public <R> R applyToChannel(CheckedFunction<WritableByteChannel, R, IOException> c) throws IOException
{
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/net/async/ByteBufDataOutputStreamPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/ByteBufDataOutputStreamPlus.java b/src/java/org/apache/cassandra/net/async/ByteBufDataOutputStreamPlus.java
index 3a544e4..777bc3e 100644
--- a/src/java/org/apache/cassandra/net/async/ByteBufDataOutputStreamPlus.java
+++ b/src/java/org/apache/cassandra/net/async/ByteBufDataOutputStreamPlus.java
@@ -20,11 +20,14 @@ package org.apache.cassandra.net.async;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.Uninterruptibles;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
@@ -35,6 +38,7 @@ import io.netty.util.concurrent.Future;
import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
import org.apache.cassandra.streaming.StreamSession;
/**
@@ -49,6 +53,7 @@ public class ByteBufDataOutputStreamPlus extends BufferedDataOutputStreamPlus
private final StreamSession session;
private final Channel channel;
private final int bufferSize;
+ private final Logger logger = LoggerFactory.getLogger(ByteBufDataOutputStreamPlus.class);
/**
* Tracks how many bytes we've written to the netty channel. This more or less follows the channel's
@@ -70,7 +75,6 @@ public class ByteBufDataOutputStreamPlus extends BufferedDataOutputStreamPlus
this.channel = channel;
this.currentBuf = buffer;
this.bufferSize = bufferSize;
-
channelRateLimiter = new Semaphore(channel.config().getWriteBufferHighWaterMark(), true);
}
@@ -114,8 +118,9 @@ public class ByteBufDataOutputStreamPlus extends BufferedDataOutputStreamPlus
doFlush(buffer.position());
int byteCount = buf.readableBytes();
+
if (!Uninterruptibles.tryAcquireUninterruptibly(channelRateLimiter, byteCount, 5, TimeUnit.MINUTES))
- throw new IOException("outbound channel was not writable");
+ throw new IOException(String.format("outbound channel was not writable. Failed to acquire sufficient permits %d", byteCount));
// the (possibly naive) assumption that we should always flush after each incoming buf
ChannelFuture channelFuture = channel.writeAndFlush(buf);
@@ -135,6 +140,53 @@ public class ByteBufDataOutputStreamPlus extends BufferedDataOutputStreamPlus
return channelFuture;
}
+ /**
+ * Writes all data in file channel to stream BUFFER_SIZE at a time.
+ * Closes file channel when done
+ *
+ * @param f
+ * @return number of bytes transferred
+ * @throws IOException
+ */
+ public long writeToChannel(FileChannel f, StreamRateLimiter limiter) throws IOException
+ {
+ final long length = f.size();
+ long bytesTransferred = 0;
+
+ try
+ {
+ while (bytesTransferred < length)
+ {
+ int toRead = (int) Math.min(bufferSize, length - bytesTransferred);
+ NonClosingDefaultFileRegion fileRegion = new NonClosingDefaultFileRegion(f, bytesTransferred, toRead);
+
+ if (!Uninterruptibles.tryAcquireUninterruptibly(channelRateLimiter, toRead, 5, TimeUnit.MINUTES))
+ throw new IOException(String.format("outbound channel was not writable. Failed to acquire sufficient permits %d", toRead));
+
+ limiter.acquire(toRead);
+
+ bytesTransferred += toRead;
+ final boolean shouldClose = (bytesTransferred == length); // this is the last buffer, can safely close channel
+
+ channel.writeAndFlush(fileRegion).addListener(future -> {
+ handleBuffer(future, toRead);
+
+ if ((shouldClose || !future.isSuccess()) && f.isOpen())
+ f.close();
+ });
+ logger.trace("{} of {} (toRead {} cs {})", bytesTransferred, length, toRead, f.isOpen());
+ }
+
+ return bytesTransferred;
+ } catch (Exception e)
+ {
+ if (f.isOpen())
+ f.close();
+
+ throw e;
+ }
+ }
+
@Override
protected void doFlush(int count) throws IOException
{
@@ -145,7 +197,7 @@ public class ByteBufDataOutputStreamPlus extends BufferedDataOutputStreamPlus
currentBuf.writerIndex(byteCount);
if (!Uninterruptibles.tryAcquireUninterruptibly(channelRateLimiter, byteCount, 2, TimeUnit.MINUTES))
- throw new IOException("outbound channel was not writable");
+ throw new IOException(String.format("outbound channel was not writable. Failed to acquire sufficient permits %d", byteCount));
channel.writeAndFlush(currentBuf).addListener(future -> handleBuffer(future, byteCount));
currentBuf = channel.alloc().directBuffer(bufferSize, bufferSize);
@@ -161,7 +213,7 @@ public class ByteBufDataOutputStreamPlus extends BufferedDataOutputStreamPlus
private void handleBuffer(Future<? super Void> future, int bytesWritten)
{
channelRateLimiter.release(bytesWritten);
-
+ logger.trace("bytesWritten {} {} because {}", bytesWritten, (future.isSuccess() == true) ? "Succeeded" : "Failed", future.cause());
if (!future.isSuccess() && channel.isOpen())
session.onError(future.cause());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/net/async/NonClosingDefaultFileRegion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/NonClosingDefaultFileRegion.java b/src/java/org/apache/cassandra/net/async/NonClosingDefaultFileRegion.java
new file mode 100644
index 0000000..46f0ce1
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/async/NonClosingDefaultFileRegion.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.async;
+
+import java.io.File;
+import java.nio.channels.FileChannel;
+
+import io.netty.channel.DefaultFileRegion;
+
+/**
+ * Netty's DefaultFileRegion closes the underlying FileChannel as soon as
+ * the refCnt() for the region drops to zero, this is an implementation of
+ * the DefaultFileRegion that doesn't close the FileChannel.
+ *
+ * See {@link ByteBufDataOutputStreamPlus} for its usage.
+ */
+public class NonClosingDefaultFileRegion extends DefaultFileRegion
+{
+
+ public NonClosingDefaultFileRegion(FileChannel file, long position, long count)
+ {
+ super(file, position, count);
+ }
+
+ public NonClosingDefaultFileRegion(File f, long position, long count)
+ {
+ super(f, position, count);
+ }
+
+ @Override
+ protected void deallocate()
+ {
+ // Overridden to avoid closing the file
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java b/src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java
index 1f32aa8..4e667da 100644
--- a/src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java
+++ b/src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java
@@ -31,6 +31,7 @@ import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelConfig;
import io.netty.util.ReferenceCountUtil;
+import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
import org.apache.cassandra.io.util.RebufferingInputStream;
public class RebufferingByteBufDataInputPlus extends RebufferingInputStream implements ReadableByteChannel
@@ -249,4 +250,42 @@ public class RebufferingByteBufDataInputPlus extends RebufferingInputStream impl
{
return channelConfig.getAllocator();
}
+
+ /**
+ * Consumes bytes in the stream until the given length
+ *
+ * @param writer
+ * @param len
+ * @return
+ * @throws IOException
+ */
+ public long consumeUntil(BufferedDataOutputStreamPlus writer, long len) throws IOException
+ {
+ long copied = 0; // number of bytes copied
+ while (copied < len)
+ {
+ if (buffer.remaining() == 0)
+ {
+ try
+ {
+ reBuffer();
+ }
+ catch (EOFException e)
+ {
+ throw new EOFException("EOF after " + copied + " bytes out of " + len);
+ }
+ if (buffer.remaining() == 0 && copied < len)
+ throw new AssertionError("reBuffer() failed to return data");
+ }
+
+ int originalLimit = buffer.limit();
+ int toCopy = (int) Math.min(len - copied, buffer.remaining());
+ buffer.limit(buffer.position() + toCopy);
+ int written = writer.applyToChannel(c -> c.write(buffer));
+ buffer.limit(originalLimit);
+ copied += written;
+ }
+
+ return copied;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
index 34b0bbd..7eada28 100644
--- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
+++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
@@ -282,7 +282,8 @@ public class StreamCoordinator
// create
if (streamSessions.size() < connectionsPerHost)
{
- StreamSession session = new StreamSession(streamOperation, peer, factory, streamSessions.size(), pendingRepair, previewKind);
+ StreamSession session = new StreamSession(streamOperation, peer, factory, streamSessions.size(),
+ pendingRepair, previewKind);
streamSessions.put(++lastReturned, session);
return session;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 49beba1..0a96f4c 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -49,6 +49,7 @@ public class StreamReceiveTask extends StreamTask
private volatile boolean done = false;
private int remoteStreamsReceived = 0;
+ private long bytesReceived = 0;
public StreamReceiveTask(StreamSession session, TableId tableId, int totalStreams, long totalSize)
{
@@ -76,8 +77,10 @@ public class StreamReceiveTask extends StreamTask
}
remoteStreamsReceived++;
+ bytesReceived += stream.getSize();
Preconditions.checkArgument(tableId.equals(stream.getTableId()));
- logger.debug("recevied {} of {} total files", remoteStreamsReceived, totalStreams);
+ logger.debug("received {} of {} total files {} of total bytes {}", remoteStreamsReceived, totalStreams,
+ bytesReceived, totalSize);
receiver.received(stream);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index ef8976d..4de63be 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -17,10 +17,8 @@
*/
package org.apache.cassandra.streaming;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.*;
+import java.util.Collection;
+import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import com.google.common.util.concurrent.AbstractFuture;
@@ -78,7 +76,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
this(planId, streamOperation, new StreamCoordinator(streamOperation, 0, new DefaultConnectionFactory(), false, pendingRepair, previewKind));
}
- static StreamResultFuture init(UUID planId, StreamOperation streamOperation, Collection<StreamEventHandler> listeners,
+ public static StreamResultFuture init(UUID planId, StreamOperation streamOperation, Collection<StreamEventHandler> listeners,
StreamCoordinator coordinator)
{
StreamResultFuture future = createAndRegister(planId, streamOperation, coordinator);
@@ -112,8 +110,9 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
StreamResultFuture future = StreamManager.instance.getReceivingStream(planId);
if (future == null)
{
- logger.info("[Stream #{} ID#{}] Creating new streaming plan for {} from {} channel.remote {} channel.local {} channel.id {}",
- planId, sessionIndex, streamOperation.getDescription(), from, channel.remoteAddress(), channel.localAddress(), channel.id());
+ logger.info("[Stream #{} ID#{}] Creating new streaming plan for {} from {} channel.remote {} channel.local {}" +
+ " channel.id {}", planId, sessionIndex, streamOperation.getDescription(),
+ from, channel.remoteAddress(), channel.localAddress(), channel.id());
// The main reason we create a StreamResultFuture on the receiving side is for JMX exposure.
future = new StreamResultFuture(planId, streamOperation, pendingRepair, previewKind);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index c56616e..393cd24 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -202,7 +202,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber
this.pendingRepair = pendingRepair;
this.previewKind = previewKind;
- logger.debug("Creating stream session peer={} preferredPeerInetAddressAndPort={}", peer, preferredPeerInetAddressAndPort);
+ logger.debug("Creating stream session peer={} preferredPeerInetAddressAndPort={}", peer,
+ preferredPeerInetAddressAndPort);
}
public UUID planId()
@@ -777,7 +778,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber
FBUtilities.waitOnFutures(flushes);
}
- private synchronized void prepareReceiving(StreamSummary summary)
+ @VisibleForTesting
+ public synchronized void prepareReceiving(StreamSummary summary)
{
failIfFinished();
if (summary.files > 0)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
index bff77cf..3fa80f5 100644
--- a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
+++ b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
@@ -322,7 +322,7 @@ public class NettyStreamingMessageSender implements StreamingMessageSender
throw new IllegalStateException("channel's transferring state is currently set to true. refusing to start new stream");
// close the DataOutputStreamPlus as we're done with it - but don't close the channel
- try (DataOutputStreamPlus outPlus = ByteBufDataOutputStreamPlus.create(session, channel, 1 << 16))
+ try (DataOutputStreamPlus outPlus = ByteBufDataOutputStreamPlus.create(session, channel, 1 << 20))
{
StreamMessage.serialize(msg, outPlus, protocolVersion, session);
channel.flush();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java b/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
index 81fe8cd..7c10ef9 100644
--- a/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
+++ b/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
@@ -62,7 +62,7 @@ public class StreamingInboundHandler extends ChannelInboundHandlerAdapter
static final Function<SessionIdentifier, StreamSession> DEFAULT_SESSION_PROVIDER = sid -> StreamManager.instance.findSession(sid.from, sid.planId, sid.sessionIndex);
private static final int AUTO_READ_LOW_WATER_MARK = 1 << 15;
- private static final int AUTO_READ_HIGH_WATER_MARK = 1 << 16;
+ private static final int AUTO_READ_HIGH_WATER_MARK = 1 << 20;
private final InetAddressAndPort remoteAddress;
private final int protocolVersion;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
index fbd3e21..a591a43 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
@@ -47,7 +47,8 @@ public class StreamInitMessage extends StreamMessage
public final UUID pendingRepair;
public final PreviewKind previewKind;
- public StreamInitMessage(InetAddressAndPort from, int sessionIndex, UUID planId, StreamOperation streamOperation, UUID pendingRepair, PreviewKind previewKind)
+ public StreamInitMessage(InetAddressAndPort from, int sessionIndex, UUID planId, StreamOperation streamOperation,
+ UUID pendingRepair, PreviewKind previewKind)
{
super(Type.STREAM_INIT);
this.from = from;
@@ -93,7 +94,8 @@ public class StreamInitMessage extends StreamMessage
UUID pendingRepair = in.readBoolean() ? UUIDSerializer.serializer.deserialize(in, version) : null;
PreviewKind previewKind = PreviewKind.deserialize(in.readInt());
- return new StreamInitMessage(from, sessionIndex, planId, StreamOperation.fromString(description), pendingRepair, previewKind);
+ return new StreamInitMessage(from, sessionIndex, planId, StreamOperation.fromString(description),
+ pendingRepair, previewKind);
}
public long serializedSize(StreamInitMessage message, int version)
@@ -108,6 +110,7 @@ public class StreamInitMessage extends StreamMessage
size += UUIDSerializer.serializer.serializedSize(message.pendingRepair, MessagingService.current_version);
}
size += TypeSizes.sizeof(message.previewKind.getSerializationVal());
+
return size;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/utils/Collectors3.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/Collectors3.java b/src/java/org/apache/cassandra/utils/Collectors3.java
new file mode 100644
index 0000000..f8f262e
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/Collectors3.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collector;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+/**
+ * Some extra Collector implementations.
+ *
+ * Named Collectors3 just in case Guava ever makes a Collectors2
+ */
+public class Collectors3
+{
+ private static final Collector.Characteristics[] LIST_CHARACTERISTICS = new Collector.Characteristics[] { };
+ public static <T> Collector<T, ?, List<T>> toImmutableList()
+ {
+ return Collector.of(ImmutableList.Builder<T>::new,
+ ImmutableList.Builder<T>::add,
+ (l, r) -> l.addAll(r.build()),
+ ImmutableList.Builder<T>::build,
+ LIST_CHARACTERISTICS);
+ }
+
+ private static final Collector.Characteristics[] SET_CHARACTERISTICS = new Collector.Characteristics[] { Collector.Characteristics.UNORDERED };
+ public static <T> Collector<T, ?, Set<T>> toImmutableSet()
+ {
+ return Collector.of(ImmutableSet.Builder<T>::new,
+ ImmutableSet.Builder<T>::add,
+ (l, r) -> l.addAll(r.build()),
+ ImmutableSet.Builder<T>::build,
+ SET_CHARACTERISTICS);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index 5893bab..3c09637 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -46,3 +46,5 @@ enable_user_defined_functions: true
enable_scripted_user_defined_functions: true
prepared_statements_cache_size_mb: 1
corrupted_tombstone_strategy: exception
+stream_entire_sstables: true
+stream_throughput_outbound_megabits_per_sec: 200000000
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
index bd7ef20..01e67f0 100644
--- a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
+++ b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
@@ -29,10 +29,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.schema.CompressionParams;
-import org.apache.cassandra.schema.TableMetadataRef;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.Keyspace;
@@ -41,6 +38,9 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.io.sstable.CQLSSTableWriter;
import org.apache.cassandra.io.sstable.SSTableLoader;
+import org.apache.cassandra.schema.CompressionParams;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadataRef;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.OutputHandler;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
----------------------------------------------------------------------
diff --git a/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
new file mode 100644
index 0000000..3192bcc
--- /dev/null
+++ b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test.microbench;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultFileRegion;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.streaming.CassandraEntireSSTableStreamReader;
+import org.apache.cassandra.db.streaming.CassandraEntireSSTableStreamWriter;
+import org.apache.cassandra.db.streaming.CassandraOutgoingFile;
+import org.apache.cassandra.db.streaming.CassandraStreamHeader;
+import org.apache.cassandra.db.streaming.CassandraStreamReader;
+import org.apache.cassandra.db.streaming.CassandraStreamWriter;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus;
+import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus;
+import org.apache.cassandra.schema.CachingParams;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.streaming.DefaultConnectionFactory;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.streaming.SessionInfo;
+import org.apache.cassandra.streaming.StreamCoordinator;
+import org.apache.cassandra.streaming.StreamEventHandler;
+import org.apache.cassandra.streaming.StreamOperation;
+import org.apache.cassandra.streaming.StreamResultFuture;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.StreamSummary;
+import org.apache.cassandra.streaming.messages.StreamMessageHeader;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * Please ensure that this benchmark is run with stream_throughput_outbound_megabits_per_sec set to a
+ * really high value otherwise, throttling will kick in and the results will not be meaningful.
+ */
+@Warmup(iterations = 1, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS)
+@Fork(value = 1)
+@Threads(1)
+public class ZeroCopyStreamingBenchmark
+{
+ static final int STREAM_SIZE = 50 * 1024 * 1024;
+
+ @State(Scope.Thread)
+ public static class BenchmarkState
+ {
+ public static final String KEYSPACE = "ZeroCopyStreamingBenchmark";
+ public static final String CF_STANDARD = "Standard1";
+ public static final String CF_INDEXED = "Indexed1";
+ public static final String CF_STANDARDLOWINDEXINTERVAL = "StandardLowIndexInterval";
+
+ private static SSTableReader sstable;
+ private static ColumnFamilyStore store;
+ private StreamSession session;
+ private CassandraEntireSSTableStreamWriter blockStreamWriter;
+ private ByteBuf serializedBlockStream;
+ private InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
+ private CassandraEntireSSTableStreamReader blockStreamReader;
+ private CassandraStreamWriter partialStreamWriter;
+ private CassandraStreamReader partialStreamReader;
+ private ByteBuf serializedPartialStream;
+
+ @Setup
+ public void setupBenchmark() throws IOException
+ {
+ Keyspace keyspace = setupSchemaAndKeySpace();
+ store = keyspace.getColumnFamilyStore("Standard1");
+ generateData();
+
+ sstable = store.getLiveSSTables().iterator().next();
+ session = setupStreamingSessionForTest();
+ blockStreamWriter = new CassandraEntireSSTableStreamWriter(sstable, session, CassandraOutgoingFile.getComponentManifest(sstable));
+
+ CapturingNettyChannel blockStreamCaptureChannel = new CapturingNettyChannel(STREAM_SIZE);
+ ByteBufDataOutputStreamPlus out = ByteBufDataOutputStreamPlus.create(session, blockStreamCaptureChannel, 1024 * 1024);
+ blockStreamWriter.write(out);
+ serializedBlockStream = blockStreamCaptureChannel.getSerializedStream();
+ out.close();
+
+ session.prepareReceiving(new StreamSummary(sstable.metadata().id, 1, serializedBlockStream.readableBytes()));
+
+ CassandraStreamHeader entireSSTableStreamHeader =
+ CassandraStreamHeader.builder()
+ .withSSTableFormat(sstable.descriptor.formatType)
+ .withSSTableVersion(sstable.descriptor.version)
+ .withSSTableLevel(0)
+ .withEstimatedKeys(sstable.estimatedKeys())
+ .withSections(Collections.emptyList())
+ .withSerializationHeader(sstable.header.toComponent())
+ .withComponentManifest(CassandraOutgoingFile.getComponentManifest(sstable))
+ .isEntireSSTable(true)
+ .withFirstKey(sstable.first)
+ .withTableId(sstable.metadata().id)
+ .build();
+
+ blockStreamReader = new CassandraEntireSSTableStreamReader(new StreamMessageHeader(sstable.metadata().id,
+ peer, session.planId(),
+ 0, 0, 0,
+ null), entireSSTableStreamHeader, session);
+
+ List<Range<Token>> requestedRanges = Arrays.asList(new Range<>(sstable.first.minValue().getToken(), sstable.last.getToken()));
+ partialStreamWriter = new CassandraStreamWriter(sstable, sstable.getPositionsForRanges(requestedRanges), session);
+
+ CapturingNettyChannel partialStreamChannel = new CapturingNettyChannel(STREAM_SIZE);
+ partialStreamWriter.write(ByteBufDataOutputStreamPlus.create(session, partialStreamChannel, 1024 * 1024));
+ serializedPartialStream = partialStreamChannel.getSerializedStream();
+
+ CassandraStreamHeader partialSSTableStreamHeader =
+ CassandraStreamHeader.builder()
+ .withSSTableFormat(sstable.descriptor.formatType)
+ .withSSTableVersion(sstable.descriptor.version)
+ .withSSTableLevel(0)
+ .withEstimatedKeys(sstable.estimatedKeys())
+ .withSections(sstable.getPositionsForRanges(requestedRanges))
+ .withSerializationHeader(sstable.header.toComponent())
+ .withTableId(sstable.metadata().id)
+ .build();
+
+ partialStreamReader = new CassandraStreamReader(new StreamMessageHeader(sstable.metadata().id,
+ peer, session.planId(),
+ 0, 0, 0,
+ null),
+ partialSSTableStreamHeader, session);
+ }
+
+ private Keyspace setupSchemaAndKeySpace()
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE, CF_STANDARD),
+ SchemaLoader.compositeIndexCFMD(KEYSPACE, CF_INDEXED, true),
+ SchemaLoader.standardCFMD(KEYSPACE, CF_STANDARDLOWINDEXINTERVAL)
+ .minIndexInterval(8)
+ .maxIndexInterval(256)
+ .caching(CachingParams.CACHE_NOTHING));
+
+ return Keyspace.open(KEYSPACE);
+ }
+
+ private void generateData()
+ {
+ // insert data and compact to a single sstable
+ CompactionManager.instance.disableAutoCompaction();
+ for (int j = 0; j < 1_000_000; j++)
+ {
+ new RowUpdateBuilder(store.metadata(), j, String.valueOf(j))
+ .clustering("0")
+ .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
+ .build()
+ .applyUnsafe();
+ }
+ store.forceBlockingFlush();
+ CompactionManager.instance.performMaximal(store, false);
+ }
+
+ @TearDown
+ public void tearDown() throws IOException
+ {
+ SchemaLoader.cleanupAndLeaveDirs();
+ CommitLog.instance.stopUnsafe(true);
+ }
+
+ private StreamSession setupStreamingSessionForTest()
+ {
+ StreamCoordinator streamCoordinator = new StreamCoordinator(StreamOperation.BOOTSTRAP, 1, new DefaultConnectionFactory(), false, null, PreviewKind.NONE);
+ StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), StreamOperation.BOOTSTRAP, Collections.<StreamEventHandler>emptyList(), streamCoordinator);
+
+ InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
+ streamCoordinator.addSessionInfo(new SessionInfo(peer, 0, peer, Collections.emptyList(), Collections.emptyList(), StreamSession.State.INITIALIZED));
+
+ StreamSession session = streamCoordinator.getOrCreateNextSession(peer);
+ session.init(future);
+ return session;
+ }
+ }
+
+ @Benchmark
+ @BenchmarkMode(Mode.Throughput)
+ public void blockStreamWriter(BenchmarkState state) throws Exception
+ {
+ EmbeddedChannel channel = createMockNettyChannel();
+ ByteBufDataOutputStreamPlus out = ByteBufDataOutputStreamPlus.create(state.session, channel, 1024 * 1024);
+ state.blockStreamWriter.write(out);
+ out.close();
+ channel.finishAndReleaseAll();
+ }
+
+ @Benchmark
+ @BenchmarkMode(Mode.Throughput)
+ public void blockStreamReader(BenchmarkState state) throws Exception
+ {
+ EmbeddedChannel channel = createMockNettyChannel();
+ RebufferingByteBufDataInputPlus in = new RebufferingByteBufDataInputPlus(STREAM_SIZE, STREAM_SIZE, channel.config());
+ in.append(state.serializedBlockStream.retainedDuplicate());
+ SSTableMultiWriter sstableWriter = state.blockStreamReader.read(in);
+ Collection<SSTableReader> newSstables = sstableWriter.finished();
+ in.close();
+ channel.finishAndReleaseAll();
+ }
+
+ @Benchmark
+ @BenchmarkMode(Mode.Throughput)
+ public void partialStreamWriter(BenchmarkState state) throws Exception
+ {
+ EmbeddedChannel channel = createMockNettyChannel();
+ ByteBufDataOutputStreamPlus out = ByteBufDataOutputStreamPlus.create(state.session, channel, 1024 * 1024);
+ state.partialStreamWriter.write(out);
+ out.close();
+ channel.finishAndReleaseAll();
+ }
+
+ @Benchmark
+ @BenchmarkMode(Mode.Throughput)
+ public void partialStreamReader(BenchmarkState state) throws Exception
+ {
+ EmbeddedChannel channel = createMockNettyChannel();
+ RebufferingByteBufDataInputPlus in = new RebufferingByteBufDataInputPlus(STREAM_SIZE, STREAM_SIZE, channel.config());
+ in.append(state.serializedPartialStream.retainedDuplicate());
+ SSTableMultiWriter sstableWriter = state.partialStreamReader.read(in);
+ Collection<SSTableReader> newSstables = sstableWriter.finished();
+ in.close();
+ channel.finishAndReleaseAll();
+ }
+
+ private EmbeddedChannel createMockNettyChannel()
+ {
+ EmbeddedChannel channel = new EmbeddedChannel();
+ channel.config().setWriteBufferHighWaterMark(STREAM_SIZE); // avoid blocking
+ return channel;
+ }
+
+ private static class CapturingNettyChannel extends EmbeddedChannel
+ {
+ private final ByteBuf serializedStream;
+ private final WritableByteChannel proxyWBC = new WritableByteChannel()
+ {
+ public int write(ByteBuffer src) throws IOException
+ {
+ int rem = src.remaining();
+ serializedStream.writeBytes(src);
+ return rem;
+ }
+
+ public boolean isOpen()
+ {
+ return true;
+ }
+
+ public void close() throws IOException
+ {
+ }
+ };
+
+ public CapturingNettyChannel(int capacity)
+ {
+ this.serializedStream = alloc().buffer(capacity);
+ this.pipeline().addLast(new ChannelOutboundHandlerAdapter()
+ {
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception
+ {
+ if (msg instanceof ByteBuf)
+ serializedStream.writeBytes((ByteBuf) msg);
+ else if (msg instanceof ByteBuffer)
+ serializedStream.writeBytes((ByteBuffer) msg);
+ else if (msg instanceof DefaultFileRegion)
+ ((DefaultFileRegion) msg).transferTo(proxyWBC, 0);
+ }
+ });
+ config().setWriteBufferHighWaterMark(capacity);
+ }
+
+ public ByteBuf getSerializedStream()
+ {
+ return serializedStream.copy();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/db/VerifyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/VerifyTest.java b/test/unit/org/apache/cassandra/db/VerifyTest.java
index c9dbe14..0632274 100644
--- a/test/unit/org/apache/cassandra/db/VerifyTest.java
+++ b/test/unit/org/apache/cassandra/db/VerifyTest.java
@@ -612,15 +612,15 @@ public class VerifyTest
Verifier.RangeOwnHelper roh = new Verifier.RangeOwnHelper(normalized);
- roh.check(dk(1));
- roh.check(dk(10));
- roh.check(dk(11));
- roh.check(dk(21));
- roh.check(dk(25));
+ roh.validate(dk(1));
+ roh.validate(dk(10));
+ roh.validate(dk(11));
+ roh.validate(dk(21));
+ roh.validate(dk(25));
boolean gotException = false;
try
{
- roh.check(dk(26));
+ roh.validate(dk(26));
}
catch (Throwable t)
{
@@ -635,9 +635,9 @@ public class VerifyTest
List<Range<Token>> normalized = new ArrayList<>();
normalized.add(r(0,10));
Verifier.RangeOwnHelper roh = new Verifier.RangeOwnHelper(normalized);
- roh.check(dk(1));
+ roh.validate(dk(1));
// call with smaller token to get exception
- roh.check(dk(0));
+ roh.validate(dk(0));
}
@@ -646,9 +646,9 @@ public class VerifyTest
{
List<Range<Token>> normalized = Range.normalize(Collections.singletonList(r(0,0)));
Verifier.RangeOwnHelper roh = new Verifier.RangeOwnHelper(normalized);
- roh.check(dk(Long.MIN_VALUE));
- roh.check(dk(0));
- roh.check(dk(Long.MAX_VALUE));
+ roh.validate(dk(Long.MIN_VALUE));
+ roh.validate(dk(0));
+ roh.validate(dk(Long.MAX_VALUE));
}
@Test
@@ -656,12 +656,12 @@ public class VerifyTest
{
List<Range<Token>> normalized = Range.normalize(Collections.singletonList(r(Long.MAX_VALUE - 1000,Long.MIN_VALUE + 1000)));
Verifier.RangeOwnHelper roh = new Verifier.RangeOwnHelper(normalized);
- roh.check(dk(Long.MIN_VALUE));
- roh.check(dk(Long.MAX_VALUE));
+ roh.validate(dk(Long.MIN_VALUE));
+ roh.validate(dk(Long.MAX_VALUE));
boolean gotException = false;
try
{
- roh.check(dk(26));
+ roh.validate(dk(26));
}
catch (Throwable t)
{
@@ -673,7 +673,7 @@ public class VerifyTest
@Test
public void testEmptyRanges()
{
- new Verifier.RangeOwnHelper(Collections.emptyList()).check(dk(1));
+ new Verifier.RangeOwnHelper(Collections.emptyList()).validate(dk(1));
}
private DecoratedKey dk(long l)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
new file mode 100644
index 0000000..947f968
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Queue;
+import java.util.UUID;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultFileRegion;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.async.ByteBufDataInputPlus;
+import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus;
+import org.apache.cassandra.net.async.NonClosingDefaultFileRegion;
+import org.apache.cassandra.schema.CachingParams;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.streaming.DefaultConnectionFactory;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.streaming.SessionInfo;
+import org.apache.cassandra.streaming.StreamCoordinator;
+import org.apache.cassandra.streaming.StreamEventHandler;
+import org.apache.cassandra.streaming.StreamOperation;
+import org.apache.cassandra.streaming.StreamResultFuture;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.StreamSummary;
+import org.apache.cassandra.streaming.messages.StreamMessageHeader;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CassandraEntireSSTableStreamWriterTest
+{
+ public static final String KEYSPACE = "CassandraEntireSSTableStreamWriterTest";
+ public static final String CF_STANDARD = "Standard1";
+ public static final String CF_INDEXED = "Indexed1";
+ public static final String CF_STANDARDLOWINDEXINTERVAL = "StandardLowIndexInterval";
+
+ private static SSTableReader sstable;
+ private static ColumnFamilyStore store;
+
+ @BeforeClass
+ public static void defineSchemaAndPrepareSSTable()
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE, CF_STANDARD),
+ SchemaLoader.compositeIndexCFMD(KEYSPACE, CF_INDEXED, true),
+ SchemaLoader.standardCFMD(KEYSPACE, CF_STANDARDLOWINDEXINTERVAL)
+ .minIndexInterval(8)
+ .maxIndexInterval(256)
+ .caching(CachingParams.CACHE_NOTHING));
+
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ store = keyspace.getColumnFamilyStore("Standard1");
+
+ // insert data and compact to a single sstable
+ CompactionManager.instance.disableAutoCompaction();
+ for (int j = 0; j < 10; j++)
+ {
+ new RowUpdateBuilder(store.metadata(), j, String.valueOf(j))
+ .clustering("0")
+ .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
+ .build()
+ .applyUnsafe();
+ }
+ store.forceBlockingFlush();
+ CompactionManager.instance.performMaximal(store, false);
+
+ sstable = store.getLiveSSTables().iterator().next();
+ }
+
+ @Test
+ public void testBlockWriterOverWire() throws IOException
+ {
+ StreamSession session = setupStreamingSessionForTest();
+
+ CassandraEntireSSTableStreamWriter writer = new CassandraEntireSSTableStreamWriter(sstable, session, CassandraOutgoingFile.getComponentManifest(sstable));
+
+ EmbeddedChannel channel = new EmbeddedChannel();
+ ByteBufDataOutputStreamPlus out = ByteBufDataOutputStreamPlus.create(session, channel, 1024 * 1024);
+ writer.write(out);
+
+ Queue msgs = channel.outboundMessages();
+
+ assertTrue(msgs.peek() instanceof DefaultFileRegion);
+ }
+
+ @Test
+ public void testBlockReadingAndWritingOverWire() throws Exception
+ {
+ StreamSession session = setupStreamingSessionForTest();
+ InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
+
+ CassandraEntireSSTableStreamWriter writer = new CassandraEntireSSTableStreamWriter(sstable, session, CassandraOutgoingFile.getComponentManifest(sstable));
+
+ // This is needed as Netty releases the ByteBuffers as soon as the channel is flushed
+ ByteBuf serializedFile = Unpooled.buffer(8192);
+ EmbeddedChannel channel = createMockNettyChannel(serializedFile);
+ ByteBufDataOutputStreamPlus out = ByteBufDataOutputStreamPlus.create(session, channel, 1024 * 1024);
+
+ writer.write(out);
+
+ session.prepareReceiving(new StreamSummary(sstable.metadata().id, 1, 5104));
+
+ CassandraStreamHeader header =
+ CassandraStreamHeader.builder()
+ .withSSTableFormat(sstable.descriptor.formatType)
+ .withSSTableVersion(sstable.descriptor.version)
+ .withSSTableLevel(0)
+ .withEstimatedKeys(sstable.estimatedKeys())
+ .withSections(Collections.emptyList())
+ .withSerializationHeader(sstable.header.toComponent())
+ .withComponentManifest(CassandraOutgoingFile.getComponentManifest(sstable))
+ .isEntireSSTable(true)
+ .withFirstKey(sstable.first)
+ .withTableId(sstable.metadata().id)
+ .build();
+
+ CassandraEntireSSTableStreamReader reader = new CassandraEntireSSTableStreamReader(new StreamMessageHeader(sstable.metadata().id, peer, session.planId(), 0, 0, 0, null), header, session);
+
+ SSTableMultiWriter sstableWriter = reader.read(new ByteBufDataInputPlus(serializedFile));
+ Collection<SSTableReader> newSstables = sstableWriter.finished();
+
+ assertEquals(1, newSstables.size());
+ }
+
+ private EmbeddedChannel createMockNettyChannel(ByteBuf serializedFile) throws Exception
+ {
+ WritableByteChannel wbc = new WritableByteChannel()
+ {
+ private boolean isOpen = true;
+ public int write(ByteBuffer src) throws IOException
+ {
+ int size = src.limit();
+ serializedFile.writeBytes(src);
+ return size;
+ }
+
+ public boolean isOpen()
+ {
+ return isOpen;
+ }
+
+ public void close() throws IOException
+ {
+ isOpen = false;
+ }
+ };
+
+ return new EmbeddedChannel(new ChannelOutboundHandlerAdapter() {
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception
+ {
+ ((NonClosingDefaultFileRegion) msg).transferTo(wbc, 0);
+ super.write(ctx, msg, promise);
+ }
+ });
+ }
+
+ private StreamSession setupStreamingSessionForTest()
+ {
+ StreamCoordinator streamCoordinator = new StreamCoordinator(StreamOperation.BOOTSTRAP, 1, new DefaultConnectionFactory(), false, null, PreviewKind.NONE);
+ StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), StreamOperation.BOOTSTRAP, Collections.<StreamEventHandler>emptyList(), streamCoordinator);
+
+ InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
+ streamCoordinator.addSessionInfo(new SessionInfo(peer, 0, peer, Collections.emptyList(), Collections.emptyList(), StreamSession.State.INITIALIZED));
+
+ StreamSession session = streamCoordinator.getOrCreateNextSession(peer);
+ session.init(future);
+ return session;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[3/3] cassandra git commit: Stream entire SSTables when possible
Posted by al...@apache.org.
Stream entire SSTables when possible
patch by Dinesh Joshi; reviewed by Aleksey Yeschenko and Ariel Weisberg
for CASSANDRA-14566
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/47a12c52
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/47a12c52
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/47a12c52
Branch: refs/heads/trunk
Commit: 47a12c52a313258307ab88392f75c5866d9a2bb1
Parents: 6ba2fb9
Author: Dinesh A. Joshi <di...@apple.com>
Authored: Tue Jul 3 12:07:11 2018 -0700
Committer: Aleksey Yeshchenko <al...@apple.com>
Committed: Fri Jul 27 17:50:25 2018 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 3 +
conf/cassandra.yaml | 12 +
.../org/apache/cassandra/config/Config.java | 1 +
.../cassandra/config/DatabaseDescriptor.java | 14 +
.../apache/cassandra/db/ColumnFamilyStore.java | 2 +-
.../org/apache/cassandra/db/DiskBoundaries.java | 17 +-
.../cassandra/db/compaction/Verifier.java | 26 +-
.../apache/cassandra/db/lifecycle/LogFile.java | 2 +-
.../cassandra/db/lifecycle/LogReplicaSet.java | 3 +-
.../CassandraCompressedStreamReader.java | 131 ++++++++
.../CassandraCompressedStreamWriter.java | 152 +++++++++
.../CassandraEntireSSTableStreamReader.java | 177 ++++++++++
.../CassandraEntireSSTableStreamWriter.java | 120 +++++++
.../db/streaming/CassandraIncomingFile.java | 17 +-
.../db/streaming/CassandraOutgoingFile.java | 132 +++++++-
.../db/streaming/CassandraStreamHeader.java | 250 ++++++++++++--
.../db/streaming/CassandraStreamManager.java | 4 +-
.../db/streaming/CassandraStreamReader.java | 5 +-
.../db/streaming/ComponentManifest.java | 130 ++++++++
.../CompressedCassandraStreamReader.java | 131 --------
.../CompressedCassandraStreamWriter.java | 152 ---------
.../db/streaming/CompressedInputStream.java | 4 +-
.../cassandra/db/streaming/IStreamReader.java | 32 ++
.../apache/cassandra/io/sstable/Component.java | 7 +-
.../cassandra/io/sstable/SSTableLoader.java | 2 +-
.../format/big/BigTableZeroCopyWriter.java | 226 +++++++++++++
.../io/util/BufferedDataOutputStreamPlus.java | 4 +-
.../cassandra/io/util/CheckedFunction.java | 25 ++
.../cassandra/io/util/DataOutputPlus.java | 4 +-
.../io/util/RebufferingInputStream.java | 4 +-
.../cassandra/io/util/SequentialWriter.java | 17 +-
.../io/util/UnbufferedDataOutputStreamPlus.java | 2 +-
.../net/async/ByteBufDataOutputPlus.java | 5 +-
.../net/async/ByteBufDataOutputStreamPlus.java | 60 +++-
.../net/async/NonClosingDefaultFileRegion.java | 51 +++
.../async/RebufferingByteBufDataInputPlus.java | 39 +++
.../cassandra/streaming/StreamCoordinator.java | 3 +-
.../cassandra/streaming/StreamReceiveTask.java | 5 +-
.../cassandra/streaming/StreamResultFuture.java | 13 +-
.../cassandra/streaming/StreamSession.java | 6 +-
.../async/NettyStreamingMessageSender.java | 2 +-
.../async/StreamingInboundHandler.java | 2 +-
.../streaming/messages/StreamInitMessage.java | 7 +-
.../org/apache/cassandra/utils/Collectors3.java | 54 +++
test/conf/cassandra.yaml | 2 +
.../cassandra/streaming/LongStreamingTest.java | 6 +-
.../microbench/ZeroCopyStreamingBenchmark.java | 329 +++++++++++++++++++
.../org/apache/cassandra/db/VerifyTest.java | 30 +-
.../CassandraEntireSSTableStreamWriterTest.java | 209 ++++++++++++
.../db/streaming/CassandraOutgoingFileTest.java | 145 ++++++++
.../db/streaming/CassandraStreamHeaderTest.java | 62 +++-
.../db/streaming/ComponentManifestTest.java | 64 ++++
.../io/sstable/BigTableWriterTest.java | 5 +-
.../cassandra/io/sstable/LegacySSTableTest.java | 2 +
.../format/big/BigTableZeroCopyWriterTest.java | 208 ++++++++++++
.../RebufferingByteBufDataInputPlusTest.java | 98 ++++++
.../serializers/SerializationUtils.java | 1 -
.../streaming/StreamTransferTaskTest.java | 4 +-
.../streaming/StreamingTransferTest.java | 1 +
60 files changed, 2809 insertions(+), 413 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6b6418c..6ede70e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0
+ * Stream entire SSTables when possible (CASSANDRA-14556)
* Add experimental support for Java 11 (CASSANDRA-9608)
* Make PeriodicCommitLogService.blockWhenSyncLagsNanos configurable (CASSANDRA-14580)
* Improve logging in MessageInHandler's constructor (CASSANDRA-14576)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 75885e9..3fab849 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -91,6 +91,9 @@ New features
statements after the cache has been refreshed. CASSANDRA-13985
- Support for audit logging of database activity. If enabled, logs every incoming
CQL command request, Authentication (successful as well as unsuccessful login) to a node.
+ - Faster streaming of entire SSTables using ZeroCopy APIs. If enabled, Cassandra will use stream
+ entire SSTables, significantly speeding up transfers. Any streaming related operations will see
+ corresponding improvement. See CASSANDRA-14556.
Upgrading
---------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 439b85a..663daaa 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -788,6 +788,18 @@ compaction_throughput_mb_per_sec: 16
# between the sstables, reducing page cache churn and keeping hot rows hot
sstable_preemptive_open_interval_in_mb: 50
+# When enabled, permits Cassandra to zero-copy stream entire eligible
+# SSTables between nodes, including every component.
+# This speeds up the network transfer significantly subject to
+# throttling specified by stream_throughput_outbound_megabits_per_sec.
+# Enabling this will reduce the GC pressure on sending and receiving node.
+# When unset, the default is enabled. While this feature tries to keep the
+# disks balanced, it cannot guarantee it. This feature will be automatically
+# disabled if internode encryption is enabled. Currently this can be used with
+# Leveled Compaction. Once CASSANDRA-14586 is fixed other compaction strategies
+# will benefit as well when used in combination with CASSANDRA-6696.
+# stream_entire_sstables: true
+
# Throttles all outbound streaming file transfers on this node to the
# given total throughput in Mbps. This is necessary because Cassandra does
# mostly sequential IO when streaming data during bootstrap or repair, which
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 0d4760e..3a7ff0d 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -381,6 +381,7 @@ public class Config
public int block_for_peers_timeout_in_secs = 10;
public volatile boolean automatic_sstable_upgrade = false;
public volatile int max_concurrent_automatic_sstable_upgrades = 1;
+ public boolean stream_entire_sstables = true;
public volatile AuditLogOptions audit_logging_options = new AuditLogOptions();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 6301ab0..366dac7 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -714,6 +714,15 @@ public class DatabaseDescriptor
"server_encryption_options.internode_encryption = " + conf.server_encryption_options.internode_encryption, false);
}
+ if (conf.stream_entire_sstables)
+ {
+ if (conf.server_encryption_options.enabled || conf.server_encryption_options.optional)
+ {
+ logger.warn("Internode encryption enabled. Disabling zero copy SSTable transfers for streaming.");
+ conf.stream_entire_sstables = false;
+ }
+ }
+
if (conf.max_value_size_in_mb <= 0)
throw new ConfigurationException("max_value_size_in_mb must be positive", false);
else if (conf.max_value_size_in_mb >= 2048)
@@ -2274,6 +2283,11 @@ public class DatabaseDescriptor
return conf.streaming_connections_per_host;
}
+ public static boolean streamEntireSSTables()
+ {
+ return conf.stream_entire_sstables;
+ }
+
public static String getLocalDataCenter()
{
return localDC;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 9c4921e..f03ffe6 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -791,7 +791,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return newSSTableDescriptor(directory, format.info.getLatestVersion(), format);
}
- private Descriptor newSSTableDescriptor(File directory, Version version, SSTableFormat.Type format)
+ public Descriptor newSSTableDescriptor(File directory, Version version, SSTableFormat.Type format)
{
return new Descriptor(version,
directory,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/DiskBoundaries.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DiskBoundaries.java b/src/java/org/apache/cassandra/db/DiskBoundaries.java
index 086bc84..22f17b0 100644
--- a/src/java/org/apache/cassandra/db/DiskBoundaries.java
+++ b/src/java/org/apache/cassandra/db/DiskBoundaries.java
@@ -129,4 +129,19 @@ public class DiskBoundaries
{
return directories.get(getDiskIndex(sstable));
}
-}
\ No newline at end of file
+
+ public Directories.DataDirectory getCorrectDiskForKey(DecoratedKey key)
+ {
+ if (positions == null)
+ return null;
+
+ return directories.get(getDiskIndex(key));
+ }
+
+ private int getDiskIndex(DecoratedKey key)
+ {
+ int pos = Collections.binarySearch(positions, key);
+ assert pos < 0;
+ return -pos - 1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/compaction/Verifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java
index bc9679d..db49369 100644
--- a/src/java/org/apache/cassandra/db/compaction/Verifier.java
+++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java
@@ -180,7 +180,7 @@ public class Verifier implements Closeable
while (iter.hasNext())
{
DecoratedKey key = iter.next();
- rangeOwnHelper.check(key);
+ rangeOwnHelper.validate(key);
}
}
catch (Throwable t)
@@ -262,7 +262,7 @@ public class Verifier implements Closeable
{
try
{
- rangeOwnHelper.check(key);
+ rangeOwnHelper.validate(key);
}
catch (Throwable t)
{
@@ -360,13 +360,27 @@ public class Verifier implements Closeable
* @param key the key
* @throws RuntimeException if the key is not contained
*/
- public void check(DecoratedKey key)
+ public void validate(DecoratedKey key)
+ {
+ if (!check(key))
+ throw new RuntimeException("Key " + key + " is not contained in the given ranges");
+ }
+
+ /**
+ * check if the given key is contained in any of the given ranges
+ *
+ * Must be called in sorted order - key should be increasing
+ *
+ * @param key the key
+ * @return boolean
+ */
+ public boolean check(DecoratedKey key)
{
assert lastKey == null || key.compareTo(lastKey) > 0;
lastKey = key;
if (normalizedRanges.isEmpty()) // handle tests etc where we don't have any ranges
- return;
+ return true;
if (rangeIndex > normalizedRanges.size() - 1)
throw new IllegalStateException("RangeOwnHelper can only be used to find the first out-of-range-token");
@@ -375,8 +389,10 @@ public class Verifier implements Closeable
{
rangeIndex++;
if (rangeIndex > normalizedRanges.size() - 1)
- throw new RuntimeException("Key "+key+" is not contained in the given ranges");
+ return false;
}
+
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
index af6f435..98be0a0 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
@@ -66,7 +66,7 @@ final class LogFile implements AutoCloseable
private final LogReplicaSet replicas = new LogReplicaSet();
// The transaction records, this set must be ORDER PRESERVING
- private final LinkedHashSet<LogRecord> records = new LinkedHashSet<>();
+ private final Set<LogRecord> records = Collections.synchronizedSet(new LinkedHashSet<>()); // TODO: Hack until we fix CASSANDRA-14554
// The type of the transaction
private final OperationType type;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java b/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java
index 65be285..f5423d6 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.lifecycle;
import java.io.File;
import java.util.Collection;
+import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -45,7 +46,7 @@ public class LogReplicaSet implements AutoCloseable
{
private static final Logger logger = LoggerFactory.getLogger(LogReplicaSet.class);
- private final Map<File, LogReplica> replicasByFile = new LinkedHashMap<>();
+ private final Map<File, LogReplica> replicasByFile = Collections.synchronizedMap(new LinkedHashMap<>()); // TODO: Hack until we fix CASSANDRA-14554
private Collection<LogReplica> replicas()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
new file mode 100644
index 0000000..eb993ff
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+
+import com.google.common.base.Throwables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.TrackedDataInputPlus;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.messages.StreamMessageHeader;
+import org.apache.cassandra.utils.ChecksumType;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.utils.Throwables.extractIOExceptionCause;
+
+/**
+ * CassandraStreamReader that reads from streamed compressed SSTable
+ */
+public class CassandraCompressedStreamReader extends CassandraStreamReader
+{
+ private static final Logger logger = LoggerFactory.getLogger(CassandraCompressedStreamReader.class);
+
+ protected final CompressionInfo compressionInfo;
+
+ public CassandraCompressedStreamReader(StreamMessageHeader header, CassandraStreamHeader streamHeader, StreamSession session)
+ {
+ super(header, streamHeader, session);
+ this.compressionInfo = streamHeader.compressionInfo;
+ }
+
+ /**
+ * @return SSTable transferred
+ * @throws java.io.IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
+ */
+ @Override
+ @SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed
+ public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException
+ {
+ long totalSize = totalSize();
+
+ ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
+
+ if (cfs == null)
+ {
+ // schema was dropped during streaming
+ throw new IOException("CF " + tableId + " was dropped during streaming");
+ }
+
+ logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', pendingRepair = '{}', table = '{}'.",
+ session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(), pendingRepair,
+ cfs.getTableName());
+
+ StreamDeserializer deserializer = null;
+ SSTableMultiWriter writer = null;
+ try (CompressedInputStream cis = new CompressedInputStream(inputPlus, compressionInfo, ChecksumType.CRC32, cfs::getCrcCheckChance))
+ {
+ TrackedDataInputPlus in = new TrackedDataInputPlus(cis);
+ deserializer = new StreamDeserializer(cfs.metadata(), in, inputVersion, getHeader(cfs.metadata()));
+ writer = createWriter(cfs, totalSize, repairedAt, pendingRepair, format);
+ String filename = writer.getFilename();
+ int sectionIdx = 0;
+ for (SSTableReader.PartitionPositionBounds section : sections)
+ {
+ assert cis.getTotalCompressedBytesRead() <= totalSize;
+ long sectionLength = section.upperPosition - section.lowerPosition;
+
+ logger.trace("[Stream #{}] Reading section {} with length {} from stream.", session.planId(), sectionIdx++, sectionLength);
+ // skip to beginning of section inside chunk
+ cis.position(section.lowerPosition);
+ in.reset(0);
+
+ while (in.getBytesRead() < sectionLength)
+ {
+ writePartition(deserializer, writer);
+ // when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred
+ session.progress(filename, ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize);
+ }
+ }
+ logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}", session.planId(), fileSeqNum,
+ session.peer, FBUtilities.prettyPrintMemory(cis.getTotalCompressedBytesRead()), FBUtilities.prettyPrintMemory(totalSize));
+ return writer;
+ }
+ catch (Throwable e)
+ {
+ Object partitionKey = deserializer != null ? deserializer.partitionKey() : "";
+ logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.",
+ session.planId(), partitionKey, cfs.keyspace.getName(), cfs.getTableName());
+ if (writer != null)
+ {
+ writer.abort(e);
+ }
+ if (extractIOExceptionCause(e).isPresent())
+ throw e;
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ protected long totalSize()
+ {
+ long size = 0;
+ // calculate total length of transferring chunks
+ for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
+ size += chunk.length + 4; // 4 bytes for CRC
+ return size;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
new file mode 100644
index 0000000..3b971f8
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.ChannelProxy;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * CassandraStreamWriter for compressed SSTable.
+ */
+public class CassandraCompressedStreamWriter extends CassandraStreamWriter
+{
+ private static final int CHUNK_SIZE = 1 << 16;
+
+ private static final Logger logger = LoggerFactory.getLogger(CassandraCompressedStreamWriter.class);
+
+ private final CompressionInfo compressionInfo;
+
+ public CassandraCompressedStreamWriter(SSTableReader sstable, Collection<SSTableReader.PartitionPositionBounds> sections, CompressionInfo compressionInfo, StreamSession session)
+ {
+ super(sstable, sections, session);
+ this.compressionInfo = compressionInfo;
+ }
+
+ @Override
+ public void write(DataOutputStreamPlus out) throws IOException
+ {
+ assert out instanceof ByteBufDataOutputStreamPlus;
+ ByteBufDataOutputStreamPlus output = (ByteBufDataOutputStreamPlus)out;
+ long totalSize = totalSize();
+ logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize = {}", session.planId(),
+ sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize);
+ try (ChannelProxy fc = sstable.getDataChannel().sharedCopy())
+ {
+ long progress = 0L;
+ // calculate chunks to transfer. we want to send continuous chunks altogether.
+ List<SSTableReader.PartitionPositionBounds> sections = getTransferSections(compressionInfo.chunks);
+
+ int sectionIdx = 0;
+
+ // stream each of the required sections of the file
+ for (final SSTableReader.PartitionPositionBounds section : sections)
+ {
+ // length of the section to stream
+ long length = section.upperPosition - section.lowerPosition;
+
+ logger.trace("[Stream #{}] Writing section {} with length {} to stream.", session.planId(), sectionIdx++, length);
+
+ // tracks write progress
+ long bytesTransferred = 0;
+ while (bytesTransferred < length)
+ {
+ final int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred);
+ limiter.acquire(toTransfer);
+
+ ByteBuffer outBuffer = ByteBuffer.allocateDirect(toTransfer);
+ long lastWrite;
+ try
+ {
+ lastWrite = fc.read(outBuffer, section.lowerPosition + bytesTransferred);
+ assert lastWrite == toTransfer : String.format("could not read required number of bytes from file to be streamed: read %d bytes, wanted %d bytes", lastWrite, toTransfer);
+ outBuffer.flip();
+ output.writeToChannel(outBuffer);
+ }
+ catch (IOException e)
+ {
+ FileUtils.clean(outBuffer);
+ throw e;
+ }
+
+ bytesTransferred += lastWrite;
+ progress += lastWrite;
+ session.progress(sstable.descriptor.filenameFor(Component.DATA), ProgressInfo.Direction.OUT, progress, totalSize);
+ }
+ }
+ logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred = {}, totalSize = {}",
+ session.planId(), sstable.getFilename(), session.peer, FBUtilities.prettyPrintMemory(progress), FBUtilities.prettyPrintMemory(totalSize));
+ }
+ }
+
+ @Override
+ protected long totalSize()
+ {
+ long size = 0;
+ // calculate total length of transferring chunks
+ for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
+ size += chunk.length + 4; // 4 bytes for CRC
+ return size;
+ }
+
+ // chunks are assumed to be sorted by offset
+ private List<SSTableReader.PartitionPositionBounds> getTransferSections(CompressionMetadata.Chunk[] chunks)
+ {
+ List<SSTableReader.PartitionPositionBounds> transferSections = new ArrayList<>();
+ SSTableReader.PartitionPositionBounds lastSection = null;
+ for (CompressionMetadata.Chunk chunk : chunks)
+ {
+ if (lastSection != null)
+ {
+ if (chunk.offset == lastSection.upperPosition)
+ {
+ // extend previous section to end of this chunk
+ lastSection = new SSTableReader.PartitionPositionBounds(lastSection.lowerPosition, chunk.offset + chunk.length + 4); // 4 bytes for CRC
+ }
+ else
+ {
+ transferSections.add(lastSection);
+ lastSection = new SSTableReader.PartitionPositionBounds(chunk.offset, chunk.offset + chunk.length + 4);
+ }
+ }
+ else
+ {
+ lastSection = new SSTableReader.PartitionPositionBounds(chunk.offset, chunk.offset + chunk.length + 4);
+ }
+ }
+ if (lastSection != null)
+ transferSections.add(lastSection);
+ return transferSections;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
new file mode 100644
index 0000000..6f8f06a
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+
+import com.google.common.base.Throwables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.big.BigTableZeroCopyWriter;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamReceiver;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.messages.StreamMessageHeader;
+
+import static java.lang.String.format;
+import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory;
+
+/**
+ * CassandraEntireSSTableStreamReader reads SSTable off the wire and writes it to disk.
+ */
+public class CassandraEntireSSTableStreamReader implements IStreamReader
+{
+ private static final Logger logger = LoggerFactory.getLogger(CassandraEntireSSTableStreamReader.class);
+
+ private final TableId tableId;
+ private final StreamSession session;
+ private final CassandraStreamHeader header;
+ private final int fileSequenceNumber;
+
+ public CassandraEntireSSTableStreamReader(StreamMessageHeader messageHeader, CassandraStreamHeader streamHeader, StreamSession session)
+ {
+ if (streamHeader.format != SSTableFormat.Type.BIG)
+ throw new AssertionError("Unsupported SSTable format " + streamHeader.format);
+
+ if (session.getPendingRepair() != null)
+ {
+ // we should only ever be streaming pending repair sstables if the session has a pending repair id
+ if (!session.getPendingRepair().equals(messageHeader.pendingRepair))
+ throw new IllegalStateException(format("Stream Session & SSTable (%s) pendingRepair UUID mismatch.", messageHeader.tableId));
+ }
+
+ this.header = streamHeader;
+ this.session = session;
+ this.tableId = messageHeader.tableId;
+ this.fileSequenceNumber = messageHeader.sequenceNumber;
+ }
+
+ /**
+ * @param in where this reads data from
+ * @return SSTable transferred
+ * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
+ */
+ @SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed
+ @Override
+ public SSTableMultiWriter read(DataInputPlus in) throws IOException
+ {
+ ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
+ if (cfs == null)
+ {
+ // schema was dropped during streaming
+ throw new IOException("Table " + tableId + " was dropped during streaming");
+ }
+
+ ComponentManifest manifest = header.componentManifest;
+ long totalSize = manifest.totalSize();
+
+ logger.debug("[Stream #{}] Started receiving sstable #{} from {}, size = {}, table = {}",
+ session.planId(),
+ fileSequenceNumber,
+ session.peer,
+ prettyPrintMemory(totalSize),
+ cfs.metadata());
+
+ BigTableZeroCopyWriter writer = null;
+
+ try
+ {
+ writer = createWriter(cfs, totalSize, manifest.components());
+ long bytesRead = 0;
+ for (Component component : manifest.components())
+ {
+ long length = manifest.sizeOf(component);
+
+ logger.debug("[Stream #{}] Started receiving {} component from {}, componentSize = {}, readBytes = {}, totalSize = {}",
+ session.planId(),
+ component,
+ session.peer,
+ prettyPrintMemory(length),
+ prettyPrintMemory(bytesRead),
+ prettyPrintMemory(totalSize));
+
+ writer.writeComponent(component.type, in, length);
+ session.progress(writer.descriptor.filenameFor(component), ProgressInfo.Direction.IN, length, length);
+ bytesRead += length;
+
+ logger.debug("[Stream #{}] Finished receiving {} component from {}, componentSize = {}, readBytes = {}, totalSize = {}",
+ session.planId(),
+ component,
+ session.peer,
+ prettyPrintMemory(length),
+ prettyPrintMemory(bytesRead),
+ prettyPrintMemory(totalSize));
+ }
+
+ writer.descriptor.getMetadataSerializer().mutateLevel(writer.descriptor, header.sstableLevel);
+ return writer;
+ }
+ catch (Throwable e)
+ {
+ logger.error("[Stream {}] Error while reading sstable from stream for table = {}", session.planId(), cfs.metadata(), e);
+ if (writer != null)
+ e = writer.abort(e);
+ Throwables.throwIfUnchecked(e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private File getDataDir(ColumnFamilyStore cfs, long totalSize) throws IOException
+ {
+ Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize);
+ if (localDir == null)
+ throw new IOException(format("Insufficient disk space to store %s", prettyPrintMemory(totalSize)));
+
+ File dir = cfs.getDirectories().getLocationForDisk(cfs.getDiskBoundaries().getCorrectDiskForKey(header.firstKey));
+
+ if (dir == null)
+ return cfs.getDirectories().getDirectoryForNewSSTables();
+
+ return dir;
+ }
+
+ @SuppressWarnings("resource")
+ protected BigTableZeroCopyWriter createWriter(ColumnFamilyStore cfs, long totalSize, Collection<Component> components) throws IOException
+ {
+ File dataDir = getDataDir(cfs, totalSize);
+
+ StreamReceiver streamReceiver = session.getAggregator(tableId);
+ assert streamReceiver instanceof CassandraStreamReceiver;
+
+ LifecycleTransaction txn = CassandraStreamReceiver.fromReceiver(session.getAggregator(tableId)).getTransaction();
+
+ Descriptor desc = cfs.newSSTableDescriptor(dataDir, header.version, header.format);
+
+ logger.debug("[Table #{}] {} Components to write: {}", cfs.metadata(), desc.filenameFor(Component.DATA), components);
+
+ return new BigTableZeroCopyWriter(desc, cfs.metadata, txn, components);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java
new file mode 100644
index 0000000..7a20110
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamManager;
+import org.apache.cassandra.streaming.StreamSession;
+
+import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
+import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory;
+
+/**
+ * CassandraEntireSSTableStreamWriter streams the entire SSTable to given channel.
+ */
+public class CassandraEntireSSTableStreamWriter
+{
+ private static final Logger logger = LoggerFactory.getLogger(CassandraEntireSSTableStreamWriter.class);
+
+ private final SSTableReader sstable;
+ private final ComponentManifest manifest;
+ private final StreamSession session;
+ private final StreamRateLimiter limiter;
+
+ public CassandraEntireSSTableStreamWriter(SSTableReader sstable, StreamSession session, ComponentManifest manifest)
+ {
+ this.session = session;
+ this.sstable = sstable;
+ this.manifest = manifest;
+ this.limiter = StreamManager.getRateLimiter(session.peer);
+ }
+
+ /**
+ * Stream the entire file to given channel.
+ * <p>
+ *
+ * @param out where this writes data to
+ * @throws IOException on any I/O error
+ */
+ public void write(ByteBufDataOutputStreamPlus out) throws IOException
+ {
+ long totalSize = manifest.totalSize();
+ logger.debug("[Stream #{}] Start streaming sstable {} to {}, repairedAt = {}, totalSize = {}",
+ session.planId(),
+ sstable.getFilename(),
+ session.peer,
+ sstable.getSSTableMetadata().repairedAt,
+ prettyPrintMemory(totalSize));
+
+ long progress = 0L;
+
+ for (Component component : manifest.components())
+ {
+ @SuppressWarnings("resource") // this is closed after the file is transferred by ByteBufDataOutputStreamPlus
+ FileChannel in = new RandomAccessFile(sstable.descriptor.filenameFor(component), "r").getChannel();
+
+ // Total Length to transmit for this file
+ long length = in.size();
+
+ // tracks write progress
+ logger.debug("[Stream #{}] Streaming {}.{} gen {} component {} size {}", session.planId(),
+ sstable.getKeyspaceName(),
+ sstable.getColumnFamilyName(),
+ sstable.descriptor.generation,
+ component,
+ prettyPrintMemory(length));
+
+ long bytesWritten = out.writeToChannel(in, limiter);
+ progress += bytesWritten;
+
+ session.progress(sstable.descriptor.filenameFor(component), ProgressInfo.Direction.OUT, bytesWritten, length);
+
+ logger.debug("[Stream #{}] Finished streaming {}.{} gen {} component {} to {}, xfered = {}, length = {}, totalSize = {}",
+ session.planId(),
+ sstable.getKeyspaceName(),
+ sstable.getColumnFamilyName(),
+ sstable.descriptor.generation,
+ component,
+ session.peer,
+ prettyPrintMemory(bytesWritten),
+ prettyPrintMemory(length),
+ prettyPrintMemory(totalSize));
+ }
+
+ out.flush();
+
+ logger.debug("[Stream #{}] Finished streaming sstable {} to {}, xfered = {}, totalSize = {}",
+ session.planId(),
+ sstable.getFilename(),
+ session.peer,
+ prettyPrintMemory(progress),
+ prettyPrintMemory(totalSize));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java b/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
index 16698e5..c65ca62 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
@@ -22,6 +22,8 @@ import java.io.IOException;
import java.util.Objects;
import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
@@ -45,6 +47,8 @@ public class CassandraIncomingFile implements IncomingStream
private volatile SSTableMultiWriter sstable;
private volatile long size = -1;
+ private static final Logger logger = LoggerFactory.getLogger(CassandraIncomingFile.class);
+
public CassandraIncomingFile(ColumnFamilyStore cfs, StreamSession session, StreamMessageHeader header)
{
this.cfs = cfs;
@@ -56,9 +60,16 @@ public class CassandraIncomingFile implements IncomingStream
public synchronized void read(DataInputPlus in, int version) throws IOException
{
CassandraStreamHeader streamHeader = CassandraStreamHeader.serializer.deserialize(in, version);
- CassandraStreamReader reader = !streamHeader.isCompressed()
- ? new CassandraStreamReader(header, streamHeader, session)
- : new CompressedCassandraStreamReader(header, streamHeader, session);
+ logger.debug("Incoming stream entireSSTable={} components={}", streamHeader.isEntireSSTable, streamHeader.componentManifest);
+
+ IStreamReader reader;
+ if (streamHeader.isEntireSSTable)
+ reader = new CassandraEntireSSTableStreamReader(header, streamHeader, session);
+ else if (streamHeader.isCompressed())
+ reader = new CassandraCompressedStreamReader(header, streamHeader, session);
+ else
+ reader = new CassandraStreamReader(header, streamHeader, session);
+
size = streamHeader.size();
sstable = reader.read(in);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
index 6ec1f85..5252187 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
@@ -18,51 +18,100 @@
package org.apache.cassandra.db.streaming;
+import java.io.File;
import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
+import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.KeyIterator;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.streaming.OutgoingStream;
import org.apache.cassandra.streaming.StreamOperation;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.utils.concurrent.Ref;
+import static org.apache.cassandra.db.compaction.Verifier.RangeOwnHelper;
+
/**
* used to transfer the part(or whole) of a SSTable data file
*/
public class CassandraOutgoingFile implements OutgoingStream
{
+ public static final List<Component> STREAM_COMPONENTS = ImmutableList.of(Component.DATA, Component.PRIMARY_INDEX, Component.STATS,
+ Component.COMPRESSION_INFO, Component.FILTER, Component.SUMMARY,
+ Component.DIGEST, Component.CRC);
+
private final Ref<SSTableReader> ref;
private final long estimatedKeys;
private final List<SSTableReader.PartitionPositionBounds> sections;
private final String filename;
private final CassandraStreamHeader header;
private final boolean keepSSTableLevel;
+ private final ComponentManifest manifest;
+ private Boolean isFullyContained;
- public CassandraOutgoingFile(StreamOperation operation, Ref<SSTableReader> ref, List<SSTableReader.PartitionPositionBounds> sections, long estimatedKeys)
+ private final List<Range<Token>> ranges;
+
+ public CassandraOutgoingFile(StreamOperation operation, Ref<SSTableReader> ref,
+ List<SSTableReader.PartitionPositionBounds> sections, Collection<Range<Token>> ranges,
+ long estimatedKeys)
{
Preconditions.checkNotNull(ref.get());
this.ref = ref;
this.estimatedKeys = estimatedKeys;
this.sections = sections;
+ this.ranges = ImmutableList.copyOf(ranges);
this.filename = ref.get().getFilename();
+ this.manifest = getComponentManifest(ref.get());
SSTableReader sstable = ref.get();
keepSSTableLevel = operation == StreamOperation.BOOTSTRAP || operation == StreamOperation.REBUILD;
- this.header = new CassandraStreamHeader(sstable.descriptor.version,
- sstable.descriptor.formatType,
- estimatedKeys,
- sections,
- sstable.compression ? sstable.getCompressionMetadata() : null,
- keepSSTableLevel ? sstable.getSSTableLevel() : 0,
- sstable.header.toComponent());
+ this.header =
+ CassandraStreamHeader.builder()
+ .withSSTableFormat(sstable.descriptor.formatType)
+ .withSSTableVersion(sstable.descriptor.version)
+ .withSSTableLevel(keepSSTableLevel ? sstable.getSSTableLevel() : 0)
+ .withEstimatedKeys(estimatedKeys)
+ .withSections(sections)
+ .withCompressionMetadata(sstable.compression ? sstable.getCompressionMetadata() : null)
+ .withSerializationHeader(sstable.header.toComponent())
+ .isEntireSSTable(shouldStreamEntireSSTable())
+ .withComponentManifest(manifest)
+ .withFirstKey(sstable.first)
+ .withTableId(sstable.metadata().id)
+ .build();
+ }
+
+ @VisibleForTesting
+ public static ComponentManifest getComponentManifest(SSTableReader sstable)
+ {
+ LinkedHashMap<Component, Long> components = new LinkedHashMap<>(STREAM_COMPONENTS.size());
+ for (Component component : STREAM_COMPONENTS)
+ {
+ File file = new File(sstable.descriptor.filenameFor(component));
+ if (file.exists())
+ components.put(component, file.length());
+ }
+
+ return new ComponentManifest(components);
}
public static CassandraOutgoingFile fromStream(OutgoingStream stream)
@@ -114,11 +163,68 @@ public class CassandraOutgoingFile implements OutgoingStream
CassandraStreamHeader.serializer.serialize(header, out, version);
out.flush();
- CassandraStreamWriter writer = header.compressionInfo == null ?
- new CassandraStreamWriter(sstable, header.sections, session) :
- new CompressedCassandraStreamWriter(sstable, header.sections,
- header.compressionInfo, session);
- writer.write(out);
+ if (shouldStreamEntireSSTable() && out instanceof ByteBufDataOutputStreamPlus)
+ {
+ CassandraEntireSSTableStreamWriter writer = new CassandraEntireSSTableStreamWriter(sstable, session, manifest);
+ writer.write((ByteBufDataOutputStreamPlus) out);
+ }
+ else
+ {
+ CassandraStreamWriter writer = (header.compressionInfo == null) ?
+ new CassandraStreamWriter(sstable, header.sections, session) :
+ new CassandraCompressedStreamWriter(sstable, header.sections,
+ header.compressionInfo, session);
+ writer.write(out);
+ }
+ }
+
+ @VisibleForTesting
+ public boolean shouldStreamEntireSSTable()
+ {
+ // don't stream if full sstable transfers are disabled or legacy counter shards are present
+ if (!DatabaseDescriptor.streamEntireSSTables() || ref.get().getSSTableMetadata().hasLegacyCounterShards)
+ return false;
+
+ ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(getTableId());
+
+ if (cfs == null)
+ return false;
+
+ AbstractCompactionStrategy compactionStrategy = cfs.getCompactionStrategyManager()
+ .getCompactionStrategyFor(ref.get());
+
+ if (compactionStrategy instanceof LeveledCompactionStrategy)
+ return contained(ranges, ref.get());
+
+ return false;
+ }
+
+ @VisibleForTesting
+ public boolean contained(List<Range<Token>> normalizedRanges, SSTableReader sstable)
+ {
+ if (isFullyContained != null)
+ return isFullyContained;
+
+ isFullyContained = computeContainment(normalizedRanges, sstable);
+ return isFullyContained;
+ }
+
+ private boolean computeContainment(List<Range<Token>> normalizedRanges, SSTableReader sstable)
+ {
+ if (normalizedRanges == null)
+ return false;
+
+ RangeOwnHelper rangeOwnHelper = new RangeOwnHelper(normalizedRanges);
+ try (KeyIterator iter = new KeyIterator(sstable.descriptor, sstable.metadata()))
+ {
+ while (iter.hasNext())
+ {
+ DecoratedKey key = iter.next();
+ if (!rangeOwnHelper.check(key))
+ return false;
+ }
+ }
+ return true;
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java
index 43631b0..2af56de 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java
@@ -15,16 +15,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.cassandra.db.streaming;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+import java.util.function.Function;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
@@ -32,6 +38,10 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static com.google.common.base.Preconditions.checkNotNull;
public class CassandraStreamHeader
{
@@ -50,33 +60,38 @@ public class CassandraStreamHeader
private final CompressionMetadata compressionMetadata;
public volatile CompressionInfo compressionInfo;
public final int sstableLevel;
- public final SerializationHeader.Component header;
+ public final SerializationHeader.Component serializationHeader;
+
+ /* flag indicating whether this is a partial or entire sstable transfer */
+ public final boolean isEntireSSTable;
+ /* first token of the sstable required for faster streaming */
+ public final DecoratedKey firstKey;
+ public final TableId tableId;
+ public final ComponentManifest componentManifest;
/* cached size value */
private transient final long size;
- private CassandraStreamHeader(Version version, SSTableFormat.Type format, long estimatedKeys, List<SSTableReader.PartitionPositionBounds> sections, CompressionMetadata compressionMetadata, CompressionInfo compressionInfo, int sstableLevel, SerializationHeader.Component header)
- {
- this.version = version;
- this.format = format;
- this.estimatedKeys = estimatedKeys;
- this.sections = sections;
- this.compressionMetadata = compressionMetadata;
- this.compressionInfo = compressionInfo;
- this.sstableLevel = sstableLevel;
- this.header = header;
-
- this.size = calculateSize();
- }
-
- public CassandraStreamHeader(Version version, SSTableFormat.Type format, long estimatedKeys, List<SSTableReader.PartitionPositionBounds> sections, CompressionMetadata compressionMetadata, int sstableLevel, SerializationHeader.Component header)
+ private CassandraStreamHeader(Builder builder)
{
- this(version, format, estimatedKeys, sections, compressionMetadata, null, sstableLevel, header);
+ version = builder.version;
+ format = builder.format;
+ estimatedKeys = builder.estimatedKeys;
+ sections = builder.sections;
+ compressionMetadata = builder.compressionMetadata;
+ compressionInfo = builder.compressionInfo;
+ sstableLevel = builder.sstableLevel;
+ serializationHeader = builder.serializationHeader;
+ tableId = builder.tableId;
+ isEntireSSTable = builder.isEntireSSTable;
+ componentManifest = builder.componentManifest;
+ firstKey = builder.firstKey;
+ size = calculateSize();
}
- public CassandraStreamHeader(Version version, SSTableFormat.Type format, long estimatedKeys, List<SSTableReader.PartitionPositionBounds> sections, CompressionInfo compressionInfo, int sstableLevel, SerializationHeader.Component header)
+ public static Builder builder()
{
- this(version, format, estimatedKeys, sections, null, compressionInfo, sstableLevel, header);
+ return new Builder();
}
public boolean isCompressed()
@@ -94,6 +109,9 @@ public class CassandraStreamHeader
private long calculateSize()
{
+ if (isEntireSSTable)
+ return componentManifest.totalSize();
+
long transferSize = 0;
if (compressionInfo != null)
{
@@ -112,9 +130,7 @@ public class CassandraStreamHeader
public synchronized void calculateCompressionInfo()
{
if (compressionMetadata != null && compressionInfo == null)
- {
compressionInfo = CompressionInfo.fromCompressionMetadata(compressionMetadata, sections);
- }
}
@Override
@@ -125,9 +141,11 @@ public class CassandraStreamHeader
", format=" + format +
", estimatedKeys=" + estimatedKeys +
", sections=" + sections +
- ", compressionInfo=" + compressionInfo +
", sstableLevel=" + sstableLevel +
- ", header=" + header +
+ ", header=" + serializationHeader +
+ ", isEntireSSTable=" + isEntireSSTable +
+ ", firstKey=" + firstKey +
+ ", tableId=" + tableId +
'}';
}
@@ -138,20 +156,26 @@ public class CassandraStreamHeader
CassandraStreamHeader that = (CassandraStreamHeader) o;
return estimatedKeys == that.estimatedKeys &&
sstableLevel == that.sstableLevel &&
+ isEntireSSTable == that.isEntireSSTable &&
Objects.equals(version, that.version) &&
format == that.format &&
Objects.equals(sections, that.sections) &&
Objects.equals(compressionInfo, that.compressionInfo) &&
- Objects.equals(header, that.header);
+ Objects.equals(serializationHeader, that.serializationHeader) &&
+ Objects.equals(componentManifest, that.componentManifest) &&
+ Objects.equals(firstKey, that.firstKey) &&
+ Objects.equals(tableId, that.tableId);
}
public int hashCode()
{
- return Objects.hash(version, format, estimatedKeys, sections, compressionInfo, sstableLevel, header);
+ return Objects.hash(version, format, estimatedKeys, sections, compressionInfo, sstableLevel, serializationHeader, componentManifest,
+ isEntireSSTable, firstKey, tableId);
}
+ public static final IVersionedSerializer<CassandraStreamHeader> serializer = new CassandraStreamHeaderSerializer();
- public static final IVersionedSerializer<CassandraStreamHeader> serializer = new IVersionedSerializer<CassandraStreamHeader>()
+ public static class CassandraStreamHeaderSerializer implements IVersionedSerializer<CassandraStreamHeader>
{
public void serialize(CassandraStreamHeader header, DataOutputPlus out, int version) throws IOException
{
@@ -168,11 +192,33 @@ public class CassandraStreamHeader
header.calculateCompressionInfo();
CompressionInfo.serializer.serialize(header.compressionInfo, out, version);
out.writeInt(header.sstableLevel);
- SerializationHeader.serializer.serialize(header.version, header.header, out);
+
+ SerializationHeader.serializer.serialize(header.version, header.serializationHeader, out);
+
+ header.tableId.serialize(out);
+ out.writeBoolean(header.isEntireSSTable);
+
+ if (header.isEntireSSTable)
+ {
+ ComponentManifest.serializer.serialize(header.componentManifest, out, version);
+ ByteBufferUtil.writeWithVIntLength(header.firstKey.getKey(), out);
+ }
}
public CassandraStreamHeader deserialize(DataInputPlus in, int version) throws IOException
{
+ return deserialize(in, version, tableId -> {
+ ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
+ if (cfs != null)
+ return cfs.getPartitioner();
+
+ return null;
+ });
+ }
+
+ @VisibleForTesting
+ public CassandraStreamHeader deserialize(DataInputPlus in, int version, Function<TableId, IPartitioner> partitionerMapper) throws IOException
+ {
Version sstableVersion = SSTableFormat.Type.current().info.getVersion(in.readUTF());
SSTableFormat.Type format = SSTableFormat.Type.validate(in.readUTF());
@@ -183,9 +229,36 @@ public class CassandraStreamHeader
sections.add(new SSTableReader.PartitionPositionBounds(in.readLong(), in.readLong()));
CompressionInfo compressionInfo = CompressionInfo.serializer.deserialize(in, version);
int sstableLevel = in.readInt();
+
SerializationHeader.Component header = SerializationHeader.serializer.deserialize(sstableVersion, in);
- return new CassandraStreamHeader(sstableVersion, format, estimatedKeys, sections, compressionInfo, sstableLevel, header);
+ TableId tableId = TableId.deserialize(in);
+ boolean isEntireSSTable = in.readBoolean();
+ ComponentManifest manifest = null;
+ DecoratedKey firstKey = null;
+
+ if (isEntireSSTable)
+ {
+ manifest = ComponentManifest.serializer.deserialize(in, version);
+ ByteBuffer keyBuf = ByteBufferUtil.readWithVIntLength(in);
+ IPartitioner partitioner = partitionerMapper.apply(tableId);
+ if (partitioner == null)
+ throw new IllegalArgumentException(String.format("Could not determine partitioner for tableId %s", tableId));
+ firstKey = partitioner.decorateKey(keyBuf);
+ }
+
+ return builder().withSSTableFormat(format)
+ .withSSTableVersion(sstableVersion)
+ .withSSTableLevel(sstableLevel)
+ .withEstimatedKeys(estimatedKeys)
+ .withSections(sections)
+ .withCompressionInfo(compressionInfo)
+ .withSerializationHeader(header)
+ .withComponentManifest(manifest)
+ .isEntireSSTable(isEntireSSTable)
+ .withFirstKey(firstKey)
+ .withTableId(tableId)
+ .build();
}
public long serializedSize(CassandraStreamHeader header, int version)
@@ -201,12 +274,127 @@ public class CassandraStreamHeader
size += TypeSizes.sizeof(section.lowerPosition);
size += TypeSizes.sizeof(section.upperPosition);
}
+
+ header.calculateCompressionInfo();
size += CompressionInfo.serializer.serializedSize(header.compressionInfo, version);
size += TypeSizes.sizeof(header.sstableLevel);
- size += SerializationHeader.serializer.serializedSize(header.version, header.header);
+ size += SerializationHeader.serializer.serializedSize(header.version, header.serializationHeader);
+
+ size += header.tableId.serializedSize();
+ size += TypeSizes.sizeof(header.isEntireSSTable);
+ if (header.isEntireSSTable)
+ {
+ size += ComponentManifest.serializer.serializedSize(header.componentManifest, version);
+ size += ByteBufferUtil.serializedSizeWithVIntLength(header.firstKey.getKey());
+ }
return size;
}
- };
+ }
+
+ public static final class Builder
+ {
+ private Version version;
+ private SSTableFormat.Type format;
+ private long estimatedKeys;
+ private List<SSTableReader.PartitionPositionBounds> sections;
+ private CompressionMetadata compressionMetadata;
+ private CompressionInfo compressionInfo;
+ private int sstableLevel;
+ private SerializationHeader.Component serializationHeader;
+ private ComponentManifest componentManifest;
+ private boolean isEntireSSTable;
+ private DecoratedKey firstKey;
+ private TableId tableId;
+
+ public Builder withSSTableFormat(SSTableFormat.Type format)
+ {
+ this.format = format;
+ return this;
+ }
+
+ public Builder withSSTableVersion(Version version)
+ {
+ this.version = version;
+ return this;
+ }
+
+ public Builder withSSTableLevel(int sstableLevel)
+ {
+ this.sstableLevel = sstableLevel;
+ return this;
+ }
+
+ public Builder withEstimatedKeys(long estimatedKeys)
+ {
+ this.estimatedKeys = estimatedKeys;
+ return this;
+ }
+
+ public Builder withSections(List<SSTableReader.PartitionPositionBounds> sections)
+ {
+ this.sections = sections;
+ return this;
+ }
+
+ public Builder withCompressionMetadata(CompressionMetadata compressionMetadata)
+ {
+ this.compressionMetadata = compressionMetadata;
+ return this;
+ }
+
+ public Builder withCompressionInfo(CompressionInfo compressionInfo)
+ {
+ this.compressionInfo = compressionInfo;
+ return this;
+ }
+
+ public Builder withSerializationHeader(SerializationHeader.Component header)
+ {
+ this.serializationHeader = header;
+ return this;
+ }
+
+ public Builder withTableId(TableId tableId)
+ {
+ this.tableId = tableId;
+ return this;
+ }
+
+ public Builder isEntireSSTable(boolean isEntireSSTable)
+ {
+ this.isEntireSSTable = isEntireSSTable;
+ return this;
+ }
+
+ public Builder withComponentManifest(ComponentManifest componentManifest)
+ {
+ this.componentManifest = componentManifest;
+ return this;
+ }
+
+ public Builder withFirstKey(DecoratedKey firstKey)
+ {
+ this.firstKey = firstKey;
+ return this;
+ }
+
+ public CassandraStreamHeader build()
+ {
+ checkNotNull(version);
+ checkNotNull(format);
+ checkNotNull(sections);
+ checkNotNull(serializationHeader);
+ checkNotNull(tableId);
+
+ if (isEntireSSTable)
+ {
+ checkNotNull(componentManifest);
+ checkNotNull(firstKey);
+ }
+
+ return new CassandraStreamHeader(this);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java
index 673b62c..43667d0 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java
@@ -47,7 +47,6 @@ import org.apache.cassandra.streaming.StreamReceiver;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.TableStreamManager;
import org.apache.cassandra.streaming.messages.StreamMessageHeader;
-import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.Ref;
import org.apache.cassandra.utils.concurrent.Refs;
@@ -152,7 +151,8 @@ public class CassandraStreamManager implements TableStreamManager
ref.release();
continue;
}
- streams.add(new CassandraOutgoingFile(session.getStreamOperation(), ref, sections, sstable.estimatedKeysForRanges(ranges)));
+ streams.add(new CassandraOutgoingFile(session.getStreamOperation(), ref, sections, ranges,
+ sstable.estimatedKeysForRanges(ranges)));
}
return streams;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
index 3930196..fccabfe 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
@@ -53,7 +53,7 @@ import org.apache.cassandra.utils.FBUtilities;
/**
* CassandraStreamReader reads from stream and writes to SSTable.
*/
-public class CassandraStreamReader
+public class CassandraStreamReader implements IStreamReader
{
private static final Logger logger = LoggerFactory.getLogger(CassandraStreamReader.class);
protected final TableId tableId;
@@ -85,7 +85,7 @@ public class CassandraStreamReader
this.pendingRepair = header.pendingRepair;
this.format = streamHeader.format;
this.sstableLevel = streamHeader.sstableLevel;
- this.header = streamHeader.header;
+ this.header = streamHeader.serializationHeader;
this.fileSeqNum = header.sequenceNumber;
}
@@ -95,6 +95,7 @@ public class CassandraStreamReader
* @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
*/
@SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed
+ @Override
public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException
{
long totalSize = totalSize();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java b/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java
new file mode 100644
index 0000000..90e3dbd
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Iterators;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+public final class ComponentManifest implements Iterable<Component>
+{
+ private final LinkedHashMap<Component, Long> components;
+
+ public ComponentManifest(Map<Component, Long> components)
+ {
+ this.components = new LinkedHashMap<>(components);
+ }
+
+ public long sizeOf(Component component)
+ {
+ Long size = components.get(component);
+ if (size == null)
+ throw new IllegalArgumentException("Component " + component + " is not present in the manifest");
+ return size;
+ }
+
+ public long totalSize()
+ {
+ long totalSize = 0;
+ for (Long size : components.values())
+ totalSize += size;
+ return totalSize;
+ }
+
+ public List<Component> components()
+ {
+ return new ArrayList<>(components.keySet());
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ return true;
+
+ if (!(o instanceof ComponentManifest))
+ return false;
+
+ ComponentManifest that = (ComponentManifest) o;
+ return components.equals(that.components);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return components.hashCode();
+ }
+
+ public static final IVersionedSerializer<ComponentManifest> serializer = new IVersionedSerializer<ComponentManifest>()
+ {
+ public void serialize(ComponentManifest manifest, DataOutputPlus out, int version) throws IOException
+ {
+ out.writeUnsignedVInt(manifest.components.size());
+ for (Map.Entry<Component, Long> entry : manifest.components.entrySet())
+ {
+ out.writeUTF(entry.getKey().name);
+ out.writeUnsignedVInt(entry.getValue());
+ }
+ }
+
+ public ComponentManifest deserialize(DataInputPlus in, int version) throws IOException
+ {
+ int size = (int) in.readUnsignedVInt();
+
+ LinkedHashMap<Component, Long> components = new LinkedHashMap<>(size);
+
+ for (int i = 0; i < size; i++)
+ {
+ Component component = Component.parse(in.readUTF());
+ long length = in.readUnsignedVInt();
+ components.put(component, length);
+ }
+
+ return new ComponentManifest(components);
+ }
+
+ public long serializedSize(ComponentManifest manifest, int version)
+ {
+ long size = TypeSizes.sizeofUnsignedVInt(manifest.components.size());
+ for (Map.Entry<Component, Long> entry : manifest.components.entrySet())
+ {
+ size += TypeSizes.sizeof(entry.getKey().name);
+ size += TypeSizes.sizeofUnsignedVInt(entry.getValue());
+ }
+ return size;
+ }
+ };
+
+ @Override
+ public Iterator<Component> iterator()
+ {
+ return Iterators.unmodifiableIterator(components.keySet().iterator());
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamReader.java
deleted file mode 100644
index c71edfb..0000000
--- a/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamReader.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.streaming;
-
-import java.io.IOException;
-
-import com.google.common.base.Throwables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.io.compress.CompressionMetadata;
-import org.apache.cassandra.io.sstable.SSTableMultiWriter;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.TrackedDataInputPlus;
-import org.apache.cassandra.streaming.ProgressInfo;
-import org.apache.cassandra.streaming.StreamSession;
-import org.apache.cassandra.streaming.messages.StreamMessageHeader;
-import org.apache.cassandra.utils.ChecksumType;
-import org.apache.cassandra.utils.FBUtilities;
-
-import static org.apache.cassandra.utils.Throwables.extractIOExceptionCause;
-
-/**
- * CassandraStreamReader that reads from streamed compressed SSTable
- */
-public class CompressedCassandraStreamReader extends CassandraStreamReader
-{
- private static final Logger logger = LoggerFactory.getLogger(CompressedCassandraStreamReader.class);
-
- protected final CompressionInfo compressionInfo;
-
- public CompressedCassandraStreamReader(StreamMessageHeader header, CassandraStreamHeader streamHeader, StreamSession session)
- {
- super(header, streamHeader, session);
- this.compressionInfo = streamHeader.compressionInfo;
- }
-
- /**
- * @return SSTable transferred
- * @throws java.io.IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
- */
- @Override
- @SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed
- public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException
- {
- long totalSize = totalSize();
-
- ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
-
- if (cfs == null)
- {
- // schema was dropped during streaming
- throw new IOException("CF " + tableId + " was dropped during streaming");
- }
-
- logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', pendingRepair = '{}', table = '{}'.",
- session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(), pendingRepair,
- cfs.getTableName());
-
- StreamDeserializer deserializer = null;
- SSTableMultiWriter writer = null;
- try (CompressedInputStream cis = new CompressedInputStream(inputPlus, compressionInfo, ChecksumType.CRC32, cfs::getCrcCheckChance))
- {
- TrackedDataInputPlus in = new TrackedDataInputPlus(cis);
- deserializer = new StreamDeserializer(cfs.metadata(), in, inputVersion, getHeader(cfs.metadata()));
- writer = createWriter(cfs, totalSize, repairedAt, pendingRepair, format);
- String filename = writer.getFilename();
- int sectionIdx = 0;
- for (SSTableReader.PartitionPositionBounds section : sections)
- {
- assert cis.getTotalCompressedBytesRead() <= totalSize;
- long sectionLength = section.upperPosition - section.lowerPosition;
-
- logger.trace("[Stream #{}] Reading section {} with length {} from stream.", session.planId(), sectionIdx++, sectionLength);
- // skip to beginning of section inside chunk
- cis.position(section.lowerPosition);
- in.reset(0);
-
- while (in.getBytesRead() < sectionLength)
- {
- writePartition(deserializer, writer);
- // when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred
- session.progress(filename, ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize);
- }
- }
- logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}", session.planId(), fileSeqNum,
- session.peer, FBUtilities.prettyPrintMemory(cis.getTotalCompressedBytesRead()), FBUtilities.prettyPrintMemory(totalSize));
- return writer;
- }
- catch (Throwable e)
- {
- Object partitionKey = deserializer != null ? deserializer.partitionKey() : "";
- logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.",
- session.planId(), partitionKey, cfs.keyspace.getName(), cfs.getTableName());
- if (writer != null)
- {
- writer.abort(e);
- }
- if (extractIOExceptionCause(e).isPresent())
- throw e;
- throw Throwables.propagate(e);
- }
- }
-
- @Override
- protected long totalSize()
- {
- long size = 0;
- // calculate total length of transferring chunks
- for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
- size += chunk.length + 4; // 4 bytes for CRC
- return size;
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org