You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2016/03/17 17:07:05 UTC

[09/12] cassandra git commit: Support streaming of older version sstables in 3.0

Support streaming of older version sstables in 3.0

patch by Paulo Motta; reviewed by Yuki Morishita for CASSANDRA-10990


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e8651b66
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e8651b66
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e8651b66

Branch: refs/heads/trunk
Commit: e8651b6625c7f6260852f2a9c45fb189c63ab528
Parents: 7f1339c
Author: Paulo Motta <pa...@gmail.com>
Authored: Fri Feb 5 12:38:39 2016 -0300
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Mar 17 10:04:44 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |   2 +
 .../org/apache/cassandra/db/Directories.java    |  30 +
 .../cassandra/db/SerializationHeader.java       |   5 +
 .../org/apache/cassandra/db/Serializers.java    | 114 ++--
 .../columniterator/AbstractSSTableIterator.java |   4 +-
 .../cassandra/hints/ChecksummedDataInput.java   |   8 +-
 .../org/apache/cassandra/hints/HintMessage.java |   4 +-
 .../io/compress/CompressedSequentialWriter.java |   8 +-
 .../io/sstable/SSTableSimpleIterator.java       |  11 +-
 .../io/sstable/format/SSTableReader.java        |   2 +-
 .../io/sstable/format/SSTableWriter.java        |   2 +-
 .../io/sstable/format/big/BigTableWriter.java   |   4 +-
 .../cassandra/io/util/BytesReadTracker.java     |  30 +
 .../apache/cassandra/io/util/DataPosition.java  |  21 +
 .../apache/cassandra/io/util/FileDataInput.java |   8 +-
 .../org/apache/cassandra/io/util/FileMark.java  |  20 -
 .../io/util/FileSegmentInputStream.java         |   6 +-
 .../cassandra/io/util/RandomAccessReader.java   |   8 +-
 .../cassandra/io/util/RewindableDataInput.java  |  30 +
 .../io/util/RewindableDataInputStreamPlus.java  | 569 +++++++++++++++++++
 .../cassandra/io/util/SequentialWriter.java     |   6 +-
 .../cassandra/io/util/TrackedDataInputPlus.java | 150 +++++
 .../cassandra/io/util/TrackedInputStream.java   |  76 +++
 .../cassandra/service/StorageService.java       |   1 +
 .../cassandra/streaming/StreamReader.java       |  84 ++-
 .../compress/CompressedStreamReader.java        |  18 +-
 .../streaming/messages/FileMessageHeader.java   |   4 +-
 .../apache/cassandra/tools/nodetool/Repair.java |   2 +-
 .../cassandra/utils/BytesReadTracker.java       | 153 -----
 .../cassandra/utils/CloseableIterator.java      |   1 -
 ...acy_jb_clust_compact-jb-1-CompressionInfo.db | Bin 0 -> 83 bytes
 ..._tables-legacy_jb_clust_compact-jb-1-Data.db | Bin 0 -> 5270 bytes
 ...ables-legacy_jb_clust_compact-jb-1-Filter.db | Bin 0 -> 24 bytes
 ...tables-legacy_jb_clust_compact-jb-1-Index.db | Bin 0 -> 157685 bytes
 ...s-legacy_jb_clust_compact-jb-1-Statistics.db | Bin 0 -> 6791 bytes
 ...bles-legacy_jb_clust_compact-jb-1-Summary.db | Bin 0 -> 71 bytes
 ..._tables-legacy_jb_clust_compact-jb-1-TOC.txt |   7 +
 ...lust_counter_compact-jb-1-CompressionInfo.db | Bin 0 -> 75 bytes
 ...legacy_jb_clust_counter_compact-jb-1-Data.db | Bin 0 -> 4228 bytes
 ...gacy_jb_clust_counter_compact-jb-1-Filter.db | Bin 0 -> 24 bytes
 ...egacy_jb_clust_counter_compact-jb-1-Index.db | Bin 0 -> 157685 bytes
 ..._jb_clust_counter_compact-jb-1-Statistics.db | Bin 0 -> 6791 bytes
 ...acy_jb_clust_counter_compact-jb-1-Summary.db | Bin 0 -> 71 bytes
 ...legacy_jb_clust_counter_compact-jb-1-TOC.txt |   7 +
 ...cy_jb_simple_compact-jb-1-CompressionInfo.db | Bin 0 -> 43 bytes
 ...tables-legacy_jb_simple_compact-jb-1-Data.db | Bin 0 -> 108 bytes
 ...bles-legacy_jb_simple_compact-jb-1-Filter.db | Bin 0 -> 24 bytes
 ...ables-legacy_jb_simple_compact-jb-1-Index.db | Bin 0 -> 75 bytes
 ...-legacy_jb_simple_compact-jb-1-Statistics.db | Bin 0 -> 4395 bytes
 ...les-legacy_jb_simple_compact-jb-1-Summary.db | Bin 0 -> 71 bytes
 ...tables-legacy_jb_simple_compact-jb-1-TOC.txt |   7 +
 ...mple_counter_compact-jb-1-CompressionInfo.db | Bin 0 -> 43 bytes
 ...egacy_jb_simple_counter_compact-jb-1-Data.db | Bin 0 -> 118 bytes
 ...acy_jb_simple_counter_compact-jb-1-Filter.db | Bin 0 -> 24 bytes
 ...gacy_jb_simple_counter_compact-jb-1-Index.db | Bin 0 -> 75 bytes
 ...jb_simple_counter_compact-jb-1-Statistics.db | Bin 0 -> 4395 bytes
 ...cy_jb_simple_counter_compact-jb-1-Summary.db | Bin 0 -> 71 bytes
 ...egacy_jb_simple_counter_compact-jb-1-TOC.txt |   7 +
 ...acy_ka_clust_compact-ka-1-CompressionInfo.db | Bin 0 -> 83 bytes
 ..._tables-legacy_ka_clust_compact-ka-1-Data.db | Bin 0 -> 5277 bytes
 ...les-legacy_ka_clust_compact-ka-1-Digest.sha1 |   1 +
 ...ables-legacy_ka_clust_compact-ka-1-Filter.db | Bin 0 -> 24 bytes
 ...tables-legacy_ka_clust_compact-ka-1-Index.db | Bin 0 -> 157685 bytes
 ...s-legacy_ka_clust_compact-ka-1-Statistics.db | Bin 0 -> 6859 bytes
 ...bles-legacy_ka_clust_compact-ka-1-Summary.db | Bin 0 -> 83 bytes
 ..._tables-legacy_ka_clust_compact-ka-1-TOC.txt |   8 +
 ...lust_counter_compact-ka-1-CompressionInfo.db | Bin 0 -> 75 bytes
 ...legacy_ka_clust_counter_compact-ka-1-Data.db | Bin 0 -> 4527 bytes
 ...cy_ka_clust_counter_compact-ka-1-Digest.sha1 |   1 +
 ...gacy_ka_clust_counter_compact-ka-1-Filter.db | Bin 0 -> 24 bytes
 ...egacy_ka_clust_counter_compact-ka-1-Index.db | Bin 0 -> 157685 bytes
 ..._ka_clust_counter_compact-ka-1-Statistics.db | Bin 0 -> 6859 bytes
 ...acy_ka_clust_counter_compact-ka-1-Summary.db | Bin 0 -> 83 bytes
 ...legacy_ka_clust_counter_compact-ka-1-TOC.txt |   8 +
 ...cy_ka_simple_compact-ka-1-CompressionInfo.db | Bin 0 -> 43 bytes
 ...tables-legacy_ka_simple_compact-ka-1-Data.db | Bin 0 -> 105 bytes
 ...es-legacy_ka_simple_compact-ka-1-Digest.sha1 |   1 +
 ...bles-legacy_ka_simple_compact-ka-1-Filter.db | Bin 0 -> 24 bytes
 ...ables-legacy_ka_simple_compact-ka-1-Index.db | Bin 0 -> 75 bytes
 ...-legacy_ka_simple_compact-ka-1-Statistics.db | Bin 0 -> 4453 bytes
 ...les-legacy_ka_simple_compact-ka-1-Summary.db | Bin 0 -> 83 bytes
 ...tables-legacy_ka_simple_compact-ka-1-TOC.txt |   8 +
 ...mple_counter_compact-ka-1-CompressionInfo.db | Bin 0 -> 43 bytes
 ...egacy_ka_simple_counter_compact-ka-1-Data.db | Bin 0 -> 124 bytes
 ...y_ka_simple_counter_compact-ka-1-Digest.sha1 |   1 +
 ...acy_ka_simple_counter_compact-ka-1-Filter.db | Bin 0 -> 24 bytes
 ...gacy_ka_simple_counter_compact-ka-1-Index.db | Bin 0 -> 75 bytes
 ...ka_simple_counter_compact-ka-1-Statistics.db | Bin 0 -> 4453 bytes
 ...cy_ka_simple_counter_compact-ka-1-Summary.db | Bin 0 -> 83 bytes
 ...egacy_ka_simple_counter_compact-ka-1-TOC.txt |   8 +
 .../la-1-big-CompressionInfo.db                 | Bin 0 -> 83 bytes
 .../legacy_la_clust_compact/la-1-big-Data.db    | Bin 0 -> 5286 bytes
 .../la-1-big-Digest.adler32                     |   1 +
 .../legacy_la_clust_compact/la-1-big-Filter.db  | Bin 0 -> 24 bytes
 .../legacy_la_clust_compact/la-1-big-Index.db   | Bin 0 -> 157685 bytes
 .../la-1-big-Statistics.db                      | Bin 0 -> 6859 bytes
 .../legacy_la_clust_compact/la-1-big-Summary.db | Bin 0 -> 75 bytes
 .../legacy_la_clust_compact/la-1-big-TOC.txt    |   8 +
 .../la-1-big-CompressionInfo.db                 | Bin 0 -> 75 bytes
 .../la-1-big-Data.db                            | Bin 0 -> 4527 bytes
 .../la-1-big-Digest.adler32                     |   1 +
 .../la-1-big-Filter.db                          | Bin 0 -> 24 bytes
 .../la-1-big-Index.db                           | Bin 0 -> 157685 bytes
 .../la-1-big-Statistics.db                      | Bin 0 -> 6859 bytes
 .../la-1-big-Summary.db                         | Bin 0 -> 75 bytes
 .../la-1-big-TOC.txt                            |   8 +
 .../la-1-big-CompressionInfo.db                 | Bin 0 -> 43 bytes
 .../legacy_la_simple_compact/la-1-big-Data.db   | Bin 0 -> 106 bytes
 .../la-1-big-Digest.adler32                     |   1 +
 .../legacy_la_simple_compact/la-1-big-Filter.db | Bin 0 -> 24 bytes
 .../legacy_la_simple_compact/la-1-big-Index.db  | Bin 0 -> 75 bytes
 .../la-1-big-Statistics.db                      | Bin 0 -> 4453 bytes
 .../la-1-big-Summary.db                         | Bin 0 -> 75 bytes
 .../legacy_la_simple_compact/la-1-big-TOC.txt   |   8 +
 .../la-1-big-CompressionInfo.db                 | Bin 0 -> 43 bytes
 .../la-1-big-Data.db                            | Bin 0 -> 124 bytes
 .../la-1-big-Digest.adler32                     |   1 +
 .../la-1-big-Filter.db                          | Bin 0 -> 24 bytes
 .../la-1-big-Index.db                           | Bin 0 -> 75 bytes
 .../la-1-big-Statistics.db                      | Bin 0 -> 4453 bytes
 .../la-1-big-Summary.db                         | Bin 0 -> 75 bytes
 .../la-1-big-TOC.txt                            |   8 +
 .../ma-1-big-CompressionInfo.db                 | Bin 0 -> 83 bytes
 .../legacy_ma_clust_compact/ma-1-big-Data.db    | Bin 0 -> 5393 bytes
 .../ma-1-big-Digest.crc32                       |   1 +
 .../legacy_ma_clust_compact/ma-1-big-Filter.db  | Bin 0 -> 24 bytes
 .../legacy_ma_clust_compact/ma-1-big-Index.db   | Bin 0 -> 157553 bytes
 .../ma-1-big-Statistics.db                      | Bin 0 -> 7046 bytes
 .../legacy_ma_clust_compact/ma-1-big-Summary.db | Bin 0 -> 47 bytes
 .../legacy_ma_clust_compact/ma-1-big-TOC.txt    |   8 +
 .../ma-1-big-CompressionInfo.db                 | Bin 0 -> 75 bytes
 .../ma-1-big-Data.db                            | Bin 0 -> 4606 bytes
 .../ma-1-big-Digest.crc32                       |   1 +
 .../ma-1-big-Filter.db                          | Bin 0 -> 24 bytes
 .../ma-1-big-Index.db                           | Bin 0 -> 157553 bytes
 .../ma-1-big-Statistics.db                      | Bin 0 -> 7055 bytes
 .../ma-1-big-Summary.db                         | Bin 0 -> 47 bytes
 .../ma-1-big-TOC.txt                            |   8 +
 .../ma-1-big-CompressionInfo.db                 | Bin 0 -> 43 bytes
 .../legacy_ma_simple_compact/ma-1-big-Data.db   | Bin 0 -> 91 bytes
 .../ma-1-big-Digest.crc32                       |   1 +
 .../legacy_ma_simple_compact/ma-1-big-Filter.db | Bin 0 -> 24 bytes
 .../legacy_ma_simple_compact/ma-1-big-Index.db  | Bin 0 -> 26 bytes
 .../ma-1-big-Statistics.db                      | Bin 0 -> 4640 bytes
 .../ma-1-big-Summary.db                         | Bin 0 -> 47 bytes
 .../legacy_ma_simple_compact/ma-1-big-TOC.txt   |   8 +
 .../ma-1-big-CompressionInfo.db                 | Bin 0 -> 43 bytes
 .../ma-1-big-Data.db                            | Bin 0 -> 114 bytes
 .../ma-1-big-Digest.crc32                       |   1 +
 .../ma-1-big-Filter.db                          | Bin 0 -> 24 bytes
 .../ma-1-big-Index.db                           | Bin 0 -> 27 bytes
 .../ma-1-big-Statistics.db                      | Bin 0 -> 4649 bytes
 .../ma-1-big-Summary.db                         | Bin 0 -> 47 bytes
 .../ma-1-big-TOC.txt                            |   8 +
 .../cassandra/AbstractSerializationsTester.java |   1 -
 .../apache/cassandra/db/DirectoriesTest.java    |  98 ++--
 .../cassandra/gms/SerializationsTest.java       |   1 -
 .../CompressedRandomAccessReaderTest.java       |   6 +-
 .../CompressedSequentialWriterTest.java         |   4 +-
 .../cassandra/io/sstable/LegacySSTableTest.java | 368 ++++++------
 .../io/util/BufferedRandomAccessFileTest.java   |   4 +-
 .../io/util/RandomAccessReaderTest.java         |   2 +-
 .../util/RewindableDataInputStreamPlusTest.java | 539 ++++++++++++++++++
 .../cassandra/utils/BytesReadTrackerTest.java   | 104 +++-
 165 files changed, 2101 insertions(+), 544 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index defc25a..51cfc16 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.5
+ * Support streaming pre-3.0 sstables (CASSANDRA-10990)
  * Add backpressure to compressed commit log (CASSANDRA-10971)
  * SSTableExport supports secondary index tables (CASSANDRA-11330)
  * Fix sstabledump to include missing info in debug output (CASSANDRA-11321)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index c564d8d..f28df1c 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -571,6 +571,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
          // clear ephemeral snapshots that were not properly cleared last session (CASSANDRA-7357)
         clearEphemeralSnapshots(directories);
 
+        directories.removeTemporaryDirectories();
+
         logger.trace("Removing temporary or obsoleted files from unfinished operations for table {}", metadata.cfName);
         LifecycleTransaction.removeUnfinishedLeftovers(metadata);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index 8744d43..83321ac 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -94,9 +94,11 @@ public class Directories
 
     public static final String BACKUPS_SUBDIR = "backups";
     public static final String SNAPSHOT_SUBDIR = "snapshots";
+    public static final String TMP_SUBDIR = "tmp";
     public static final String SECONDARY_INDEX_NAME_SEPARATOR = ".";
 
     public static final DataDirectory[] dataDirectories;
+
     static
     {
         String[] locations = DatabaseDescriptor.getAllDataFileLocations();
@@ -322,6 +324,34 @@ public class Directories
     }
 
     /**
+     * Returns a temporary subdirectory on non-blacklisted data directory
+     * that _currently_ has {@code writeSize} bytes as usable space.
+     * This method does not create the temporary directory.
+     *
+     * @throws IOError if all directories are blacklisted.
+     */
+    public File getTemporaryWriteableDirectoryAsFile(long writeSize)
+    {
+        File location = getLocationForDisk(getWriteableLocation(writeSize));
+        if (location == null)
+            return null;
+        return new File(location, TMP_SUBDIR);
+    }
+
+    public void removeTemporaryDirectories()
+    {
+        for (File dataDir : dataPaths)
+        {
+            File tmpDir = new File(dataDir, TMP_SUBDIR);
+            if (tmpDir.exists())
+            {
+                logger.debug("Removing temporary directory {}", tmpDir);
+                FileUtils.deleteRecursive(tmpDir);
+            }
+        }
+    }
+
+    /**
      * Returns a non-blacklisted data directory that _currently_ has {@code writeSize} bytes as usable space.
      *
      * @throws IOError if all directories are blacklisted.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/db/SerializationHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java b/src/java/org/apache/cassandra/db/SerializationHeader.java
index 6e03756..0fd1281 100644
--- a/src/java/org/apache/cassandra/db/SerializationHeader.java
+++ b/src/java/org/apache/cassandra/db/SerializationHeader.java
@@ -70,6 +70,11 @@ public class SerializationHeader
         this.typeMap = typeMap;
     }
 
+    public static SerializationHeader makeWithoutStats(CFMetaData metadata)
+    {
+        return new SerializationHeader(true, metadata, metadata.partitionColumns(), EncodingStats.NO_STATS);
+    }
+
     public static SerializationHeader forKeyCache(CFMetaData metadata)
     {
         // We don't save type information in the key cache (we could change

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/db/Serializers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Serializers.java b/src/java/org/apache/cassandra/db/Serializers.java
index 9b29d89..348fda3 100644
--- a/src/java/org/apache/cassandra/db/Serializers.java
+++ b/src/java/org/apache/cassandra/db/Serializers.java
@@ -29,8 +29,6 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
-import static org.apache.cassandra.io.sstable.IndexHelper.IndexInfo;
-
 /**
  * Holds references on serializers that depend on the table definition.
  */
@@ -48,62 +46,77 @@ public class Serializers
     // unecessary (since IndexInfo.Serializer won't depend on the metadata either).
     public ISerializer<ClusteringPrefix> indexEntryClusteringPrefixSerializer(final Version version, final SerializationHeader header)
     {
-        if (!version.storeRows())
+        if (!version.storeRows() || header ==  null) //null header indicates streaming from pre-3.0 sstables
         {
-            return new ISerializer<ClusteringPrefix>()
+            return oldFormatSerializer(version);
+        }
+
+        return newFormatSerializer(version, header);
+    }
+
+    private ISerializer<ClusteringPrefix> oldFormatSerializer(final Version version)
+    {
+        return new ISerializer<ClusteringPrefix>()
+        {
+            SerializationHeader newHeader = SerializationHeader.makeWithoutStats(metadata);
+
+            public void serialize(ClusteringPrefix clustering, DataOutputPlus out) throws IOException
             {
-                public void serialize(ClusteringPrefix clustering, DataOutputPlus out) throws IOException
-                {
-                    // We should only use this for reading old sstable, never write new ones.
-                    throw new UnsupportedOperationException();
-                }
+                //we deserialize in the old format and serialize in the new format
+                ClusteringPrefix.serializer.serialize(clustering, out,
+                                                      version.correspondingMessagingVersion(),
+                                                      newHeader.clusteringTypes());
+            }
+
+            public ClusteringPrefix deserialize(DataInputPlus in) throws IOException
+            {
+                // We're reading the old cellname/composite
+                ByteBuffer bb = ByteBufferUtil.readWithShortLength(in);
+                assert bb.hasRemaining(); // empty cellnames were invalid
+
+                int clusteringSize = metadata.clusteringColumns().size();
+                // If the table has no clustering column, then the cellname will just be the "column" name, which we ignore here.
+                if (clusteringSize == 0)
+                    return Clustering.EMPTY;
 
-                public ClusteringPrefix deserialize(DataInputPlus in) throws IOException
+                if (!metadata.isCompound())
+                    return new Clustering(bb);
+
+                List<ByteBuffer> components = CompositeType.splitName(bb);
+                byte eoc = CompositeType.lastEOC(bb);
+
+                if (eoc == 0 || components.size() >= clusteringSize)
                 {
-                    // We're reading the old cellname/composite
-                    ByteBuffer bb = ByteBufferUtil.readWithShortLength(in);
-                    assert bb.hasRemaining(); // empty cellnames were invalid
-
-                    int clusteringSize = metadata.clusteringColumns().size();
-                    // If the table has no clustering column, then the cellname will just be the "column" name, which we ignore here.
-                    if (clusteringSize == 0)
-                        return Clustering.EMPTY;
-
-                    if (!metadata.isCompound())
-                        return new Clustering(bb);
-
-                    List<ByteBuffer> components = CompositeType.splitName(bb);
-                    byte eoc = CompositeType.lastEOC(bb);
-
-                    if (eoc == 0 || components.size() >= clusteringSize)
-                    {
-                        // That's a clustering.
-                        if (components.size() > clusteringSize)
-                            components = components.subList(0, clusteringSize);
-
-                        return new Clustering(components.toArray(new ByteBuffer[clusteringSize]));
-                    }
-                    else
-                    {
-                        // It's a range tombstone bound. It is a start since that's the only part we've ever included
-                        // in the index entries.
-                        Slice.Bound.Kind boundKind = eoc > 0
-                                                   ? Slice.Bound.Kind.EXCL_START_BOUND
-                                                   : Slice.Bound.Kind.INCL_START_BOUND;
-
-                        return Slice.Bound.create(boundKind, components.toArray(new ByteBuffer[components.size()]));
-                    }
-                }
+                    // That's a clustering.
+                    if (components.size() > clusteringSize)
+                        components = components.subList(0, clusteringSize);
 
-                public long serializedSize(ClusteringPrefix clustering)
+                    return new Clustering(components.toArray(new ByteBuffer[clusteringSize]));
+                }
+                else
                 {
-                    // We should only use this for reading old sstable, never write new ones.
-                    throw new UnsupportedOperationException();
+                    // It's a range tombstone bound. It is a start since that's the only part we've ever included
+                    // in the index entries.
+                    Slice.Bound.Kind boundKind = eoc > 0
+                                                 ? Slice.Bound.Kind.EXCL_START_BOUND
+                                                 : Slice.Bound.Kind.INCL_START_BOUND;
+
+                    return Slice.Bound.create(boundKind, components.toArray(new ByteBuffer[components.size()]));
                 }
-            };
-        }
+            }
 
-        return new ISerializer<ClusteringPrefix>()
+            public long serializedSize(ClusteringPrefix clustering)
+            {
+                return ClusteringPrefix.serializer.serializedSize(clustering, version.correspondingMessagingVersion(),
+                                                                  newHeader.clusteringTypes());
+            }
+        };
+    }
+
+
+    private ISerializer<ClusteringPrefix> newFormatSerializer(final Version version, final SerializationHeader header)
+    {
+        return new ISerializer<ClusteringPrefix>() //Reading and writing from/to the new sstable format
         {
             public void serialize(ClusteringPrefix clustering, DataOutputPlus out) throws IOException
             {
@@ -121,4 +134,5 @@ public class Serializers
             }
         };
     }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
index 8ac3dcb..0e2012e 100644
--- a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
@@ -30,7 +30,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.sstable.IndexHelper;
 import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.DataPosition;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
@@ -401,7 +401,7 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
         private int currentIndexIdx;
 
         // Marks the beginning of the block corresponding to currentIndexIdx.
-        private FileMark mark;
+        private DataPosition mark;
 
         public IndexState(Reader reader, ClusteringComparator comparator, RowIndexEntry indexEntry, boolean reversed)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
index 1dc6d1e..095d7f4 100644
--- a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
+++ b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
@@ -22,13 +22,9 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.zip.CRC32;
 
-import org.apache.cassandra.io.FSError;
-import org.apache.cassandra.io.FSReadError;
-import org.apache.cassandra.io.compress.ICompressor;
 import org.apache.cassandra.io.util.ChannelProxy;
-import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.DataPosition;
 import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.schema.CompressionParams;
 
 /**
  * A {@link RandomAccessReader} wrapper that calctulates the CRC in place.
@@ -48,7 +44,7 @@ public class ChecksummedDataInput extends RandomAccessReader.RandomAccessReaderW
     private boolean crcUpdateDisabled;
 
     private long limit;
-    private FileMark limitMark;
+    private DataPosition limitMark;
 
     protected ChecksummedDataInput(Builder builder)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/hints/HintMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintMessage.java b/src/java/org/apache/cassandra/hints/HintMessage.java
index e78738d..723ab6d 100644
--- a/src/java/org/apache/cassandra/hints/HintMessage.java
+++ b/src/java/org/apache/cassandra/hints/HintMessage.java
@@ -33,7 +33,7 @@ import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.BytesReadTracker;
+import org.apache.cassandra.io.util.TrackedDataInputPlus;
 import org.apache.cassandra.utils.UUIDSerializer;
 
 /**
@@ -117,7 +117,7 @@ public final class HintMessage
             UUID hostId = UUIDSerializer.serializer.deserialize(in, version);
 
             long hintSize = in.readUnsignedVInt();
-            BytesReadTracker countingIn = new BytesReadTracker(in);
+            TrackedDataInputPlus countingIn = new TrackedDataInputPlus(in);
             try
             {
                 return new HintMessage(hostId, Hint.serializer.deserialize(countingIn, version));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index 14f1ba7..9bd1145 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -32,7 +32,7 @@ import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.DataIntegrityMetadata;
-import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.DataPosition;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.SequentialWriter;
 import org.apache.cassandra.schema.CompressionParams;
@@ -153,7 +153,7 @@ public class CompressedSequentialWriter extends SequentialWriter
     }
 
     @Override
-    public FileMark mark()
+    public DataPosition mark()
     {
         if (!buffer.hasRemaining())
             doFlush(0);
@@ -161,7 +161,7 @@ public class CompressedSequentialWriter extends SequentialWriter
     }
 
     @Override
-    public synchronized void resetAndTruncate(FileMark mark)
+    public synchronized void resetAndTruncate(DataPosition mark)
     {
         assert mark instanceof CompressedFileWriterMark;
 
@@ -306,7 +306,7 @@ public class CompressedSequentialWriter extends SequentialWriter
     /**
      * Class to hold a mark to the position of the file
      */
-    protected static class CompressedFileWriterMark implements FileMark
+    protected static class CompressedFileWriterMark implements DataPosition
     {
         // chunk offset in the compressed file
         final long chunkOffset;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
index 365d469..f82db4e 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.io.IOError;
 import java.util.Iterator;
 
+import org.apache.cassandra.io.util.RewindableDataInput;
 import org.apache.cassandra.utils.AbstractIterator;
 
 import org.apache.cassandra.config.CFMetaData;
@@ -29,7 +30,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.DataPosition;
 import org.apache.cassandra.net.MessagingService;
 
 /**
@@ -113,11 +114,9 @@ public abstract class SSTableSimpleIterator extends AbstractIterator<Unfiltered>
                 // need to extract them. Which imply 2 passes (one to extract the static, then one for other value).
                 if (metadata.isStaticCompactTable())
                 {
-                    // Because we don't support streaming from old file version, the only case we should get there is for compaction,
-                    // where the DataInput should be a file based one.
-                    assert in instanceof FileDataInput;
-                    FileDataInput file = (FileDataInput)in;
-                    FileMark mark = file.mark();
+                    assert in instanceof RewindableDataInput;
+                    RewindableDataInput file = (RewindableDataInput)in;
+                    DataPosition mark = file.mark();
                     Row staticRow = LegacyLayout.extractStaticColumns(metadata, file, metadata.partitionColumns().statics);
                     file.reset(mark);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 8a778b7..b9561ec 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -427,7 +427,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
                                              System.currentTimeMillis(),
                                              statsMetadata,
                                              OpenReason.NORMAL,
-                                             header.toHeader(metadata));
+                                             header == null? null : header.toHeader(metadata));
 
         // special implementation of load to use non-pooled SegmentedFile builders
         try(SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index 4cbbd70..5f35029 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -80,7 +80,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
         this.keyCount = keyCount;
         this.repairedAt = repairedAt;
         this.metadataCollector = metadataCollector;
-        this.header = header;
+        this.header = header != null ? header : SerializationHeader.makeWithoutStats(metadata); //null header indicates streaming from pre-3.0 sstable
         this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata, descriptor.version, header);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 2335e47..d3630d7 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -55,7 +55,7 @@ public class BigTableWriter extends SSTableWriter
     private final SegmentedFile.Builder dbuilder;
     protected final SequentialWriter dataFile;
     private DecoratedKey lastWrittenKey;
-    private FileMark dataMark;
+    private DataPosition dataMark;
 
     public BigTableWriter(Descriptor descriptor, 
                           Long keyCount, 
@@ -368,7 +368,7 @@ public class BigTableWriter extends SSTableWriter
         public final SegmentedFile.Builder builder;
         public final IndexSummaryBuilder summary;
         public final IFilter bf;
-        private FileMark mark;
+        private DataPosition mark;
 
         IndexWriter(long keyCount, final SequentialWriter dataFile)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/io/util/BytesReadTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BytesReadTracker.java b/src/java/org/apache/cassandra/io/util/BytesReadTracker.java
new file mode 100644
index 0000000..fc83856
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/BytesReadTracker.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+public interface BytesReadTracker
+{
+    public long getBytesRead();
+
+    /**
+     * reset counter to @param count
+     */
+    public void reset(long count);
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/io/util/DataPosition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataPosition.java b/src/java/org/apache/cassandra/io/util/DataPosition.java
new file mode 100644
index 0000000..e106dae
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/DataPosition.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.util;
+
+public interface DataPosition
+{}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/io/util/FileDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileDataInput.java b/src/java/org/apache/cassandra/io/util/FileDataInput.java
index f56193b..1059b01 100644
--- a/src/java/org/apache/cassandra/io/util/FileDataInput.java
+++ b/src/java/org/apache/cassandra/io/util/FileDataInput.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.io.util;
 import java.io.Closeable;
 import java.io.IOException;
 
-public interface FileDataInput extends DataInputPlus, Closeable
+public interface FileDataInput extends RewindableDataInput, Closeable
 {
     String getPath();
 
@@ -30,11 +30,5 @@ public interface FileDataInput extends DataInputPlus, Closeable
 
     void seek(long pos) throws IOException;
 
-    FileMark mark();
-
-    void reset(FileMark mark) throws IOException;
-
-    long bytesPastMark(FileMark mark);
-
     long getFilePointer();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/io/util/FileMark.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileMark.java b/src/java/org/apache/cassandra/io/util/FileMark.java
deleted file mode 100644
index 781bc1e..0000000
--- a/src/java/org/apache/cassandra/io/util/FileMark.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.util;
-
-public interface FileMark {}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/io/util/FileSegmentInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileSegmentInputStream.java b/src/java/org/apache/cassandra/io/util/FileSegmentInputStream.java
index 425c7d6..a585215 100644
--- a/src/java/org/apache/cassandra/io/util/FileSegmentInputStream.java
+++ b/src/java/org/apache/cassandra/io/util/FileSegmentInputStream.java
@@ -74,17 +74,17 @@ public class FileSegmentInputStream extends DataInputBuffer implements FileDataI
         return false;
     }
 
-    public FileMark mark()
+    public DataPosition mark()
     {
         throw new UnsupportedOperationException();
     }
 
-    public void reset(FileMark mark)
+    public void reset(DataPosition mark)
     {
         throw new UnsupportedOperationException();
     }
 
-    public long bytesPastMark(FileMark mark)
+    public long bytesPastMark(DataPosition mark)
     {
         return 0;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index b495bf0..1943773 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -200,19 +200,19 @@ public class RandomAccessReader extends RebufferingInputStream implements FileDa
         return bytes;
     }
 
-    public FileMark mark()
+    public DataPosition mark()
     {
         markedPointer = current();
         return new BufferedRandomAccessFileMark(markedPointer);
     }
 
-    public void reset(FileMark mark)
+    public void reset(DataPosition mark)
     {
         assert mark instanceof BufferedRandomAccessFileMark;
         seek(((BufferedRandomAccessFileMark) mark).pointer);
     }
 
-    public long bytesPastMark(FileMark mark)
+    public long bytesPastMark(DataPosition mark)
     {
         assert mark instanceof BufferedRandomAccessFileMark;
         long bytes = current() - ((BufferedRandomAccessFileMark) mark).pointer;
@@ -262,7 +262,7 @@ public class RandomAccessReader extends RebufferingInputStream implements FileDa
     /**
      * Class to hold a mark to the position of the file
      */
-    protected static class BufferedRandomAccessFileMark implements FileMark
+    protected static class BufferedRandomAccessFileMark implements DataPosition
     {
         final long pointer;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/io/util/RewindableDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RewindableDataInput.java b/src/java/org/apache/cassandra/io/util/RewindableDataInput.java
new file mode 100644
index 0000000..c202f60
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/RewindableDataInput.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.io.IOException;
+
+public interface RewindableDataInput extends DataInputPlus
+{
+    DataPosition mark();
+
+    void reset(DataPosition mark) throws IOException;
+
+    long bytesPastMark(DataPosition mark);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/io/util/RewindableDataInputStreamPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RewindableDataInputStreamPlus.java b/src/java/org/apache/cassandra/io/util/RewindableDataInputStreamPlus.java
new file mode 100644
index 0000000..3a680f4
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/RewindableDataInputStreamPlus.java
@@ -0,0 +1,569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.cassandra.utils.Throwables.maybeFail;
+import static org.apache.cassandra.utils.Throwables.merge;
+
+/**
+ * Adds mark/reset functionality to another input stream by caching read bytes to a memory buffer and
+ * spilling to disk if necessary.
+ *
+ * When the stream is marked via {@link this#mark()} or {@link this#mark(int)}, up to
+ * <code>maxMemBufferSize</code> will be cached in memory (heap). If more than
+ * <code>maxMemBufferSize</code> bytes are read while the stream is marked, the
+ * following bytes are cached on the <code>spillFile</code> for up to <code>maxDiskBufferSize</code>.
+ *
+ * Please note that successive calls to {@link this#mark()} and {@link this#reset()} will write
+ * sequentially to the same <code>spillFile</code> until <code>maxDiskBufferSize</code> is reached.
+ * At this point, if less than <code>maxDiskBufferSize</code> bytes are currently cached on the
+ * <code>spillFile</code>, the remaining bytes are written to the beginning of the file,
+ * treating the <code>spillFile</code> as a circular buffer.
+ *
+ * If more than <code>maxMemBufferSize + maxDiskBufferSize</code> are cached while the stream is marked,
+ * the following {@link this#reset()} invocation will throw a {@link IllegalStateException}.
+ *
+ */
+public class RewindableDataInputStreamPlus extends FilterInputStream implements RewindableDataInput, Closeable
+{
+    private boolean marked = false;
+    private boolean exhausted = false;
+    private AtomicBoolean closed = new AtomicBoolean(false);
+
+    protected int memAvailable = 0;
+    protected int diskTailAvailable = 0;
+    protected int diskHeadAvailable = 0;
+
+    private final File spillFile;
+    private final int initialMemBufferSize;
+    private final int maxMemBufferSize;
+    private final int maxDiskBufferSize;
+
+    private volatile byte memBuffer[];
+    private int memBufferSize;
+    private RandomAccessFile spillBuffer;
+
+    private final DataInputPlus dataReader;
+
+    public RewindableDataInputStreamPlus(InputStream in, int initialMemBufferSize, int maxMemBufferSize,
+                                         File spillFile, int maxDiskBufferSize)
+    {
+        super(in);
+        dataReader = new DataInputStreamPlus(this);
+        this.initialMemBufferSize = initialMemBufferSize;
+        this.maxMemBufferSize = maxMemBufferSize;
+        this.spillFile = spillFile;
+        this.maxDiskBufferSize = maxDiskBufferSize;
+    }
+
+    /* RewindableDataInput methods */
+
+    /**
+     * Marks the current position of a stream to return to this position later via the {@link this#reset(DataPosition)} method.
+     * @return An empty @link{DataPosition} object
+     */
+    public DataPosition mark()
+    {
+        mark(0);
+        return new RewindableDataInputPlusMark();
+    }
+
+    /**
+     * Rewinds to the previously marked position via the {@link this#mark()} method.
+     * @param mark it's not possible to return to a custom position, so this parameter is ignored.
+     * @throws IOException if an error ocurs while resetting
+     */
+    public void reset(DataPosition mark) throws IOException
+    {
+        reset();
+    }
+
+    public long bytesPastMark(DataPosition mark)
+    {
+        return maxMemBufferSize - memAvailable + (diskTailAvailable == -1? 0 : maxDiskBufferSize - diskHeadAvailable - diskTailAvailable);
+    }
+
+
+    protected static class RewindableDataInputPlusMark implements DataPosition
+    {
+    }
+
+    /* InputStream methods */
+
+    public boolean markSupported()
+    {
+        return true;
+    }
+
+    /**
+     * Marks the current position of a stream to return to this position
+     * later via the {@link this#reset()} method.
+     * @param readlimit the maximum amount of bytes to cache
+     */
+    public synchronized void mark(int readlimit)
+    {
+        if (marked)
+            throw new IllegalStateException("Cannot mark already marked stream.");
+
+        if (memAvailable > 0 || diskHeadAvailable > 0 || diskTailAvailable > 0)
+            throw new IllegalStateException("Can only mark stream after reading previously marked data.");
+
+        marked = true;
+        memAvailable = maxMemBufferSize;
+        diskHeadAvailable = -1;
+        diskTailAvailable = -1;
+    }
+
+    public synchronized void reset() throws IOException
+    {
+        if (!marked)
+            throw new IOException("Must call mark() before calling reset().");
+
+        if (exhausted)
+            throw new IOException(String.format("Read more than capacity: %d bytes.", maxMemBufferSize + maxDiskBufferSize));
+
+        memAvailable = maxMemBufferSize - memAvailable;
+        memBufferSize = memAvailable;
+
+        if (diskTailAvailable == -1)
+        {
+            diskHeadAvailable = 0;
+            diskTailAvailable = 0;
+        }
+        else
+        {
+            int initialPos = diskTailAvailable > 0 ? 0 : (int)getIfNotClosed(spillBuffer).getFilePointer();
+            int diskMarkpos = initialPos + diskHeadAvailable;
+            getIfNotClosed(spillBuffer).seek(diskMarkpos);
+
+            diskHeadAvailable = diskMarkpos - diskHeadAvailable;
+            diskTailAvailable = (maxDiskBufferSize - diskTailAvailable) - diskMarkpos;
+        }
+
+        marked = false;
+    }
+
+    public int available() throws IOException
+    {
+
+        return super.available() + (marked? 0 : memAvailable + diskHeadAvailable + diskTailAvailable);
+    }
+
+    public int read() throws IOException
+    {
+        int read = readOne();
+        if (read == -1)
+            return read;
+
+        if (marked)
+        {
+            //mark exhausted
+            if (isExhausted(1))
+            {
+                exhausted = true;
+                return read;
+            }
+
+            writeOne(read);
+        }
+
+        return read;
+    }
+
+    public int read(byte[] b, int off, int len) throws IOException
+    {
+        int readBytes = readMulti(b, off, len);
+        if (readBytes == -1)
+            return readBytes;
+
+        if (marked)
+        {
+            //check we have space on buffer
+            if (isExhausted(readBytes))
+            {
+                exhausted = true;
+                return readBytes;
+            }
+
+            writeMulti(b, off, readBytes);
+        }
+
+        return readBytes;
+    }
+
+    private void maybeCreateDiskBuffer() throws IOException
+    {
+        if (spillBuffer == null)
+        {
+            if (!spillFile.getParentFile().exists())
+                spillFile.getParentFile().mkdirs();
+            spillFile.createNewFile();
+
+            this.spillBuffer = new RandomAccessFile(spillFile, "rw");
+        }
+    }
+
+
+    private int readOne() throws IOException
+    {
+        if (!marked)
+        {
+            if (memAvailable > 0)
+            {
+                int pos = memBufferSize - memAvailable;
+                memAvailable--;
+                return getIfNotClosed(memBuffer)[pos] & 0xff;
+            }
+
+            if (diskTailAvailable > 0 || diskHeadAvailable > 0)
+            {
+                int read = getIfNotClosed(spillBuffer).read();
+                if (diskTailAvailable > 0)
+                    diskTailAvailable--;
+                else if (diskHeadAvailable > 0)
+                    diskHeadAvailable++;
+                if (diskTailAvailable == 0)
+                    spillBuffer.seek(0);
+                return read;
+            }
+        }
+
+        return getIfNotClosed(in).read();
+    }
+
+    private boolean isExhausted(int readBytes)
+    {
+        return exhausted || readBytes > memAvailable + (long)(diskTailAvailable == -1? maxDiskBufferSize : diskTailAvailable + diskHeadAvailable);
+    }
+
+    private int readMulti(byte[] b, int off, int len) throws IOException
+    {
+        int readBytes = 0;
+        if (!marked)
+        {
+            if (memAvailable > 0)
+            {
+                readBytes += memAvailable < len ? memAvailable : len;
+                int pos = memBufferSize - memAvailable;
+                System.arraycopy(memBuffer, pos, b, off, readBytes);
+                memAvailable -= readBytes;
+                off += readBytes;
+                len -= readBytes;
+            }
+            if (len > 0 && diskTailAvailable > 0)
+            {
+                int readFromTail = diskTailAvailable < len? diskTailAvailable : len;
+                getIfNotClosed(spillBuffer).read(b, off, readFromTail);
+                readBytes += readFromTail;
+                diskTailAvailable -= readFromTail;
+                off += readFromTail;
+                len -= readFromTail;
+                if (diskTailAvailable == 0)
+                    spillBuffer.seek(0);
+            }
+            if (len > 0 && diskHeadAvailable > 0)
+            {
+                int readFromHead = diskHeadAvailable < len? diskHeadAvailable : len;
+                getIfNotClosed(spillBuffer).read(b, off, readFromHead);
+                readBytes += readFromHead;
+                diskHeadAvailable -= readFromHead;
+                off += readFromHead;
+                len -= readFromHead;
+            }
+        }
+
+        if (len > 0)
+            readBytes += getIfNotClosed(in).read(b, off, len);
+
+        return readBytes;
+    }
+
+    private void writeMulti(byte[] b, int off, int len) throws IOException
+    {
+        if (memAvailable > 0)
+        {
+            if (memBuffer == null)
+                memBuffer = new byte[initialMemBufferSize];
+            int pos = maxMemBufferSize - memAvailable;
+            int memWritten = memAvailable < len? memAvailable : len;
+            if (pos + memWritten >= getIfNotClosed(memBuffer).length)
+                growMemBuffer(pos, memWritten);
+            System.arraycopy(b, off, memBuffer, pos, memWritten);
+            off += memWritten;
+            len -= memWritten;
+            memAvailable -= memWritten;
+        }
+
+        if (len > 0)
+        {
+            if (diskTailAvailable == -1)
+            {
+                maybeCreateDiskBuffer();
+                diskHeadAvailable = (int)spillBuffer.getFilePointer();
+                diskTailAvailable = maxDiskBufferSize - diskHeadAvailable;
+            }
+
+            if (len > 0 && diskTailAvailable > 0)
+            {
+                int diskTailWritten = diskTailAvailable < len? diskTailAvailable : len;
+                getIfNotClosed(spillBuffer).write(b, off, diskTailWritten);
+                off += diskTailWritten;
+                len -= diskTailWritten;
+                diskTailAvailable -= diskTailWritten;
+                if (diskTailAvailable == 0)
+                    spillBuffer.seek(0);
+            }
+
+            if (len > 0 && diskTailAvailable > 0)
+            {
+                int diskHeadWritten = diskHeadAvailable < len? diskHeadAvailable : len;
+                getIfNotClosed(spillBuffer).write(b, off, diskHeadWritten);
+            }
+        }
+    }
+
+    private void writeOne(int value) throws IOException
+    {
+        if (memAvailable > 0)
+        {
+            if (memBuffer == null)
+                memBuffer = new byte[initialMemBufferSize];
+            int pos = maxMemBufferSize - memAvailable;
+            if (pos == getIfNotClosed(memBuffer).length)
+                growMemBuffer(pos, 1);
+            getIfNotClosed(memBuffer)[pos] = (byte)value;
+            memAvailable--;
+            return;
+        }
+
+        if (diskTailAvailable == -1)
+        {
+            maybeCreateDiskBuffer();
+            diskHeadAvailable = (int)spillBuffer.getFilePointer();
+            diskTailAvailable = maxDiskBufferSize - diskHeadAvailable;
+        }
+
+        if (diskTailAvailable > 0 || diskHeadAvailable > 0)
+        {
+            getIfNotClosed(spillBuffer).write(value);
+            if (diskTailAvailable > 0)
+                diskTailAvailable--;
+            else if (diskHeadAvailable > 0)
+                diskHeadAvailable--;
+            if (diskTailAvailable == 0)
+                spillBuffer.seek(0);
+            return;
+        }
+    }
+
+    public int read(byte[] b) throws IOException
+    {
+        return read(b, 0, b.length);
+    }
+
+    private void growMemBuffer(int pos, int writeSize)
+    {
+        int newSize = Math.min(2 * (pos + writeSize), maxMemBufferSize);
+        byte newBuffer[] = new byte[newSize];
+        System.arraycopy(memBuffer, 0, newBuffer, 0, (int)pos);
+        memBuffer = newBuffer;
+    }
+
+    public long skip(long n) throws IOException
+    {
+        long skipped = 0;
+
+        if (marked)
+        {
+            //if marked, we need to cache skipped bytes
+            while (n-- > 0 && read() != -1)
+            {
+                skipped++;
+            }
+            return skipped;
+        }
+
+        if (memAvailable > 0)
+        {
+            skipped += memAvailable < n ? memAvailable : n;
+            memAvailable -= skipped;
+            n -= skipped;
+        }
+        if (n > 0 && diskTailAvailable > 0)
+        {
+            int skipFromTail = diskTailAvailable < n? diskTailAvailable : (int)n;
+            getIfNotClosed(spillBuffer).skipBytes(skipFromTail);
+            diskTailAvailable -= skipFromTail;
+            skipped += skipFromTail;
+            n -= skipFromTail;
+            if (diskTailAvailable == 0)
+                spillBuffer.seek(0);
+        }
+        if (n > 0 && diskHeadAvailable > 0)
+        {
+            int skipFromHead = diskHeadAvailable < n? diskHeadAvailable : (int)n;
+            getIfNotClosed(spillBuffer).skipBytes(skipFromHead);
+            diskHeadAvailable -= skipFromHead;
+            skipped += skipFromHead;
+            n -= skipFromHead;
+        }
+
+        if (n > 0)
+            skipped += getIfNotClosed(in).skip(n);
+
+        return skipped;
+    }
+
+    private <T> T getIfNotClosed(T in) throws IOException {
+        if (closed.get())
+            throw new IOException("Stream closed");
+        return in;
+    }
+
+    public void close() throws IOException
+    {
+        close(true);
+    }
+
+    public void close(boolean closeUnderlying) throws IOException
+    {
+        if (closed.compareAndSet(false, true))
+        {
+            Throwable fail = null;
+            if (closeUnderlying)
+            {
+                try
+                {
+                    super.close();
+                }
+                catch (IOException e)
+                {
+                    fail = merge(fail, e);
+                }
+            }
+            try
+            {
+                if (spillBuffer != null)
+                {
+                    this.spillBuffer.close();
+                    this.spillBuffer = null;
+                }
+            } catch (IOException e)
+            {
+                fail = merge(fail, e);
+            }
+            try {
+                if (spillFile.exists())
+                {
+                    spillFile.delete();
+                }
+            }
+            catch (Throwable e)
+            {
+                fail = merge(fail, e);
+            }
+            maybeFail(fail, IOException.class);
+        }
+    }
+
+    /* DataInputPlus methods */
+
+    public void readFully(byte[] b) throws IOException
+    {
+        dataReader.readFully(b);
+    }
+
+    public void readFully(byte[] b, int off, int len) throws IOException
+    {
+        dataReader.readFully(b, off, len);
+    }
+
+    public int skipBytes(int n) throws IOException
+    {
+        return dataReader.skipBytes(n);
+    }
+
+    public boolean readBoolean() throws IOException
+    {
+        return dataReader.readBoolean();
+    }
+
+    public byte readByte() throws IOException
+    {
+        return dataReader.readByte();
+    }
+
+    public int readUnsignedByte() throws IOException
+    {
+        return dataReader.readUnsignedByte();
+    }
+
+    public short readShort() throws IOException
+    {
+        return dataReader.readShort();
+    }
+
+    public int readUnsignedShort() throws IOException
+    {
+        return dataReader.readUnsignedShort();
+    }
+
+    public char readChar() throws IOException
+    {
+        return dataReader.readChar();
+    }
+
+    public int readInt() throws IOException
+    {
+        return dataReader.readInt();
+    }
+
+    public long readLong() throws IOException
+    {
+        return dataReader.readLong();
+    }
+
+    public float readFloat() throws IOException
+    {
+        return dataReader.readFloat();
+    }
+
+    public double readDouble() throws IOException
+    {
+        return dataReader.readDouble();
+    }
+
+    public String readLine() throws IOException
+    {
+        return dataReader.readLine();
+    }
+
+    public String readUTF() throws IOException
+    {
+        return dataReader.readUTF();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index dd49868..26316a2 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -297,7 +297,7 @@ public class SequentialWriter extends BufferedDataOutputStreamPlus implements Tr
         return bufferOffset + (buffer == null ? 0 : buffer.position());
     }
 
-    public FileMark mark()
+    public DataPosition mark()
     {
         return new BufferedFileWriterMark(current());
     }
@@ -306,7 +306,7 @@ public class SequentialWriter extends BufferedDataOutputStreamPlus implements Tr
      * Drops all buffered data that's past the limits of our new file mark + buffer capacity, or syncs and truncates
      * the underlying file to the marked position
      */
-    public void resetAndTruncate(FileMark mark)
+    public void resetAndTruncate(DataPosition mark)
     {
         assert mark instanceof BufferedFileWriterMark;
 
@@ -404,7 +404,7 @@ public class SequentialWriter extends BufferedDataOutputStreamPlus implements Tr
     /**
      * Class to hold a mark to the position of the file
      */
-    protected static class BufferedFileWriterMark implements FileMark
+    protected static class BufferedFileWriterMark implements DataPosition
     {
         final long pointer;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/io/util/TrackedDataInputPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/TrackedDataInputPlus.java b/src/java/org/apache/cassandra/io/util/TrackedDataInputPlus.java
new file mode 100644
index 0000000..dc5bbb6
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/TrackedDataInputPlus.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.util;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+/**
+ * This class is to track bytes read from given DataInput
+ */
+public class TrackedDataInputPlus implements DataInputPlus, BytesReadTracker
+{
+    private long bytesRead;
+    final DataInput source;
+
+    public TrackedDataInputPlus(DataInput source)
+    {
+        this.source = source;
+    }
+
+    public long getBytesRead()
+    {
+        return bytesRead;
+    }
+
+    /**
+     * reset counter to @param count
+     */
+    public void reset(long count)
+    {
+        bytesRead = count;
+    }
+
+    public boolean readBoolean() throws IOException
+    {
+        boolean bool = source.readBoolean();
+        bytesRead += 1;
+        return bool;
+    }
+
+    public byte readByte() throws IOException
+    {
+        byte b = source.readByte();
+        bytesRead += 1;
+        return b;
+    }
+
+    public char readChar() throws IOException
+    {
+        char c = source.readChar();
+        bytesRead += 2;
+        return c;
+    }
+
+    public double readDouble() throws IOException
+    {
+        double d = source.readDouble();
+        bytesRead += 8;
+        return d;
+    }
+
+    public float readFloat() throws IOException
+    {
+        float f = source.readFloat();
+        bytesRead += 4;
+        return f;
+    }
+
+    public void readFully(byte[] b, int off, int len) throws IOException
+    {
+        source.readFully(b, off, len);
+        bytesRead += len;
+    }
+
+    public void readFully(byte[] b) throws IOException
+    {
+        source.readFully(b);
+        bytesRead += b.length;
+    }
+
+    public int readInt() throws IOException
+    {
+        int i = source.readInt();
+        bytesRead += 4;
+        return i;
+    }
+
+    public String readLine() throws IOException
+    {
+        // since this method is deprecated and cannot track bytes read
+        // just throw exception
+        throw new UnsupportedOperationException();
+    }
+
+    public long readLong() throws IOException
+    {
+        long l = source.readLong();
+        bytesRead += 8;
+        return l;
+    }
+
+    public short readShort() throws IOException
+    {
+        short s = source.readShort();
+        bytesRead += 2;
+        return s;
+    }
+
+    public String readUTF() throws IOException
+    {
+        return DataInputStream.readUTF(this);
+    }
+
+    public int readUnsignedByte() throws IOException
+    {
+        int i = source.readUnsignedByte();
+        bytesRead += 1;
+        return i;
+    }
+
+    public int readUnsignedShort() throws IOException
+    {
+        int i = source.readUnsignedShort();
+        bytesRead += 2;
+        return i;
+    }
+
+    public int skipBytes(int n) throws IOException
+    {
+        int skipped = source.skipBytes(n);
+        bytesRead += skipped;
+        return skipped;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/io/util/TrackedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/TrackedInputStream.java b/src/java/org/apache/cassandra/io/util/TrackedInputStream.java
new file mode 100644
index 0000000..f398d30
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/TrackedInputStream.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.util;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * This class is to track bytes read from given DataInputStream
+ */
+public class TrackedInputStream extends FilterInputStream implements BytesReadTracker
+{
+    private long bytesRead;
+
+    public TrackedInputStream(InputStream source)
+    {
+        super(source);
+    }
+
+    public long getBytesRead()
+    {
+        return bytesRead;
+    }
+
+    /**
+     * reset counter to @param count
+     */
+    public void reset(long count)
+    {
+        bytesRead = count;
+    }
+
+    public int read() throws IOException
+    {
+        int read = super.read();
+        bytesRead += 1;
+        return read;
+    }
+
+    public int read(byte[] b, int off, int len) throws IOException
+    {
+        int read = super.read(b, off, len);
+        bytesRead += read;
+        return read;
+    }
+
+    public int read(byte[] b) throws IOException
+    {
+        int read = super.read(b);
+        bytesRead += read;
+        return read;
+    }
+
+    public long skip(long n) throws IOException
+    {
+        long skip = super.skip(n);
+        bytesRead += skip;
+        return skip;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index f445e25..fd7f4b6 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -4221,6 +4221,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return Collections.unmodifiableList(keyspaceNamesList);
     }
 
+
     public List<String> getNonSystemKeyspaces()
     {
         List<String> keyspaceNamesList = new ArrayList<>(Schema.instance.getNonSystemKeyspaces());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index 268f974..f8db26b 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -40,10 +40,13 @@ import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.sstable.SSTableSimpleIterator;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.util.RewindableDataInputStreamPlus;
 import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.streaming.messages.FileMessageHeader;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.BytesReadTracker;
+import org.apache.cassandra.io.util.TrackedInputStream;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
 
@@ -105,9 +108,9 @@ public class StreamReader
                      session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(),
                      cfs.getColumnFamilyName());
 
-        DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
-        BytesReadTracker in = new BytesReadTracker(dis);
-        StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion, header.toHeader(cfs.metadata));
+        TrackedInputStream in = new TrackedInputStream(new LZFInputStream(Channels.newInputStream(channel)));
+        StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion, getHeader(cfs.metadata),
+                                                                 totalSize, session.planId());
         SSTableMultiWriter writer = null;
         try
         {
@@ -131,12 +134,22 @@ public class StreamReader
             {
                 writer.abort(e);
             }
-            drain(dis, in.getBytesRead());
+            drain(in, in.getBytesRead());
             if (e instanceof IOException)
                 throw (IOException) e;
             else
                 throw Throwables.propagate(e);
         }
+        finally
+        {
+            if (deserializer != null)
+                deserializer.cleanup();
+        }
+    }
+
+    protected SerializationHeader getHeader(CFMetaData metadata)
+    {
+        return header != null? header.toHeader(metadata) : null; //pre-3.0 sstable have no SerializationHeader
     }
 
     protected SSTableMultiWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, SSTableFormat.Type format) throws IOException
@@ -146,8 +159,7 @@ public class StreamReader
             throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
         desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir), format));
 
-
-        return cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header.toHeader(cfs.metadata), session.getTransaction(cfId));
+        return cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, getHeader(cfs.metadata), session.getTransaction(cfId));
     }
 
     protected void drain(InputStream dis, long bytesRead) throws IOException
@@ -185,6 +197,13 @@ public class StreamReader
 
     public static class StreamDeserializer extends UnmodifiableIterator<Unfiltered> implements UnfilteredRowIterator
     {
+        public static final int INITIAL_MEM_BUFFER_SIZE = Integer.getInteger("cassandra.streamdes.initial_mem_buffer_size", 32768);
+        public static final int MAX_MEM_BUFFER_SIZE = Integer.getInteger("cassandra.streamdes.max_mem_buffer_size", 1048576);
+        public static final int MAX_SPILL_FILE_SIZE = Integer.getInteger("cassandra.streamdes.max_spill_file_size", Integer.MAX_VALUE);
+
+        public static final String BUFFER_FILE_PREFIX = "buf";
+        public static final String BUFFER_FILE_SUFFIX = "dat";
+
         private final CFMetaData metadata;
         private final DataInputPlus in;
         private final SerializationHeader header;
@@ -196,11 +215,20 @@ public class StreamReader
         private Row staticRow;
         private IOException exception;
 
-        public StreamDeserializer(CFMetaData metadata, DataInputPlus in, Version version, SerializationHeader header)
+        public StreamDeserializer(CFMetaData metadata, InputStream in, Version version, SerializationHeader header,
+                                  long totalSize, UUID sessionId) throws IOException
         {
-            assert version.storeRows() : "We don't allow streaming from pre-3.0 nodes";
             this.metadata = metadata;
-            this.in = in;
+            // streaming pre-3.0 sstables require mark/reset support from source stream
+            if (version.correspondingMessagingVersion() < MessagingService.VERSION_30)
+            {
+                logger.trace("Initializing rewindable input stream for reading legacy sstable with {} bytes with following " +
+                             "parameters: initial_mem_buffer_size={}, max_mem_buffer_size={}, max_spill_file_size={}.",
+                             totalSize, INITIAL_MEM_BUFFER_SIZE, MAX_MEM_BUFFER_SIZE, MAX_SPILL_FILE_SIZE);
+                File bufferFile = getTempBufferFile(metadata, totalSize, sessionId);
+                this.in = new RewindableDataInputStreamPlus(in, INITIAL_MEM_BUFFER_SIZE, MAX_MEM_BUFFER_SIZE, bufferFile, MAX_SPILL_FILE_SIZE);
+            } else
+                this.in = new DataInputPlus.DataInputStreamPlus(in);
             this.helper = new SerializationHelper(metadata, version.correspondingMessagingVersion(), SerializationHelper.Flag.PRESERVE_SIZE);
             this.header = header;
         }
@@ -292,5 +320,41 @@ public class StreamReader
         public void close()
         {
         }
+
+        /* We have a separate cleanup method because sometimes close is called before exhausting the
+           StreamDeserializer (for instance, when enclosed in an try-with-resources wrapper, such as in
+           BigTableWriter.append()).
+         */
+        public void cleanup()
+        {
+            if (in instanceof RewindableDataInputStreamPlus)
+            {
+                try
+                {
+                    ((RewindableDataInputStreamPlus) in).close(false);
+                }
+                catch (IOException e)
+                {
+                    logger.warn("Error while closing RewindableDataInputStreamPlus.", e);
+                }
+            }
+        }
+
+        private static File getTempBufferFile(CFMetaData metadata, long totalSize, UUID sessionId) throws IOException
+        {
+            ColumnFamilyStore cfs = Keyspace.open(metadata.ksName).getColumnFamilyStore(metadata.cfName);
+            if (cfs == null)
+            {
+                // schema was dropped during streaming
+                throw new RuntimeException(String.format("CF %s.%s was dropped during streaming", metadata.ksName, metadata.cfName));
+            }
+
+            long maxSize = Math.min(MAX_SPILL_FILE_SIZE, totalSize);
+            File tmpDir = cfs.getDirectories().getTemporaryWriteableDirectoryAsFile(maxSize);
+            if (tmpDir == null)
+                throw new IOException(String.format("No sufficient disk space to stream legacy sstable from {}.{}. " +
+                                                         "Required disk space: %s.", FBUtilities.prettyPrintMemory(maxSize)));
+            return new File(tmpDir, String.format("%s-%s.%s", BUFFER_FILE_PREFIX, sessionId, BUFFER_FILE_SUFFIX));
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 5210d5b..9719587 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -24,7 +24,6 @@ import java.nio.channels.ReadableByteChannel;
 
 import com.google.common.base.Throwables;
 
-import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 
 import org.slf4j.Logger;
@@ -38,7 +37,7 @@ import org.apache.cassandra.streaming.ProgressInfo;
 import org.apache.cassandra.streaming.StreamReader;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.streaming.messages.FileMessageHeader;
-import org.apache.cassandra.utils.BytesReadTracker;
+import org.apache.cassandra.io.util.TrackedInputStream;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -83,8 +82,10 @@ public class CompressedStreamReader extends StreamReader
 
         CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo,
                                                               inputVersion.compressedChecksumType(), cfs::getCrcCheckChance);
-        BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
-        StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion, header.toHeader(cfs.metadata));
+        TrackedInputStream in = new TrackedInputStream(cis);
+
+        StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion, getHeader(cfs.metadata),
+                                                                 totalSize, session.planId());
         SSTableMultiWriter writer = null;
         try
         {
@@ -115,17 +116,22 @@ public class CompressedStreamReader extends StreamReader
         {
             if (deserializer != null)
                 logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.",
-                            session.planId(), deserializer.partitionKey(), cfs.keyspace.getName(), cfs.getColumnFamilyName());
+                            session.planId(), deserializer.partitionKey(), cfs.keyspace.getName(), cfs.getTableName());
             if (writer != null)
             {
                 writer.abort(e);
             }
-            drain(cis, in.getBytesRead());
+            drain(in, in.getBytesRead());
             if (e instanceof IOException)
                 throw (IOException) e;
             else
                 throw Throwables.propagate(e);
         }
+        finally
+        {
+            if (deserializer != null)
+                deserializer.cleanup();
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index e1e13b7..2b5047d 100644
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@ -204,7 +204,7 @@ public class FileMessageHeader
             out.writeLong(header.repairedAt);
             out.writeInt(header.sstableLevel);
 
-            if (version >= StreamMessage.VERSION_30)
+            if (version >= StreamMessage.VERSION_30 && header.version.storeRows())
                 SerializationHeader.serializer.serialize(header.version, header.header, out);
             return compressionInfo;
         }
@@ -227,7 +227,7 @@ public class FileMessageHeader
             CompressionInfo compressionInfo = CompressionInfo.serializer.deserialize(in, MessagingService.current_version);
             long repairedAt = in.readLong();
             int sstableLevel = in.readInt();
-            SerializationHeader.Component header = version >= StreamMessage.VERSION_30
+            SerializationHeader.Component header = version >= StreamMessage.VERSION_30 && sstableVersion.storeRows()
                                                  ? SerializationHeader.serializer.deserialize(sstableVersion, in)
                                                  : null;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/src/java/org/apache/cassandra/tools/nodetool/Repair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Repair.java b/src/java/org/apache/cassandra/tools/nodetool/Repair.java
index 84b463c..bd1a916 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Repair.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Repair.java
@@ -56,7 +56,7 @@ public class Repair extends NodeToolCmd
     private boolean localDC = false;
 
     @Option(title = "specific_dc", name = {"-dc", "--in-dc"}, description = "Use -dc to repair specific datacenters")
-    private List<String> specificDataCenters = new ArrayList<>();
+    private List<String> specificDataCenters = new ArrayList<>();;
 
     @Option(title = "specific_host", name = {"-hosts", "--in-hosts"}, description = "Use -hosts to repair specific hosts")
     private List<String> specificHosts = new ArrayList<>();