You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2016/03/17 17:07:07 UTC
[11/12] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.5
Merge branch 'cassandra-3.0' into cassandra-3.5
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/587773fa
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/587773fa
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/587773fa
Branch: refs/heads/cassandra-3.5
Commit: 587773fa478ff64aa46cf17760eb31d6f83fc46d
Parents: e3716ee e8651b6
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu Mar 17 10:42:20 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Mar 17 10:42:20 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 2 +
.../org/apache/cassandra/db/Directories.java | 30 +
.../cassandra/db/SerializationHeader.java | 5 +
.../org/apache/cassandra/db/Serializers.java | 114 ++--
.../columniterator/AbstractSSTableIterator.java | 4 +-
.../EncryptedFileSegmentInputStream.java | 4 +-
.../cassandra/hints/ChecksummedDataInput.java | 8 +-
.../org/apache/cassandra/hints/HintMessage.java | 4 +-
.../io/compress/CompressedSequentialWriter.java | 8 +-
.../io/sstable/SSTableSimpleIterator.java | 11 +-
.../sstable/format/RangeAwareSSTableWriter.java | 8 +-
.../io/sstable/format/SSTableReader.java | 2 +-
.../io/sstable/format/SSTableWriter.java | 2 +-
.../io/sstable/format/big/BigTableWriter.java | 4 +-
.../cassandra/io/util/BytesReadTracker.java | 30 +
.../apache/cassandra/io/util/DataPosition.java | 21 +
.../apache/cassandra/io/util/FileDataInput.java | 8 +-
.../org/apache/cassandra/io/util/FileMark.java | 20 -
.../io/util/FileSegmentInputStream.java | 12 +-
.../cassandra/io/util/RandomAccessReader.java | 8 +-
.../cassandra/io/util/RewindableDataInput.java | 30 +
.../io/util/RewindableDataInputStreamPlus.java | 569 +++++++++++++++++++
.../cassandra/io/util/SequentialWriter.java | 6 +-
.../cassandra/io/util/TrackedDataInputPlus.java | 150 +++++
.../cassandra/io/util/TrackedInputStream.java | 76 +++
.../cassandra/service/StorageService.java | 1 +
.../cassandra/streaming/StreamReader.java | 85 ++-
.../compress/CompressedStreamReader.java | 18 +-
.../streaming/messages/FileMessageHeader.java | 4 +-
.../apache/cassandra/tools/nodetool/Repair.java | 2 +-
.../cassandra/utils/BytesReadTracker.java | 153 -----
.../cassandra/utils/CloseableIterator.java | 1 -
...acy_jb_clust_compact-jb-1-CompressionInfo.db | Bin 0 -> 83 bytes
..._tables-legacy_jb_clust_compact-jb-1-Data.db | Bin 0 -> 5270 bytes
...ables-legacy_jb_clust_compact-jb-1-Filter.db | Bin 0 -> 24 bytes
...tables-legacy_jb_clust_compact-jb-1-Index.db | Bin 0 -> 157685 bytes
...s-legacy_jb_clust_compact-jb-1-Statistics.db | Bin 0 -> 6791 bytes
...bles-legacy_jb_clust_compact-jb-1-Summary.db | Bin 0 -> 71 bytes
..._tables-legacy_jb_clust_compact-jb-1-TOC.txt | 7 +
...lust_counter_compact-jb-1-CompressionInfo.db | Bin 0 -> 75 bytes
...legacy_jb_clust_counter_compact-jb-1-Data.db | Bin 0 -> 4228 bytes
...gacy_jb_clust_counter_compact-jb-1-Filter.db | Bin 0 -> 24 bytes
...egacy_jb_clust_counter_compact-jb-1-Index.db | Bin 0 -> 157685 bytes
..._jb_clust_counter_compact-jb-1-Statistics.db | Bin 0 -> 6791 bytes
...acy_jb_clust_counter_compact-jb-1-Summary.db | Bin 0 -> 71 bytes
...legacy_jb_clust_counter_compact-jb-1-TOC.txt | 7 +
...cy_jb_simple_compact-jb-1-CompressionInfo.db | Bin 0 -> 43 bytes
...tables-legacy_jb_simple_compact-jb-1-Data.db | Bin 0 -> 108 bytes
...bles-legacy_jb_simple_compact-jb-1-Filter.db | Bin 0 -> 24 bytes
...ables-legacy_jb_simple_compact-jb-1-Index.db | Bin 0 -> 75 bytes
...-legacy_jb_simple_compact-jb-1-Statistics.db | Bin 0 -> 4395 bytes
...les-legacy_jb_simple_compact-jb-1-Summary.db | Bin 0 -> 71 bytes
...tables-legacy_jb_simple_compact-jb-1-TOC.txt | 7 +
...mple_counter_compact-jb-1-CompressionInfo.db | Bin 0 -> 43 bytes
...egacy_jb_simple_counter_compact-jb-1-Data.db | Bin 0 -> 118 bytes
...acy_jb_simple_counter_compact-jb-1-Filter.db | Bin 0 -> 24 bytes
...gacy_jb_simple_counter_compact-jb-1-Index.db | Bin 0 -> 75 bytes
...jb_simple_counter_compact-jb-1-Statistics.db | Bin 0 -> 4395 bytes
...cy_jb_simple_counter_compact-jb-1-Summary.db | Bin 0 -> 71 bytes
...egacy_jb_simple_counter_compact-jb-1-TOC.txt | 7 +
...acy_ka_clust_compact-ka-1-CompressionInfo.db | Bin 0 -> 83 bytes
..._tables-legacy_ka_clust_compact-ka-1-Data.db | Bin 0 -> 5277 bytes
...les-legacy_ka_clust_compact-ka-1-Digest.sha1 | 1 +
...ables-legacy_ka_clust_compact-ka-1-Filter.db | Bin 0 -> 24 bytes
...tables-legacy_ka_clust_compact-ka-1-Index.db | Bin 0 -> 157685 bytes
...s-legacy_ka_clust_compact-ka-1-Statistics.db | Bin 0 -> 6859 bytes
...bles-legacy_ka_clust_compact-ka-1-Summary.db | Bin 0 -> 83 bytes
..._tables-legacy_ka_clust_compact-ka-1-TOC.txt | 8 +
...lust_counter_compact-ka-1-CompressionInfo.db | Bin 0 -> 75 bytes
...legacy_ka_clust_counter_compact-ka-1-Data.db | Bin 0 -> 4527 bytes
...cy_ka_clust_counter_compact-ka-1-Digest.sha1 | 1 +
...gacy_ka_clust_counter_compact-ka-1-Filter.db | Bin 0 -> 24 bytes
...egacy_ka_clust_counter_compact-ka-1-Index.db | Bin 0 -> 157685 bytes
..._ka_clust_counter_compact-ka-1-Statistics.db | Bin 0 -> 6859 bytes
...acy_ka_clust_counter_compact-ka-1-Summary.db | Bin 0 -> 83 bytes
...legacy_ka_clust_counter_compact-ka-1-TOC.txt | 8 +
...cy_ka_simple_compact-ka-1-CompressionInfo.db | Bin 0 -> 43 bytes
...tables-legacy_ka_simple_compact-ka-1-Data.db | Bin 0 -> 105 bytes
...es-legacy_ka_simple_compact-ka-1-Digest.sha1 | 1 +
...bles-legacy_ka_simple_compact-ka-1-Filter.db | Bin 0 -> 24 bytes
...ables-legacy_ka_simple_compact-ka-1-Index.db | Bin 0 -> 75 bytes
...-legacy_ka_simple_compact-ka-1-Statistics.db | Bin 0 -> 4453 bytes
...les-legacy_ka_simple_compact-ka-1-Summary.db | Bin 0 -> 83 bytes
...tables-legacy_ka_simple_compact-ka-1-TOC.txt | 8 +
...mple_counter_compact-ka-1-CompressionInfo.db | Bin 0 -> 43 bytes
...egacy_ka_simple_counter_compact-ka-1-Data.db | Bin 0 -> 124 bytes
...y_ka_simple_counter_compact-ka-1-Digest.sha1 | 1 +
...acy_ka_simple_counter_compact-ka-1-Filter.db | Bin 0 -> 24 bytes
...gacy_ka_simple_counter_compact-ka-1-Index.db | Bin 0 -> 75 bytes
...ka_simple_counter_compact-ka-1-Statistics.db | Bin 0 -> 4453 bytes
...cy_ka_simple_counter_compact-ka-1-Summary.db | Bin 0 -> 83 bytes
...egacy_ka_simple_counter_compact-ka-1-TOC.txt | 8 +
.../la-1-big-CompressionInfo.db | Bin 0 -> 83 bytes
.../legacy_la_clust_compact/la-1-big-Data.db | Bin 0 -> 5286 bytes
.../la-1-big-Digest.adler32 | 1 +
.../legacy_la_clust_compact/la-1-big-Filter.db | Bin 0 -> 24 bytes
.../legacy_la_clust_compact/la-1-big-Index.db | Bin 0 -> 157685 bytes
.../la-1-big-Statistics.db | Bin 0 -> 6859 bytes
.../legacy_la_clust_compact/la-1-big-Summary.db | Bin 0 -> 75 bytes
.../legacy_la_clust_compact/la-1-big-TOC.txt | 8 +
.../la-1-big-CompressionInfo.db | Bin 0 -> 75 bytes
.../la-1-big-Data.db | Bin 0 -> 4527 bytes
.../la-1-big-Digest.adler32 | 1 +
.../la-1-big-Filter.db | Bin 0 -> 24 bytes
.../la-1-big-Index.db | Bin 0 -> 157685 bytes
.../la-1-big-Statistics.db | Bin 0 -> 6859 bytes
.../la-1-big-Summary.db | Bin 0 -> 75 bytes
.../la-1-big-TOC.txt | 8 +
.../la-1-big-CompressionInfo.db | Bin 0 -> 43 bytes
.../legacy_la_simple_compact/la-1-big-Data.db | Bin 0 -> 106 bytes
.../la-1-big-Digest.adler32 | 1 +
.../legacy_la_simple_compact/la-1-big-Filter.db | Bin 0 -> 24 bytes
.../legacy_la_simple_compact/la-1-big-Index.db | Bin 0 -> 75 bytes
.../la-1-big-Statistics.db | Bin 0 -> 4453 bytes
.../la-1-big-Summary.db | Bin 0 -> 75 bytes
.../legacy_la_simple_compact/la-1-big-TOC.txt | 8 +
.../la-1-big-CompressionInfo.db | Bin 0 -> 43 bytes
.../la-1-big-Data.db | Bin 0 -> 124 bytes
.../la-1-big-Digest.adler32 | 1 +
.../la-1-big-Filter.db | Bin 0 -> 24 bytes
.../la-1-big-Index.db | Bin 0 -> 75 bytes
.../la-1-big-Statistics.db | Bin 0 -> 4453 bytes
.../la-1-big-Summary.db | Bin 0 -> 75 bytes
.../la-1-big-TOC.txt | 8 +
.../ma-1-big-CompressionInfo.db | Bin 0 -> 83 bytes
.../legacy_ma_clust_compact/ma-1-big-Data.db | Bin 0 -> 5393 bytes
.../ma-1-big-Digest.crc32 | 1 +
.../legacy_ma_clust_compact/ma-1-big-Filter.db | Bin 0 -> 24 bytes
.../legacy_ma_clust_compact/ma-1-big-Index.db | Bin 0 -> 157553 bytes
.../ma-1-big-Statistics.db | Bin 0 -> 7046 bytes
.../legacy_ma_clust_compact/ma-1-big-Summary.db | Bin 0 -> 47 bytes
.../legacy_ma_clust_compact/ma-1-big-TOC.txt | 8 +
.../ma-1-big-CompressionInfo.db | Bin 0 -> 75 bytes
.../ma-1-big-Data.db | Bin 0 -> 4606 bytes
.../ma-1-big-Digest.crc32 | 1 +
.../ma-1-big-Filter.db | Bin 0 -> 24 bytes
.../ma-1-big-Index.db | Bin 0 -> 157553 bytes
.../ma-1-big-Statistics.db | Bin 0 -> 7055 bytes
.../ma-1-big-Summary.db | Bin 0 -> 47 bytes
.../ma-1-big-TOC.txt | 8 +
.../ma-1-big-CompressionInfo.db | Bin 0 -> 43 bytes
.../legacy_ma_simple_compact/ma-1-big-Data.db | Bin 0 -> 91 bytes
.../ma-1-big-Digest.crc32 | 1 +
.../legacy_ma_simple_compact/ma-1-big-Filter.db | Bin 0 -> 24 bytes
.../legacy_ma_simple_compact/ma-1-big-Index.db | Bin 0 -> 26 bytes
.../ma-1-big-Statistics.db | Bin 0 -> 4640 bytes
.../ma-1-big-Summary.db | Bin 0 -> 47 bytes
.../legacy_ma_simple_compact/ma-1-big-TOC.txt | 8 +
.../ma-1-big-CompressionInfo.db | Bin 0 -> 43 bytes
.../ma-1-big-Data.db | Bin 0 -> 114 bytes
.../ma-1-big-Digest.crc32 | 1 +
.../ma-1-big-Filter.db | Bin 0 -> 24 bytes
.../ma-1-big-Index.db | Bin 0 -> 27 bytes
.../ma-1-big-Statistics.db | Bin 0 -> 4649 bytes
.../ma-1-big-Summary.db | Bin 0 -> 47 bytes
.../ma-1-big-TOC.txt | 8 +
.../cassandra/AbstractSerializationsTester.java | 1 -
.../apache/cassandra/db/DirectoriesTest.java | 98 ++--
.../cassandra/gms/SerializationsTest.java | 1 -
.../CompressedRandomAccessReaderTest.java | 6 +-
.../CompressedSequentialWriterTest.java | 4 +-
.../cassandra/io/sstable/LegacySSTableTest.java | 369 ++++++------
.../io/util/BufferedRandomAccessFileTest.java | 4 +-
.../io/util/RandomAccessReaderTest.java | 2 +-
.../util/RewindableDataInputStreamPlusTest.java | 539 ++++++++++++++++++
.../cassandra/utils/BytesReadTrackerTest.java | 104 +++-
167 files changed, 2115 insertions(+), 550 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index a01e511,51cfc16..53dd292
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,6 -1,6 +1,7 @@@
-3.0.5
+3.5
+Merged from 3.0:
+ * Support streaming pre-3.0 sstables (CASSANDRA-10990)
- * Add backpressure to compressed commit log (CASSANDRA-10971)
+ * Add backpressure to compressed or encrypted commit log (CASSANDRA-10971)
* SSTableExport supports secondary index tables (CASSANDRA-11330)
* Fix sstabledump to include missing info in debug output (CASSANDRA-11321)
* Establish and implement canonical bulk reading workload(s) (CASSANDRA-10331)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/db/Serializers.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Serializers.java
index 17f1de0,348fda3..cef06a3
--- a/src/java/org/apache/cassandra/db/Serializers.java
+++ b/src/java/org/apache/cassandra/db/Serializers.java
@@@ -46,62 -46,77 +46,77 @@@ public class Serializer
// unecessary (since IndexInfo.Serializer won't depend on the metadata either).
public ISerializer<ClusteringPrefix> indexEntryClusteringPrefixSerializer(final Version version, final SerializationHeader header)
{
- if (!version.storeRows())
+ if (!version.storeRows() || header == null) //null header indicates streaming from pre-3.0 sstables
{
- return new ISerializer<ClusteringPrefix>()
+ return oldFormatSerializer(version);
+ }
+
+ return newFormatSerializer(version, header);
+ }
+
+ private ISerializer<ClusteringPrefix> oldFormatSerializer(final Version version)
+ {
+ return new ISerializer<ClusteringPrefix>()
+ {
+ SerializationHeader newHeader = SerializationHeader.makeWithoutStats(metadata);
+
+ public void serialize(ClusteringPrefix clustering, DataOutputPlus out) throws IOException
{
- public void serialize(ClusteringPrefix clustering, DataOutputPlus out) throws IOException
- {
- // We should only use this for reading old sstable, never write new ones.
- throw new UnsupportedOperationException();
- }
+ //we deserialize in the old format and serialize in the new format
+ ClusteringPrefix.serializer.serialize(clustering, out,
+ version.correspondingMessagingVersion(),
+ newHeader.clusteringTypes());
+ }
+
+ public ClusteringPrefix deserialize(DataInputPlus in) throws IOException
+ {
+ // We're reading the old cellname/composite
+ ByteBuffer bb = ByteBufferUtil.readWithShortLength(in);
+ assert bb.hasRemaining(); // empty cellnames were invalid
+
+ int clusteringSize = metadata.clusteringColumns().size();
+ // If the table has no clustering column, then the cellname will just be the "column" name, which we ignore here.
+ if (clusteringSize == 0)
+ return Clustering.EMPTY;
+
+ if (!metadata.isCompound())
- return new Clustering(bb);
++ return Clustering.make(bb);
- public ClusteringPrefix deserialize(DataInputPlus in) throws IOException
+ List<ByteBuffer> components = CompositeType.splitName(bb);
+ byte eoc = CompositeType.lastEOC(bb);
+
+ if (eoc == 0 || components.size() >= clusteringSize)
{
- // We're reading the old cellname/composite
- ByteBuffer bb = ByteBufferUtil.readWithShortLength(in);
- assert bb.hasRemaining(); // empty cellnames were invalid
-
- int clusteringSize = metadata.clusteringColumns().size();
- // If the table has no clustering column, then the cellname will just be the "column" name, which we ignore here.
- if (clusteringSize == 0)
- return Clustering.EMPTY;
-
- if (!metadata.isCompound())
- return Clustering.make(bb);
-
- List<ByteBuffer> components = CompositeType.splitName(bb);
- byte eoc = CompositeType.lastEOC(bb);
-
- if (eoc == 0 || components.size() >= clusteringSize)
- {
- // That's a clustering.
- if (components.size() > clusteringSize)
- components = components.subList(0, clusteringSize);
-
- return Clustering.make(components.toArray(new ByteBuffer[clusteringSize]));
- }
- else
- {
- // It's a range tombstone bound. It is a start since that's the only part we've ever included
- // in the index entries.
- Slice.Bound.Kind boundKind = eoc > 0
- ? Slice.Bound.Kind.EXCL_START_BOUND
- : Slice.Bound.Kind.INCL_START_BOUND;
-
- return Slice.Bound.create(boundKind, components.toArray(new ByteBuffer[components.size()]));
- }
- }
+ // That's a clustering.
+ if (components.size() > clusteringSize)
+ components = components.subList(0, clusteringSize);
- public long serializedSize(ClusteringPrefix clustering)
- return new Clustering(components.toArray(new ByteBuffer[clusteringSize]));
++ return Clustering.make(components.toArray(new ByteBuffer[clusteringSize]));
+ }
+ else
{
- // We should only use this for reading old sstable, never write new ones.
- throw new UnsupportedOperationException();
+ // It's a range tombstone bound. It is a start since that's the only part we've ever included
+ // in the index entries.
+ Slice.Bound.Kind boundKind = eoc > 0
+ ? Slice.Bound.Kind.EXCL_START_BOUND
+ : Slice.Bound.Kind.INCL_START_BOUND;
+
+ return Slice.Bound.create(boundKind, components.toArray(new ByteBuffer[components.size()]));
}
- };
- }
+ }
- return new ISerializer<ClusteringPrefix>()
+ public long serializedSize(ClusteringPrefix clustering)
+ {
+ return ClusteringPrefix.serializer.serializedSize(clustering, version.correspondingMessagingVersion(),
+ newHeader.clusteringTypes());
+ }
+ };
+ }
+
+
+ private ISerializer<ClusteringPrefix> newFormatSerializer(final Version version, final SerializationHeader header)
+ {
+ return new ISerializer<ClusteringPrefix>() //Reading and writing from/to the new sstable format
{
public void serialize(ClusteringPrefix clustering, DataOutputPlus out) throws IOException
{
@@@ -119,4 -134,5 +134,5 @@@
}
};
}
- }
+
-}
++}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
index d55161b,0e2012e..7f2e3bb
--- a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
@@@ -29,10 -30,10 +29,10 @@@ import org.apache.cassandra.io.sstable.
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.IndexHelper;
import org.apache.cassandra.io.util.FileDataInput;
- import org.apache.cassandra.io.util.FileMark;
+ import org.apache.cassandra.io.util.DataPosition;
import org.apache.cassandra.utils.ByteBufferUtil;
-abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
+abstract class AbstractSSTableIterator implements UnfilteredRowIterator
{
protected final SSTableReader sstable;
protected final DecoratedKey key;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java
index 6915196,0000000..56bb7d6
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java
+++ b/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java
@@@ -1,73 -1,0 +1,73 @@@
+package org.apache.cassandra.db.commitlog;
+
+import java.io.DataInput;
+import java.nio.ByteBuffer;
+
++import org.apache.cassandra.io.util.DataPosition;
+import org.apache.cassandra.io.util.FileDataInput;
- import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.FileSegmentInputStream;
+
+/**
+ * Each segment of an encrypted file may contain many encrypted chunks, and each chunk needs to be individually decrypted
+ * to reconstruct the full segment.
+ */
+public class EncryptedFileSegmentInputStream extends FileSegmentInputStream implements FileDataInput, DataInput
+{
+ private final long segmentOffset;
+ private final int expectedLength;
+ private final ChunkProvider chunkProvider;
+
+ /**
+ * offset the decrypted chunks already processed in this segment.
+ */
+ private int totalChunkOffset;
+
+ public EncryptedFileSegmentInputStream(String filePath, long segmentOffset, int position, int expectedLength, ChunkProvider chunkProvider)
+ {
+ super(chunkProvider.nextChunk(), filePath, position);
+ this.segmentOffset = segmentOffset;
+ this.expectedLength = expectedLength;
+ this.chunkProvider = chunkProvider;
+ }
+
+ public interface ChunkProvider
+ {
+ /**
+ * Get the next chunk from the backing provider, if any chunks remain.
+ * @return Next chunk, else null if no more chunks remain.
+ */
+ ByteBuffer nextChunk();
+ }
+
+ public long getFilePointer()
+ {
+ return segmentOffset + totalChunkOffset + buffer.position();
+ }
+
+ public boolean isEOF()
+ {
+ return totalChunkOffset + buffer.position() >= expectedLength;
+ }
+
+ public long bytesRemaining()
+ {
+ return expectedLength - (totalChunkOffset + buffer.position());
+ }
+
+ public void seek(long position)
+ {
+ // implement this when we actually need it
+ throw new UnsupportedOperationException();
+ }
+
- public long bytesPastMark(FileMark mark)
++ public long bytesPastMark(DataPosition mark)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void reBuffer()
+ {
+ totalChunkOffset += buffer.position();
+ buffer = chunkProvider.nextChunk();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
index 674ed7f,0000000..9fcdfa4
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
@@@ -1,205 -1,0 +1,205 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.service.StorageService;
+
+public class RangeAwareSSTableWriter implements SSTableMultiWriter
+{
+ private final List<PartitionPosition> boundaries;
+ private final Directories.DataDirectory[] directories;
+ private final int sstableLevel;
+ private final long estimatedKeys;
+ private final long repairedAt;
+ private final SSTableFormat.Type format;
- private final SerializationHeader.Component header;
++ private final SerializationHeader header;
+ private final LifecycleTransaction txn;
+ private int currentIndex = -1;
+ public final ColumnFamilyStore cfs;
+ private final List<SSTableMultiWriter> finishedWriters = new ArrayList<>();
+ private final List<SSTableReader> finishedReaders = new ArrayList<>();
+ private SSTableMultiWriter currentWriter = null;
+
- public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt, SSTableFormat.Type format, int sstableLevel, long totalSize, LifecycleTransaction txn, SerializationHeader.Component header) throws IOException
++ public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt, SSTableFormat.Type format, int sstableLevel, long totalSize, LifecycleTransaction txn, SerializationHeader header) throws IOException
+ {
+ directories = cfs.getDirectories().getWriteableLocations();
+ this.sstableLevel = sstableLevel;
+ this.cfs = cfs;
+ this.estimatedKeys = estimatedKeys / directories.length;
+ this.repairedAt = repairedAt;
+ this.format = format;
+ this.txn = txn;
+ this.header = header;
+ boundaries = StorageService.getDiskBoundaries(cfs, directories);
+ if (boundaries == null)
+ {
+ Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize);
+ if (localDir == null)
+ throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
+ Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir), format));
- currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header.toHeader(cfs.metadata), txn);
++ currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header, txn);
+ }
+ }
+
+ private void maybeSwitchWriter(DecoratedKey key)
+ {
+ if (boundaries == null)
+ return;
+
+ boolean switched = false;
+ while (currentIndex < 0 || key.compareTo(boundaries.get(currentIndex)) > 0)
+ {
+ switched = true;
+ currentIndex++;
+ }
+
+ if (switched)
+ {
+ if (currentWriter != null)
+ finishedWriters.add(currentWriter);
+
+ Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(directories[currentIndex])), format);
- currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header.toHeader(cfs.metadata), txn);
++ currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header, txn);
+ }
+ }
+
+ public boolean append(UnfilteredRowIterator partition)
+ {
+ maybeSwitchWriter(partition.partitionKey());
+ return currentWriter.append(partition);
+ }
+
+ @Override
+ public Collection<SSTableReader> finish(long repairedAt, long maxDataAge, boolean openResult)
+ {
+ if (currentWriter != null)
+ finishedWriters.add(currentWriter);
+ currentWriter = null;
+ for (SSTableMultiWriter writer : finishedWriters)
+ {
+ if (writer.getFilePointer() > 0)
+ finishedReaders.addAll(writer.finish(repairedAt, maxDataAge, openResult));
+ else
+ SSTableMultiWriter.abortOrDie(writer);
+ }
+ return finishedReaders;
+ }
+
+ @Override
+ public Collection<SSTableReader> finish(boolean openResult)
+ {
+ if (currentWriter != null)
+ finishedWriters.add(currentWriter);
+ currentWriter = null;
+ for (SSTableMultiWriter writer : finishedWriters)
+ {
+ if (writer.getFilePointer() > 0)
+ finishedReaders.addAll(writer.finish(openResult));
+ else
+ SSTableMultiWriter.abortOrDie(writer);
+ }
+ return finishedReaders;
+ }
+
+ @Override
+ public Collection<SSTableReader> finished()
+ {
+ return finishedReaders;
+ }
+
+ @Override
+ public SSTableMultiWriter setOpenResult(boolean openResult)
+ {
+ finishedWriters.forEach((w) -> w.setOpenResult(openResult));
+ currentWriter.setOpenResult(openResult);
+ return this;
+ }
+
+ public String getFilename()
+ {
+ return String.join("/", cfs.keyspace.getName(), cfs.getTableName());
+ }
+
+ @Override
+ public long getFilePointer()
+ {
+ return currentWriter.getFilePointer();
+ }
+
+ @Override
+ public UUID getCfId()
+ {
+ return currentWriter.getCfId();
+ }
+
+ @Override
+ public Throwable commit(Throwable accumulate)
+ {
+ if (currentWriter != null)
+ finishedWriters.add(currentWriter);
+ currentWriter = null;
+ for (SSTableMultiWriter writer : finishedWriters)
+ accumulate = writer.commit(accumulate);
+ return accumulate;
+ }
+
+ @Override
+ public Throwable abort(Throwable accumulate)
+ {
+ if (currentWriter != null)
+ finishedWriters.add(currentWriter);
+ currentWriter = null;
+ for (SSTableMultiWriter finishedWriter : finishedWriters)
+ accumulate = finishedWriter.abort(accumulate);
+
+ return accumulate;
+ }
+
+ @Override
+ public void prepareToCommit()
+ {
+ if (currentWriter != null)
+ finishedWriters.add(currentWriter);
+ currentWriter = null;
+ finishedWriters.forEach(SSTableMultiWriter::prepareToCommit);
+ }
+
+ @Override
+ public void close()
+ {
+ if (currentWriter != null)
+ finishedWriters.add(currentWriter);
+ currentWriter = null;
+ finishedWriters.forEach(SSTableMultiWriter::close);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index ab38ba9,5f35029..6aaf776
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@@ -83,9 -80,8 +83,9 @@@ public abstract class SSTableWriter ext
this.keyCount = keyCount;
this.repairedAt = repairedAt;
this.metadataCollector = metadataCollector;
- this.header = header;
+ this.header = header != null ? header : SerializationHeader.makeWithoutStats(metadata); //null header indicates streaming from pre-3.0 sstable
this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata, descriptor.version, header);
+ this.observers = observers == null ? Collections.emptySet() : observers;
}
public static SSTableWriter create(Descriptor descriptor,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java
index 7348027,f8db26b..7d7cf8a
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@@ -35,15 -35,18 +35,18 @@@ import org.apache.cassandra.config.CFMe
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.SSTableSimpleIterator;
+import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.Version;
+ import org.apache.cassandra.io.util.RewindableDataInputStreamPlus;
import org.apache.cassandra.io.util.DataInputPlus;
+ import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.streaming.messages.FileMessageHeader;
import org.apache.cassandra.utils.ByteBufferUtil;
- import org.apache.cassandra.utils.BytesReadTracker;
+ import org.apache.cassandra.io.util.TrackedInputStream;
+ import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
@@@ -124,7 -129,7 +127,7 @@@ public class StreamReade
{
if (deserializer != null)
logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.",
-- session.planId(), deserializer.partitionKey(), cfs.keyspace.getName(), cfs.getColumnFamilyName());
++ session.planId(), deserializer.partitionKey(), cfs.keyspace.getName(), cfs.getTableName());
if (writer != null)
{
writer.abort(e);
@@@ -142,10 -157,9 +155,10 @@@
Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize);
if (localDir == null)
throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
- desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir), format));
- RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, format, sstableLevel, totalSize, session.getTransaction(cfId), header);
- return cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, getHeader(cfs.metadata), session.getTransaction(cfId));
++ RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, format, sstableLevel, totalSize, session.getTransaction(cfId), getHeader(cfs.metadata));
+ StreamHook.instance.reportIncomingFile(cfs, writer, session, fileSeqNum);
+ return writer;
}
protected void drain(InputStream dis, long bytesRead) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/587773fa/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 5a47787,9719587..318484f
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@@ -24,9 -24,7 +24,8 @@@ import java.nio.channels.ReadableByteCh
import com.google.common.base.Throwables;
- import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;