You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2016/03/17 17:07:05 UTC
[09/12] cassandra git commit: Support streaming of older version
sstables in 3.0
Support streaming of older version sstables in 3.0
patch by Paulo Motta; reviewed by Yuki Morishita for CASSANDRA-10990
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e8651b66
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e8651b66
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e8651b66
Branch: refs/heads/trunk
Commit: e8651b6625c7f6260852f2a9c45fb189c63ab528
Parents: 7f1339c
Author: Paulo Motta <pa...@gmail.com>
Authored: Fri Feb 5 12:38:39 2016 -0300
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Mar 17 10:04:44 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 2 +
.../org/apache/cassandra/db/Directories.java | 30 +
.../cassandra/db/SerializationHeader.java | 5 +
.../org/apache/cassandra/db/Serializers.java | 114 ++--
.../columniterator/AbstractSSTableIterator.java | 4 +-
.../cassandra/hints/ChecksummedDataInput.java | 8 +-
.../org/apache/cassandra/hints/HintMessage.java | 4 +-
.../io/compress/CompressedSequentialWriter.java | 8 +-
.../io/sstable/SSTableSimpleIterator.java | 11 +-
.../io/sstable/format/SSTableReader.java | 2 +-
.../io/sstable/format/SSTableWriter.java | 2 +-
.../io/sstable/format/big/BigTableWriter.java | 4 +-
.../cassandra/io/util/BytesReadTracker.java | 30 +
.../apache/cassandra/io/util/DataPosition.java | 21 +
.../apache/cassandra/io/util/FileDataInput.java | 8 +-
.../org/apache/cassandra/io/util/FileMark.java | 20 -
.../io/util/FileSegmentInputStream.java | 6 +-
.../cassandra/io/util/RandomAccessReader.java | 8 +-
.../cassandra/io/util/RewindableDataInput.java | 30 +
.../io/util/RewindableDataInputStreamPlus.java | 569 +++++++++++++++++++
.../cassandra/io/util/SequentialWriter.java | 6 +-
.../cassandra/io/util/TrackedDataInputPlus.java | 150 +++++
.../cassandra/io/util/TrackedInputStream.java | 76 +++
.../cassandra/service/StorageService.java | 1 +
.../cassandra/streaming/StreamReader.java | 84 ++-
.../compress/CompressedStreamReader.java | 18 +-
.../streaming/messages/FileMessageHeader.java | 4 +-
.../apache/cassandra/tools/nodetool/Repair.java | 2 +-
.../cassandra/utils/BytesReadTracker.java | 153 -----
.../cassandra/utils/CloseableIterator.java | 1 -
...acy_jb_clust_compact-jb-1-CompressionInfo.db | Bin 0 -> 83 bytes
..._tables-legacy_jb_clust_compact-jb-1-Data.db | Bin 0 -> 5270 bytes
...ables-legacy_jb_clust_compact-jb-1-Filter.db | Bin 0 -> 24 bytes
...tables-legacy_jb_clust_compact-jb-1-Index.db | Bin 0 -> 157685 bytes
...s-legacy_jb_clust_compact-jb-1-Statistics.db | Bin 0 -> 6791 bytes
...bles-legacy_jb_clust_compact-jb-1-Summary.db | Bin 0 -> 71 bytes
..._tables-legacy_jb_clust_compact-jb-1-TOC.txt | 7 +
...lust_counter_compact-jb-1-CompressionInfo.db | Bin 0 -> 75 bytes
...legacy_jb_clust_counter_compact-jb-1-Data.db | Bin 0 -> 4228 bytes
...gacy_jb_clust_counter_compact-jb-1-Filter.db | Bin 0 -> 24 bytes
...egacy_jb_clust_counter_compact-jb-1-Index.db | Bin 0 -> 157685 bytes
..._jb_clust_counter_compact-jb-1-Statistics.db | Bin 0 -> 6791 bytes
...acy_jb_clust_counter_compact-jb-1-Summary.db | Bin 0 -> 71 bytes
...legacy_jb_clust_counter_compact-jb-1-TOC.txt | 7 +
...cy_jb_simple_compact-jb-1-CompressionInfo.db | Bin 0 -> 43 bytes
...tables-legacy_jb_simple_compact-jb-1-Data.db | Bin 0 -> 108 bytes
...bles-legacy_jb_simple_compact-jb-1-Filter.db | Bin 0 -> 24 bytes
...ables-legacy_jb_simple_compact-jb-1-Index.db | Bin 0 -> 75 bytes
...-legacy_jb_simple_compact-jb-1-Statistics.db | Bin 0 -> 4395 bytes
...les-legacy_jb_simple_compact-jb-1-Summary.db | Bin 0 -> 71 bytes
...tables-legacy_jb_simple_compact-jb-1-TOC.txt | 7 +
...mple_counter_compact-jb-1-CompressionInfo.db | Bin 0 -> 43 bytes
...egacy_jb_simple_counter_compact-jb-1-Data.db | Bin 0 -> 118 bytes
...acy_jb_simple_counter_compact-jb-1-Filter.db | Bin 0 -> 24 bytes
...gacy_jb_simple_counter_compact-jb-1-Index.db | Bin 0 -> 75 bytes
...jb_simple_counter_compact-jb-1-Statistics.db | Bin 0 -> 4395 bytes
...cy_jb_simple_counter_compact-jb-1-Summary.db | Bin 0 -> 71 bytes
...egacy_jb_simple_counter_compact-jb-1-TOC.txt | 7 +
...acy_ka_clust_compact-ka-1-CompressionInfo.db | Bin 0 -> 83 bytes
..._tables-legacy_ka_clust_compact-ka-1-Data.db | Bin 0 -> 5277 bytes
...les-legacy_ka_clust_compact-ka-1-Digest.sha1 | 1 +
...ables-legacy_ka_clust_compact-ka-1-Filter.db | Bin 0 -> 24 bytes
...tables-legacy_ka_clust_compact-ka-1-Index.db | Bin 0 -> 157685 bytes
...s-legacy_ka_clust_compact-ka-1-Statistics.db | Bin 0 -> 6859 bytes
...bles-legacy_ka_clust_compact-ka-1-Summary.db | Bin 0 -> 83 bytes
..._tables-legacy_ka_clust_compact-ka-1-TOC.txt | 8 +
...lust_counter_compact-ka-1-CompressionInfo.db | Bin 0 -> 75 bytes
...legacy_ka_clust_counter_compact-ka-1-Data.db | Bin 0 -> 4527 bytes
...cy_ka_clust_counter_compact-ka-1-Digest.sha1 | 1 +
...gacy_ka_clust_counter_compact-ka-1-Filter.db | Bin 0 -> 24 bytes
...egacy_ka_clust_counter_compact-ka-1-Index.db | Bin 0 -> 157685 bytes
..._ka_clust_counter_compact-ka-1-Statistics.db | Bin 0 -> 6859 bytes
...acy_ka_clust_counter_compact-ka-1-Summary.db | Bin 0 -> 83 bytes
...legacy_ka_clust_counter_compact-ka-1-TOC.txt | 8 +
...cy_ka_simple_compact-ka-1-CompressionInfo.db | Bin 0 -> 43 bytes
...tables-legacy_ka_simple_compact-ka-1-Data.db | Bin 0 -> 105 bytes
...es-legacy_ka_simple_compact-ka-1-Digest.sha1 | 1 +
...bles-legacy_ka_simple_compact-ka-1-Filter.db | Bin 0 -> 24 bytes
...ables-legacy_ka_simple_compact-ka-1-Index.db | Bin 0 -> 75 bytes
...-legacy_ka_simple_compact-ka-1-Statistics.db | Bin 0 -> 4453 bytes
...les-legacy_ka_simple_compact-ka-1-Summary.db | Bin 0 -> 83 bytes
...tables-legacy_ka_simple_compact-ka-1-TOC.txt | 8 +
...mple_counter_compact-ka-1-CompressionInfo.db | Bin 0 -> 43 bytes
...egacy_ka_simple_counter_compact-ka-1-Data.db | Bin 0 -> 124 bytes
...y_ka_simple_counter_compact-ka-1-Digest.sha1 | 1 +
...acy_ka_simple_counter_compact-ka-1-Filter.db | Bin 0 -> 24 bytes
...gacy_ka_simple_counter_compact-ka-1-Index.db | Bin 0 -> 75 bytes
...ka_simple_counter_compact-ka-1-Statistics.db | Bin 0 -> 4453 bytes
...cy_ka_simple_counter_compact-ka-1-Summary.db | Bin 0 -> 83 bytes
...egacy_ka_simple_counter_compact-ka-1-TOC.txt | 8 +
.../la-1-big-CompressionInfo.db | Bin 0 -> 83 bytes
.../legacy_la_clust_compact/la-1-big-Data.db | Bin 0 -> 5286 bytes
.../la-1-big-Digest.adler32 | 1 +
.../legacy_la_clust_compact/la-1-big-Filter.db | Bin 0 -> 24 bytes
.../legacy_la_clust_compact/la-1-big-Index.db | Bin 0 -> 157685 bytes
.../la-1-big-Statistics.db | Bin 0 -> 6859 bytes
.../legacy_la_clust_compact/la-1-big-Summary.db | Bin 0 -> 75 bytes
.../legacy_la_clust_compact/la-1-big-TOC.txt | 8 +
.../la-1-big-CompressionInfo.db | Bin 0 -> 75 bytes
.../la-1-big-Data.db | Bin 0 -> 4527 bytes
.../la-1-big-Digest.adler32 | 1 +
.../la-1-big-Filter.db | Bin 0 -> 24 bytes
.../la-1-big-Index.db | Bin 0 -> 157685 bytes
.../la-1-big-Statistics.db | Bin 0 -> 6859 bytes
.../la-1-big-Summary.db | Bin 0 -> 75 bytes
.../la-1-big-TOC.txt | 8 +
.../la-1-big-CompressionInfo.db | Bin 0 -> 43 bytes
.../legacy_la_simple_compact/la-1-big-Data.db | Bin 0 -> 106 bytes
.../la-1-big-Digest.adler32 | 1 +
.../legacy_la_simple_compact/la-1-big-Filter.db | Bin 0 -> 24 bytes
.../legacy_la_simple_compact/la-1-big-Index.db | Bin 0 -> 75 bytes
.../la-1-big-Statistics.db | Bin 0 -> 4453 bytes
.../la-1-big-Summary.db | Bin 0 -> 75 bytes
.../legacy_la_simple_compact/la-1-big-TOC.txt | 8 +
.../la-1-big-CompressionInfo.db | Bin 0 -> 43 bytes
.../la-1-big-Data.db | Bin 0 -> 124 bytes
.../la-1-big-Digest.adler32 | 1 +
.../la-1-big-Filter.db | Bin 0 -> 24 bytes
.../la-1-big-Index.db | Bin 0 -> 75 bytes
.../la-1-big-Statistics.db | Bin 0 -> 4453 bytes
.../la-1-big-Summary.db | Bin 0 -> 75 bytes
.../la-1-big-TOC.txt | 8 +
.../ma-1-big-CompressionInfo.db | Bin 0 -> 83 bytes
.../legacy_ma_clust_compact/ma-1-big-Data.db | Bin 0 -> 5393 bytes
.../ma-1-big-Digest.crc32 | 1 +
.../legacy_ma_clust_compact/ma-1-big-Filter.db | Bin 0 -> 24 bytes
.../legacy_ma_clust_compact/ma-1-big-Index.db | Bin 0 -> 157553 bytes
.../ma-1-big-Statistics.db | Bin 0 -> 7046 bytes
.../legacy_ma_clust_compact/ma-1-big-Summary.db | Bin 0 -> 47 bytes
.../legacy_ma_clust_compact/ma-1-big-TOC.txt | 8 +
.../ma-1-big-CompressionInfo.db | Bin 0 -> 75 bytes
.../ma-1-big-Data.db | Bin 0 -> 4606 bytes
.../ma-1-big-Digest.crc32 | 1 +
.../ma-1-big-Filter.db | Bin 0 -> 24 bytes
.../ma-1-big-Index.db | Bin 0 -> 157553 bytes
.../ma-1-big-Statistics.db | Bin 0 -> 7055 bytes
.../ma-1-big-Summary.db | Bin 0 -> 47 bytes
.../ma-1-big-TOC.txt | 8 +
.../ma-1-big-CompressionInfo.db | Bin 0 -> 43 bytes
.../legacy_ma_simple_compact/ma-1-big-Data.db | Bin 0 -> 91 bytes
.../ma-1-big-Digest.crc32 | 1 +
.../legacy_ma_simple_compact/ma-1-big-Filter.db | Bin 0 -> 24 bytes
.../legacy_ma_simple_compact/ma-1-big-Index.db | Bin 0 -> 26 bytes
.../ma-1-big-Statistics.db | Bin 0 -> 4640 bytes
.../ma-1-big-Summary.db | Bin 0 -> 47 bytes
.../legacy_ma_simple_compact/ma-1-big-TOC.txt | 8 +
.../ma-1-big-CompressionInfo.db | Bin 0 -> 43 bytes
.../ma-1-big-Data.db | Bin 0 -> 114 bytes
.../ma-1-big-Digest.crc32 | 1 +
.../ma-1-big-Filter.db | Bin 0 -> 24 bytes
.../ma-1-big-Index.db | Bin 0 -> 27 bytes
.../ma-1-big-Statistics.db | Bin 0 -> 4649 bytes
.../ma-1-big-Summary.db | Bin 0 -> 47 bytes
.../ma-1-big-TOC.txt | 8 +
.../cassandra/AbstractSerializationsTester.java | 1 -
.../apache/cassandra/db/DirectoriesTest.java | 98 ++--
.../cassandra/gms/SerializationsTest.java | 1 -
.../CompressedRandomAccessReaderTest.java | 6 +-
.../CompressedSequentialWriterTest.java | 4 +-
.../cassandra/io/sstable/LegacySSTableTest.java | 368 ++++++------
.../io/util/BufferedRandomAccessFileTest.java | 4 +-
.../io/util/RandomAccessReaderTest.java | 2 +-
.../util/RewindableDataInputStreamPlusTest.java | 539 ++++++++++++++++++
.../cassandra/utils/BytesReadTrackerTest.java | 104 +++-
165 files changed, 2101 insertions(+), 544 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index defc25a..51cfc16 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.5
+ * Support streaming pre-3.0 sstables (CASSANDRA-10990)
* Add backpressure to compressed commit log (CASSANDRA-10971)
* SSTableExport supports secondary index tables (CASSANDRA-11330)
* Fix sstabledump to include missing info in debug output (CASSANDRA-11321)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index c564d8d..f28df1c 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -571,6 +571,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
// clear ephemeral snapshots that were not properly cleared last session (CASSANDRA-7357)
clearEphemeralSnapshots(directories);
+ directories.removeTemporaryDirectories();
+
logger.trace("Removing temporary or obsoleted files from unfinished operations for table {}", metadata.cfName);
LifecycleTransaction.removeUnfinishedLeftovers(metadata);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index 8744d43..83321ac 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -94,9 +94,11 @@ public class Directories
public static final String BACKUPS_SUBDIR = "backups";
public static final String SNAPSHOT_SUBDIR = "snapshots";
+ public static final String TMP_SUBDIR = "tmp";
public static final String SECONDARY_INDEX_NAME_SEPARATOR = ".";
public static final DataDirectory[] dataDirectories;
+
static
{
String[] locations = DatabaseDescriptor.getAllDataFileLocations();
@@ -322,6 +324,34 @@ public class Directories
}
/**
+ * Returns a temporary subdirectory on non-blacklisted data directory
+ * that _currently_ has {@code writeSize} bytes as usable space.
+ * This method does not create the temporary directory.
+ *
+ * @throws IOError if all directories are blacklisted.
+ */
+ public File getTemporaryWriteableDirectoryAsFile(long writeSize)
+ {
+ File location = getLocationForDisk(getWriteableLocation(writeSize));
+ if (location == null)
+ return null;
+ return new File(location, TMP_SUBDIR);
+ }
+
+ public void removeTemporaryDirectories()
+ {
+ for (File dataDir : dataPaths)
+ {
+ File tmpDir = new File(dataDir, TMP_SUBDIR);
+ if (tmpDir.exists())
+ {
+ logger.debug("Removing temporary directory {}", tmpDir);
+ FileUtils.deleteRecursive(tmpDir);
+ }
+ }
+ }
+
+ /**
* Returns a non-blacklisted data directory that _currently_ has {@code writeSize} bytes as usable space.
*
* @throws IOError if all directories are blacklisted.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/db/SerializationHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java b/src/java/org/apache/cassandra/db/SerializationHeader.java
index 6e03756..0fd1281 100644
--- a/src/java/org/apache/cassandra/db/SerializationHeader.java
+++ b/src/java/org/apache/cassandra/db/SerializationHeader.java
@@ -70,6 +70,11 @@ public class SerializationHeader
this.typeMap = typeMap;
}
+ public static SerializationHeader makeWithoutStats(CFMetaData metadata)
+ {
+ return new SerializationHeader(true, metadata, metadata.partitionColumns(), EncodingStats.NO_STATS);
+ }
+
public static SerializationHeader forKeyCache(CFMetaData metadata)
{
// We don't save type information in the key cache (we could change
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/db/Serializers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Serializers.java b/src/java/org/apache/cassandra/db/Serializers.java
index 9b29d89..348fda3 100644
--- a/src/java/org/apache/cassandra/db/Serializers.java
+++ b/src/java/org/apache/cassandra/db/Serializers.java
@@ -29,8 +29,6 @@ import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.utils.ByteBufferUtil;
-import static org.apache.cassandra.io.sstable.IndexHelper.IndexInfo;
-
/**
* Holds references on serializers that depend on the table definition.
*/
@@ -48,62 +46,77 @@ public class Serializers
// unecessary (since IndexInfo.Serializer won't depend on the metadata either).
public ISerializer<ClusteringPrefix> indexEntryClusteringPrefixSerializer(final Version version, final SerializationHeader header)
{
- if (!version.storeRows())
+ if (!version.storeRows() || header == null) //null header indicates streaming from pre-3.0 sstables
{
- return new ISerializer<ClusteringPrefix>()
+ return oldFormatSerializer(version);
+ }
+
+ return newFormatSerializer(version, header);
+ }
+
+ private ISerializer<ClusteringPrefix> oldFormatSerializer(final Version version)
+ {
+ return new ISerializer<ClusteringPrefix>()
+ {
+ SerializationHeader newHeader = SerializationHeader.makeWithoutStats(metadata);
+
+ public void serialize(ClusteringPrefix clustering, DataOutputPlus out) throws IOException
{
- public void serialize(ClusteringPrefix clustering, DataOutputPlus out) throws IOException
- {
- // We should only use this for reading old sstable, never write new ones.
- throw new UnsupportedOperationException();
- }
+ //we deserialize in the old format and serialize in the new format
+ ClusteringPrefix.serializer.serialize(clustering, out,
+ version.correspondingMessagingVersion(),
+ newHeader.clusteringTypes());
+ }
+
+ public ClusteringPrefix deserialize(DataInputPlus in) throws IOException
+ {
+ // We're reading the old cellname/composite
+ ByteBuffer bb = ByteBufferUtil.readWithShortLength(in);
+ assert bb.hasRemaining(); // empty cellnames were invalid
+
+ int clusteringSize = metadata.clusteringColumns().size();
+ // If the table has no clustering column, then the cellname will just be the "column" name, which we ignore here.
+ if (clusteringSize == 0)
+ return Clustering.EMPTY;
- public ClusteringPrefix deserialize(DataInputPlus in) throws IOException
+ if (!metadata.isCompound())
+ return new Clustering(bb);
+
+ List<ByteBuffer> components = CompositeType.splitName(bb);
+ byte eoc = CompositeType.lastEOC(bb);
+
+ if (eoc == 0 || components.size() >= clusteringSize)
{
- // We're reading the old cellname/composite
- ByteBuffer bb = ByteBufferUtil.readWithShortLength(in);
- assert bb.hasRemaining(); // empty cellnames were invalid
-
- int clusteringSize = metadata.clusteringColumns().size();
- // If the table has no clustering column, then the cellname will just be the "column" name, which we ignore here.
- if (clusteringSize == 0)
- return Clustering.EMPTY;
-
- if (!metadata.isCompound())
- return new Clustering(bb);
-
- List<ByteBuffer> components = CompositeType.splitName(bb);
- byte eoc = CompositeType.lastEOC(bb);
-
- if (eoc == 0 || components.size() >= clusteringSize)
- {
- // That's a clustering.
- if (components.size() > clusteringSize)
- components = components.subList(0, clusteringSize);
-
- return new Clustering(components.toArray(new ByteBuffer[clusteringSize]));
- }
- else
- {
- // It's a range tombstone bound. It is a start since that's the only part we've ever included
- // in the index entries.
- Slice.Bound.Kind boundKind = eoc > 0
- ? Slice.Bound.Kind.EXCL_START_BOUND
- : Slice.Bound.Kind.INCL_START_BOUND;
-
- return Slice.Bound.create(boundKind, components.toArray(new ByteBuffer[components.size()]));
- }
- }
+ // That's a clustering.
+ if (components.size() > clusteringSize)
+ components = components.subList(0, clusteringSize);
- public long serializedSize(ClusteringPrefix clustering)
+ return new Clustering(components.toArray(new ByteBuffer[clusteringSize]));
+ }
+ else
{
- // We should only use this for reading old sstable, never write new ones.
- throw new UnsupportedOperationException();
+ // It's a range tombstone bound. It is a start since that's the only part we've ever included
+ // in the index entries.
+ Slice.Bound.Kind boundKind = eoc > 0
+ ? Slice.Bound.Kind.EXCL_START_BOUND
+ : Slice.Bound.Kind.INCL_START_BOUND;
+
+ return Slice.Bound.create(boundKind, components.toArray(new ByteBuffer[components.size()]));
}
- };
- }
+ }
- return new ISerializer<ClusteringPrefix>()
+ public long serializedSize(ClusteringPrefix clustering)
+ {
+ return ClusteringPrefix.serializer.serializedSize(clustering, version.correspondingMessagingVersion(),
+ newHeader.clusteringTypes());
+ }
+ };
+ }
+
+
+ private ISerializer<ClusteringPrefix> newFormatSerializer(final Version version, final SerializationHeader header)
+ {
+ return new ISerializer<ClusteringPrefix>() //Reading and writing from/to the new sstable format
{
public void serialize(ClusteringPrefix clustering, DataOutputPlus out) throws IOException
{
@@ -121,4 +134,5 @@ public class Serializers
}
};
}
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
index 8ac3dcb..0e2012e 100644
--- a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
@@ -30,7 +30,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.IndexHelper;
import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.DataPosition;
import org.apache.cassandra.utils.ByteBufferUtil;
abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
@@ -401,7 +401,7 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
private int currentIndexIdx;
// Marks the beginning of the block corresponding to currentIndexIdx.
- private FileMark mark;
+ private DataPosition mark;
public IndexState(Reader reader, ClusteringComparator comparator, RowIndexEntry indexEntry, boolean reversed)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
index 1dc6d1e..095d7f4 100644
--- a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
+++ b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
@@ -22,13 +22,9 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.zip.CRC32;
-import org.apache.cassandra.io.FSError;
-import org.apache.cassandra.io.FSReadError;
-import org.apache.cassandra.io.compress.ICompressor;
import org.apache.cassandra.io.util.ChannelProxy;
-import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.DataPosition;
import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.schema.CompressionParams;
/**
* A {@link RandomAccessReader} wrapper that calctulates the CRC in place.
@@ -48,7 +44,7 @@ public class ChecksummedDataInput extends RandomAccessReader.RandomAccessReaderW
private boolean crcUpdateDisabled;
private long limit;
- private FileMark limitMark;
+ private DataPosition limitMark;
protected ChecksummedDataInput(Builder builder)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/hints/HintMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintMessage.java b/src/java/org/apache/cassandra/hints/HintMessage.java
index e78738d..723ab6d 100644
--- a/src/java/org/apache/cassandra/hints/HintMessage.java
+++ b/src/java/org/apache/cassandra/hints/HintMessage.java
@@ -33,7 +33,7 @@ import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.BytesReadTracker;
+import org.apache.cassandra.io.util.TrackedDataInputPlus;
import org.apache.cassandra.utils.UUIDSerializer;
/**
@@ -117,7 +117,7 @@ public final class HintMessage
UUID hostId = UUIDSerializer.serializer.deserialize(in, version);
long hintSize = in.readUnsignedVInt();
- BytesReadTracker countingIn = new BytesReadTracker(in);
+ TrackedDataInputPlus countingIn = new TrackedDataInputPlus(in);
try
{
return new HintMessage(hostId, Hint.serializer.deserialize(countingIn, version));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index 14f1ba7..9bd1145 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -32,7 +32,7 @@ import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.DataIntegrityMetadata;
-import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.DataPosition;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.SequentialWriter;
import org.apache.cassandra.schema.CompressionParams;
@@ -153,7 +153,7 @@ public class CompressedSequentialWriter extends SequentialWriter
}
@Override
- public FileMark mark()
+ public DataPosition mark()
{
if (!buffer.hasRemaining())
doFlush(0);
@@ -161,7 +161,7 @@ public class CompressedSequentialWriter extends SequentialWriter
}
@Override
- public synchronized void resetAndTruncate(FileMark mark)
+ public synchronized void resetAndTruncate(DataPosition mark)
{
assert mark instanceof CompressedFileWriterMark;
@@ -306,7 +306,7 @@ public class CompressedSequentialWriter extends SequentialWriter
/**
* Class to hold a mark to the position of the file
*/
- protected static class CompressedFileWriterMark implements FileMark
+ protected static class CompressedFileWriterMark implements DataPosition
{
// chunk offset in the compressed file
final long chunkOffset;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
index 365d469..f82db4e 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.io.IOError;
import java.util.Iterator;
+import org.apache.cassandra.io.util.RewindableDataInput;
import org.apache.cassandra.utils.AbstractIterator;
import org.apache.cassandra.config.CFMetaData;
@@ -29,7 +30,7 @@ import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.DataPosition;
import org.apache.cassandra.net.MessagingService;
/**
@@ -113,11 +114,9 @@ public abstract class SSTableSimpleIterator extends AbstractIterator<Unfiltered>
// need to extract them. Which imply 2 passes (one to extract the static, then one for other value).
if (metadata.isStaticCompactTable())
{
- // Because we don't support streaming from old file version, the only case we should get there is for compaction,
- // where the DataInput should be a file based one.
- assert in instanceof FileDataInput;
- FileDataInput file = (FileDataInput)in;
- FileMark mark = file.mark();
+ assert in instanceof RewindableDataInput;
+ RewindableDataInput file = (RewindableDataInput)in;
+ DataPosition mark = file.mark();
Row staticRow = LegacyLayout.extractStaticColumns(metadata, file, metadata.partitionColumns().statics);
file.reset(mark);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 8a778b7..b9561ec 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -427,7 +427,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
System.currentTimeMillis(),
statsMetadata,
OpenReason.NORMAL,
- header.toHeader(metadata));
+ header == null? null : header.toHeader(metadata));
// special implementation of load to use non-pooled SegmentedFile builders
try(SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index 4cbbd70..5f35029 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -80,7 +80,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
this.keyCount = keyCount;
this.repairedAt = repairedAt;
this.metadataCollector = metadataCollector;
- this.header = header;
+ this.header = header != null ? header : SerializationHeader.makeWithoutStats(metadata); //null header indicates streaming from pre-3.0 sstable
this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata, descriptor.version, header);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 2335e47..d3630d7 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -55,7 +55,7 @@ public class BigTableWriter extends SSTableWriter
private final SegmentedFile.Builder dbuilder;
protected final SequentialWriter dataFile;
private DecoratedKey lastWrittenKey;
- private FileMark dataMark;
+ private DataPosition dataMark;
public BigTableWriter(Descriptor descriptor,
Long keyCount,
@@ -368,7 +368,7 @@ public class BigTableWriter extends SSTableWriter
public final SegmentedFile.Builder builder;
public final IndexSummaryBuilder summary;
public final IFilter bf;
- private FileMark mark;
+ private DataPosition mark;
IndexWriter(long keyCount, final SequentialWriter dataFile)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/io/util/BytesReadTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BytesReadTracker.java b/src/java/org/apache/cassandra/io/util/BytesReadTracker.java
new file mode 100644
index 0000000..fc83856
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/BytesReadTracker.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+public interface BytesReadTracker
+{
+ public long getBytesRead();
+
+ /**
+ * reset counter to @param count
+ */
+ public void reset(long count);
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/io/util/DataPosition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataPosition.java b/src/java/org/apache/cassandra/io/util/DataPosition.java
new file mode 100644
index 0000000..e106dae
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/DataPosition.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.util;
+
+public interface DataPosition
+{}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/io/util/FileDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileDataInput.java b/src/java/org/apache/cassandra/io/util/FileDataInput.java
index f56193b..1059b01 100644
--- a/src/java/org/apache/cassandra/io/util/FileDataInput.java
+++ b/src/java/org/apache/cassandra/io/util/FileDataInput.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.io.util;
import java.io.Closeable;
import java.io.IOException;
-public interface FileDataInput extends DataInputPlus, Closeable
+public interface FileDataInput extends RewindableDataInput, Closeable
{
String getPath();
@@ -30,11 +30,5 @@ public interface FileDataInput extends DataInputPlus, Closeable
void seek(long pos) throws IOException;
- FileMark mark();
-
- void reset(FileMark mark) throws IOException;
-
- long bytesPastMark(FileMark mark);
-
long getFilePointer();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/io/util/FileMark.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileMark.java b/src/java/org/apache/cassandra/io/util/FileMark.java
deleted file mode 100644
index 781bc1e..0000000
--- a/src/java/org/apache/cassandra/io/util/FileMark.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.util;
-
-public interface FileMark {}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/io/util/FileSegmentInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileSegmentInputStream.java b/src/java/org/apache/cassandra/io/util/FileSegmentInputStream.java
index 425c7d6..a585215 100644
--- a/src/java/org/apache/cassandra/io/util/FileSegmentInputStream.java
+++ b/src/java/org/apache/cassandra/io/util/FileSegmentInputStream.java
@@ -74,17 +74,17 @@ public class FileSegmentInputStream extends DataInputBuffer implements FileDataI
return false;
}
- public FileMark mark()
+ public DataPosition mark()
{
throw new UnsupportedOperationException();
}
- public void reset(FileMark mark)
+ public void reset(DataPosition mark)
{
throw new UnsupportedOperationException();
}
- public long bytesPastMark(FileMark mark)
+ public long bytesPastMark(DataPosition mark)
{
return 0;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index b495bf0..1943773 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -200,19 +200,19 @@ public class RandomAccessReader extends RebufferingInputStream implements FileDa
return bytes;
}
- public FileMark mark()
+ public DataPosition mark()
{
markedPointer = current();
return new BufferedRandomAccessFileMark(markedPointer);
}
- public void reset(FileMark mark)
+ public void reset(DataPosition mark)
{
assert mark instanceof BufferedRandomAccessFileMark;
seek(((BufferedRandomAccessFileMark) mark).pointer);
}
- public long bytesPastMark(FileMark mark)
+ public long bytesPastMark(DataPosition mark)
{
assert mark instanceof BufferedRandomAccessFileMark;
long bytes = current() - ((BufferedRandomAccessFileMark) mark).pointer;
@@ -262,7 +262,7 @@ public class RandomAccessReader extends RebufferingInputStream implements FileDa
/**
* Class to hold a mark to the position of the file
*/
- protected static class BufferedRandomAccessFileMark implements FileMark
+ protected static class BufferedRandomAccessFileMark implements DataPosition
{
final long pointer;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/io/util/RewindableDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RewindableDataInput.java b/src/java/org/apache/cassandra/io/util/RewindableDataInput.java
new file mode 100644
index 0000000..c202f60
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/RewindableDataInput.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.io.IOException;
+
+public interface RewindableDataInput extends DataInputPlus
+{
+ DataPosition mark();
+
+ void reset(DataPosition mark) throws IOException;
+
+ long bytesPastMark(DataPosition mark);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/io/util/RewindableDataInputStreamPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RewindableDataInputStreamPlus.java b/src/java/org/apache/cassandra/io/util/RewindableDataInputStreamPlus.java
new file mode 100644
index 0000000..3a680f4
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/RewindableDataInputStreamPlus.java
@@ -0,0 +1,569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.cassandra.utils.Throwables.maybeFail;
+import static org.apache.cassandra.utils.Throwables.merge;
+
+/**
+ * Adds mark/reset functionality to another input stream by caching read bytes to a memory buffer and
+ * spilling to disk if necessary.
+ *
+ * When the stream is marked via {@link this#mark()} or {@link this#mark(int)}, up to
+ * <code>maxMemBufferSize</code> will be cached in memory (heap). If more than
+ * <code>maxMemBufferSize</code> bytes are read while the stream is marked, the
+ * following bytes are cached on the <code>spillFile</code> for up to <code>maxDiskBufferSize</code>.
+ *
+ * Please note that successive calls to {@link this#mark()} and {@link this#reset()} will write
+ * sequentially to the same <code>spillFile</code> until <code>maxDiskBufferSize</code> is reached.
+ * At this point, if less than <code>maxDiskBufferSize</code> bytes are currently cached on the
+ * <code>spillFile</code>, the remaining bytes are written to the beginning of the file,
+ * treating the <code>spillFile</code> as a circular buffer.
+ *
+ * If more than <code>maxMemBufferSize + maxDiskBufferSize</code> are cached while the stream is marked,
+ * the following {@link this#reset()} invocation will throw a {@link IllegalStateException}.
+ *
+ */
+public class RewindableDataInputStreamPlus extends FilterInputStream implements RewindableDataInput, Closeable
+{
+ private boolean marked = false;
+ private boolean exhausted = false;
+ private AtomicBoolean closed = new AtomicBoolean(false);
+
+ protected int memAvailable = 0;
+ protected int diskTailAvailable = 0;
+ protected int diskHeadAvailable = 0;
+
+ private final File spillFile;
+ private final int initialMemBufferSize;
+ private final int maxMemBufferSize;
+ private final int maxDiskBufferSize;
+
+ private volatile byte memBuffer[];
+ private int memBufferSize;
+ private RandomAccessFile spillBuffer;
+
+ private final DataInputPlus dataReader;
+
+ public RewindableDataInputStreamPlus(InputStream in, int initialMemBufferSize, int maxMemBufferSize,
+ File spillFile, int maxDiskBufferSize)
+ {
+ super(in);
+ dataReader = new DataInputStreamPlus(this);
+ this.initialMemBufferSize = initialMemBufferSize;
+ this.maxMemBufferSize = maxMemBufferSize;
+ this.spillFile = spillFile;
+ this.maxDiskBufferSize = maxDiskBufferSize;
+ }
+
+ /* RewindableDataInput methods */
+
+ /**
+ * Marks the current position of a stream to return to this position later via the {@link this#reset(DataPosition)} method.
+ * @return An empty @link{DataPosition} object
+ */
+ public DataPosition mark()
+ {
+ mark(0);
+ return new RewindableDataInputPlusMark();
+ }
+
+ /**
+ * Rewinds to the previously marked position via the {@link this#mark()} method.
+ * @param mark it's not possible to return to a custom position, so this parameter is ignored.
+ * @throws IOException if an error ocurs while resetting
+ */
+ public void reset(DataPosition mark) throws IOException
+ {
+ reset();
+ }
+
+ public long bytesPastMark(DataPosition mark)
+ {
+ return maxMemBufferSize - memAvailable + (diskTailAvailable == -1? 0 : maxDiskBufferSize - diskHeadAvailable - diskTailAvailable);
+ }
+
+
+ protected static class RewindableDataInputPlusMark implements DataPosition
+ {
+ }
+
+ /* InputStream methods */
+
+ public boolean markSupported()
+ {
+ return true;
+ }
+
+ /**
+ * Marks the current position of a stream to return to this position
+ * later via the {@link this#reset()} method.
+ * @param readlimit the maximum amount of bytes to cache
+ */
+ public synchronized void mark(int readlimit)
+ {
+ if (marked)
+ throw new IllegalStateException("Cannot mark already marked stream.");
+
+ if (memAvailable > 0 || diskHeadAvailable > 0 || diskTailAvailable > 0)
+ throw new IllegalStateException("Can only mark stream after reading previously marked data.");
+
+ marked = true;
+ memAvailable = maxMemBufferSize;
+ diskHeadAvailable = -1;
+ diskTailAvailable = -1;
+ }
+
+ public synchronized void reset() throws IOException
+ {
+ if (!marked)
+ throw new IOException("Must call mark() before calling reset().");
+
+ if (exhausted)
+ throw new IOException(String.format("Read more than capacity: %d bytes.", maxMemBufferSize + maxDiskBufferSize));
+
+ memAvailable = maxMemBufferSize - memAvailable;
+ memBufferSize = memAvailable;
+
+ if (diskTailAvailable == -1)
+ {
+ diskHeadAvailable = 0;
+ diskTailAvailable = 0;
+ }
+ else
+ {
+ int initialPos = diskTailAvailable > 0 ? 0 : (int)getIfNotClosed(spillBuffer).getFilePointer();
+ int diskMarkpos = initialPos + diskHeadAvailable;
+ getIfNotClosed(spillBuffer).seek(diskMarkpos);
+
+ diskHeadAvailable = diskMarkpos - diskHeadAvailable;
+ diskTailAvailable = (maxDiskBufferSize - diskTailAvailable) - diskMarkpos;
+ }
+
+ marked = false;
+ }
+
+ public int available() throws IOException
+ {
+
+ return super.available() + (marked? 0 : memAvailable + diskHeadAvailable + diskTailAvailable);
+ }
+
+ public int read() throws IOException
+ {
+ int read = readOne();
+ if (read == -1)
+ return read;
+
+ if (marked)
+ {
+ //mark exhausted
+ if (isExhausted(1))
+ {
+ exhausted = true;
+ return read;
+ }
+
+ writeOne(read);
+ }
+
+ return read;
+ }
+
+ public int read(byte[] b, int off, int len) throws IOException
+ {
+ int readBytes = readMulti(b, off, len);
+ if (readBytes == -1)
+ return readBytes;
+
+ if (marked)
+ {
+ //check we have space on buffer
+ if (isExhausted(readBytes))
+ {
+ exhausted = true;
+ return readBytes;
+ }
+
+ writeMulti(b, off, readBytes);
+ }
+
+ return readBytes;
+ }
+
+ private void maybeCreateDiskBuffer() throws IOException
+ {
+ if (spillBuffer == null)
+ {
+ if (!spillFile.getParentFile().exists())
+ spillFile.getParentFile().mkdirs();
+ spillFile.createNewFile();
+
+ this.spillBuffer = new RandomAccessFile(spillFile, "rw");
+ }
+ }
+
+
+ private int readOne() throws IOException
+ {
+ if (!marked)
+ {
+ if (memAvailable > 0)
+ {
+ int pos = memBufferSize - memAvailable;
+ memAvailable--;
+ return getIfNotClosed(memBuffer)[pos] & 0xff;
+ }
+
+ if (diskTailAvailable > 0 || diskHeadAvailable > 0)
+ {
+ int read = getIfNotClosed(spillBuffer).read();
+ if (diskTailAvailable > 0)
+ diskTailAvailable--;
+ else if (diskHeadAvailable > 0)
+ diskHeadAvailable++;
+ if (diskTailAvailable == 0)
+ spillBuffer.seek(0);
+ return read;
+ }
+ }
+
+ return getIfNotClosed(in).read();
+ }
+
+ private boolean isExhausted(int readBytes)
+ {
+ return exhausted || readBytes > memAvailable + (long)(diskTailAvailable == -1? maxDiskBufferSize : diskTailAvailable + diskHeadAvailable);
+ }
+
+ private int readMulti(byte[] b, int off, int len) throws IOException
+ {
+ int readBytes = 0;
+ if (!marked)
+ {
+ if (memAvailable > 0)
+ {
+ readBytes += memAvailable < len ? memAvailable : len;
+ int pos = memBufferSize - memAvailable;
+ System.arraycopy(memBuffer, pos, b, off, readBytes);
+ memAvailable -= readBytes;
+ off += readBytes;
+ len -= readBytes;
+ }
+ if (len > 0 && diskTailAvailable > 0)
+ {
+ int readFromTail = diskTailAvailable < len? diskTailAvailable : len;
+ getIfNotClosed(spillBuffer).read(b, off, readFromTail);
+ readBytes += readFromTail;
+ diskTailAvailable -= readFromTail;
+ off += readFromTail;
+ len -= readFromTail;
+ if (diskTailAvailable == 0)
+ spillBuffer.seek(0);
+ }
+ if (len > 0 && diskHeadAvailable > 0)
+ {
+ int readFromHead = diskHeadAvailable < len? diskHeadAvailable : len;
+ getIfNotClosed(spillBuffer).read(b, off, readFromHead);
+ readBytes += readFromHead;
+ diskHeadAvailable -= readFromHead;
+ off += readFromHead;
+ len -= readFromHead;
+ }
+ }
+
+ if (len > 0)
+ readBytes += getIfNotClosed(in).read(b, off, len);
+
+ return readBytes;
+ }
+
+ private void writeMulti(byte[] b, int off, int len) throws IOException
+ {
+ if (memAvailable > 0)
+ {
+ if (memBuffer == null)
+ memBuffer = new byte[initialMemBufferSize];
+ int pos = maxMemBufferSize - memAvailable;
+ int memWritten = memAvailable < len? memAvailable : len;
+ if (pos + memWritten >= getIfNotClosed(memBuffer).length)
+ growMemBuffer(pos, memWritten);
+ System.arraycopy(b, off, memBuffer, pos, memWritten);
+ off += memWritten;
+ len -= memWritten;
+ memAvailable -= memWritten;
+ }
+
+ if (len > 0)
+ {
+ if (diskTailAvailable == -1)
+ {
+ maybeCreateDiskBuffer();
+ diskHeadAvailable = (int)spillBuffer.getFilePointer();
+ diskTailAvailable = maxDiskBufferSize - diskHeadAvailable;
+ }
+
+ if (len > 0 && diskTailAvailable > 0)
+ {
+ int diskTailWritten = diskTailAvailable < len? diskTailAvailable : len;
+ getIfNotClosed(spillBuffer).write(b, off, diskTailWritten);
+ off += diskTailWritten;
+ len -= diskTailWritten;
+ diskTailAvailable -= diskTailWritten;
+ if (diskTailAvailable == 0)
+ spillBuffer.seek(0);
+ }
+
+ if (len > 0 && diskTailAvailable > 0)
+ {
+ int diskHeadWritten = diskHeadAvailable < len? diskHeadAvailable : len;
+ getIfNotClosed(spillBuffer).write(b, off, diskHeadWritten);
+ }
+ }
+ }
+
+ private void writeOne(int value) throws IOException
+ {
+ if (memAvailable > 0)
+ {
+ if (memBuffer == null)
+ memBuffer = new byte[initialMemBufferSize];
+ int pos = maxMemBufferSize - memAvailable;
+ if (pos == getIfNotClosed(memBuffer).length)
+ growMemBuffer(pos, 1);
+ getIfNotClosed(memBuffer)[pos] = (byte)value;
+ memAvailable--;
+ return;
+ }
+
+ if (diskTailAvailable == -1)
+ {
+ maybeCreateDiskBuffer();
+ diskHeadAvailable = (int)spillBuffer.getFilePointer();
+ diskTailAvailable = maxDiskBufferSize - diskHeadAvailable;
+ }
+
+ if (diskTailAvailable > 0 || diskHeadAvailable > 0)
+ {
+ getIfNotClosed(spillBuffer).write(value);
+ if (diskTailAvailable > 0)
+ diskTailAvailable--;
+ else if (diskHeadAvailable > 0)
+ diskHeadAvailable--;
+ if (diskTailAvailable == 0)
+ spillBuffer.seek(0);
+ return;
+ }
+ }
+
+ public int read(byte[] b) throws IOException
+ {
+ return read(b, 0, b.length);
+ }
+
+ private void growMemBuffer(int pos, int writeSize)
+ {
+ int newSize = Math.min(2 * (pos + writeSize), maxMemBufferSize);
+ byte newBuffer[] = new byte[newSize];
+ System.arraycopy(memBuffer, 0, newBuffer, 0, (int)pos);
+ memBuffer = newBuffer;
+ }
+
+ public long skip(long n) throws IOException
+ {
+ long skipped = 0;
+
+ if (marked)
+ {
+ //if marked, we need to cache skipped bytes
+ while (n-- > 0 && read() != -1)
+ {
+ skipped++;
+ }
+ return skipped;
+ }
+
+ if (memAvailable > 0)
+ {
+ skipped += memAvailable < n ? memAvailable : n;
+ memAvailable -= skipped;
+ n -= skipped;
+ }
+ if (n > 0 && diskTailAvailable > 0)
+ {
+ int skipFromTail = diskTailAvailable < n? diskTailAvailable : (int)n;
+ getIfNotClosed(spillBuffer).skipBytes(skipFromTail);
+ diskTailAvailable -= skipFromTail;
+ skipped += skipFromTail;
+ n -= skipFromTail;
+ if (diskTailAvailable == 0)
+ spillBuffer.seek(0);
+ }
+ if (n > 0 && diskHeadAvailable > 0)
+ {
+ int skipFromHead = diskHeadAvailable < n? diskHeadAvailable : (int)n;
+ getIfNotClosed(spillBuffer).skipBytes(skipFromHead);
+ diskHeadAvailable -= skipFromHead;
+ skipped += skipFromHead;
+ n -= skipFromHead;
+ }
+
+ if (n > 0)
+ skipped += getIfNotClosed(in).skip(n);
+
+ return skipped;
+ }
+
+ private <T> T getIfNotClosed(T in) throws IOException {
+ if (closed.get())
+ throw new IOException("Stream closed");
+ return in;
+ }
+
+ public void close() throws IOException
+ {
+ close(true);
+ }
+
+ public void close(boolean closeUnderlying) throws IOException
+ {
+ if (closed.compareAndSet(false, true))
+ {
+ Throwable fail = null;
+ if (closeUnderlying)
+ {
+ try
+ {
+ super.close();
+ }
+ catch (IOException e)
+ {
+ fail = merge(fail, e);
+ }
+ }
+ try
+ {
+ if (spillBuffer != null)
+ {
+ this.spillBuffer.close();
+ this.spillBuffer = null;
+ }
+ } catch (IOException e)
+ {
+ fail = merge(fail, e);
+ }
+ try {
+ if (spillFile.exists())
+ {
+ spillFile.delete();
+ }
+ }
+ catch (Throwable e)
+ {
+ fail = merge(fail, e);
+ }
+ maybeFail(fail, IOException.class);
+ }
+ }
+
+ /* DataInputPlus methods */
+
+ public void readFully(byte[] b) throws IOException
+ {
+ dataReader.readFully(b);
+ }
+
+ public void readFully(byte[] b, int off, int len) throws IOException
+ {
+ dataReader.readFully(b, off, len);
+ }
+
+ public int skipBytes(int n) throws IOException
+ {
+ return dataReader.skipBytes(n);
+ }
+
+ public boolean readBoolean() throws IOException
+ {
+ return dataReader.readBoolean();
+ }
+
+ public byte readByte() throws IOException
+ {
+ return dataReader.readByte();
+ }
+
+ public int readUnsignedByte() throws IOException
+ {
+ return dataReader.readUnsignedByte();
+ }
+
+ public short readShort() throws IOException
+ {
+ return dataReader.readShort();
+ }
+
+ public int readUnsignedShort() throws IOException
+ {
+ return dataReader.readUnsignedShort();
+ }
+
+ public char readChar() throws IOException
+ {
+ return dataReader.readChar();
+ }
+
+ public int readInt() throws IOException
+ {
+ return dataReader.readInt();
+ }
+
+ public long readLong() throws IOException
+ {
+ return dataReader.readLong();
+ }
+
+ public float readFloat() throws IOException
+ {
+ return dataReader.readFloat();
+ }
+
+ public double readDouble() throws IOException
+ {
+ return dataReader.readDouble();
+ }
+
+ public String readLine() throws IOException
+ {
+ return dataReader.readLine();
+ }
+
+ public String readUTF() throws IOException
+ {
+ return dataReader.readUTF();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index dd49868..26316a2 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -297,7 +297,7 @@ public class SequentialWriter extends BufferedDataOutputStreamPlus implements Tr
return bufferOffset + (buffer == null ? 0 : buffer.position());
}
- public FileMark mark()
+ public DataPosition mark()
{
return new BufferedFileWriterMark(current());
}
@@ -306,7 +306,7 @@ public class SequentialWriter extends BufferedDataOutputStreamPlus implements Tr
* Drops all buffered data that's past the limits of our new file mark + buffer capacity, or syncs and truncates
* the underlying file to the marked position
*/
- public void resetAndTruncate(FileMark mark)
+ public void resetAndTruncate(DataPosition mark)
{
assert mark instanceof BufferedFileWriterMark;
@@ -404,7 +404,7 @@ public class SequentialWriter extends BufferedDataOutputStreamPlus implements Tr
/**
* Class to hold a mark to the position of the file
*/
- protected static class BufferedFileWriterMark implements FileMark
+ protected static class BufferedFileWriterMark implements DataPosition
{
final long pointer;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/io/util/TrackedDataInputPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/TrackedDataInputPlus.java b/src/java/org/apache/cassandra/io/util/TrackedDataInputPlus.java
new file mode 100644
index 0000000..dc5bbb6
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/TrackedDataInputPlus.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.util;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+/**
+ * This class is to track bytes read from given DataInput
+ */
+public class TrackedDataInputPlus implements DataInputPlus, BytesReadTracker
+{
+ private long bytesRead;
+ final DataInput source;
+
+ public TrackedDataInputPlus(DataInput source)
+ {
+ this.source = source;
+ }
+
+ public long getBytesRead()
+ {
+ return bytesRead;
+ }
+
+ /**
+ * reset counter to @param count
+ */
+ public void reset(long count)
+ {
+ bytesRead = count;
+ }
+
+ public boolean readBoolean() throws IOException
+ {
+ boolean bool = source.readBoolean();
+ bytesRead += 1;
+ return bool;
+ }
+
+ public byte readByte() throws IOException
+ {
+ byte b = source.readByte();
+ bytesRead += 1;
+ return b;
+ }
+
+ public char readChar() throws IOException
+ {
+ char c = source.readChar();
+ bytesRead += 2;
+ return c;
+ }
+
+ public double readDouble() throws IOException
+ {
+ double d = source.readDouble();
+ bytesRead += 8;
+ return d;
+ }
+
+ public float readFloat() throws IOException
+ {
+ float f = source.readFloat();
+ bytesRead += 4;
+ return f;
+ }
+
+ public void readFully(byte[] b, int off, int len) throws IOException
+ {
+ source.readFully(b, off, len);
+ bytesRead += len;
+ }
+
+ public void readFully(byte[] b) throws IOException
+ {
+ source.readFully(b);
+ bytesRead += b.length;
+ }
+
+ public int readInt() throws IOException
+ {
+ int i = source.readInt();
+ bytesRead += 4;
+ return i;
+ }
+
+ public String readLine() throws IOException
+ {
+ // since this method is deprecated and cannot track bytes read
+ // just throw exception
+ throw new UnsupportedOperationException();
+ }
+
+ public long readLong() throws IOException
+ {
+ long l = source.readLong();
+ bytesRead += 8;
+ return l;
+ }
+
+ public short readShort() throws IOException
+ {
+ short s = source.readShort();
+ bytesRead += 2;
+ return s;
+ }
+
+ public String readUTF() throws IOException
+ {
+ return DataInputStream.readUTF(this);
+ }
+
+ public int readUnsignedByte() throws IOException
+ {
+ int i = source.readUnsignedByte();
+ bytesRead += 1;
+ return i;
+ }
+
+ public int readUnsignedShort() throws IOException
+ {
+ int i = source.readUnsignedShort();
+ bytesRead += 2;
+ return i;
+ }
+
+ public int skipBytes(int n) throws IOException
+ {
+ int skipped = source.skipBytes(n);
+ bytesRead += skipped;
+ return skipped;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/io/util/TrackedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/TrackedInputStream.java b/src/java/org/apache/cassandra/io/util/TrackedInputStream.java
new file mode 100644
index 0000000..f398d30
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/TrackedInputStream.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.util;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * This class is to track bytes read from given DataInputStream
+ */
+public class TrackedInputStream extends FilterInputStream implements BytesReadTracker
+{
+ private long bytesRead;
+
+ public TrackedInputStream(InputStream source)
+ {
+ super(source);
+ }
+
+ public long getBytesRead()
+ {
+ return bytesRead;
+ }
+
+ /**
+ * reset counter to @param count
+ */
+ public void reset(long count)
+ {
+ bytesRead = count;
+ }
+
+ public int read() throws IOException
+ {
+ int read = super.read();
+ bytesRead += 1;
+ return read;
+ }
+
+ public int read(byte[] b, int off, int len) throws IOException
+ {
+ int read = super.read(b, off, len);
+ bytesRead += read;
+ return read;
+ }
+
+ public int read(byte[] b) throws IOException
+ {
+ int read = super.read(b);
+ bytesRead += read;
+ return read;
+ }
+
+ public long skip(long n) throws IOException
+ {
+ long skip = super.skip(n);
+ bytesRead += skip;
+ return skip;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index f445e25..fd7f4b6 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -4221,6 +4221,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return Collections.unmodifiableList(keyspaceNamesList);
}
+
public List<String> getNonSystemKeyspaces()
{
List<String> keyspaceNamesList = new ArrayList<>(Schema.instance.getNonSystemKeyspaces());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index 268f974..f8db26b 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -40,10 +40,13 @@ import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.SSTableSimpleIterator;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.util.RewindableDataInputStreamPlus;
import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.streaming.messages.FileMessageHeader;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.BytesReadTracker;
+import org.apache.cassandra.io.util.TrackedInputStream;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
@@ -105,9 +108,9 @@ public class StreamReader
session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(),
cfs.getColumnFamilyName());
- DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
- BytesReadTracker in = new BytesReadTracker(dis);
- StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion, header.toHeader(cfs.metadata));
+ TrackedInputStream in = new TrackedInputStream(new LZFInputStream(Channels.newInputStream(channel)));
+ StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion, getHeader(cfs.metadata),
+ totalSize, session.planId());
SSTableMultiWriter writer = null;
try
{
@@ -131,12 +134,22 @@ public class StreamReader
{
writer.abort(e);
}
- drain(dis, in.getBytesRead());
+ drain(in, in.getBytesRead());
if (e instanceof IOException)
throw (IOException) e;
else
throw Throwables.propagate(e);
}
+ finally
+ {
+ if (deserializer != null)
+ deserializer.cleanup();
+ }
+ }
+
+ protected SerializationHeader getHeader(CFMetaData metadata)
+ {
+ return header != null? header.toHeader(metadata) : null; //pre-3.0 sstable have no SerializationHeader
}
protected SSTableMultiWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, SSTableFormat.Type format) throws IOException
@@ -146,8 +159,7 @@ public class StreamReader
throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir), format));
-
- return cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header.toHeader(cfs.metadata), session.getTransaction(cfId));
+ return cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, getHeader(cfs.metadata), session.getTransaction(cfId));
}
protected void drain(InputStream dis, long bytesRead) throws IOException
@@ -185,6 +197,13 @@ public class StreamReader
public static class StreamDeserializer extends UnmodifiableIterator<Unfiltered> implements UnfilteredRowIterator
{
+ public static final int INITIAL_MEM_BUFFER_SIZE = Integer.getInteger("cassandra.streamdes.initial_mem_buffer_size", 32768);
+ public static final int MAX_MEM_BUFFER_SIZE = Integer.getInteger("cassandra.streamdes.max_mem_buffer_size", 1048576);
+ public static final int MAX_SPILL_FILE_SIZE = Integer.getInteger("cassandra.streamdes.max_spill_file_size", Integer.MAX_VALUE);
+
+ public static final String BUFFER_FILE_PREFIX = "buf";
+ public static final String BUFFER_FILE_SUFFIX = "dat";
+
private final CFMetaData metadata;
private final DataInputPlus in;
private final SerializationHeader header;
@@ -196,11 +215,20 @@ public class StreamReader
private Row staticRow;
private IOException exception;
- public StreamDeserializer(CFMetaData metadata, DataInputPlus in, Version version, SerializationHeader header)
+ public StreamDeserializer(CFMetaData metadata, InputStream in, Version version, SerializationHeader header,
+ long totalSize, UUID sessionId) throws IOException
{
- assert version.storeRows() : "We don't allow streaming from pre-3.0 nodes";
this.metadata = metadata;
- this.in = in;
+ // streaming pre-3.0 sstables require mark/reset support from source stream
+ if (version.correspondingMessagingVersion() < MessagingService.VERSION_30)
+ {
+ logger.trace("Initializing rewindable input stream for reading legacy sstable with {} bytes with following " +
+ "parameters: initial_mem_buffer_size={}, max_mem_buffer_size={}, max_spill_file_size={}.",
+ totalSize, INITIAL_MEM_BUFFER_SIZE, MAX_MEM_BUFFER_SIZE, MAX_SPILL_FILE_SIZE);
+ File bufferFile = getTempBufferFile(metadata, totalSize, sessionId);
+ this.in = new RewindableDataInputStreamPlus(in, INITIAL_MEM_BUFFER_SIZE, MAX_MEM_BUFFER_SIZE, bufferFile, MAX_SPILL_FILE_SIZE);
+ } else
+ this.in = new DataInputPlus.DataInputStreamPlus(in);
this.helper = new SerializationHelper(metadata, version.correspondingMessagingVersion(), SerializationHelper.Flag.PRESERVE_SIZE);
this.header = header;
}
@@ -292,5 +320,41 @@ public class StreamReader
public void close()
{
}
+
+ /* We have a separate cleanup method because sometimes close is called before exhausting the
+ StreamDeserializer (for instance, when enclosed in an try-with-resources wrapper, such as in
+ BigTableWriter.append()).
+ */
+ public void cleanup()
+ {
+ if (in instanceof RewindableDataInputStreamPlus)
+ {
+ try
+ {
+ ((RewindableDataInputStreamPlus) in).close(false);
+ }
+ catch (IOException e)
+ {
+ logger.warn("Error while closing RewindableDataInputStreamPlus.", e);
+ }
+ }
+ }
+
+ private static File getTempBufferFile(CFMetaData metadata, long totalSize, UUID sessionId) throws IOException
+ {
+ ColumnFamilyStore cfs = Keyspace.open(metadata.ksName).getColumnFamilyStore(metadata.cfName);
+ if (cfs == null)
+ {
+ // schema was dropped during streaming
+ throw new RuntimeException(String.format("CF %s.%s was dropped during streaming", metadata.ksName, metadata.cfName));
+ }
+
+ long maxSize = Math.min(MAX_SPILL_FILE_SIZE, totalSize);
+ File tmpDir = cfs.getDirectories().getTemporaryWriteableDirectoryAsFile(maxSize);
+ if (tmpDir == null)
+ throw new IOException(String.format("No sufficient disk space to stream legacy sstable from {}.{}. " +
+ "Required disk space: %s.", FBUtilities.prettyPrintMemory(maxSize)));
+ return new File(tmpDir, String.format("%s-%s.%s", BUFFER_FILE_PREFIX, sessionId, BUFFER_FILE_SUFFIX));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 5210d5b..9719587 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -24,7 +24,6 @@ import java.nio.channels.ReadableByteChannel;
import com.google.common.base.Throwables;
-import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.slf4j.Logger;
@@ -38,7 +37,7 @@ import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.StreamReader;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.messages.FileMessageHeader;
-import org.apache.cassandra.utils.BytesReadTracker;
+import org.apache.cassandra.io.util.TrackedInputStream;
import org.apache.cassandra.utils.Pair;
/**
@@ -83,8 +82,10 @@ public class CompressedStreamReader extends StreamReader
CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo,
inputVersion.compressedChecksumType(), cfs::getCrcCheckChance);
- BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
- StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion, header.toHeader(cfs.metadata));
+ TrackedInputStream in = new TrackedInputStream(cis);
+
+ StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion, getHeader(cfs.metadata),
+ totalSize, session.planId());
SSTableMultiWriter writer = null;
try
{
@@ -115,17 +116,22 @@ public class CompressedStreamReader extends StreamReader
{
if (deserializer != null)
logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.",
- session.planId(), deserializer.partitionKey(), cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ session.planId(), deserializer.partitionKey(), cfs.keyspace.getName(), cfs.getTableName());
if (writer != null)
{
writer.abort(e);
}
- drain(cis, in.getBytesRead());
+ drain(in, in.getBytesRead());
if (e instanceof IOException)
throw (IOException) e;
else
throw Throwables.propagate(e);
}
+ finally
+ {
+ if (deserializer != null)
+ deserializer.cleanup();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index e1e13b7..2b5047d 100644
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@ -204,7 +204,7 @@ public class FileMessageHeader
out.writeLong(header.repairedAt);
out.writeInt(header.sstableLevel);
- if (version >= StreamMessage.VERSION_30)
+ if (version >= StreamMessage.VERSION_30 && header.version.storeRows())
SerializationHeader.serializer.serialize(header.version, header.header, out);
return compressionInfo;
}
@@ -227,7 +227,7 @@ public class FileMessageHeader
CompressionInfo compressionInfo = CompressionInfo.serializer.deserialize(in, MessagingService.current_version);
long repairedAt = in.readLong();
int sstableLevel = in.readInt();
- SerializationHeader.Component header = version >= StreamMessage.VERSION_30
+ SerializationHeader.Component header = version >= StreamMessage.VERSION_30 && sstableVersion.storeRows()
? SerializationHeader.serializer.deserialize(sstableVersion, in)
: null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/tools/nodetool/Repair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Repair.java b/src/java/org/apache/cassandra/tools/nodetool/Repair.java
index 84b463c..bd1a916 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Repair.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Repair.java
@@ -56,7 +56,7 @@ public class Repair extends NodeToolCmd
private boolean localDC = false;
@Option(title = "specific_dc", name = {"-dc", "--in-dc"}, description = "Use -dc to repair specific datacenters")
- private List<String> specificDataCenters = new ArrayList<>();
+ private List<String> specificDataCenters = new ArrayList<>();;
@Option(title = "specific_host", name = {"-hosts", "--in-hosts"}, description = "Use -hosts to repair specific hosts")
private List<String> specificHosts = new ArrayList<>();