You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2016/07/06 14:38:50 UTC

[3/3] cassandra git commit: Remove DatabaseDescriptor dependency from SegmentedFile

Remove DatabaseDescriptor dependency from SegmentedFile

patch by yukim; reviewed by Stefania Alborghetti for CASSANDRA-11580


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b4133f38
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b4133f38
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b4133f38

Branch: refs/heads/trunk
Commit: b4133f38d5ef5fc50047eb4a31307ac97c5b72ee
Parents: a4e7387
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Jul 6 06:08:17 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jul 6 09:36:40 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/cache/AutoSavingCache.java |   4 +-
 .../org/apache/cassandra/cache/ChunkCache.java  |   2 +-
 .../cassandra/config/DatabaseDescriptor.java    |  35 +-
 .../org/apache/cassandra/db/RowIndexEntry.java  |  10 +-
 .../columniterator/AbstractSSTableIterator.java |   8 +-
 .../db/columniterator/SSTableIterator.java      |   4 +-
 .../columniterator/SSTableReversedIterator.java |   4 +-
 .../cassandra/db/commitlog/CommitLogReader.java |   3 +-
 .../apache/cassandra/io/sstable/Descriptor.java |  15 +-
 .../apache/cassandra/io/sstable/SSTable.java    |   9 +-
 .../io/sstable/format/SSTableReader.java        | 104 ++---
 .../io/sstable/format/SSTableWriter.java        |   3 +-
 .../io/sstable/format/big/BigTableWriter.java   |  57 ++-
 .../io/util/AbstractReaderFileProxy.java        |   2 +-
 .../io/util/BufferManagingRebufferer.java       |  12 +-
 .../io/util/BufferedSegmentedFile.java          |  58 ---
 .../io/util/ChecksummedRandomAccessReader.java  | 101 +----
 .../io/util/ChecksummedRebufferer.java          |  76 ++++
 .../apache/cassandra/io/util/ChunkReader.java   |   5 -
 .../io/util/CompressedChunkReader.java          | 227 ++++++++++
 .../io/util/CompressedSegmentedFile.java        | 358 ---------------
 .../cassandra/io/util/CorruptFileException.java |  31 ++
 .../io/util/DiskOptimizationStrategy.java       |  48 ++
 .../apache/cassandra/io/util/FileHandle.java    | 440 +++++++++++++++++++
 .../cassandra/io/util/ICompressedFile.java      |  26 --
 .../cassandra/io/util/MmapRebufferer.java       |   2 +-
 .../cassandra/io/util/MmappedRegions.java       |   5 +
 .../cassandra/io/util/MmappedSegmentedFile.java | 117 -----
 .../cassandra/io/util/RandomAccessReader.java   | 192 ++------
 .../apache/cassandra/io/util/Rebufferer.java    |   4 +-
 .../apache/cassandra/io/util/SegmentedFile.java | 318 --------------
 .../cassandra/io/util/SimpleChunkReader.java    |  10 +-
 .../util/SpinningDiskOptimizationStrategy.java  |  31 ++
 .../io/util/SsdDiskOptimizationStrategy.java    |  49 +++
 test/unit/org/apache/cassandra/MockSchema.java  |  12 +-
 .../cassandra/db/ColumnFamilyStoreTest.java     |   6 +-
 .../apache/cassandra/db/DirectoriesTest.java    |  15 +-
 .../apache/cassandra/db/RowIndexEntryTest.java  |   3 +-
 .../db/lifecycle/LogTransactionTest.java        |  12 +-
 .../CompressedRandomAccessReaderTest.java       | 223 +++++-----
 .../CompressedSequentialWriterTest.java         |  69 ++-
 .../cassandra/io/sstable/DescriptorTest.java    |   6 +-
 .../cassandra/io/sstable/SSTableReaderTest.java |  10 +-
 .../cassandra/io/sstable/SSTableUtils.java      |   3 +-
 .../metadata/MetadataSerializerTest.java        |   8 +-
 .../io/util/BufferedRandomAccessFileTest.java   | 404 +++++++++--------
 .../util/ChecksummedRandomAccessReaderTest.java |  50 +--
 .../io/util/DiskOptimizationStrategyTest.java   |  85 ++++
 .../io/util/RandomAccessReaderTest.java         |  77 ++--
 .../cassandra/io/util/SegmentedFileTest.java    |  88 ----
 51 files changed, 1615 insertions(+), 1827 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2e2444a..f5d1d01 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.10
+ * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580)
  * Add supplied username to authentication error messages (CASSANDRA-12076)
  * Remove pre-startup check for open JMX port (CASSANDRA-12074)
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index cb2ad8a..b98ad53 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -44,7 +44,7 @@ import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.util.*;
-import org.apache.cassandra.io.util.ChecksummedRandomAccessReader.CorruptFileException;
+import org.apache.cassandra.io.util.CorruptFileException;
 import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -91,7 +91,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
 
         public InputStream getInputStream(File dataPath, File crcPath) throws IOException
         {
-            return new ChecksummedRandomAccessReader.Builder(dataPath, crcPath).build();
+            return ChecksummedRandomAccessReader.open(dataPath, crcPath);
         }
 
         public OutputStream getOutputStream(File dataPath, File crcPath)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/cache/ChunkCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/ChunkCache.java b/src/java/org/apache/cassandra/cache/ChunkCache.java
index e6296bd..0845944 100644
--- a/src/java/org/apache/cassandra/cache/ChunkCache.java
+++ b/src/java/org/apache/cassandra/cache/ChunkCache.java
@@ -184,7 +184,7 @@ public class ChunkCache
         return instance.wrap(file);
     }
 
-    public void invalidatePosition(SegmentedFile dfile, long position)
+    public void invalidatePosition(FileHandle dfile, long position)
     {
         if (!(dfile.rebuffererFactory() instanceof CachingRebufferer))
             return;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 1375a39..be62f10 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -46,7 +46,10 @@ import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.util.DiskOptimizationStrategy;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.SpinningDiskOptimizationStrategy;
+import org.apache.cassandra.io.util.SsdDiskOptimizationStrategy;
 import org.apache.cassandra.locator.*;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.scheduler.IRequestScheduler;
@@ -108,6 +111,8 @@ public class DatabaseDescriptor
     private static EncryptionContext encryptionContext;
     private static boolean hasLoggedConfig;
 
+    private static DiskOptimizationStrategy diskOptimizationStrategy;
+
     public static void forceStaticInitialization() {}
     static
     {
@@ -124,6 +129,15 @@ public class DatabaseDescriptor
             {
                 applyConfig(loadConfig());
             }
+            switch (conf.disk_optimization_strategy)
+            {
+                case ssd:
+                    diskOptimizationStrategy = new SsdDiskOptimizationStrategy(conf.disk_optimization_page_cross_chance);
+                    break;
+                case spinning:
+                    diskOptimizationStrategy = new SpinningDiskOptimizationStrategy();
+                    break;
+            }
         }
         catch (Exception e)
         {
@@ -1862,15 +1876,9 @@ public class DatabaseDescriptor
         return conf.buffer_pool_use_heap_if_exhausted;
     }
 
-    public static Config.DiskOptimizationStrategy getDiskOptimizationStrategy()
-    {
-        return conf.disk_optimization_strategy;
-    }
-
-    @VisibleForTesting
-    public static void setDiskOptimizationStrategy(Config.DiskOptimizationStrategy strategy)
+    public static DiskOptimizationStrategy getDiskOptimizationStrategy()
     {
-        conf.disk_optimization_strategy = strategy;
+        return diskOptimizationStrategy;
     }
 
     public static double getDiskOptimizationEstimatePercentile()
@@ -1878,17 +1886,6 @@ public class DatabaseDescriptor
         return conf.disk_optimization_estimate_percentile;
     }
 
-    public static double getDiskOptimizationPageCrossChance()
-    {
-        return conf.disk_optimization_page_cross_chance;
-    }
-
-    @VisibleForTesting
-    public static void setDiskOptimizationPageCrossChance(double chance)
-    {
-        conf.disk_optimization_page_cross_chance = chance;
-    }
-
     public static long getTotalCommitlogSpaceInMB()
     {
         return conf.commitlog_total_space_in_mb;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/db/RowIndexEntry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIndexEntry.java b/src/java/org/apache/cassandra/db/RowIndexEntry.java
index dd1fdb7..d030329 100644
--- a/src/java/org/apache/cassandra/db/RowIndexEntry.java
+++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java
@@ -33,7 +33,7 @@ import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.io.util.FileHandle;
 import org.apache.cassandra.io.util.TrackedDataInputPlus;
 import org.apache.cassandra.metrics.DefaultNameFactory;
 import org.apache.cassandra.metrics.MetricNameFactory;
@@ -229,7 +229,7 @@ public class RowIndexEntry<T> implements IMeasurableMemory
         return new RowIndexEntry<>(dataFilePosition);
     }
 
-    public IndexInfoRetriever openWithIndex(SegmentedFile indexFile)
+    public IndexInfoRetriever openWithIndex(FileHandle indexFile)
     {
         return null;
     }
@@ -480,7 +480,7 @@ public class RowIndexEntry<T> implements IMeasurableMemory
         }
 
         @Override
-        public IndexInfoRetriever openWithIndex(SegmentedFile indexFile)
+        public IndexInfoRetriever openWithIndex(FileHandle indexFile)
         {
             int fieldsSize = (int) DeletionTime.serializer.serializedSize(deletionTime)
                              + TypeSizes.sizeof(0); // columnIndexCount
@@ -715,7 +715,7 @@ public class RowIndexEntry<T> implements IMeasurableMemory
         }
 
         @Override
-        public IndexInfoRetriever openWithIndex(SegmentedFile indexFile)
+        public IndexInfoRetriever openWithIndex(FileHandle indexFile)
         {
             indexEntrySizeHistogram.update(serializedSize(deletionTime, headerLength, columnsIndex.length) + indexedPartSize);
             indexInfoCountHistogram.update(columnsIndex.length);
@@ -879,7 +879,7 @@ public class RowIndexEntry<T> implements IMeasurableMemory
         }
 
         @Override
-        public IndexInfoRetriever openWithIndex(SegmentedFile indexFile)
+        public IndexInfoRetriever openWithIndex(FileHandle indexFile)
         {
             indexEntrySizeHistogram.update(indexedPartSize + fieldsSerializedSize);
             indexInfoCountHistogram.update(columnsIndexCount);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
index 005bb2c..16aa30a 100644
--- a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
@@ -31,7 +31,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.DataPosition;
-import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.io.util.FileHandle;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 public abstract class AbstractSSTableIterator implements UnfilteredRowIterator
@@ -47,7 +47,7 @@ public abstract class AbstractSSTableIterator implements UnfilteredRowIterator
 
     private final boolean isForThrift;
 
-    protected final SegmentedFile ifile;
+    protected final FileHandle ifile;
 
     private boolean isClosed;
 
@@ -63,7 +63,7 @@ public abstract class AbstractSSTableIterator implements UnfilteredRowIterator
                                       Slices slices,
                                       ColumnFilter columnFilter,
                                       boolean isForThrift,
-                                      SegmentedFile ifile)
+                                      FileHandle ifile)
     {
         this.sstable = sstable;
         this.ifile = ifile;
@@ -453,7 +453,7 @@ public abstract class AbstractSSTableIterator implements UnfilteredRowIterator
         // Marks the beginning of the block corresponding to currentIndexIdx.
         private DataPosition mark;
 
-        public IndexState(Reader reader, ClusteringComparator comparator, RowIndexEntry indexEntry, boolean reversed, SegmentedFile indexFile)
+        public IndexState(Reader reader, ClusteringComparator comparator, RowIndexEntry indexEntry, boolean reversed, FileHandle indexFile)
         {
             this.reader = reader;
             this.comparator = comparator;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
index e4f6700..ad3732f 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
@@ -25,7 +25,7 @@ import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.io.util.FileHandle;
 
 /**
  *  A Cell Iterator over SSTable
@@ -39,7 +39,7 @@ public class SSTableIterator extends AbstractSSTableIterator
                            Slices slices,
                            ColumnFilter columns,
                            boolean isForThrift,
-                           SegmentedFile ifile)
+                           FileHandle ifile)
     {
         super(sstable, file, key, indexEntry, slices, columns, isForThrift, ifile);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
index 6fef70f..e65376e 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
@@ -27,7 +27,7 @@ import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.io.util.FileHandle;
 import org.apache.cassandra.utils.btree.BTree;
 
 /**
@@ -42,7 +42,7 @@ public class SSTableReversedIterator extends AbstractSSTableIterator
                                    Slices slices,
                                    ColumnFilter columns,
                                    boolean isForThrift,
-                                   SegmentedFile ifile)
+                                   FileHandle ifile)
     {
         super(sstable, file, key, indexEntry, slices, columns, isForThrift, ifile);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
index a9e9038..4594080 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
@@ -121,8 +121,7 @@ public class CommitLogReader
         // just transform from the file name (no reading of headers) to determine version
         CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
 
-        try(ChannelProxy channel = new ChannelProxy(file);
-            RandomAccessReader reader = RandomAccessReader.open(channel))
+        try(RandomAccessReader reader = RandomAccessReader.open(file))
         {
             if (desc.version < CommitLogDescriptor.VERSION_21)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index 7840985..13611a6 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -27,11 +27,9 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.CharMatcher;
 import com.google.common.base.Objects;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.Version;
-import org.apache.cassandra.io.sstable.format.big.BigFormat;
 import org.apache.cassandra.io.sstable.metadata.IMetadataSerializer;
 import org.apache.cassandra.io.sstable.metadata.LegacyMetadataSerializer;
 import org.apache.cassandra.io.sstable.metadata.MetadataSerializer;
@@ -63,26 +61,17 @@ public class Descriptor
     private final int hashCode;
 
     /**
-     * A descriptor that assumes CURRENT_VERSION.
-     */
-    @VisibleForTesting
-    public Descriptor(File directory, String ksname, String cfname, int generation)
-    {
-        this(DatabaseDescriptor.getSSTableFormat().info.getLatestVersion(), directory, ksname, cfname, generation, DatabaseDescriptor.getSSTableFormat(), null);
-    }
-
-    /**
      * Constructor for sstable writers only.
      */
     public Descriptor(File directory, String ksname, String cfname, int generation, SSTableFormat.Type formatType)
     {
-        this(formatType.info.getLatestVersion(), directory, ksname, cfname, generation, formatType, Component.digestFor(BigFormat.latestVersion.uncompressedChecksumType()));
+        this(formatType.info.getLatestVersion(), directory, ksname, cfname, generation, formatType, Component.digestFor(formatType.info.getLatestVersion().uncompressedChecksumType()));
     }
 
     @VisibleForTesting
     public Descriptor(String version, File directory, String ksname, String cfname, int generation, SSTableFormat.Type formatType)
     {
-        this(formatType.info.getVersion(version), directory, ksname, cfname, generation, formatType, Component.digestFor(BigFormat.latestVersion.uncompressedChecksumType()));
+        this(formatType.info.getVersion(version), directory, ksname, cfname, generation, formatType, Component.digestFor(formatType.info.getLatestVersion().uncompressedChecksumType()));
     }
 
     public Descriptor(Version version, File directory, String ksname, String cfname, int generation, SSTableFormat.Type formatType, Component digestComponent)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index 923ef82..601f5a0 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.util.DiskOptimizationStrategy;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -68,12 +69,9 @@ public abstract class SSTable
     public DecoratedKey first;
     public DecoratedKey last;
 
-    protected SSTable(Descriptor descriptor, CFMetaData metadata)
-    {
-        this(descriptor, new HashSet<>(), metadata);
-    }
+    protected final DiskOptimizationStrategy optimizationStrategy;
 
-    protected SSTable(Descriptor descriptor, Set<Component> components, CFMetaData metadata)
+    protected SSTable(Descriptor descriptor, Set<Component> components, CFMetaData metadata, DiskOptimizationStrategy optimizationStrategy)
     {
         // In almost all cases, metadata shouldn't be null, but allowing null allows to create a mostly functional SSTable without
         // full schema definition. SSTableLoader use that ability
@@ -86,6 +84,7 @@ public abstract class SSTable
         this.compression = dataComponents.contains(Component.COMPRESSION_INFO);
         this.components = new CopyOnWriteArraySet<>(dataComponents);
         this.metadata = metadata;
+        this.optimizationStrategy = Objects.requireNonNull(optimizationStrategy);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 91b71cc..78a6825 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -37,11 +37,14 @@ import org.slf4j.LoggerFactory;
 import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
 import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
 import com.clearspring.analytics.stream.cardinality.ICardinality;
+
+import org.apache.cassandra.cache.ChunkCache;
 import org.apache.cassandra.cache.InstrumentingCache;
 import org.apache.cassandra.cache.KeyCacheKey;
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
@@ -190,8 +193,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
     public final UniqueIdentifier instanceId = new UniqueIdentifier();
 
     // indexfile and datafile: might be null before a call to load()
-    protected SegmentedFile ifile;
-    protected SegmentedFile dfile;
+    protected FileHandle ifile;
+    protected FileHandle dfile;
     protected IndexSummary indexSummary;
     protected IFilter bf;
 
@@ -430,16 +433,20 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
                                              OpenReason.NORMAL,
                                              header == null? null : header.toHeader(metadata));
 
-        // special implementation of load to use non-pooled SegmentedFile builders
-        try(SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder();
-            SegmentedFile.Builder dbuilder = sstable.compression
-                ? new CompressedSegmentedFile.Builder(null)
-                : new BufferedSegmentedFile.Builder())
+        try(FileHandle.Builder ibuilder = new FileHandle.Builder(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX))
+                                                     .mmapped(DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap)
+                                                     .withChunkCache(ChunkCache.instance);
+            FileHandle.Builder dbuilder = new FileHandle.Builder(sstable.descriptor.filenameFor(Component.DATA)).compressed(sstable.compression)
+                                                     .mmapped(DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap)
+                                                     .withChunkCache(ChunkCache.instance))
         {
-            if (!sstable.loadSummary(ibuilder, dbuilder))
+            if (!sstable.loadSummary())
                 sstable.buildSummary(false, false, Downsampling.BASE_SAMPLING_LEVEL);
-            sstable.ifile = ibuilder.buildIndex(sstable.descriptor, sstable.indexSummary);
-            sstable.dfile = dbuilder.buildData(sstable.descriptor, statsMetadata);
+            long indexFileLength = new File(descriptor.filenameFor(Component.PRIMARY_INDEX)).length();
+            int dataBufferSize = sstable.optimizationStrategy.bufferSize(statsMetadata.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile()));
+            int indexBufferSize = sstable.optimizationStrategy.bufferSize(indexFileLength / sstable.indexSummary.size());
+            sstable.ifile = ibuilder.bufferSize(sstable.optimizationStrategy.bufferSize(indexBufferSize)).complete();
+            sstable.dfile = dbuilder.bufferSize(sstable.optimizationStrategy.bufferSize(dataBufferSize)).complete();
             sstable.bf = FilterFactory.AlwaysPresent;
             sstable.setup(false);
             return sstable;
@@ -578,8 +585,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
     public static SSTableReader internalOpen(Descriptor desc,
                                       Set<Component> components,
                                       CFMetaData metadata,
-                                      SegmentedFile ifile,
-                                      SegmentedFile dfile,
+                                      FileHandle ifile,
+                                      FileHandle dfile,
                                       IndexSummary isummary,
                                       IFilter bf,
                                       long maxDataAge,
@@ -622,7 +629,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
                             OpenReason openReason,
                             SerializationHeader header)
     {
-        super(desc, components, metadata);
+        super(desc, components, metadata, DatabaseDescriptor.getDiskOptimizationStrategy());
         this.sstableMetadata = sstableMetadata;
         this.header = header;
         this.maxDataAge = maxDataAge;
@@ -730,10 +737,14 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
      */
     private void load(boolean recreateBloomFilter, boolean saveSummaryIfCreated) throws IOException
     {
-        try(SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode(), false);
-            SegmentedFile.Builder dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), compression))
+        try(FileHandle.Builder ibuilder = new FileHandle.Builder(descriptor.filenameFor(Component.PRIMARY_INDEX))
+                                                     .mmapped(DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap)
+                                                     .withChunkCache(ChunkCache.instance);
+            FileHandle.Builder dbuilder = new FileHandle.Builder(descriptor.filenameFor(Component.DATA)).compressed(compression)
+                                                     .mmapped(DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap)
+                                                     .withChunkCache(ChunkCache.instance))
         {
-            boolean summaryLoaded = loadSummary(ibuilder, dbuilder);
+            boolean summaryLoaded = loadSummary();
             boolean builtSummary = false;
             if (recreateBloomFilter || !summaryLoaded)
             {
@@ -741,13 +752,19 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
                 builtSummary = true;
             }
 
+            long indexFileLength = new File(descriptor.filenameFor(Component.PRIMARY_INDEX)).length();
+            int dataBufferSize = optimizationStrategy.bufferSize(sstableMetadata.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile()));
+
             if (components.contains(Component.PRIMARY_INDEX))
-                ifile = ibuilder.buildIndex(descriptor, indexSummary);
+            {
+                int indexBufferSize = optimizationStrategy.bufferSize(indexFileLength / indexSummary.size());
+                ifile = ibuilder.bufferSize(indexBufferSize).complete();
+            }
 
-            dfile = dbuilder.buildData(descriptor, sstableMetadata);
+            dfile = dbuilder.bufferSize(dataBufferSize).complete();
 
             if (saveSummaryIfCreated && builtSummary)
-                saveSummary(ibuilder, dbuilder);
+                saveSummary();
         }
         catch (Throwable t)
         { // Because the tidier has not been set-up yet in SSTableReader.open(), we must release the files in case of error
@@ -835,12 +852,10 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
      * if loaded index summary has different index interval from current value stored in schema,
      * then Summary.db file will be deleted and this returns false to rebuild summary.
      *
-     * @param ibuilder
-     * @param dbuilder
      * @return true if index summary is loaded successfully from Summary.db file.
      */
     @SuppressWarnings("resource")
-    public boolean loadSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
+    public boolean loadSummary()
     {
         File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
         if (!summariesFile.exists())
@@ -855,8 +870,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
                     metadata.params.minIndexInterval, metadata.params.maxIndexInterval);
             first = decorateKey(ByteBufferUtil.readWithLength(iStream));
             last = decorateKey(ByteBufferUtil.readWithLength(iStream));
-            ibuilder.deserializeBounds(iStream, descriptor.version);
-            dbuilder.deserializeBounds(iStream, descriptor.version);
         }
         catch (IOException e)
         {
@@ -879,25 +892,22 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
 
     /**
      * Save index summary to Summary.db file.
-     *
-     * @param ibuilder
-     * @param dbuilder
      */
 
-    public void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
+    public void saveSummary()
     {
-        saveSummary(this.descriptor, this.first, this.last, ibuilder, dbuilder, indexSummary);
+        saveSummary(this.descriptor, this.first, this.last, indexSummary);
     }
 
-    private void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, IndexSummary newSummary)
+    private void saveSummary(IndexSummary newSummary)
     {
-        saveSummary(this.descriptor, this.first, this.last, ibuilder, dbuilder, newSummary);
+        saveSummary(this.descriptor, this.first, this.last, newSummary);
     }
+
     /**
      * Save index summary to Summary.db file.
      */
-    public static void saveSummary(Descriptor descriptor, DecoratedKey first, DecoratedKey last,
-                                   SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, IndexSummary summary)
+    public static void saveSummary(Descriptor descriptor, DecoratedKey first, DecoratedKey last, IndexSummary summary)
     {
         File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
         if (summariesFile.exists())
@@ -908,8 +918,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
             IndexSummary.serializer.serialize(summary, oStream, descriptor.version.hasSamplingLevel());
             ByteBufferUtil.writeWithLength(first.getKey(), oStream);
             ByteBufferUtil.writeWithLength(last.getKey(), oStream);
-            ibuilder.serializeBounds(oStream, descriptor.version);
-            dbuilder.serializeBounds(oStream, descriptor.version);
         }
         catch (IOException e)
         {
@@ -1044,13 +1052,13 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
 
     private static class DropPageCache implements Runnable
     {
-        final SegmentedFile dfile;
+        final FileHandle dfile;
         final long dfilePosition;
-        final SegmentedFile ifile;
+        final FileHandle ifile;
         final long ifilePosition;
         final Runnable andThen;
 
-        private DropPageCache(SegmentedFile dfile, long dfilePosition, SegmentedFile ifile, long ifilePosition, Runnable andThen)
+        private DropPageCache(FileHandle dfile, long dfilePosition, FileHandle ifile, long ifilePosition, Runnable andThen)
         {
             this.dfile = dfile;
             this.dfilePosition = dfilePosition;
@@ -1114,12 +1122,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
                         "no adjustments to min/max_index_interval");
             }
 
-            //Always save the resampled index
-            try(SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode(), false);
-                SegmentedFile.Builder dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), compression))
-            {
-                saveSummary(ibuilder, dbuilder, newSummary);
-            }
+            // Always save the resampled index
+            saveSummary(newSummary);
 
             long newSize = bytesOnDisk();
             StorageMetrics.load.inc(newSize - oldSize);
@@ -1241,7 +1245,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         if (!compression)
             throw new IllegalStateException(this + " is not compressed");
 
-        return ((ICompressedFile) dfile).getMetadata();
+        return dfile.compressionMetadata().get();
     }
 
     /**
@@ -1596,9 +1600,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
     public void setCrcCheckChance(double crcCheckChance)
     {
         this.crcCheckChance = crcCheckChance;
-        if (compression)
-            ((CompressedSegmentedFile)dfile).metadata.parameters.setCrcCheckChance(crcCheckChance);
-
+        dfile.compressionMetadata().ifPresent(metadata -> metadata.parameters.setCrcCheckChance(crcCheckChance));
     }
 
     /**
@@ -1946,7 +1948,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         return ifile.channel;
     }
 
-    public SegmentedFile getIndexFile()
+    public FileHandle getIndexFile()
     {
         return ifile;
     }
@@ -2083,8 +2085,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         private IFilter bf;
         private IndexSummary summary;
 
-        private SegmentedFile dfile;
-        private SegmentedFile ifile;
+        private FileHandle dfile;
+        private FileHandle ifile;
         private Runnable runOnClose;
         private boolean isReplaced = false;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index 9f2e159..9fb5f7c 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.SerializationHeader;
@@ -79,7 +80,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
                             SerializationHeader header,
                             Collection<SSTableFlushObserver> observers)
     {
-        super(descriptor, components(metadata), metadata);
+        super(descriptor, components(metadata), metadata, DatabaseDescriptor.getDiskOptimizationStrategy());
         this.keyCount = keyCount;
         this.repairedAt = repairedAt;
         this.metadataCollector = metadataCollector;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index c1d9bbc..26b1543 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -23,12 +23,14 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Map;
+import java.util.Optional;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cache.ChunkCache;
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -54,11 +56,12 @@ public class BigTableWriter extends SSTableWriter
 
     private final ColumnIndex columnIndexWriter;
     private final IndexWriter iwriter;
-    private final SegmentedFile.Builder dbuilder;
+    private final FileHandle.Builder dbuilder;
     protected final SequentialWriter dataFile;
     private DecoratedKey lastWrittenKey;
     private DataPosition dataMark;
     private long lastEarlyOpenLength = 0;
+    private final Optional<ChunkCache> chunkCache = Optional.ofNullable(ChunkCache.instance);
 
     private final SequentialWriterOption writerOption = SequentialWriterOption.newBuilder()
                                                         .trickleFsync(DatabaseDescriptor.getTrickleFsync())
@@ -85,16 +88,17 @@ public class BigTableWriter extends SSTableWriter
                                              writerOption,
                                              metadata.params.compression,
                                              metadataCollector);
-            dbuilder = SegmentedFile.getCompressedBuilder((CompressedSequentialWriter) dataFile);
         }
         else
         {
             dataFile = new ChecksummedSequentialWriter(new File(getFilename()),
-                                                       new File(descriptor.filenameFor(Component.CRC)),
-                                                       new File(descriptor.filenameFor(descriptor.digestComponent)),
-                                                       writerOption);
-            dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), false);
+                    new File(descriptor.filenameFor(Component.CRC)),
+                    new File(descriptor.filenameFor(descriptor.digestComponent)),
+                    writerOption);
         }
+        dbuilder = new FileHandle.Builder(descriptor.filenameFor(Component.DATA)).compressed(compression)
+                                              .mmapped(DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap);
+        chunkCache.ifPresent(dbuilder::withChunkCache);
         iwriter = new IndexWriter(keyCount);
 
         columnIndexWriter = new ColumnIndex(this.header, dataFile, descriptor.version, this.observers, getRowIndexEntrySerializer().indexInfoSerializer());
@@ -277,8 +281,13 @@ public class BigTableWriter extends SSTableWriter
         assert boundary.indexLength > 0 && boundary.dataLength > 0;
         // open the reader early
         IndexSummary indexSummary = iwriter.summary.build(metadata.partitioner, boundary);
-        SegmentedFile ifile = iwriter.builder.buildIndex(descriptor, indexSummary, boundary);
-        SegmentedFile dfile = dbuilder.buildData(descriptor, stats, boundary);
+        long indexFileLength = new File(descriptor.filenameFor(Component.PRIMARY_INDEX)).length();
+        int indexBufferSize = optimizationStrategy.bufferSize(indexFileLength / indexSummary.size());
+        FileHandle ifile = iwriter.builder.bufferSize(optimizationStrategy.bufferSize(indexBufferSize)).complete(boundary.indexLength);
+        if (compression)
+            dbuilder.withCompressionMetadata(((CompressedSequentialWriter) dataFile).open(boundary.dataLength));
+        int dataBufferSize = optimizationStrategy.bufferSize(stats.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile()));
+        FileHandle dfile = dbuilder.bufferSize(optimizationStrategy.bufferSize(dataBufferSize)).complete(boundary.dataLength);
         invalidateCacheAtBoundary(dfile);
         SSTableReader sstable = SSTableReader.internalOpen(descriptor,
                                                            components, metadata,
@@ -291,10 +300,12 @@ public class BigTableWriter extends SSTableWriter
         return sstable;
     }
 
-    void invalidateCacheAtBoundary(SegmentedFile dfile)
+    void invalidateCacheAtBoundary(FileHandle dfile)
     {
-        if (ChunkCache.instance != null && lastEarlyOpenLength != 0 && dfile.dataLength() > lastEarlyOpenLength)
-            ChunkCache.instance.invalidatePosition(dfile, lastEarlyOpenLength);
+        chunkCache.ifPresent(cache -> {
+            if (lastEarlyOpenLength != 0 && dfile.dataLength() > lastEarlyOpenLength)
+                cache.invalidatePosition(dfile, lastEarlyOpenLength);
+        });
         lastEarlyOpenLength = dfile.dataLength();
     }
 
@@ -304,11 +315,11 @@ public class BigTableWriter extends SSTableWriter
         dataFile.sync();
         iwriter.indexFile.sync();
 
-        return openFinal(descriptor, SSTableReader.OpenReason.EARLY);
+        return openFinal(SSTableReader.OpenReason.EARLY);
     }
 
     @SuppressWarnings("resource")
-    private SSTableReader openFinal(Descriptor desc, SSTableReader.OpenReason openReason)
+    private SSTableReader openFinal(SSTableReader.OpenReason openReason)
     {
         if (maxDataAge < 0)
             maxDataAge = System.currentTimeMillis();
@@ -316,10 +327,15 @@ public class BigTableWriter extends SSTableWriter
         StatsMetadata stats = statsMetadata();
         // finalize in-memory state for the reader
         IndexSummary indexSummary = iwriter.summary.build(this.metadata.partitioner);
-        SegmentedFile ifile = iwriter.builder.buildIndex(desc, indexSummary);
-        SegmentedFile dfile = dbuilder.buildData(desc, stats);
+        long indexFileLength = new File(descriptor.filenameFor(Component.PRIMARY_INDEX)).length();
+        int dataBufferSize = optimizationStrategy.bufferSize(stats.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile()));
+        int indexBufferSize = optimizationStrategy.bufferSize(indexFileLength / indexSummary.size());
+        FileHandle ifile = iwriter.builder.bufferSize(optimizationStrategy.bufferSize(indexBufferSize)).complete();
+        if (compression)
+            dbuilder.withCompressionMetadata(((CompressedSequentialWriter) dataFile).open(0));
+        FileHandle dfile = dbuilder.bufferSize(optimizationStrategy.bufferSize(dataBufferSize)).complete();
         invalidateCacheAtBoundary(dfile);
-        SSTableReader sstable = SSTableReader.internalOpen(desc,
+        SSTableReader sstable = SSTableReader.internalOpen(descriptor,
                                                            components,
                                                            this.metadata,
                                                            ifile,
@@ -355,7 +371,7 @@ public class BigTableWriter extends SSTableWriter
             SSTable.appendTOC(descriptor, components);
 
             if (openResult)
-                finalReader = openFinal(descriptor, SSTableReader.OpenReason.NORMAL);
+                finalReader = openFinal(SSTableReader.OpenReason.NORMAL);
         }
 
         protected Throwable doCommit(Throwable accumulate)
@@ -415,7 +431,7 @@ public class BigTableWriter extends SSTableWriter
     class IndexWriter extends AbstractTransactional implements Transactional
     {
         private final SequentialWriter indexFile;
-        public final SegmentedFile.Builder builder;
+        public final FileHandle.Builder builder;
         public final IndexSummaryBuilder summary;
         public final IFilter bf;
         private DataPosition mark;
@@ -423,7 +439,8 @@ public class BigTableWriter extends SSTableWriter
         IndexWriter(long keyCount)
         {
             indexFile = new SequentialWriter(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), writerOption);
-            builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode(), false);
+            builder = new FileHandle.Builder(descriptor.filenameFor(Component.PRIMARY_INDEX)).mmapped(DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap);
+            chunkCache.ifPresent(builder::withChunkCache);
             summary = new IndexSummaryBuilder(keyCount, metadata.params.minIndexInterval, Downsampling.BASE_SAMPLING_LEVEL);
             bf = FilterFactory.getFilter(keyCount, metadata.params.bloomFilterFpChance, true, descriptor.version.hasOldBfHashOrder());
             // register listeners to be alerted when the data files are flushed
@@ -507,7 +524,7 @@ public class BigTableWriter extends SSTableWriter
             summary.prepareToCommit();
             try (IndexSummary indexSummary = summary.build(getPartitioner()))
             {
-                SSTableReader.saveSummary(descriptor, first, last, builder, dbuilder, indexSummary);
+                SSTableReader.saveSummary(descriptor, first, last, indexSummary);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/AbstractReaderFileProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/AbstractReaderFileProxy.java b/src/java/org/apache/cassandra/io/util/AbstractReaderFileProxy.java
index 5dc0d37..7962c0f 100644
--- a/src/java/org/apache/cassandra/io/util/AbstractReaderFileProxy.java
+++ b/src/java/org/apache/cassandra/io/util/AbstractReaderFileProxy.java
@@ -23,7 +23,7 @@ public abstract class AbstractReaderFileProxy implements ReaderFileProxy
     protected final ChannelProxy channel;
     protected final long fileLength;
 
-    public AbstractReaderFileProxy(ChannelProxy channel, long fileLength)
+    protected AbstractReaderFileProxy(ChannelProxy channel, long fileLength)
     {
         this.channel = channel;
         this.fileLength = fileLength >= 0 ? fileLength : channel.size();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java b/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java
index 95af31f..1648bcf 100644
--- a/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java
+++ b/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java
@@ -21,6 +21,7 @@
 package org.apache.cassandra.io.util;
 
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 
 import org.apache.cassandra.utils.memory.BufferPool;
 
@@ -36,19 +37,12 @@ public abstract class BufferManagingRebufferer implements Rebufferer, Rebufferer
     protected final ByteBuffer buffer;
     protected long offset = 0;
 
-    public static BufferManagingRebufferer on(ChunkReader wrapped)
-    {
-        return wrapped.alignmentRequired()
-             ? new Aligned(wrapped)
-             : new Unaligned(wrapped);
-    }
-
     abstract long alignedPosition(long position);
 
-    public BufferManagingRebufferer(ChunkReader wrapped)
+    protected BufferManagingRebufferer(ChunkReader wrapped)
     {
         this.source = wrapped;
-        buffer = RandomAccessReader.allocateBuffer(wrapped.chunkSize(), wrapped.preferredBufferType());
+        buffer = BufferPool.get(wrapped.chunkSize(), wrapped.preferredBufferType()).order(ByteOrder.BIG_ENDIAN);
         buffer.limit(0);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
deleted file mode 100644
index a46ec14..0000000
--- a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.util;
-
-import org.apache.cassandra.cache.ChunkCache;
-import org.apache.cassandra.io.compress.BufferType;
-
-public class BufferedSegmentedFile extends SegmentedFile
-{
-    public BufferedSegmentedFile(ChannelProxy channel, int bufferSize, long length)
-    {
-        this(channel, createRebufferer(channel, length, bufferSize), length);
-    }
-
-    private BufferedSegmentedFile(ChannelProxy channel, RebuffererFactory rebufferer, long length)
-    {
-        super(new Cleanup(channel, rebufferer), channel, rebufferer, length);
-    }
-
-    private BufferedSegmentedFile(BufferedSegmentedFile copy)
-    {
-        super(copy);
-    }
-
-    private static RebuffererFactory createRebufferer(ChannelProxy channel, long length, int bufferSize)
-    {
-        return ChunkCache.maybeWrap(new SimpleChunkReader(channel, length, BufferType.OFF_HEAP, bufferSize));
-    }
-
-    public static class Builder extends SegmentedFile.Builder
-    {
-        public SegmentedFile complete(ChannelProxy channel, int bufferSize, long overrideLength)
-        {
-            long length = overrideLength > 0 ? overrideLength : channel.size();
-            return new BufferedSegmentedFile(channel, bufferSize, length);
-        }
-    }
-
-    public BufferedSegmentedFile sharedCopy()
-    {
-        return new BufferedSegmentedFile(this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java b/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
index 25ef615..e1f795a 100644
--- a/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
@@ -20,101 +20,26 @@ package org.apache.cassandra.io.util;
 import java.io.File;
 import java.io.IOException;
 
-import org.apache.cassandra.io.compress.BufferType;
-import org.apache.cassandra.io.util.DataIntegrityMetadata.ChecksumValidator;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.ChecksumType;
 
-public class ChecksummedRandomAccessReader
+public final class ChecksummedRandomAccessReader
 {
-    @SuppressWarnings("serial")
-    public static class CorruptFileException extends RuntimeException
+    public static RandomAccessReader open(File file, File crcFile) throws IOException
     {
-        public final String filePath;
-
-        public CorruptFileException(Exception cause, String filePath)
-        {
-            super(cause);
-            this.filePath = filePath;
-        }
-    }
-
-    static class ChecksummedRebufferer extends BufferManagingRebufferer
-    {
-        private final DataIntegrityMetadata.ChecksumValidator validator;
-
-        public ChecksummedRebufferer(ChannelProxy channel, ChecksumValidator validator)
-        {
-            super(new SimpleChunkReader(channel, channel.size(), BufferType.ON_HEAP, validator.chunkSize));
-            this.validator = validator;
-        }
-
-        @Override
-        public BufferHolder rebuffer(long desiredPosition)
-        {
-            if (desiredPosition != offset + buffer.position())
-                validator.seek(desiredPosition);
-
-            // align with buffer size, as checksums were computed in chunks of buffer size each.
-            offset = alignedPosition(desiredPosition);
-            source.readChunk(offset, buffer);
-
-            try
-            {
-                validator.validate(ByteBufferUtil.getArray(buffer), 0, buffer.remaining());
-            }
-            catch (IOException e)
-            {
-                throw new CorruptFileException(e, channel().filePath());
-            }
-
-            return this;
-        }
-
-        @Override
-        public void close()
+        ChannelProxy channel = new ChannelProxy(file);
+        try
         {
-            try
-            {
-                source.close();
-            }
-            finally
-            {
-                validator.close();
-            }
-        }
-
-        @Override
-        long alignedPosition(long desiredPosition)
-        {
-            return (desiredPosition / buffer.capacity()) * buffer.capacity();
-        }
-    }
-
-    public static final class Builder extends RandomAccessReader.Builder
-    {
-        private final DataIntegrityMetadata.ChecksumValidator validator;
-
-        @SuppressWarnings("resource")
-        public Builder(File file, File crcFile) throws IOException
-        {
-            super(new ChannelProxy(file));
-            this.validator = new DataIntegrityMetadata.ChecksumValidator(ChecksumType.CRC32,
-                                                                         RandomAccessReader.open(crcFile),
-                                                                         file.getPath());
-        }
-
-        @Override
-        protected Rebufferer createRebufferer()
-        {
-            return new ChecksummedRebufferer(channel, validator);
+            DataIntegrityMetadata.ChecksumValidator validator = new DataIntegrityMetadata.ChecksumValidator(ChecksumType.CRC32,
+                                                                                                            RandomAccessReader.open(crcFile),
+                                                                                                            file.getPath());
+            Rebufferer rebufferer = new ChecksummedRebufferer(channel, validator);
+            // Always own and close the channel.
+            return new RandomAccessReader.RandomAccessReaderWithOwnChannel(rebufferer);
         }
-
-        @Override
-        public RandomAccessReader build()
+        catch (Throwable t)
         {
-            // Always own and close the channel.
-            return buildWithChannel();
+            channel.close();
+            throw t;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/ChecksummedRebufferer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ChecksummedRebufferer.java b/src/java/org/apache/cassandra/io/util/ChecksummedRebufferer.java
new file mode 100644
index 0000000..bc8695f
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/ChecksummedRebufferer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+class ChecksummedRebufferer extends BufferManagingRebufferer
+{
+    private final DataIntegrityMetadata.ChecksumValidator validator;
+
+    ChecksummedRebufferer(ChannelProxy channel, DataIntegrityMetadata.ChecksumValidator validator)
+    {
+        super(new SimpleChunkReader(channel, channel.size(), BufferType.ON_HEAP, validator.chunkSize));
+        this.validator = validator;
+    }
+
+    @Override
+    public BufferHolder rebuffer(long desiredPosition)
+    {
+        if (desiredPosition != offset + buffer.position())
+            validator.seek(desiredPosition);
+
+        // align with buffer size, as checksums were computed in chunks of buffer size each.
+        offset = alignedPosition(desiredPosition);
+        source.readChunk(offset, buffer);
+
+        try
+        {
+            validator.validate(ByteBufferUtil.getArray(buffer), 0, buffer.remaining());
+        }
+        catch (IOException e)
+        {
+            throw new CorruptFileException(e, channel().filePath());
+        }
+
+        return this;
+    }
+
+    @Override
+    public void close()
+    {
+        try
+        {
+            source.close();
+        }
+        finally
+        {
+            validator.close();
+        }
+    }
+
+    @Override
+    long alignedPosition(long desiredPosition)
+    {
+        return (desiredPosition / buffer.capacity()) * buffer.capacity();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/ChunkReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ChunkReader.java b/src/java/org/apache/cassandra/io/util/ChunkReader.java
index a04299a..1d3439e 100644
--- a/src/java/org/apache/cassandra/io/util/ChunkReader.java
+++ b/src/java/org/apache/cassandra/io/util/ChunkReader.java
@@ -44,11 +44,6 @@ public interface ChunkReader extends RebuffererFactory
     int chunkSize();
 
     /**
-     * If true, positions passed to this rebufferer must be aligned to chunkSize.
-     */
-    boolean alignmentRequired();
-
-    /**
      * Specifies type of buffer the caller should attempt to give.
      * This is not guaranteed to be fulfilled.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java b/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java
new file mode 100644
index 0000000..5f8751a
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ThreadLocalRandom;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Ints;
+
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.compress.CorruptBlockException;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+
+public abstract class CompressedChunkReader extends AbstractReaderFileProxy implements ChunkReader
+{
+    final CompressionMetadata metadata;
+
+    protected CompressedChunkReader(ChannelProxy channel, CompressionMetadata metadata)
+    {
+        super(channel, metadata.dataLength);
+        this.metadata = metadata;
+        assert Integer.bitCount(metadata.chunkLength()) == 1; //must be a power of two
+    }
+
+    @VisibleForTesting
+    public double getCrcCheckChance()
+    {
+        return metadata.parameters.getCrcCheckChance();
+    }
+
+    @Override
+    public String toString()
+    {
+        return String.format("CompressedChunkReader.%s(%s - %s, chunk length %d, data length %d)",
+                             getClass().getSimpleName(),
+                             channel.filePath(),
+                             metadata.compressor().getClass().getSimpleName(),
+                             metadata.chunkLength(),
+                             metadata.dataLength);
+    }
+
+    @Override
+    public int chunkSize()
+    {
+        return metadata.chunkLength();
+    }
+
+    @Override
+    public BufferType preferredBufferType()
+    {
+        return metadata.compressor().preferredBufferType();
+    }
+
+    @Override
+    public Rebufferer instantiateRebufferer()
+    {
+        return new BufferManagingRebufferer.Aligned(this);
+    }
+
+    public static class Standard extends CompressedChunkReader
+    {
+        // we read the raw compressed bytes into this buffer, then uncompressed them into the provided one.
+        private final ThreadLocal<ByteBuffer> compressedHolder;
+
+        public Standard(ChannelProxy channel, CompressionMetadata metadata)
+        {
+            super(channel, metadata);
+            compressedHolder = ThreadLocal.withInitial(this::allocateBuffer);
+        }
+
+        public ByteBuffer allocateBuffer()
+        {
+            return allocateBuffer(metadata.compressor().initialCompressedBufferLength(metadata.chunkLength()));
+        }
+
+        public ByteBuffer allocateBuffer(int size)
+        {
+            return metadata.compressor().preferredBufferType().allocate(size);
+        }
+
+        @Override
+        public void readChunk(long position, ByteBuffer uncompressed)
+        {
+            try
+            {
+                // accesses must always be aligned
+                assert (position & -uncompressed.capacity()) == position;
+                assert position <= fileLength;
+
+                CompressionMetadata.Chunk chunk = metadata.chunkFor(position);
+                ByteBuffer compressed = compressedHolder.get();
+
+                if (compressed.capacity() < chunk.length)
+                {
+                    compressed = allocateBuffer(chunk.length);
+                    compressedHolder.set(compressed);
+                }
+                else
+                {
+                    compressed.clear();
+                }
+
+                compressed.limit(chunk.length);
+                if (channel.read(compressed, chunk.offset) != chunk.length)
+                    throw new CorruptBlockException(channel.filePath(), chunk);
+
+                compressed.flip();
+                uncompressed.clear();
+
+                try
+                {
+                    metadata.compressor().uncompress(compressed, uncompressed);
+                }
+                catch (IOException e)
+                {
+                    throw new CorruptBlockException(channel.filePath(), chunk);
+                }
+                finally
+                {
+                    uncompressed.flip();
+                }
+
+                if (getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
+                {
+                    compressed.rewind();
+                    int checksum = (int) metadata.checksumType.of(compressed);
+
+                    compressed.clear().limit(Integer.BYTES);
+                    if (channel.read(compressed, chunk.offset + chunk.length) != Integer.BYTES
+                                || compressed.getInt(0) != checksum)
+                        throw new CorruptBlockException(channel.filePath(), chunk);
+                }
+            }
+            catch (CorruptBlockException e)
+            {
+                throw new CorruptSSTableException(e, channel.filePath());
+            }
+        }
+    }
+
+    public static class Mmap extends CompressedChunkReader
+    {
+        protected final MmappedRegions regions;
+
+        public Mmap(ChannelProxy channel, CompressionMetadata metadata, MmappedRegions regions)
+        {
+            super(channel, metadata);
+            this.regions = regions;
+        }
+
+        @Override
+        public void readChunk(long position, ByteBuffer uncompressed)
+        {
+            try
+            {
+                // accesses must always be aligned
+                assert (position & -uncompressed.capacity()) == position;
+                assert position <= fileLength;
+
+                CompressionMetadata.Chunk chunk = metadata.chunkFor(position);
+
+                MmappedRegions.Region region = regions.floor(chunk.offset);
+                long segmentOffset = region.offset();
+                int chunkOffset = Ints.checkedCast(chunk.offset - segmentOffset);
+                ByteBuffer compressedChunk = region.buffer();
+
+                compressedChunk.position(chunkOffset).limit(chunkOffset + chunk.length);
+
+                uncompressed.clear();
+
+                try
+                {
+                    metadata.compressor().uncompress(compressedChunk, uncompressed);
+                }
+                catch (IOException e)
+                {
+                    throw new CorruptBlockException(channel.filePath(), chunk);
+                }
+                finally
+                {
+                    uncompressed.flip();
+                }
+
+                if (getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
+                {
+                    compressedChunk.position(chunkOffset).limit(chunkOffset + chunk.length);
+
+                    int checksum = (int) metadata.checksumType.of(compressedChunk);
+
+                    compressedChunk.limit(compressedChunk.capacity());
+                    if (compressedChunk.getInt() != checksum)
+                        throw new CorruptBlockException(channel.filePath(), chunk);
+                }
+            }
+            catch (CorruptBlockException e)
+            {
+                throw new CorruptSSTableException(e, channel.filePath());
+            }
+
+        }
+
+        public void close()
+        {
+            regions.closeQuietly();
+            super.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
deleted file mode 100644
index 7365d40..0000000
--- a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
+++ /dev/null
@@ -1,358 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.util;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.ThreadLocalRandom;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.primitives.Ints;
-
-import org.apache.cassandra.cache.ChunkCache;
-import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.Config.DiskAccessMode;
-import org.apache.cassandra.io.compress.*;
-import org.apache.cassandra.io.sstable.CorruptSSTableException;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.utils.concurrent.Ref;
-
-public class CompressedSegmentedFile extends SegmentedFile implements ICompressedFile
-{
-    public final CompressionMetadata metadata;
-
-    public CompressedSegmentedFile(ChannelProxy channel, CompressionMetadata metadata, Config.DiskAccessMode mode)
-    {
-        this(channel,
-             metadata,
-             mode == DiskAccessMode.mmap
-             ? MmappedRegions.map(channel, metadata)
-             : null);
-    }
-
-    public CompressedSegmentedFile(ChannelProxy channel, CompressionMetadata metadata, MmappedRegions regions)
-    {
-        this(channel, metadata, regions, createRebufferer(channel, metadata, regions));
-    }
-
-    private static RebuffererFactory createRebufferer(ChannelProxy channel, CompressionMetadata metadata, MmappedRegions regions)
-    {
-        return ChunkCache.maybeWrap(chunkReader(channel, metadata, regions));
-    }
-
-    public static ChunkReader chunkReader(ChannelProxy channel, CompressionMetadata metadata, MmappedRegions regions)
-    {
-        return regions != null
-               ? new Mmap(channel, metadata, regions)
-               : new Standard(channel, metadata);
-    }
-
-    public CompressedSegmentedFile(ChannelProxy channel, CompressionMetadata metadata, MmappedRegions regions, RebuffererFactory rebufferer)
-    {
-        super(new Cleanup(channel, metadata, regions, rebufferer), channel, rebufferer, metadata.compressedFileLength);
-        this.metadata = metadata;
-    }
-
-    private CompressedSegmentedFile(CompressedSegmentedFile copy)
-    {
-        super(copy);
-        this.metadata = copy.metadata;
-    }
-
-    public ChannelProxy channel()
-    {
-        return channel;
-    }
-
-    private static final class Cleanup extends SegmentedFile.Cleanup
-    {
-        final CompressionMetadata metadata;
-
-        protected Cleanup(ChannelProxy channel, CompressionMetadata metadata, MmappedRegions regions, ReaderFileProxy rebufferer)
-        {
-            super(channel, rebufferer);
-            this.metadata = metadata;
-        }
-        public void tidy()
-        {
-            if (ChunkCache.instance != null)
-            {
-                ChunkCache.instance.invalidateFile(name());
-            }
-            metadata.close();
-
-            super.tidy();
-        }
-    }
-
-    public CompressedSegmentedFile sharedCopy()
-    {
-        return new CompressedSegmentedFile(this);
-    }
-
-    public void addTo(Ref.IdentityCollection identities)
-    {
-        super.addTo(identities);
-        metadata.addTo(identities);
-    }
-
-    public static class Builder extends SegmentedFile.Builder
-    {
-        final CompressedSequentialWriter writer;
-        final Config.DiskAccessMode mode;
-
-        public Builder(CompressedSequentialWriter writer)
-        {
-            this.writer = writer;
-            this.mode = DatabaseDescriptor.getDiskAccessMode();
-        }
-
-        protected CompressionMetadata metadata(String path, long overrideLength)
-        {
-            if (writer == null)
-                return CompressionMetadata.create(path);
-
-            return writer.open(overrideLength);
-        }
-
-        public SegmentedFile complete(ChannelProxy channel, int bufferSize, long overrideLength)
-        {
-            return new CompressedSegmentedFile(channel, metadata(channel.filePath(), overrideLength), mode);
-        }
-    }
-
-    public void dropPageCache(long before)
-    {
-        if (before >= metadata.dataLength)
-            super.dropPageCache(0);
-        super.dropPageCache(metadata.chunkFor(before).offset);
-    }
-
-    public CompressionMetadata getMetadata()
-    {
-        return metadata;
-    }
-
-    public long dataLength()
-    {
-        return metadata.dataLength;
-    }
-
-    @VisibleForTesting
-    public abstract static class CompressedChunkReader extends AbstractReaderFileProxy implements ChunkReader
-    {
-        final CompressionMetadata metadata;
-
-        public CompressedChunkReader(ChannelProxy channel, CompressionMetadata metadata)
-        {
-            super(channel, metadata.dataLength);
-            this.metadata = metadata;
-            assert Integer.bitCount(metadata.chunkLength()) == 1; //must be a power of two
-        }
-
-        @VisibleForTesting
-        public double getCrcCheckChance()
-        {
-            return metadata.parameters.getCrcCheckChance();
-        }
-
-        @Override
-        public String toString()
-        {
-            return String.format("CompressedChunkReader.%s(%s - %s, chunk length %d, data length %d)",
-                                 getClass().getSimpleName(),
-                                 channel.filePath(),
-                                 metadata.compressor().getClass().getSimpleName(),
-                                 metadata.chunkLength(),
-                                 metadata.dataLength);
-        }
-
-        @Override
-        public int chunkSize()
-        {
-            return metadata.chunkLength();
-        }
-
-        @Override
-        public boolean alignmentRequired()
-        {
-            return true;
-        }
-
-        @Override
-        public BufferType preferredBufferType()
-        {
-            return metadata.compressor().preferredBufferType();
-        }
-
-        @Override
-        public Rebufferer instantiateRebufferer()
-        {
-            return BufferManagingRebufferer.on(this);
-        }
-    }
-
-    static class Standard extends CompressedChunkReader
-    {
-        // we read the raw compressed bytes into this buffer, then uncompressed them into the provided one.
-        private final ThreadLocal<ByteBuffer> compressedHolder;
-
-        public Standard(ChannelProxy channel, CompressionMetadata metadata)
-        {
-            super(channel, metadata);
-            compressedHolder = ThreadLocal.withInitial(this::allocateBuffer);
-        }
-
-        public ByteBuffer allocateBuffer()
-        {
-            return allocateBuffer(metadata.compressor().initialCompressedBufferLength(metadata.chunkLength()));
-        }
-
-        public ByteBuffer allocateBuffer(int size)
-        {
-            return metadata.compressor().preferredBufferType().allocate(size);
-        }
-
-        @Override
-        public void readChunk(long position, ByteBuffer uncompressed)
-        {
-            try
-            {
-                // accesses must always be aligned
-                assert (position & -uncompressed.capacity()) == position;
-                assert position <= fileLength;
-
-                CompressionMetadata.Chunk chunk = metadata.chunkFor(position);
-                ByteBuffer compressed = compressedHolder.get();
-
-                if (compressed.capacity() < chunk.length)
-                {
-                    compressed = allocateBuffer(chunk.length);
-                    compressedHolder.set(compressed);
-                }
-                else
-                {
-                    compressed.clear();
-                }
-
-                compressed.limit(chunk.length);
-                if (channel.read(compressed, chunk.offset) != chunk.length)
-                    throw new CorruptBlockException(channel.filePath(), chunk);
-
-                compressed.flip();
-                uncompressed.clear();
-
-                try
-                {
-                    metadata.compressor().uncompress(compressed, uncompressed);
-                }
-                catch (IOException e)
-                {
-                    throw new CorruptBlockException(channel.filePath(), chunk);
-                }
-                finally
-                {
-                    uncompressed.flip();
-                }
-
-                if (getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
-                {
-                    compressed.rewind();
-                    int checksum = (int) metadata.checksumType.of(compressed);
-
-                    compressed.clear().limit(Integer.BYTES);
-                    if (channel.read(compressed, chunk.offset + chunk.length) != Integer.BYTES
-                        || compressed.getInt(0) != checksum)
-                        throw new CorruptBlockException(channel.filePath(), chunk);
-                }
-            }
-            catch (CorruptBlockException e)
-            {
-                throw new CorruptSSTableException(e, channel.filePath());
-            }
-        }
-    }
-
-    static class Mmap extends CompressedChunkReader
-    {
-        protected final MmappedRegions regions;
-
-        public Mmap(ChannelProxy channel, CompressionMetadata metadata, MmappedRegions regions)
-        {
-            super(channel, metadata);
-            this.regions = regions;
-        }
-
-        @Override
-        public void readChunk(long position, ByteBuffer uncompressed)
-        {
-            try
-            {
-                // accesses must always be aligned
-                assert (position & -uncompressed.capacity()) == position;
-                assert position <= fileLength;
-
-                CompressionMetadata.Chunk chunk = metadata.chunkFor(position);
-
-                MmappedRegions.Region region = regions.floor(chunk.offset);
-                long segmentOffset = region.offset();
-                int chunkOffset = Ints.checkedCast(chunk.offset - segmentOffset);
-                ByteBuffer compressedChunk = region.buffer();
-
-                compressedChunk.position(chunkOffset).limit(chunkOffset + chunk.length);
-
-                uncompressed.clear();
-
-                try
-                {
-                    metadata.compressor().uncompress(compressedChunk, uncompressed);
-                }
-                catch (IOException e)
-                {
-                    throw new CorruptBlockException(channel.filePath(), chunk);
-                }
-                finally
-                {
-                    uncompressed.flip();
-                }
-
-                if (getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
-                {
-                    compressedChunk.position(chunkOffset).limit(chunkOffset + chunk.length);
-
-                    int checksum = (int) metadata.checksumType.of(compressedChunk);
-
-                    compressedChunk.limit(compressedChunk.capacity());
-                    if (compressedChunk.getInt() != checksum)
-                        throw new CorruptBlockException(channel.filePath(), chunk);
-                }
-            }
-            catch (CorruptBlockException e)
-            {
-                throw new CorruptSSTableException(e, channel.filePath());
-            }
-
-        }
-
-        public void close()
-        {
-            regions.closeQuietly();
-            super.close();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/CorruptFileException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CorruptFileException.java b/src/java/org/apache/cassandra/io/util/CorruptFileException.java
new file mode 100644
index 0000000..875d06f
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/CorruptFileException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+@SuppressWarnings("serial")
+public class CorruptFileException extends RuntimeException
+{
+    public final String filePath;
+
+    public CorruptFileException(Exception cause, String filePath)
+    {
+        super(cause);
+        this.filePath = filePath;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/DiskOptimizationStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DiskOptimizationStrategy.java b/src/java/org/apache/cassandra/io/util/DiskOptimizationStrategy.java
new file mode 100644
index 0000000..e10342d
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/DiskOptimizationStrategy.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+public interface DiskOptimizationStrategy
+{
+    // The maximum buffer size, we will never buffer more than this size. Further,
+    // when the limiter is not null, i.e. when throttling is enabled, we read exactly
+    // this size, since when throttling the intention is to eventually read everything,
+    // see CASSANDRA-8630
+    // NOTE: this size is chosen both for historical consistency, as a reasonable upper bound,
+    //       and because our BufferPool currently has a maximum allocation size of this.
+    int MAX_BUFFER_SIZE = 1 << 16; // 64k
+
+    /**
+     * @param recordSize record size
+     * @return the buffer size for a given record size.
+     */
+    int bufferSize(long recordSize);
+
+    /**
+     * Round up to the next multiple of 4k but no more than {@link #MAX_BUFFER_SIZE}.
+     */
+    default int roundBufferSize(long size)
+    {
+        if (size <= 0)
+            return 4096;
+
+        size = (size + 4095) & ~4095;
+        return (int)Math.min(size, MAX_BUFFER_SIZE);
+    }
+}