You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2016/03/17 17:07:03 UTC

[07/12] cassandra git commit: Support streaming of older version sstables in 3.0

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/test/data/legacy-sstables/ma/legacy_tables/legacy_ma_simple_counter_compact/ma-1-big-TOC.txt
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/ma/legacy_tables/legacy_ma_simple_counter_compact/ma-1-big-TOC.txt b/test/data/legacy-sstables/ma/legacy_tables/legacy_ma_simple_counter_compact/ma-1-big-TOC.txt
new file mode 100644
index 0000000..a29a600
--- /dev/null
+++ b/test/data/legacy-sstables/ma/legacy_tables/legacy_ma_simple_counter_compact/ma-1-big-TOC.txt
@@ -0,0 +1,8 @@
+Index.db
+Statistics.db
+Digest.crc32
+Summary.db
+Data.db
+TOC.txt
+Filter.db
+CompressionInfo.db

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
index 501f4ae..3a1f348 100644
--- a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
+++ b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
@@ -25,7 +25,6 @@ import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
-import org.apache.cassandra.io.util.NIODataInputStream;
 import org.apache.cassandra.net.MessagingService;
 
 import java.io.File;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/test/unit/org/apache/cassandra/db/DirectoriesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/DirectoriesTest.java b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
index 8732881..f864bbc 100644
--- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java
+++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
@@ -243,46 +243,76 @@ public class DirectoriesTest
         for (CFMetaData cfm : CFM)
         {
             Directories directories = new Directories(cfm);
-            Directories.SSTableLister lister;
-            Set<File> listed;
+            checkFiles(cfm, directories);
+        }
+    }
 
-            // List all but no snapshot, backup
-            lister = directories.sstableLister(Directories.OnTxnErr.THROW);
-            listed = new HashSet<>(lister.listFiles());
-            for (File f : files.get(cfm.cfName))
-            {
-                if (f.getPath().contains(Directories.SNAPSHOT_SUBDIR) || f.getPath().contains(Directories.BACKUPS_SUBDIR))
-                    assert !listed.contains(f) : f + " should not be listed";
-                else
-                    assert listed.contains(f) : f + " is missing";
-            }
+    private void checkFiles(CFMetaData cfm, Directories directories)
+    {
+        Directories.SSTableLister lister;
+        Set<File> listed;// List all but no snapshot, backup
+        lister = directories.sstableLister(Directories.OnTxnErr.THROW);
+        listed = new HashSet<>(lister.listFiles());
+        for (File f : files.get(cfm.cfName))
+        {
+            if (f.getPath().contains(Directories.SNAPSHOT_SUBDIR) || f.getPath().contains(Directories.BACKUPS_SUBDIR))
+                assertFalse(f + " should not be listed", listed.contains(f));
+            else
+                assertTrue(f + " is missing", listed.contains(f));
+        }
 
-            // List all but including backup (but no snapshot)
-            lister = directories.sstableLister(Directories.OnTxnErr.THROW).includeBackups(true);
-            listed = new HashSet<>(lister.listFiles());
-            for (File f : files.get(cfm.cfName))
-            {
-                if (f.getPath().contains(Directories.SNAPSHOT_SUBDIR))
-                    assert !listed.contains(f) : f + " should not be listed";
-                else
-                    assert listed.contains(f) : f + " is missing";
-            }
+        // List all but including backup (but no snapshot)
+        lister = directories.sstableLister(Directories.OnTxnErr.THROW).includeBackups(true);
+        listed = new HashSet<>(lister.listFiles());
+        for (File f : files.get(cfm.cfName))
+        {
+            if (f.getPath().contains(Directories.SNAPSHOT_SUBDIR))
+                assertFalse(f + " should not be listed", listed.contains(f));
+            else
+                assertTrue(f + " is missing", listed.contains(f));
+        }
 
-            // Skip temporary and compacted
-            lister = directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
-            listed = new HashSet<>(lister.listFiles());
-            for (File f : files.get(cfm.cfName))
-            {
-                if (f.getPath().contains(Directories.SNAPSHOT_SUBDIR) || f.getPath().contains(Directories.BACKUPS_SUBDIR))
-                    assert !listed.contains(f) : f + " should not be listed";
-                else if (f.getName().contains("tmp-"))
-                    assert !listed.contains(f) : f + " should not be listed";
-                else
-                    assert listed.contains(f) : f + " is missing";
-            }
+        // Skip temporary and compacted
+        lister = directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
+        listed = new HashSet<>(lister.listFiles());
+        for (File f : files.get(cfm.cfName))
+        {
+            if (f.getPath().contains(Directories.SNAPSHOT_SUBDIR) || f.getPath().contains(Directories.BACKUPS_SUBDIR))
+                assertFalse(f + " should not be listed", listed.contains(f));
+            else if (f.getName().contains("tmp-"))
+                assertFalse(f + " should not be listed", listed.contains(f));
+            else
+                assertTrue(f + " is missing", listed.contains(f));
         }
     }
 
+    @Test
+    public void testTemporaryFile() throws IOException
+    {
+        for (CFMetaData cfm : CFM)
+        {
+            Directories directories = new Directories(cfm);
+
+            File tempDir = directories.getTemporaryWriteableDirectoryAsFile(10);
+            tempDir.mkdir();
+            File tempFile = new File(tempDir, "tempFile");
+            tempFile.createNewFile();
+
+            assertTrue(tempDir.exists());
+            assertTrue(tempFile.exists());
+
+            //make sure temp dir/file will not affect existing sstable listing
+            checkFiles(cfm, directories);
+
+            directories.removeTemporaryDirectories();
+
+            //make sure temp dir/file deletion will not affect existing sstable listing
+            checkFiles(cfm, directories);
+
+            assertFalse(tempDir.exists());
+            assertFalse(tempFile.exists());
+        }
+    }
 
     @Test
     public void testDiskFailurePolicy_best_effort()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/test/unit/org/apache/cassandra/gms/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/gms/SerializationsTest.java b/test/unit/org/apache/cassandra/gms/SerializationsTest.java
index 8b7ad1f..e50b461 100644
--- a/test/unit/org/apache/cassandra/gms/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/gms/SerializationsTest.java
@@ -19,7 +19,6 @@
 package org.apache.cassandra.gms;
 
 import org.apache.cassandra.AbstractSerializationsTester;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
index 9154d79..74b5c74 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
@@ -30,7 +30,7 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.ChannelProxy;
-import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.DataPosition;
 import org.apache.cassandra.io.util.MmappedRegions;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.io.util.SequentialWriter;
@@ -84,7 +84,7 @@ public class CompressedRandomAccessReaderTest
                 for (int i = 0; i < 20; i++)
                     writer.write("x".getBytes());
 
-                FileMark mark = writer.mark();
+                DataPosition mark = writer.mark();
                 // write enough garbage to create new chunks:
                 for (int i = 0; i < 40; ++i)
                     writer.write("y".getBytes());
@@ -126,7 +126,7 @@ public class CompressedRandomAccessReaderTest
                 : SequentialWriter.open(f))
             {
                 writer.write("The quick ".getBytes());
-                FileMark mark = writer.mark();
+                DataPosition mark = writer.mark();
                 writer.write("blue fox jumps over the lazy dog".getBytes());
 
                 // write enough to be sure to change chunk

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
index 56c83da..9b09f0b 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
@@ -38,7 +38,7 @@ import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.ChannelProxy;
-import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.DataPosition;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.io.util.SequentialWriterTest;
 import org.apache.cassandra.schema.CompressionParams;
@@ -104,7 +104,7 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest
                 dataPost.flip();
 
                 writer.write(dataPre);
-                FileMark mark = writer.mark();
+                DataPosition mark = writer.mark();
 
                 // Write enough garbage to transition chunk
                 for (int i = 0; i < CompressionParams.DEFAULT_CHUNK_LENGTH; i++)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
index 00727b8..4b9a769 100644
--- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
@@ -21,14 +21,13 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
-import java.util.Set;
 
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -37,16 +36,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.Util;
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DeletionTime;
 import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.rows.SliceableUnfilteredRowIterator;
-import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -55,7 +48,6 @@ import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.sstable.format.big.BigFormat;
-import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.StreamPlan;
@@ -63,8 +55,6 @@ import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
-import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
-
 /**
  * Tests backwards compatibility for SSTables
  */
@@ -73,10 +63,7 @@ public class LegacySSTableTest
     private static final Logger logger = LoggerFactory.getLogger(LegacySSTableTest.class);
 
     public static final String LEGACY_SSTABLE_PROP = "legacy-sstable-root";
-    public static final String KSNAME = "Keyspace1";
-    public static final String CFNAME = "Standard1";
 
-    public static Set<String> TEST_DATA;
     public static File LEGACY_SSTABLE_ROOT;
 
     /**
@@ -104,58 +91,78 @@ public class LegacySSTableTest
     public static void defineSchema() throws ConfigurationException
     {
         SchemaLoader.prepareServer();
-
-        CFMetaData metadata = CFMetaData.Builder.createDense(KSNAME, CFNAME, false, false)
-                                                .addPartitionKey("key", BytesType.instance)
-                                                .addClusteringColumn("column", BytesType.instance)
-                                                .addRegularColumn("value", BytesType.instance)
-                                                .build();
-
-        SchemaLoader.createKeyspace(KSNAME,
-                                    KeyspaceParams.simple(1),
-                                    metadata);
-        beforeClass();
-    }
-
-    public static void beforeClass()
-    {
+        StorageService.instance.initServer();
         Keyspace.setInitialized();
+        createKeyspace();
+        for (String legacyVersion : legacyVersions)
+        {
+            createTables(legacyVersion);
+        }
         String scp = System.getProperty(LEGACY_SSTABLE_PROP);
         assert scp != null;
         LEGACY_SSTABLE_ROOT = new File(scp).getAbsoluteFile();
         assert LEGACY_SSTABLE_ROOT.isDirectory();
+    }
 
-        TEST_DATA = new HashSet<String>();
-        for (int i = 100; i < 1000; ++i)
-            TEST_DATA.add(Integer.toString(i));
+    @After
+    public void tearDown()
+    {
+        for (String legacyVersion : legacyVersions)
+        {
+            truncateTables(legacyVersion);
+        }
     }
 
     /**
      * Get a descriptor for the legacy sstable at the given version.
      */
-    protected Descriptor getDescriptor(String ver)
+    protected Descriptor getDescriptor(String legacyVersion, String table)
     {
-        File directory = new File(LEGACY_SSTABLE_ROOT + File.separator + ver + File.separator + KSNAME);
-        return new Descriptor(ver, directory, KSNAME, CFNAME, 0, SSTableFormat.Type.LEGACY);
+        return new Descriptor(legacyVersion, getTableDir(legacyVersion, table), "legacy_tables", table, 1,
+                              BigFormat.instance.getVersion(legacyVersion).hasNewFileName()?
+                              SSTableFormat.Type.BIG :SSTableFormat.Type.LEGACY);
     }
 
     @Test
-    public void testStreaming() throws Throwable
+    public void testLoadLegacyCqlTables() throws Exception
     {
-        StorageService.instance.initServer();
+        for (String legacyVersion : legacyVersions)
+        {
+            logger.info("Loading legacy version: {}", legacyVersion);
+            loadLegacyTables(legacyVersion);
+            CacheService.instance.invalidateKeyCache();
+            long startCount = CacheService.instance.keyCache.size();
+            verifyReads(legacyVersion);
+            verifyCache(legacyVersion, startCount);
+        }
+    }
+
+    @Test
+    public void testStreamLegacyCqlTables() throws Exception
+    {
+        for (String legacyVersion : legacyVersions)
+        {
+            streamLegacyTables(legacyVersion);
+            verifyReads(legacyVersion);
+        }
+    }
 
-        for (File version : LEGACY_SSTABLE_ROOT.listFiles())
+    private void streamLegacyTables(String legacyVersion) throws Exception
+    {
+        for (int compact = 0; compact <= 1; compact++)
         {
-            if (!new File(LEGACY_SSTABLE_ROOT + File.separator + version.getName() + File.separator + KSNAME).isDirectory())
-                continue;
-            if (Version.validate(version.getName()) && SSTableFormat.Type.LEGACY.info.getVersion(version.getName()).isCompatibleForStreaming())
-                testStreaming(version.getName());
+            logger.info("Streaming legacy version {}{}", legacyVersion, getCompactNameSuffix(compact));
+            streamLegacyTable("legacy_%s_simple%s", legacyVersion, getCompactNameSuffix(compact));
+            streamLegacyTable("legacy_%s_simple_counter%s", legacyVersion, getCompactNameSuffix(compact));
+            streamLegacyTable("legacy_%s_clust%s", legacyVersion, getCompactNameSuffix(compact));
+            streamLegacyTable("legacy_%s_clust_counter%s", legacyVersion, getCompactNameSuffix(compact));
         }
     }
 
-    private void testStreaming(String version) throws Exception
+    private void streamLegacyTable(String tablePattern, String legacyVersion, String compactNameSuffix) throws Exception
     {
-        SSTableReader sstable = SSTableReader.open(getDescriptor(version));
+        String table = String.format(tablePattern, legacyVersion, compactNameSuffix);
+        SSTableReader sstable = SSTableReader.open(getDescriptor(legacyVersion, table));
         IPartitioner p = sstable.getPartitioner();
         List<Range<Token>> ranges = new ArrayList<>();
         ranges.add(new Range<>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("100"))));
@@ -166,94 +173,41 @@ public class LegacySSTableTest
                                                                sstable.estimatedKeysForRanges(ranges), sstable.getSSTableMetadata().repairedAt));
         new StreamPlan("LegacyStreamingTest").transferFiles(FBUtilities.getBroadcastAddress(), details)
                                              .execute().get();
-
-        ColumnFamilyStore cfs = Keyspace.open(KSNAME).getColumnFamilyStore(CFNAME);
-        assert cfs.getLiveSSTables().size() == 1;
-        sstable = cfs.getLiveSSTables().iterator().next();
-        for (String keystring : TEST_DATA)
-        {
-            ByteBuffer key = bytes(keystring);
-
-            SliceableUnfilteredRowIterator iter = sstable.iterator(Util.dk(key), ColumnFilter.selectionBuilder().add(cfs.metadata.getColumnDefinition(bytes("name"))).build(), false, false);
-
-            // check not deleted (CASSANDRA-6527)
-            assert iter.partitionLevelDeletion().equals(DeletionTime.LIVE);
-            assert iter.next().clustering().get(0).equals(key);
-        }
-        sstable.selfRef().release();
-    }
-
-    @Test
-    public void testVersions() throws Throwable
-    {
-        boolean notSkipped = false;
-
-        for (File version : LEGACY_SSTABLE_ROOT.listFiles())
-        {
-            if (!new File(LEGACY_SSTABLE_ROOT + File.separator + version.getName() + File.separator + KSNAME).isDirectory())
-                continue;
-            if (Version.validate(version.getName()) && SSTableFormat.Type.LEGACY.info.getVersion(version.getName()).isCompatible())
-            {
-                notSkipped = true;
-                testVersion(version.getName());
-            }
-        }
-
-        assert notSkipped;
     }
 
-    public void testVersion(String version) throws Throwable
+    private static void loadLegacyTables(String legacyVersion) throws Exception
     {
-        try
-        {
-            ColumnFamilyStore cfs = Keyspace.open(KSNAME).getColumnFamilyStore(CFNAME);
-
-
-            SSTableReader reader = SSTableReader.open(getDescriptor(version));
-            for (String keystring : TEST_DATA)
-            {
-
-                ByteBuffer key = bytes(keystring);
-
-                SliceableUnfilteredRowIterator iter = reader.iterator(Util.dk(key), ColumnFilter.selection(cfs.metadata.partitionColumns()), false, false);
-
-                // check not deleted (CASSANDRA-6527)
-                assert iter.partitionLevelDeletion().equals(DeletionTime.LIVE);
-                assert iter.next().clustering().get(0).equals(key);
-            }
-
-            // TODO actually test some reads
-        }
-        catch (Throwable e)
+        for (int compact = 0; compact <= 1; compact++)
         {
-            System.err.println("Failed to read " + version);
-            throw e;
+            logger.info("Preparing legacy version {}{}", legacyVersion, getCompactNameSuffix(compact));
+            loadLegacyTable("legacy_%s_simple%s", legacyVersion, getCompactNameSuffix(compact));
+            loadLegacyTable("legacy_%s_simple_counter%s", legacyVersion, getCompactNameSuffix(compact));
+            loadLegacyTable("legacy_%s_clust%s", legacyVersion, getCompactNameSuffix(compact));
+            loadLegacyTable("legacy_%s_clust_counter%s", legacyVersion, getCompactNameSuffix(compact));
         }
     }
 
-    @Test
-    public void testLegacyCqlTables() throws Exception
+    private static void verifyCache(String legacyVersion, long startCount) throws InterruptedException, java.util.concurrent.ExecutionException
     {
-        createKeyspace();
-
-        loadLegacyTables();
+        //For https://issues.apache.org/jira/browse/CASSANDRA-10778
+        //Validate whether the key cache successfully saves in the presence of old keys as
+        //well as loads the correct number of keys
+        long endCount = CacheService.instance.keyCache.size();
+        Assert.assertTrue(endCount > startCount);
+        CacheService.instance.keyCache.submitWrite(Integer.MAX_VALUE).get();
+        CacheService.instance.invalidateKeyCache();
+        Assert.assertEquals(startCount, CacheService.instance.keyCache.size());
+        CacheService.instance.keyCache.loadSaved();
+        if (BigFormat.instance.getVersion(legacyVersion).storeRows())
+            Assert.assertEquals(endCount, CacheService.instance.keyCache.size());
+        else
+            Assert.assertEquals(startCount, CacheService.instance.keyCache.size());
     }
 
-    private static void loadLegacyTables() throws Exception
+    private static void verifyReads(String legacyVersion)
     {
-        for (String legacyVersion : legacyVersions)
+        for (int compact = 0; compact <= 1; compact++)
         {
-            logger.info("Preparing legacy version {}", legacyVersion);
-
-            createTables(legacyVersion);
-
-            loadLegacyTable("legacy_%s_simple", legacyVersion);
-            loadLegacyTable("legacy_%s_simple_counter", legacyVersion);
-            loadLegacyTable("legacy_%s_clust", legacyVersion);
-            loadLegacyTable("legacy_%s_clust_counter", legacyVersion);
-
-            CacheService.instance.invalidateKeyCache();
-            long startCount = CacheService.instance.keyCache.size();
             for (int ck = 0; ck < 50; ck++)
             {
                 String ckValue = Integer.toString(ck) + longString;
@@ -265,58 +219,94 @@ public class LegacySSTableTest
                     UntypedResultSet rs;
                     if (ck == 0)
                     {
-                        rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_simple WHERE pk=?", legacyVersion), pkValue);
-                        Assert.assertNotNull(rs);
-                        Assert.assertEquals(1, rs.size());
-                        Assert.assertEquals("foo bar baz", rs.one().getString("val"));
-                        rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_simple_counter WHERE pk=?", legacyVersion), pkValue);
-                        Assert.assertNotNull(rs);
-                        Assert.assertEquals(1, rs.size());
-                        Assert.assertEquals(1L, rs.one().getLong("val"));
+                        readSimpleTable(legacyVersion, getCompactNameSuffix(compact),  pkValue);
+                        readSimpleCounterTable(legacyVersion, getCompactNameSuffix(compact), pkValue);
                     }
 
-                    rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_clust WHERE pk=? AND ck=?", legacyVersion), pkValue, ckValue);
-                    assertLegacyClustRows(1, rs);
-
-                    String ckValue2 = Integer.toString(ck < 10 ? 40 : ck - 1) + longString;
-                    String ckValue3 = Integer.toString(ck > 39 ? 10 : ck + 1) + longString;
-                    rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_clust WHERE pk=? AND ck IN (?, ?, ?)", legacyVersion), pkValue, ckValue, ckValue2, ckValue3);
-                    assertLegacyClustRows(3, rs);
-
-                    rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_clust_counter WHERE pk=? AND ck=?", legacyVersion), pkValue, ckValue);
-                    Assert.assertNotNull(rs);
-                    Assert.assertEquals(1, rs.size());
-                    Assert.assertEquals(1L, rs.one().getLong("val"));
+                    readClusteringTable(legacyVersion, getCompactNameSuffix(compact), ck, ckValue, pkValue);
+                    readClusteringCounterTable(legacyVersion, getCompactNameSuffix(compact), ckValue, pkValue);
                 }
             }
-
-            //For https://issues.apache.org/jira/browse/CASSANDRA-10778
-            //Validate whether the key cache successfully saves in the presence of old keys as
-            //well as loads the correct number of keys
-            long endCount = CacheService.instance.keyCache.size();
-            Assert.assertTrue(endCount > startCount);
-            CacheService.instance.keyCache.submitWrite(Integer.MAX_VALUE).get();
-            CacheService.instance.invalidateKeyCache();
-            Assert.assertEquals(startCount, CacheService.instance.keyCache.size());
-            CacheService.instance.keyCache.loadSaved();
-            if (BigFormat.instance.getVersion(legacyVersion).storeRows())
-                Assert.assertEquals(endCount, CacheService.instance.keyCache.size());
-            else
-                Assert.assertEquals(startCount, CacheService.instance.keyCache.size());
         }
     }
 
-    private void createKeyspace()
+    private static void readClusteringCounterTable(String legacyVersion, String compactSuffix, String ckValue, String pkValue)
+    {
+        logger.debug("Read legacy_{}_clust_counter{}", legacyVersion, compactSuffix);
+        UntypedResultSet rs;
+        rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_clust_counter%s WHERE pk=? AND ck=?", legacyVersion, compactSuffix), pkValue, ckValue);
+        Assert.assertNotNull(rs);
+        Assert.assertEquals(1, rs.size());
+        Assert.assertEquals(1L, rs.one().getLong("val"));
+    }
+
+    private static void readClusteringTable(String legacyVersion, String compactSuffix, int ck, String ckValue, String pkValue)
+    {
+        logger.debug("Read legacy_{}_clust{}", legacyVersion, compactSuffix);
+        UntypedResultSet rs;
+        rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_clust%s WHERE pk=? AND ck=?", legacyVersion, compactSuffix), pkValue, ckValue);
+        assertLegacyClustRows(1, rs);
+
+        String ckValue2 = Integer.toString(ck < 10 ? 40 : ck - 1) + longString;
+        String ckValue3 = Integer.toString(ck > 39 ? 10 : ck + 1) + longString;
+        rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_clust%s WHERE pk=? AND ck IN (?, ?, ?)", legacyVersion, compactSuffix), pkValue, ckValue, ckValue2, ckValue3);
+        assertLegacyClustRows(3, rs);
+    }
+
+    private static void readSimpleCounterTable(String legacyVersion, String compactSuffix, String pkValue)
+    {
+        logger.debug("Read legacy_{}_simple_counter{}", legacyVersion, compactSuffix);
+        UntypedResultSet rs;
+        rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_simple_counter%s WHERE pk=?", legacyVersion, compactSuffix), pkValue);
+        Assert.assertNotNull(rs);
+        Assert.assertEquals(1, rs.size());
+        Assert.assertEquals(1L, rs.one().getLong("val"));
+    }
+
+    private static void readSimpleTable(String legacyVersion, String compactSuffix, String pkValue)
+    {
+        logger.debug("Read simple: legacy_{}_simple{}", legacyVersion, compactSuffix);
+        UntypedResultSet rs;
+        rs = QueryProcessor.executeInternal(String.format("SELECT val FROM legacy_tables.legacy_%s_simple%s WHERE pk=?", legacyVersion, compactSuffix), pkValue);
+        Assert.assertNotNull(rs);
+        Assert.assertEquals(1, rs.size());
+        Assert.assertEquals("foo bar baz", rs.one().getString("val"));
+    }
+
+    private static void createKeyspace()
     {
         QueryProcessor.executeInternal("CREATE KEYSPACE legacy_tables WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}");
     }
 
     private static void createTables(String legacyVersion)
     {
-        QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_simple (pk text PRIMARY KEY, val text)", legacyVersion));
-        QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_simple_counter (pk text PRIMARY KEY, val counter)", legacyVersion));
-        QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_clust (pk text, ck text, val text, PRIMARY KEY (pk, ck))", legacyVersion));
-        QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_clust_counter (pk text, ck text, val counter, PRIMARY KEY (pk, ck))", legacyVersion));
+        for (int i=0; i<=1; i++)
+        {
+            String compactSuffix = getCompactNameSuffix(i);
+            String tableSuffix = i == 0? "" : " WITH COMPACT STORAGE";
+            QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_simple%s (pk text PRIMARY KEY, val text)%s", legacyVersion, compactSuffix, tableSuffix));
+            QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_simple_counter%s (pk text PRIMARY KEY, val counter)%s", legacyVersion, compactSuffix, tableSuffix));
+            QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_clust%s (pk text, ck text, val text, PRIMARY KEY (pk, ck))%s", legacyVersion, compactSuffix, tableSuffix));
+            QueryProcessor.executeInternal(String.format("CREATE TABLE legacy_tables.legacy_%s_clust_counter%s (pk text, ck text, val counter, PRIMARY KEY (pk, ck))%s", legacyVersion, compactSuffix, tableSuffix));
+        }
+    }
+
+    private static String getCompactNameSuffix(int i)
+    {
+        return i == 0? "" : "_compact";
+    }
+
+    private static void truncateTables(String legacyVersion)
+    {
+        for (int compact = 0; compact <= 1; compact++)
+        {
+            QueryProcessor.executeInternal(String.format("TRUNCATE legacy_tables.legacy_%s_simple%s", legacyVersion, getCompactNameSuffix(compact)));
+            QueryProcessor.executeInternal(String.format("TRUNCATE legacy_tables.legacy_%s_simple_counter%s", legacyVersion, getCompactNameSuffix(compact)));
+            QueryProcessor.executeInternal(String.format("TRUNCATE legacy_tables.legacy_%s_clust%s", legacyVersion, getCompactNameSuffix(compact)));
+            QueryProcessor.executeInternal(String.format("TRUNCATE legacy_tables.legacy_%s_clust_counter%s", legacyVersion, getCompactNameSuffix(compact)));
+        }
+        CacheService.instance.invalidateCounterCache();
+        CacheService.instance.invalidateKeyCache();
     }
 
     private static void assertLegacyClustRows(int count, UntypedResultSet rs)
@@ -332,9 +322,9 @@ public class LegacySSTableTest
         }
     }
 
-    private static void loadLegacyTable(String tablePattern, String legacyVersion) throws IOException
+    private static void loadLegacyTable(String tablePattern, String legacyVersion, String compactSuffix) throws IOException
     {
-        String table = String.format(tablePattern, legacyVersion);
+        String table = String.format(tablePattern, legacyVersion, compactSuffix);
 
         logger.info("Loading legacy table {}", table);
 
@@ -349,7 +339,7 @@ public class LegacySSTableTest
     }
 
     /**
-     * Generates sstables for 4 CQL tables (see {@link #createTables(String)}) in <i>current</i>
+     * Generates sstables for 8 CQL tables (see {@link #createTables(String)}) in <i>current</i>
      * sstable format (version) into {@code test/data/legacy-sstables/VERSION}, where
      * {@code VERSION} matches {@link Version#getVersion() BigFormat.latestVersion.getVersion()}.
      * <p>
@@ -357,13 +347,10 @@ public class LegacySSTableTest
      * during development. I.e. remove the {@code @Ignore} annotation temporarily.
      * </p>
      */
-    @Test
     @Ignore
+    @Test
     public void testGenerateSstables() throws Throwable
     {
-        createKeyspace();
-        createTables(BigFormat.latestVersion.getVersion());
-
         Random rand = new Random();
         StringBuilder sb = new StringBuilder();
         for (int i = 0; i < 128; i++)
@@ -372,25 +359,28 @@ public class LegacySSTableTest
         }
         String randomString = sb.toString();
 
-        for (int pk = 0; pk < 5; pk++)
+        for (int compact = 0; compact <= 1; compact++)
         {
-            String valPk = Integer.toString(pk);
-            QueryProcessor.executeInternal(String.format("INSERT INTO legacy_tables.legacy_%s_simple (pk, val) VALUES ('%s', '%s')",
-                                                         BigFormat.latestVersion, valPk, "foo bar baz"));
+            for (int pk = 0; pk < 5; pk++)
+            {
+                String valPk = Integer.toString(pk);
+                QueryProcessor.executeInternal(String.format("INSERT INTO legacy_tables.legacy_%s_simple%s (pk, val) VALUES ('%s', '%s')",
+                                                             BigFormat.latestVersion, getCompactNameSuffix(compact), valPk, "foo bar baz"));
 
-            QueryProcessor.executeInternal(String.format("UPDATE legacy_tables.legacy_%s_simple_counter SET val = val + 1 WHERE pk = '%s'",
-                                                         BigFormat.latestVersion, valPk));
+                QueryProcessor.executeInternal(String.format("UPDATE legacy_tables.legacy_%s_simple_counter%s SET val = val + 1 WHERE pk = '%s'",
+                                                             BigFormat.latestVersion, getCompactNameSuffix(compact), valPk));
 
-            for (int ck = 0; ck < 50; ck++)
-            {
-                String valCk = Integer.toString(ck);
+                for (int ck = 0; ck < 50; ck++)
+                {
+                    String valCk = Integer.toString(ck);
 
-                QueryProcessor.executeInternal(String.format("INSERT INTO legacy_tables.legacy_%s_clust (pk, ck, val) VALUES ('%s', '%s', '%s')",
-                                                             BigFormat.latestVersion, valPk, valCk + longString, randomString));
+                    QueryProcessor.executeInternal(String.format("INSERT INTO legacy_tables.legacy_%s_clust%s (pk, ck, val) VALUES ('%s', '%s', '%s')",
+                                                                 BigFormat.latestVersion, getCompactNameSuffix(compact), valPk, valCk + longString, randomString));
 
-                QueryProcessor.executeInternal(String.format("UPDATE legacy_tables.legacy_%s_clust_counter SET val = val + 1 WHERE pk = '%s' AND ck='%s'",
-                                                             BigFormat.latestVersion, valPk, valCk + longString));
+                    QueryProcessor.executeInternal(String.format("UPDATE legacy_tables.legacy_%s_clust_counter%s SET val = val + 1 WHERE pk = '%s' AND ck='%s'",
+                                                                 BigFormat.latestVersion, getCompactNameSuffix(compact), valPk, valCk + longString));
 
+                }
             }
         }
 
@@ -398,10 +388,13 @@ public class LegacySSTableTest
 
         File ksDir = new File(LEGACY_SSTABLE_ROOT, String.format("%s/legacy_tables", BigFormat.latestVersion));
         ksDir.mkdirs();
-        copySstablesFromTestData(String.format("legacy_%s_simple", BigFormat.latestVersion), ksDir);
-        copySstablesFromTestData(String.format("legacy_%s_simple_counter", BigFormat.latestVersion), ksDir);
-        copySstablesFromTestData(String.format("legacy_%s_clust", BigFormat.latestVersion), ksDir);
-        copySstablesFromTestData(String.format("legacy_%s_clust_counter", BigFormat.latestVersion), ksDir);
+        for (int compact = 0; compact <= 1; compact++)
+        {
+            copySstablesFromTestData(String.format("legacy_%s_simple%s", BigFormat.latestVersion, getCompactNameSuffix(compact)), ksDir);
+            copySstablesFromTestData(String.format("legacy_%s_simple_counter%s", BigFormat.latestVersion, getCompactNameSuffix(compact)), ksDir);
+            copySstablesFromTestData(String.format("legacy_%s_clust%s", BigFormat.latestVersion, getCompactNameSuffix(compact)), ksDir);
+            copySstablesFromTestData(String.format("legacy_%s_clust_counter%s", BigFormat.latestVersion, getCompactNameSuffix(compact)), ksDir);
+        }
     }
 
     private void copySstablesFromTestData(String table, File ksDir) throws IOException
@@ -420,12 +413,17 @@ public class LegacySSTableTest
 
     private static void copySstablesToTestData(String legacyVersion, String table, File cfDir) throws IOException
     {
-        for (File file : new File(LEGACY_SSTABLE_ROOT, String.format("%s/legacy_tables/%s", legacyVersion, table)).listFiles())
+        for (File file : getTableDir(legacyVersion, table).listFiles())
         {
             copyFile(cfDir, file);
         }
     }
 
+    private static File getTableDir(String legacyVersion, String table)
+    {
+        return new File(LEGACY_SSTABLE_ROOT, String.format("%s/legacy_tables/%s", legacyVersion, table));
+    }
+
     private static void copyFile(File cfDir, File file) throws IOException
     {
         byte[] buf = new byte[65536];

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
index c3a4539..360d262 100644
--- a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
+++ b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
@@ -406,7 +406,7 @@ public class BufferedRandomAccessFileTest
             assert tmpFile.getPath().equals(r.getPath());
 
             // Create a mark and move the rw there.
-            final FileMark mark = r.mark();
+            final DataPosition mark = r.mark();
             r.reset(mark);
 
             // Expect this call to succeed.
@@ -457,7 +457,7 @@ public class BufferedRandomAccessFileTest
         RandomAccessReader file = RandomAccessReader.open(channel);
 
         file.seek(10);
-        FileMark mark = file.mark();
+        DataPosition mark = file.mark();
 
         file.seek(file.length());
         assertTrue(file.isEOF());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java
index f0d4383..e83c015 100644
--- a/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java
@@ -356,7 +356,7 @@ public class RandomAccessReaderTest
             assertFalse(reader.isEOF());
             assertEquals((numIterations - 1) * expected.length(), reader.bytesRemaining());
 
-            FileMark mark = reader.mark();
+            DataPosition mark = reader.mark();
             assertEquals(0, reader.bytesPastMark());
             assertEquals(0, reader.bytesPastMark(mark));
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/test/unit/org/apache/cassandra/io/util/RewindableDataInputStreamPlusTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/RewindableDataInputStreamPlusTest.java b/test/unit/org/apache/cassandra/io/util/RewindableDataInputStreamPlusTest.java
new file mode 100644
index 0000000..175ab53
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/util/RewindableDataInputStreamPlusTest.java
@@ -0,0 +1,539 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class RewindableDataInputStreamPlusTest
+{
+
+    private final int INITIAL_BUFFER_SIZE = 1;
+
+    private File file;
+
+    @Before
+    public void setup() throws Exception
+    {
+        this.file = new File(System.getProperty("java.io.tmpdir"), "subdir/test.buffer");
+    }
+
+    @Test
+    public void testMarkAndResetSimple() throws Exception
+    {
+        byte[] testData;
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try (DataOutputStream out = new DataOutputStream(baos))
+        {
+            // boolean
+            out.writeBoolean(true);
+            // byte
+            out.writeByte(0x1);
+            // char
+            out.writeChar('a');
+            // short
+            out.writeShort(1);
+            // int
+            out.writeInt(1);
+            // long
+            out.writeLong(1L);
+            // float
+            out.writeFloat(1.0f);
+            // double
+            out.writeDouble(1.0d);
+
+            // String
+            out.writeUTF("abc");
+            testData = baos.toByteArray();
+        }
+
+        for (int memCapacity = 0; memCapacity <= 16; memCapacity++)
+        {
+            int diskCapacity = 16 - memCapacity;
+            try (RewindableDataInputStreamPlus reader = new RewindableDataInputStreamPlus(new ByteArrayInputStream(testData),
+                                                                                          INITIAL_BUFFER_SIZE, memCapacity, file,
+                                                                                          diskCapacity))
+            {
+                try {
+                    //should mark before resetting
+                    reader.reset(null);
+                    fail("Should have thrown IOException");
+                } catch (IOException e) {}
+
+                assertTrue(reader.readBoolean());
+
+                reader.mark();
+
+                try {
+                    //cannot mark already marked stream
+                    reader.mark();
+                    fail("Should have thrown IllegalStateException");
+                } catch (IllegalStateException e) {}
+
+                assertEquals(0x1, reader.readByte());
+                assertEquals('a', reader.readChar());
+                assertEquals(3, reader.bytesPastMark(null));
+                reader.reset(null);
+
+                try {
+                    //cannot mark when reading from cache
+                    reader.mark();
+                    fail("Should have thrown IllegalStateException");
+                } catch (IllegalStateException e) {}
+
+                //read again previous sequence
+                assertEquals(0x1, reader.readByte());
+                assertEquals('a', reader.readChar());
+                //finish reading again previous sequence
+                assertEquals(1, reader.readShort());
+
+                reader.mark();
+                assertEquals(1, reader.readInt());
+                assertEquals(1L, reader.readLong());
+                assertEquals(1.0f, reader.readFloat(), 0);
+                assertEquals(16, reader.bytesPastMark(null));
+                reader.reset(null);
+
+                //read again previous sequence
+                assertEquals(1, reader.readInt());
+                assertEquals(1L, reader.readLong());
+                assertEquals(1.0f, reader.readFloat(), 0);
+                //finish reading again previous sequence
+
+                //mark again
+                reader.mark();
+                assertEquals(1.0d, reader.readDouble(), 0);
+                assertEquals(8, reader.bytesPastMark(null));
+                reader.reset(null);
+
+                //read again previous sequence
+                assertEquals(1.0d, reader.readDouble(), 0);
+                //finish reading again previous sequence
+
+                //mark and reset
+                reader.mark();
+                reader.reset(null);
+
+                assertEquals("abc", reader.readUTF());
+
+                //check max file size
+                assertEquals(diskCapacity, file.length());
+            }
+            assertFalse(file.exists());
+        }
+    }
+
+    @Test
+    public void testVeryLargeCapacity() throws Exception
+    {
+        byte[] testData;
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try (DataOutputStream out = new DataOutputStream(baos))
+        {
+            out.writeUTF("abc");
+            testData = baos.toByteArray();
+        }
+
+        try (RewindableDataInputStreamPlus reader = new RewindableDataInputStreamPlus(new ByteArrayInputStream(testData),
+                                                                                      INITIAL_BUFFER_SIZE, Integer.MAX_VALUE, file,
+                                                                                      Integer.MAX_VALUE))
+        {
+            reader.mark();
+            assertEquals("abc", reader.readUTF());
+            reader.reset();
+            assertEquals("abc", reader.readUTF());
+        }
+        assertFalse(file.exists());
+
+
+        baos = new ByteArrayOutputStream();
+        try (DataOutputStream out = new DataOutputStream(baos))
+        {
+            out.writeBoolean(true);
+            out.writeBoolean(true);
+            testData = baos.toByteArray();
+        }
+    }
+
+    @Test
+    public void testMarkAndResetBigBuffer() throws Exception
+    {
+        byte[] testData;
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try (DataOutputStream out = new DataOutputStream(baos))
+        {
+            // boolean
+            out.writeBoolean(true);
+            // byte
+            out.writeByte(0x1);
+            // char
+            out.writeChar('a');
+            // short
+            out.writeShort(1);
+            // int
+            out.writeInt(1);
+            // long
+            out.writeLong(1L);
+            // float
+            out.writeFloat(1.0f);
+            // double
+            out.writeDouble(1.0d);
+
+            // String
+            out.writeUTF("abc");
+            testData = baos.toByteArray();
+
+            // 1 (boolean) + 1 (byte) + 2 (char) + 2 (short) + 4 (int) + 8 (long)
+            // + 4 (float) + 8 (double) + 5 bytes (utf string)
+        }
+
+        for (int memCapacity = 0; memCapacity <= 18; memCapacity++)
+        {
+            int diskCapacity = 18 - memCapacity;
+            try (RewindableDataInputStreamPlus reader = new RewindableDataInputStreamPlus(new ByteArrayInputStream(testData),
+                                                                                          INITIAL_BUFFER_SIZE, memCapacity, file,
+                                                                                          diskCapacity))
+            {
+                //read a big amount before resetting
+                reader.mark();
+                assertTrue(reader.readBoolean());
+                assertEquals(0x1, reader.readByte());
+                assertEquals('a', reader.readChar());
+                assertEquals(1, reader.readShort());
+                assertEquals(1, reader.readInt());
+                assertEquals(1L, reader.readLong());
+                reader.reset();
+
+                //read from buffer
+                assertTrue(reader.readBoolean());
+                assertEquals(0x1, reader.readByte());
+                assertEquals('a', reader.readChar());
+                assertEquals(1, reader.readShort());
+                assertEquals(1, reader.readInt());
+                assertEquals(1L, reader.readLong());
+
+                assertEquals(17, reader.available());
+
+                //mark again
+                reader.mark();
+                assertEquals(1.0f, reader.readFloat(), 0);
+                assertEquals(1.0d, reader.readDouble(), 0);
+                assertEquals("abc", reader.readUTF());
+                reader.reset();
+
+                assertEquals(17, reader.available());
+
+                assertEquals(1.0f, reader.readFloat(), 0);
+                assertEquals(1.0d, reader.readDouble(), 0);
+                assertEquals("abc", reader.readUTF());
+            }
+            assertFalse(file.exists());
+        }
+    }
+
+
+    @Test
+    public void testCircularSpillFile() throws Exception
+    {
+        byte[] testData;
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try (DataOutputStream out = new DataOutputStream(baos))
+        {
+            // boolean
+            out.writeBoolean(true);
+            // byte
+            out.writeByte(0x1);
+            // char
+            out.writeChar('a');
+            // short
+            out.writeShort(1);
+            // int
+            out.writeInt(1);
+
+            // String
+            out.writeUTF("ab");
+            testData = baos.toByteArray();
+
+            // 1 (boolean) + 1 (byte) + 2 (char) + 2 (short) + 4 (int) + 4 bytes (utf string)
+        }
+
+        //read at most 4 bytes multiple times (and then check file size)
+        int MEM_SIZE = 0;
+        int DISK_SIZE = 4;
+        try (RewindableDataInputStreamPlus reader = new RewindableDataInputStreamPlus(new ByteArrayInputStream(testData),
+                                                                                      INITIAL_BUFFER_SIZE, MEM_SIZE, file,
+                                                                                      DISK_SIZE))
+        {
+            //read 2 bytes and reset
+            reader.mark();
+            assertTrue(reader.readBoolean());
+            assertEquals(0x1, reader.readByte());
+            assertEquals(2, reader.bytesPastMark(null));
+            reader.reset();
+
+            //read again previous sequence
+            assertTrue(reader.readBoolean());
+            assertEquals(0x1, reader.readByte());
+            //finish reading again previous sequence
+
+            //read 4 bytes and reset
+            reader.mark();
+            assertEquals('a', reader.readChar());
+            assertEquals(1, reader.readShort());
+            assertEquals(4, reader.bytesPastMark(null));
+            reader.reset();
+
+            //read again previous sequence
+            assertEquals('a', reader.readChar());
+            assertEquals(1, reader.readShort());
+            //finish reading again previous sequence
+
+            //read 4 bytes and reset
+            reader.mark();
+            assertEquals(1, reader.readInt());
+            assertEquals(4, reader.bytesPastMark(null));
+            reader.reset();
+
+            //read again previous sequence
+            assertEquals(1, reader.readInt());
+
+            //check max file size
+            assertEquals(DISK_SIZE, file.length());
+        }
+        assertFalse(file.exists());
+    }
+
+    @Test
+    public void testExhaustCapacity() throws Exception
+    {
+        byte[] testData;
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try (DataOutputStream out = new DataOutputStream(baos))
+        {
+            // boolean
+            out.writeBoolean(true);
+            // byte
+            out.writeByte(0x1);
+            // char
+            out.writeChar('a');
+            // short
+            out.writeShort(1);
+            testData = baos.toByteArray();
+        }
+
+        //test capacity exhausted when reading more than 4 bytes
+        testCapacityExhausted(testData, 0, 2);
+        testCapacityExhausted(testData, 2, 0);
+        testCapacityExhausted(testData, 1, 1);
+    }
+
+    private void testCapacityExhausted(byte[] testData, int memSize, int diskSize) throws IOException
+    {
+        try (RewindableDataInputStreamPlus reader = new RewindableDataInputStreamPlus(new ByteArrayInputStream(testData),
+                                                                                      INITIAL_BUFFER_SIZE, memSize, file,
+                                                                                      diskSize))
+        {
+            //read 2 bytes and reset
+            reader.mark();
+            assertTrue(reader.readBoolean());
+            assertEquals(0x1, reader.readByte());
+            assertEquals(2, reader.bytesPastMark(null));
+            reader.reset();
+
+            //read again previous sequence
+            assertTrue(reader.readBoolean());
+            assertEquals(0x1, reader.readByte());
+            //finish reading again previous sequence
+
+            reader.mark();
+            //read 3 bytes - OK
+            assertEquals('a', reader.readChar());
+            //read 1 more bytes - CAPACITY will exhaust when trying to reset :(
+            assertEquals(1, reader.readShort());
+
+            try
+            {
+                reader.reset();
+                fail("Should have thrown IOException");
+            }
+            catch (IOException e) {}
+
+            //check max file size
+            assertEquals(diskSize, file.length());
+        }
+        assertFalse(file.exists());
+    }
+
+    @Test
+    public void testMarkAndResetUnsignedRead() throws Exception
+    {
+        byte[] testData;
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try (DataOutputStream out = new DataOutputStream(baos))
+        {
+            // byte
+            out.writeByte(0x1);
+            // short
+            out.writeShort(2);
+            testData = baos.toByteArray();
+        }
+
+        for (int memCapacity = 0; memCapacity <= 1; memCapacity++)
+        {
+            int diskCapacity = 1 - memCapacity;
+            try (RewindableDataInputStreamPlus reader = new RewindableDataInputStreamPlus(new ByteArrayInputStream(testData),
+                                                                                          INITIAL_BUFFER_SIZE, memCapacity, file,
+                                                                                          diskCapacity))
+            {
+                reader.mark();
+                assertEquals(1, reader.readUnsignedByte());
+                reader.reset();
+                assertEquals(1, reader.readUnsignedByte());
+
+                //will read first byte of short 2
+                reader.mark();
+                assertEquals(0, reader.readUnsignedByte());
+                reader.reset();
+
+                assertEquals(2, reader.readUnsignedShort());
+
+                reader.mark();
+                reader.reset();
+                assertEquals(0, reader.available());
+            }
+        }
+        assertFalse(file.exists());
+    }
+
+    @Test
+    public void testMarkAndResetSkipBytes() throws Exception
+    {
+        String testStr = "1234567890";
+        byte[] testData = testStr.getBytes();
+
+        for (int memCapacity = 0; memCapacity <= 7; memCapacity++)
+        {
+            int diskCapacity = 7 - memCapacity;
+            try (RewindableDataInputStreamPlus reader = new RewindableDataInputStreamPlus(new ByteArrayInputStream(testData),
+                                                                                          INITIAL_BUFFER_SIZE, memCapacity, file,
+                                                                                          diskCapacity))
+            {
+                reader.mark();
+                // read first 5 bytes and rewind
+                byte[] out = new byte[5];
+                reader.readFully(out, 0, 5);
+                assertEquals("12345", new String(out));
+
+                // then skip 2 bytes (67)
+                reader.skipBytes(2);
+
+                assertEquals(7, reader.bytesPastMark(null));
+                reader.reset();
+
+                //now read part of the previously skipped bytes
+                out = new byte[5];
+                reader.readFully(out);
+                assertEquals("12345", new String(out));
+
+                //skip 3 bytes (2 from cache, 1 from stream)
+                reader.skip(3);
+
+                // mark and read 2 more bytes
+                reader.mark();
+                out = new byte[2];
+                reader.readFully(out);
+                assertEquals("90", new String(out));
+                assertEquals(0, reader.available());
+                reader.reset();
+
+                //reset and read only the next byte "9" in the third position
+                reader.readFully(out, 1, 1);
+                assertEquals("99", new String(out));
+
+                //now we read the remainder via readline
+                assertEquals(1, reader.available());
+                assertEquals("0", reader.readLine());
+
+            }
+            assertFalse(file.exists());
+        }
+    }
+
+    @Test
+    public void testMarkAndResetReadFully() throws Exception
+    {
+        String testStr = "1234567890";
+        byte[] testData = testStr.getBytes();
+
+        for (int memCapacity = 0; memCapacity <= 5; memCapacity++)
+        {
+            int diskCapacity = 5 - memCapacity;
+            try (RewindableDataInputStreamPlus reader = new RewindableDataInputStreamPlus(new ByteArrayInputStream(testData),
+                                                                                          INITIAL_BUFFER_SIZE, memCapacity, file,
+                                                                                          diskCapacity))
+            {
+                reader.mark();
+                // read first 5 bytes and rewind
+                byte[] out = new byte[5];
+                reader.readFully(out, 0, 5);
+                assertEquals("12345", new String(out));
+                reader.reset();
+
+                // read half from cache, half from parent stream
+                out = new byte[7];
+                reader.readFully(out);
+                assertEquals("1234567", new String(out));
+
+                // mark and read 3 more bytes
+                reader.mark();
+                out = new byte[3];
+                reader.readFully(out);
+                assertEquals("890", new String(out));
+                assertEquals(0, reader.available());
+                reader.reset();
+
+                //reset and read only the next byte "8" in the third position
+                reader.readFully(out, 2, 1);
+                assertEquals("898", new String(out));
+
+                //now we read the remainder via readline
+                assertEquals(2, reader.available());
+                assertEquals("90", reader.readLine());
+            }
+            assertFalse(file.exists());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8651b66/test/unit/org/apache/cassandra/utils/BytesReadTrackerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/BytesReadTrackerTest.java b/test/unit/org/apache/cassandra/utils/BytesReadTrackerTest.java
index 221e55c..7693b45 100644
--- a/test/unit/org/apache/cassandra/utils/BytesReadTrackerTest.java
+++ b/test/unit/org/apache/cassandra/utils/BytesReadTrackerTest.java
@@ -18,6 +18,10 @@
  */
 package org.apache.cassandra.utils;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -25,8 +29,10 @@ import java.io.DataOutputStream;
 
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import org.apache.cassandra.io.util.BytesReadTracker;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.TrackedDataInputPlus;
+import org.apache.cassandra.io.util.TrackedInputStream;
 
 public class BytesReadTrackerTest
 {
@@ -34,6 +40,33 @@ public class BytesReadTrackerTest
     @Test
     public void testBytesRead() throws Exception
     {
+        internalTestBytesRead(true);
+        internalTestBytesRead(false);
+    }
+
+    @Test
+    public void testUnsignedRead() throws Exception
+    {
+        internalTestUnsignedRead(true);
+        internalTestUnsignedRead(false);
+    }
+
+    @Test
+    public void testSkipBytesAndReadFully() throws Exception
+    {
+        internalTestSkipBytesAndReadFully(true);
+        internalTestSkipBytesAndReadFully(false);
+    }
+
+    @Test
+    public void testReadLine() throws Exception
+    {
+        internalTestReadLine(true);
+        internalTestReadLine(false);
+    }
+
+    public void internalTestBytesRead(boolean inputStream) throws Exception
+    {
         byte[] testData;
 
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -66,45 +99,46 @@ public class BytesReadTrackerTest
             out.close();
         }
 
-        DataInputStream in = new DataInputStream(new ByteArrayInputStream(testData));
-        BytesReadTracker tracker = new BytesReadTracker(in);
+        DataInputPlus.DataInputStreamPlus in = new DataInputPlus.DataInputStreamPlus(new ByteArrayInputStream(testData));
+        BytesReadTracker tracker = inputStream? new TrackedInputStream(in) : new TrackedDataInputPlus(in);
+        DataInputPlus reader = inputStream? new DataInputPlus.DataInputStreamPlus((TrackedInputStream)tracker) : (DataInputPlus) tracker;
 
         try
         {
             // boolean = 1byte
-            boolean bool = tracker.readBoolean();
+            boolean bool = reader.readBoolean();
             assertTrue(bool);
             assertEquals(1, tracker.getBytesRead());
             // byte = 1byte
-            byte b = tracker.readByte();
+            byte b = reader.readByte();
             assertEquals(b, 0x1);
             assertEquals(2, tracker.getBytesRead());
             // char = 2byte
-            char c = tracker.readChar();
+            char c = reader.readChar();
             assertEquals('a', c);
             assertEquals(4, tracker.getBytesRead());
             // short = 2bytes
-            short s = tracker.readShort();
+            short s = reader.readShort();
             assertEquals(1, s);
             assertEquals((short) 6, tracker.getBytesRead());
             // int = 4bytes
-            int i = tracker.readInt();
+            int i = reader.readInt();
             assertEquals(1, i);
             assertEquals(10, tracker.getBytesRead());
             // long = 8bytes
-            long l = tracker.readLong();
+            long l = reader.readLong();
             assertEquals(1L, l);
             assertEquals(18, tracker.getBytesRead());
             // float = 4bytes
-            float f = tracker.readFloat();
+            float f = reader.readFloat();
             assertEquals(1.0f, f, 0);
             assertEquals(22, tracker.getBytesRead());
             // double = 8bytes
-            double d = tracker.readDouble();
+            double d = reader.readDouble();
             assertEquals(1.0d, d, 0);
             assertEquals(30, tracker.getBytesRead());
             // String("abc") = 2(string size) + 3 = 5 bytes
-            String str = tracker.readUTF();
+            String str = reader.readUTF();
             assertEquals("abc", str);
             assertEquals(35, tracker.getBytesRead());
 
@@ -119,8 +153,7 @@ public class BytesReadTrackerTest
         assertEquals(0, tracker.getBytesRead());
     }
 
-    @Test
-    public void testUnsignedRead() throws Exception
+    public void internalTestUnsignedRead(boolean inputStream) throws Exception
     {
         byte[] testData;
 
@@ -139,17 +172,18 @@ public class BytesReadTrackerTest
             out.close();
         }
 
-        DataInputStream in = new DataInputStream(new ByteArrayInputStream(testData));
-        BytesReadTracker tracker = new BytesReadTracker(in);
+        DataInputPlus.DataInputStreamPlus in = new DataInputPlus.DataInputStreamPlus(new ByteArrayInputStream(testData));
+        BytesReadTracker tracker = inputStream? new TrackedInputStream(in) : new TrackedDataInputPlus(in);
+        DataInputPlus reader = inputStream? new DataInputPlus.DataInputStreamPlus((TrackedInputStream)tracker) : (DataInputPlus) tracker;
 
         try
         {
             // byte = 1byte
-            int b = tracker.readUnsignedByte();
+            int b = reader.readUnsignedByte();
             assertEquals(b, 1);
             assertEquals(1, tracker.getBytesRead());
             // short = 2bytes
-            int s = tracker.readUnsignedShort();
+            int s = reader.readUnsignedShort();
             assertEquals(1, s);
             assertEquals(3, tracker.getBytesRead());
 
@@ -161,30 +195,30 @@ public class BytesReadTrackerTest
         }
     }
 
-    @Test
-    public void testSkipBytesAndReadFully() throws Exception
+    public void internalTestSkipBytesAndReadFully(boolean inputStream) throws Exception
     {
         String testStr = "1234567890";
         byte[] testData = testStr.getBytes();
 
-        DataInputStream in = new DataInputStream(new ByteArrayInputStream(testData));
-        BytesReadTracker tracker = new BytesReadTracker(in);
+        DataInputPlus.DataInputStreamPlus in = new DataInputPlus.DataInputStreamPlus(new ByteArrayInputStream(testData));
+        BytesReadTracker tracker = inputStream? new TrackedInputStream(in) : new TrackedDataInputPlus(in);
+        DataInputPlus reader = inputStream? new DataInputPlus.DataInputStreamPlus((TrackedInputStream)tracker) : (DataInputPlus) tracker;
 
         try
         {
             // read first 5 bytes
             byte[] out = new byte[5];
-            tracker.readFully(out, 0, 5);
+            reader.readFully(out, 0, 5);
             assertEquals("12345", new String(out));
             assertEquals(5, tracker.getBytesRead());
 
             // then skip 2 bytes
-            tracker.skipBytes(2);
+            reader.skipBytes(2);
             assertEquals(7, tracker.getBytesRead());
 
             // and read the rest
             out = new byte[3];
-            tracker.readFully(out);
+            reader.readFully(out);
             assertEquals("890", new String(out));
             assertEquals(10, tracker.getBytesRead());
 
@@ -196,16 +230,24 @@ public class BytesReadTrackerTest
         }
     }
 
-    @Test(expected = UnsupportedOperationException.class)
-    public void testReadLine() throws Exception
+    public void internalTestReadLine(boolean inputStream) throws Exception
     {
         DataInputStream in = new DataInputStream(new ByteArrayInputStream("1".getBytes()));
-        BytesReadTracker tracker = new BytesReadTracker(in);
+        BytesReadTracker tracker = inputStream? new TrackedInputStream(in) : new TrackedDataInputPlus(in);
+        DataInputPlus reader = inputStream? new DataInputPlus.DataInputStreamPlus((TrackedInputStream)tracker) : (DataInputPlus) tracker;
 
         try
         {
-            // throws UnsupportedOperationException
-            tracker.readLine();
+            String line = reader.readLine();
+            if (inputStream)
+                assertEquals(line, "1");
+            else
+                fail("Should have thrown UnsupportedOperationException");
+        }
+        catch (UnsupportedOperationException e)
+        {
+            if (inputStream)
+                fail("Should have not thrown UnsupportedOperationException");
         }
         finally
         {