You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2016/07/06 14:38:49 UTC

[2/3] cassandra git commit: Remove DatabaseDescriptor dependency from SegmentedFile

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/FileHandle.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileHandle.java b/src/java/org/apache/cassandra/io/util/FileHandle.java
new file mode 100644
index 0000000..62fe5e5
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/FileHandle.java
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.util;
+
+import java.util.Objects;
+import java.util.Optional;
+
+import com.google.common.util.concurrent.RateLimiter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.cache.ChunkCache;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.utils.CLibrary;
+import org.apache.cassandra.utils.concurrent.Ref;
+import org.apache.cassandra.utils.concurrent.RefCounted;
+import org.apache.cassandra.utils.concurrent.SharedCloseableImpl;
+
+import static org.apache.cassandra.utils.Throwables.maybeFail;
+
+/**
+ * {@link FileHandle} provides access to a file for reading, including the ones written by various {@link SequentialWriter}
+ * instances, and it is typically used by {@link org.apache.cassandra.io.sstable.format.SSTableReader}.
+ *
+ * Use {@link FileHandle.Builder} to create an instance, and call {@link #createReader()} (and its variants) to
+ * access the readers for the underlying file.
+ *
+ * You can use {@link Builder#complete()} several times during its lifecycle with different {@code overrideLength}(i.e. early opening file).
+ * For that reason, the builder keeps a reference to the file channel and makes a copy for each {@link Builder#complete()} call.
+ * Therefore, it is important to close the {@link Builder} when it is no longer needed, as well as any {@link FileHandle}
+ * instances.
+ */
+public class FileHandle extends SharedCloseableImpl
+{
+    private static final Logger logger = LoggerFactory.getLogger(FileHandle.class);
+
+    public final ChannelProxy channel;
+
+    public final long onDiskLength;
+
+    /*
+     * Rebufferer factory to use when constructing RandomAccessReaders
+     */
+    private final RebuffererFactory rebuffererFactory;
+
+    /*
+     * Optional CompressionMetadata when dealing with compressed file
+     */
+    private final Optional<CompressionMetadata> compressionMetadata;
+
+    private FileHandle(Cleanup cleanup,
+                       ChannelProxy channel,
+                       RebuffererFactory rebuffererFactory,
+                       CompressionMetadata compressionMetadata,
+                       long onDiskLength)
+    {
+        super(cleanup);
+        this.rebuffererFactory = rebuffererFactory;
+        this.channel = channel;
+        this.compressionMetadata = Optional.ofNullable(compressionMetadata);
+        this.onDiskLength = onDiskLength;
+    }
+
+    private FileHandle(FileHandle copy)
+    {
+        super(copy);
+        channel = copy.channel;
+        rebuffererFactory = copy.rebuffererFactory;
+        compressionMetadata = copy.compressionMetadata;
+        onDiskLength = copy.onDiskLength;
+    }
+
+    /**
+     * @return Path to the file this factory is referencing
+     */
+    public String path()
+    {
+        return channel.filePath();
+    }
+
+    public long dataLength()
+    {
+        return compressionMetadata.map(c -> c.dataLength).orElseGet(rebuffererFactory::fileLength);
+    }
+
+    public RebuffererFactory rebuffererFactory()
+    {
+        return rebuffererFactory;
+    }
+
+    public Optional<CompressionMetadata> compressionMetadata()
+    {
+        return compressionMetadata;
+    }
+
+    @Override
+    public void addTo(Ref.IdentityCollection identities)
+    {
+        super.addTo(identities);
+        compressionMetadata.ifPresent(metadata -> metadata.addTo(identities));
+    }
+
+    @Override
+    public FileHandle sharedCopy()
+    {
+        return new FileHandle(this);
+    }
+
+    /**
+     * Create {@link RandomAccessReader} with configured method of reading content of the file.
+     *
+     * @return RandomAccessReader for the file
+     */
+    public RandomAccessReader createReader()
+    {
+        return createReader(null);
+    }
+
+    /**
+     * Create {@link RandomAccessReader} with configured method of reading content of the file.
+     * Reading from file will be rate limited by given {@link RateLimiter}.
+     *
+     * @param limiter RateLimiter to use for rate limiting read
+     * @return RandomAccessReader for the file
+     */
+    public RandomAccessReader createReader(RateLimiter limiter)
+    {
+        return new RandomAccessReader(instantiateRebufferer(limiter));
+    }
+
+    public FileDataInput createReader(long position)
+    {
+        RandomAccessReader reader = createReader();
+        reader.seek(position);
+        return reader;
+    }
+
+    /**
+     * Drop page cache from start to given {@code before}.
+     *
+     * @param before uncompressed position from start of the file to be dropped from cache. if 0, to end of file.
+     */
+    public void dropPageCache(long before)
+    {
+        long position = compressionMetadata.map(metadata -> {
+            if (before >= metadata.dataLength)
+                return 0L;
+            else
+                return metadata.chunkFor(before).offset;
+        }).orElse(before);
+        CLibrary.trySkipCache(channel.getFileDescriptor(), 0, position, path());
+    }
+
+    private Rebufferer instantiateRebufferer(RateLimiter limiter)
+    {
+        Rebufferer rebufferer = rebuffererFactory.instantiateRebufferer();
+
+        if (limiter != null)
+            rebufferer = new LimitingRebufferer(rebufferer, limiter, DiskOptimizationStrategy.MAX_BUFFER_SIZE);
+        return rebufferer;
+    }
+
+    /**
+     * Perform clean up of all resources held by {@link FileHandle}.
+     */
+    private static class Cleanup implements RefCounted.Tidy
+    {
+        final ChannelProxy channel;
+        final RebuffererFactory rebufferer;
+        final CompressionMetadata compressionMetadata;
+        final Optional<ChunkCache> chunkCache;
+
+        private Cleanup(ChannelProxy channel,
+                        RebuffererFactory rebufferer,
+                        CompressionMetadata compressionMetadata,
+                        ChunkCache chunkCache)
+        {
+            this.channel = channel;
+            this.rebufferer = rebufferer;
+            this.compressionMetadata = compressionMetadata;
+            this.chunkCache = Optional.ofNullable(chunkCache);
+        }
+
+        public String name()
+        {
+            return channel.filePath();
+        }
+
+        public void tidy()
+        {
+            chunkCache.ifPresent(cache -> cache.invalidateFile(name()));
+            try
+            {
+                if (compressionMetadata != null)
+                {
+                    compressionMetadata.close();
+                }
+            }
+            finally
+            {
+                try
+                {
+                    channel.close();
+                }
+                finally
+                {
+                    rebufferer.close();
+                }
+            }
+        }
+    }
+
+    /**
+     * Configures how the file will be read (compressed, mmapped, use cache etc.)
+     */
+    public static class Builder implements AutoCloseable
+    {
+        private final String path;
+
+        private ChannelProxy channel;
+        private CompressionMetadata compressionMetadata;
+        private MmappedRegions regions;
+        private ChunkCache chunkCache;
+        private int bufferSize = RandomAccessReader.DEFAULT_BUFFER_SIZE;
+        private BufferType bufferType = BufferType.OFF_HEAP;
+
+        private boolean mmapped = false;
+        private boolean compressed = false;
+
+        public Builder(String path)
+        {
+            this.path = path;
+        }
+
+        public Builder(ChannelProxy channel)
+        {
+            this.channel = channel;
+            this.path = channel.filePath();
+        }
+
+        public Builder compressed(boolean compressed)
+        {
+            this.compressed = compressed;
+            return this;
+        }
+
+        /**
+         * Set {@link ChunkCache} to use.
+         *
+         * @param chunkCache ChunkCache object to use for caching
+         * @return this object
+         */
+        public Builder withChunkCache(ChunkCache chunkCache)
+        {
+            this.chunkCache = chunkCache;
+            return this;
+        }
+
+        /**
+         * Provide {@link CompressionMetadata} to use when reading compressed file.
+         *
+         * @param metadata CompressionMetadata to use
+         * @return this object
+         */
+        public Builder withCompressionMetadata(CompressionMetadata metadata)
+        {
+            this.compressed = Objects.nonNull(metadata);
+            this.compressionMetadata = metadata;
+            return this;
+        }
+
+        /**
+         * Set whether to use mmap for reading
+         *
+         * @param mmapped true if using mmap
+         * @return this instance
+         */
+        public Builder mmapped(boolean mmapped)
+        {
+            this.mmapped = mmapped;
+            return this;
+        }
+
+        /**
+         * Set the buffer size to use (if appropriate).
+         *
+         * @param bufferSize Buffer size in bytes
+         * @return this instance
+         */
+        public Builder bufferSize(int bufferSize)
+        {
+            this.bufferSize = bufferSize;
+            return this;
+        }
+
+        /**
+         * Set the buffer type (on heap or off heap) to use (if appropriate).
+         *
+         * @param bufferType Buffer type to use
+         * @return this instance
+         */
+        public Builder bufferType(BufferType bufferType)
+        {
+            this.bufferType = bufferType;
+            return this;
+        }
+
+        /**
+         * Complete building {@link FileHandle} without overriding file length.
+         *
+         * @see #complete(long)
+         */
+        public FileHandle complete()
+        {
+            return complete(-1L);
+        }
+
+        /**
+         * Complete building {@link FileHandle} with the given length, which overrides the file length.
+         *
+         * @param overrideLength Override file length (in bytes) so that read cannot go further than this value.
+         *                       If the value is less than or equal to 0, then the value is ignored.
+         * @return Built file
+         */
+        @SuppressWarnings("resource")
+        public FileHandle complete(long overrideLength)
+        {
+            if (channel == null)
+            {
+                channel = new ChannelProxy(path);
+            }
+
+            ChannelProxy channelCopy = channel.sharedCopy();
+            try
+            {
+                if (compressed && compressionMetadata == null)
+                    compressionMetadata = CompressionMetadata.create(channelCopy.filePath());
+
+                long length = overrideLength > 0 ? overrideLength : compressed ? compressionMetadata.compressedFileLength : channelCopy.size();
+
+                RebuffererFactory rebuffererFactory;
+                if (mmapped)
+                {
+                    if (compressed)
+                    {
+                        regions = MmappedRegions.map(channelCopy, compressionMetadata);
+                        rebuffererFactory = maybeCached(new CompressedChunkReader.Mmap(channelCopy, compressionMetadata,
+                                                                                       regions));
+                    }
+                    else
+                    {
+                        updateRegions(channelCopy, length);
+                        rebuffererFactory = new MmapRebufferer(channelCopy, length, regions.sharedCopy());
+                    }
+                }
+                else
+                {
+                    regions = null;
+                    if (compressed)
+                    {
+                        rebuffererFactory = maybeCached(new CompressedChunkReader.Standard(channelCopy, compressionMetadata));
+                    }
+                    else
+                    {
+                        rebuffererFactory = maybeCached(new SimpleChunkReader(channelCopy, length, bufferType, bufferSize));
+                    }
+                }
+                Cleanup cleanup = new Cleanup(channelCopy, rebuffererFactory, compressionMetadata, chunkCache);
+                return new FileHandle(cleanup, channelCopy, rebuffererFactory, compressionMetadata, length);
+            }
+            catch (Throwable t)
+            {
+                channelCopy.close();
+                throw t;
+            }
+        }
+
+        public Throwable close(Throwable accumulate)
+        {
+            if (!compressed && regions != null)
+                accumulate = regions.close(accumulate);
+            if (channel != null)
+                return channel.close(accumulate);
+
+            return accumulate;
+        }
+
+        public void close()
+        {
+            maybeFail(close(null));
+        }
+
+        private RebuffererFactory maybeCached(ChunkReader reader)
+        {
+            if (chunkCache != null && chunkCache.capacity() > 0)
+                return chunkCache.wrap(reader);
+            return reader;
+        }
+
+        private void updateRegions(ChannelProxy channel, long length)
+        {
+            if (regions != null && !regions.isValid(channel))
+            {
+                Throwable err = regions.close(null);
+                if (err != null)
+                    logger.error("Failed to close mapped regions", err);
+
+                regions = null;
+            }
+
+            if (regions == null)
+                regions = MmappedRegions.map(channel, length);
+            else
+                regions.extend(length);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + "(path='" + path() + '\'' +
+               ", length=" + rebuffererFactory.fileLength() +
+               ')';
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/ICompressedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ICompressedFile.java b/src/java/org/apache/cassandra/io/util/ICompressedFile.java
deleted file mode 100644
index e69487c..0000000
--- a/src/java/org/apache/cassandra/io/util/ICompressedFile.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.util;
-
-import org.apache.cassandra.io.compress.CompressionMetadata;
-
-public interface ICompressedFile
-{
-    ChannelProxy channel();
-    CompressionMetadata getMetadata();
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/MmapRebufferer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MmapRebufferer.java b/src/java/org/apache/cassandra/io/util/MmapRebufferer.java
index 9d79919..8df6370 100644
--- a/src/java/org/apache/cassandra/io/util/MmapRebufferer.java
+++ b/src/java/org/apache/cassandra/io/util/MmapRebufferer.java
@@ -28,7 +28,7 @@ class MmapRebufferer extends AbstractReaderFileProxy implements Rebufferer, Rebu
 {
     protected final MmappedRegions regions;
 
-    public MmapRebufferer(ChannelProxy channel, long fileLength, MmappedRegions regions)
+    MmapRebufferer(ChannelProxy channel, long fileLength, MmappedRegions regions)
     {
         super(channel, fileLength);
         this.regions = regions;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/MmappedRegions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MmappedRegions.java b/src/java/org/apache/cassandra/io/util/MmappedRegions.java
index f269b84..9ab8abf 100644
--- a/src/java/org/apache/cassandra/io/util/MmappedRegions.java
+++ b/src/java/org/apache/cassandra/io/util/MmappedRegions.java
@@ -91,6 +91,11 @@ public class MmappedRegions extends SharedCloseableImpl
         return new MmappedRegions(channel, null, 0);
     }
 
+    /**
+     * @param channel file to map. the MmappedRegions instance will hold shared copy of given channel.
+     * @param metadata
+     * @return new instance
+     */
     public static MmappedRegions map(ChannelProxy channel, CompressionMetadata metadata)
     {
         if (metadata == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
deleted file mode 100644
index d514bf8..0000000
--- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.util;
-
-import java.io.*;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.io.sstable.format.Version;
-
-public class MmappedSegmentedFile extends SegmentedFile
-{
-    private static final Logger logger = LoggerFactory.getLogger(MmappedSegmentedFile.class);
-
-    public MmappedSegmentedFile(ChannelProxy channel, long length, MmappedRegions regions)
-    {
-        this(channel, new MmapRebufferer(channel, length, regions), length);
-    }
-
-    public MmappedSegmentedFile(ChannelProxy channel, RebuffererFactory rebufferer, long length)
-    {
-        super(new Cleanup(channel, rebufferer), channel, rebufferer, length);
-    }
-
-    private MmappedSegmentedFile(MmappedSegmentedFile copy)
-    {
-        super(copy);
-    }
-
-    public MmappedSegmentedFile sharedCopy()
-    {
-        return new MmappedSegmentedFile(this);
-    }
-
-    /**
-     * Overrides the default behaviour to create segments of a maximum size.
-     */
-    static class Builder extends SegmentedFile.Builder
-    {
-        private MmappedRegions regions;
-
-        Builder()
-        {
-            super();
-        }
-
-        public SegmentedFile complete(ChannelProxy channel, int bufferSize, long overrideLength)
-        {
-            long length = overrideLength > 0 ? overrideLength : channel.size();
-            updateRegions(channel, length);
-
-            return new MmappedSegmentedFile(channel, length, regions.sharedCopy());
-        }
-
-        private void updateRegions(ChannelProxy channel, long length)
-        {
-            if (regions != null && !regions.isValid(channel))
-            {
-                Throwable err = regions.close(null);
-                if (err != null)
-                    logger.error("Failed to close mapped regions", err);
-
-                regions = null;
-            }
-
-            if (regions == null)
-                regions = MmappedRegions.map(channel, length);
-            else
-                regions.extend(length);
-        }
-
-        @Override
-        public void serializeBounds(DataOutput out, Version version) throws IOException
-        {
-            if (!version.hasBoundaries())
-                return;
-
-            super.serializeBounds(out, version);
-            out.writeInt(0);
-        }
-
-        @Override
-        public void deserializeBounds(DataInput in, Version version) throws IOException
-        {
-            if (!version.hasBoundaries())
-                return;
-
-            super.deserializeBounds(in, version);
-            in.skipBytes(in.readInt() * TypeSizes.sizeof(0L));
-        }
-
-        @Override
-        public Throwable close(Throwable accumulate)
-        {
-            return super.close(regions == null
-                               ? accumulate
-                               : regions.close(accumulate));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index 725b367..5157eac 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -17,50 +17,37 @@
  */
 package org.apache.cassandra.io.util;
 
-import java.io.*;
-import java.nio.ByteBuffer;
+import java.io.File;
+import java.io.IOException;
 import java.nio.ByteOrder;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.Ints;
-import com.google.common.util.concurrent.RateLimiter;
 
 import org.apache.cassandra.io.compress.BufferType;
-import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.util.Rebufferer.BufferHolder;
-import org.apache.cassandra.utils.memory.BufferPool;
 
 public class RandomAccessReader extends RebufferingInputStream implements FileDataInput
 {
     // The default buffer size when the client doesn't specify it
     public static final int DEFAULT_BUFFER_SIZE = 4096;
 
-    // The maximum buffer size, we will never buffer more than this size. Further,
-    // when the limiter is not null, i.e. when throttling is enabled, we read exactly
-    // this size, since when throttling the intention is to eventually read everything,
-    // see CASSANDRA-8630
-    // NOTE: this size is chosen both for historical consistency, as a reasonable upper bound,
-    //       and because our BufferPool currently has a maximum allocation size of this.
-    public static final int MAX_BUFFER_SIZE = 1 << 16; // 64k
-
     // offset of the last file mark
-    protected long markedPointer;
+    private long markedPointer;
 
-    @VisibleForTesting
     final Rebufferer rebufferer;
-    BufferHolder bufferHolder = Rebufferer.EMPTY;
+    private BufferHolder bufferHolder = Rebufferer.EMPTY;
 
-    protected RandomAccessReader(Rebufferer rebufferer)
+    /**
+     * Only created through Builder
+     *
+     * @param rebufferer Rebufferer to use
+     */
+    RandomAccessReader(Rebufferer rebufferer)
     {
         super(Rebufferer.EMPTY.buffer());
         this.rebufferer = rebufferer;
     }
 
-    public static ByteBuffer allocateBuffer(int size, BufferType bufferType)
-    {
-        return BufferPool.get(size, bufferType).order(ByteOrder.BIG_ENDIAN);
-    }
-
     /**
      * Read data from file starting from current currentOffset to populate buffer.
      */
@@ -72,7 +59,7 @@ public class RandomAccessReader extends RebufferingInputStream implements FileDa
         reBufferAt(current());
     }
 
-    public void reBufferAt(long position)
+    private void reBufferAt(long position)
     {
         bufferHolder.release();
         bufferHolder = rebufferer.rebuffer(position);
@@ -188,11 +175,11 @@ public class RandomAccessReader extends RebufferingInputStream implements FileDa
     /**
      * Class to hold a mark to the position of the file
      */
-    protected static class BufferedRandomAccessFileMark implements DataPosition
+    private static class BufferedRandomAccessFileMark implements DataPosition
     {
         final long pointer;
 
-        public BufferedRandomAccessFileMark(long pointer)
+        private BufferedRandomAccessFileMark(long pointer)
         {
             this.pointer = pointer;
         }
@@ -283,139 +270,14 @@ public class RandomAccessReader extends RebufferingInputStream implements FileDa
         return rebufferer.getCrcCheckChance();
     }
 
-    protected static Rebufferer instantiateRebufferer(RebuffererFactory fileRebufferer, RateLimiter limiter)
-    {
-        Rebufferer rebufferer = fileRebufferer.instantiateRebufferer();
-
-        if (limiter != null)
-            rebufferer = new LimitingRebufferer(rebufferer, limiter, MAX_BUFFER_SIZE);
-
-        return rebufferer;
-    }
-
-    public static RandomAccessReader build(SegmentedFile file, RateLimiter limiter)
-    {
-        return new RandomAccessReader(instantiateRebufferer(file.rebuffererFactory(), limiter));
-    }
-
-    public static Builder builder(ChannelProxy channel)
-    {
-        return new Builder(channel);
-    }
-
-    public static class Builder
-    {
-        // The NIO file channel or an empty channel
-        public final ChannelProxy channel;
-
-        // The size of the buffer for buffered readers
-        protected int bufferSize;
-
-        // The type of the buffer for buffered readers
-        public BufferType bufferType;
-
-        // The buffer
-        public ByteBuffer buffer;
-
-        // An optional limiter that will throttle the amount of data we read
-        public RateLimiter limiter;
-
-        // The mmap segments for mmap readers
-        public MmappedRegions regions;
-
-        // Compression for compressed readers
-        public CompressionMetadata compression;
-
-        public Builder(ChannelProxy channel)
-        {
-            this.channel = channel;
-            this.bufferSize = DEFAULT_BUFFER_SIZE;
-            this.bufferType = BufferType.OFF_HEAP;
-        }
-
-        /** The buffer size is typically already page aligned but if that is not the case
-         * make sure that it is a multiple of the page size, 4096. Also limit it to the maximum
-         * buffer size unless we are throttling, in which case we may as well read the maximum
-         * directly since the intention is to read the full file, see CASSANDRA-8630.
-         * */
-        private int adjustedBufferSize()
-        {
-            if (limiter != null)
-                return MAX_BUFFER_SIZE;
-
-            // should already be a page size multiple but if that's not case round it up
-            int wholePageSize = (bufferSize + 4095) & ~4095;
-            return Math.min(MAX_BUFFER_SIZE, wholePageSize);
-        }
-
-        protected Rebufferer createRebufferer()
-        {
-            return instantiateRebufferer(chunkReader(), limiter);
-        }
-
-        public RebuffererFactory chunkReader()
-        {
-            if (compression != null)
-                return CompressedSegmentedFile.chunkReader(channel, compression, regions);
-            if (regions != null)
-                return new MmapRebufferer(channel, -1, regions);
-
-            int adjustedSize = adjustedBufferSize();
-            return new SimpleChunkReader(channel, -1, bufferType, adjustedSize);
-        }
-
-        public Builder bufferSize(int bufferSize)
-        {
-            if (bufferSize <= 0)
-                throw new IllegalArgumentException("bufferSize must be positive");
-
-            this.bufferSize = bufferSize;
-            return this;
-        }
-
-        public Builder bufferType(BufferType bufferType)
-        {
-            this.bufferType = bufferType;
-            return this;
-        }
-
-        public Builder regions(MmappedRegions regions)
-        {
-            this.regions = regions;
-            return this;
-        }
-
-        public Builder compression(CompressionMetadata metadata)
-        {
-            this.compression = metadata;
-            return this;
-        }
-
-        public Builder limiter(RateLimiter limiter)
-        {
-            this.limiter = limiter;
-            return this;
-        }
-
-        public RandomAccessReader build()
-        {
-            return new RandomAccessReader(createRebufferer());
-        }
-
-        public RandomAccessReader buildWithChannel()
-        {
-            return new RandomAccessReaderWithOwnChannel(createRebufferer());
-        }
-    }
-
     // A wrapper of the RandomAccessReader that closes the channel when done.
     // For performance reasons RAR does not increase the reference count of
     // a channel but assumes the owner will keep it open and close it,
     // see CASSANDRA-9379, this thin class is just for those cases where we do
     // not have a shared channel.
-    public static class RandomAccessReaderWithOwnChannel extends RandomAccessReader
+    static class RandomAccessReaderWithOwnChannel extends RandomAccessReader
     {
-        protected RandomAccessReaderWithOwnChannel(Rebufferer rebufferer)
+        RandomAccessReaderWithOwnChannel(Rebufferer rebufferer)
         {
             super(rebufferer);
         }
@@ -441,14 +303,26 @@ public class RandomAccessReader extends RebufferingInputStream implements FileDa
         }
     }
 
+    /**
+     * Open a RandomAccessReader (not compressed, not mmapped, no read throttling) that will own its channel.
+     *
+     * @param file File to open for reading
+     * @return new RandomAccessReader that owns the channel opened in this method.
+     */
     @SuppressWarnings("resource")
     public static RandomAccessReader open(File file)
     {
-        return new Builder(new ChannelProxy(file)).buildWithChannel();
-    }
-
-    public static RandomAccessReader open(ChannelProxy channel)
-    {
-        return new Builder(channel).build();
+        ChannelProxy channel = new ChannelProxy(file);
+        try
+        {
+            ChunkReader reader = new SimpleChunkReader(channel, -1, BufferType.OFF_HEAP, DEFAULT_BUFFER_SIZE);
+            Rebufferer rebufferer = reader.instantiateRebufferer();
+            return new RandomAccessReaderWithOwnChannel(rebufferer);
+        }
+        catch (Throwable t)
+        {
+            channel.close();
+            throw t;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/Rebufferer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/Rebufferer.java b/src/java/org/apache/cassandra/io/util/Rebufferer.java
index e88c7cb..2fc7ffa 100644
--- a/src/java/org/apache/cassandra/io/util/Rebufferer.java
+++ b/src/java/org/apache/cassandra/io/util/Rebufferer.java
@@ -38,7 +38,7 @@ public interface Rebufferer extends ReaderFileProxy
      */
     void closeReader();
 
-    public interface BufferHolder
+    interface BufferHolder
     {
         /**
          * Returns a useable buffer (i.e. one whose position and limit can be freely modified). Its limit will be set
@@ -59,7 +59,7 @@ public interface Rebufferer extends ReaderFileProxy
         void release();
     }
 
-    static final BufferHolder EMPTY = new BufferHolder()
+    BufferHolder EMPTY = new BufferHolder()
     {
         final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
deleted file mode 100644
index 62e14ba..0000000
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ /dev/null
@@ -1,318 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.util;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.File;
-import java.io.IOException;
-
-import com.google.common.util.concurrent.RateLimiter;
-
-import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.io.compress.CompressedSequentialWriter;
-import org.apache.cassandra.io.sstable.Component;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.IndexSummary;
-import org.apache.cassandra.io.sstable.IndexSummaryBuilder;
-import org.apache.cassandra.io.sstable.format.Version;
-import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
-import org.apache.cassandra.utils.CLibrary;
-import org.apache.cassandra.utils.concurrent.RefCounted;
-import org.apache.cassandra.utils.concurrent.SharedCloseableImpl;
-
-import static org.apache.cassandra.utils.Throwables.maybeFail;
-
-/**
- * Abstracts a read-only file that has been split into segments, each of which can be represented by an independent
- * FileDataInput. Allows for iteration over the FileDataInputs, or random access to the FileDataInput for a given
- * position.
- *
- * The JVM can only map up to 2GB at a time, so each segment is at most that size when using mmap i/o. If a segment
- * would need to be longer than 2GB, that segment will not be mmap'd, and a new RandomAccessFile will be created for
- * each access to that segment.
- */
-public abstract class SegmentedFile extends SharedCloseableImpl
-{
-    public final ChannelProxy channel;
-
-    // This differs from length for compressed files (but we still need length for
-    // SegmentIterator because offsets in the file are relative to the uncompressed size)
-    public final long onDiskLength;
-
-    /**
-     * Rebufferer to use to construct RandomAccessReaders.
-     */
-    private final RebuffererFactory rebufferer;
-
-    protected SegmentedFile(Cleanup cleanup, ChannelProxy channel, RebuffererFactory rebufferer, long onDiskLength)
-    {
-        super(cleanup);
-        this.rebufferer = rebufferer;
-        this.channel = channel;
-        this.onDiskLength = onDiskLength;
-    }
-
-    protected SegmentedFile(SegmentedFile copy)
-    {
-        super(copy);
-        channel = copy.channel;
-        rebufferer = copy.rebufferer;
-        onDiskLength = copy.onDiskLength;
-    }
-
-    public String path()
-    {
-        return channel.filePath();
-    }
-
-    public long dataLength()
-    {
-        return rebufferer.fileLength();
-    }
-
-    public RebuffererFactory rebuffererFactory()
-    {
-        return rebufferer;
-    }
-
-    protected static class Cleanup implements RefCounted.Tidy
-    {
-        final ChannelProxy channel;
-        final ReaderFileProxy rebufferer;
-        protected Cleanup(ChannelProxy channel, ReaderFileProxy rebufferer)
-        {
-            this.channel = channel;
-            this.rebufferer = rebufferer;
-        }
-
-        public String name()
-        {
-            return channel.filePath();
-        }
-
-        public void tidy()
-        {
-            try
-            {
-                channel.close();
-            }
-            finally
-            {
-                rebufferer.close();
-            }
-        }
-    }
-
-    public abstract SegmentedFile sharedCopy();
-
-    public RandomAccessReader createReader()
-    {
-        return RandomAccessReader.build(this, null);
-    }
-
-    public RandomAccessReader createReader(RateLimiter limiter)
-    {
-        return RandomAccessReader.build(this, limiter);
-    }
-
-    public FileDataInput createReader(long position)
-    {
-        RandomAccessReader reader = createReader();
-        reader.seek(position);
-        return reader;
-    }
-
-    public void dropPageCache(long before)
-    {
-        CLibrary.trySkipCache(channel.getFileDescriptor(), 0, before, path());
-    }
-
-    /**
-     * @return A SegmentedFile.Builder.
-     */
-    public static Builder getBuilder(Config.DiskAccessMode mode, boolean compressed)
-    {
-        return compressed ? new CompressedSegmentedFile.Builder(null)
-                          : mode == Config.DiskAccessMode.mmap ? new MmappedSegmentedFile.Builder()
-                                                               : new BufferedSegmentedFile.Builder();
-    }
-
-    public static Builder getCompressedBuilder(CompressedSequentialWriter writer)
-    {
-        return new CompressedSegmentedFile.Builder(writer);
-    }
-
-    /**
-     * Collects potential segmentation points in an underlying file, and builds a SegmentedFile to represent it.
-     */
-    public static abstract class Builder implements AutoCloseable
-    {
-        private ChannelProxy channel;
-
-        /**
-         * Called after all potential boundaries have been added to apply this Builder to a concrete file on disk.
-         * @param channel The channel to the file on disk.
-         */
-        protected abstract SegmentedFile complete(ChannelProxy channel, int bufferSize, long overrideLength);
-
-        @SuppressWarnings("resource") // SegmentedFile owns channel
-        private SegmentedFile complete(String path, int bufferSize, long overrideLength)
-        {
-            ChannelProxy channelCopy = getChannel(path);
-            try
-            {
-                return complete(channelCopy, bufferSize, overrideLength);
-            }
-            catch (Throwable t)
-            {
-                channelCopy.close();
-                throw t;
-            }
-        }
-
-        public SegmentedFile buildData(Descriptor desc, StatsMetadata stats, IndexSummaryBuilder.ReadableBoundary boundary)
-        {
-            return complete(desc.filenameFor(Component.DATA), bufferSize(stats), boundary.dataLength);
-        }
-
-        public SegmentedFile buildData(Descriptor desc, StatsMetadata stats)
-        {
-            return complete(desc.filenameFor(Component.DATA), bufferSize(stats), -1L);
-        }
-
-        public SegmentedFile buildIndex(Descriptor desc, IndexSummary indexSummary, IndexSummaryBuilder.ReadableBoundary boundary)
-        {
-            return complete(desc.filenameFor(Component.PRIMARY_INDEX), bufferSize(desc, indexSummary), boundary.indexLength);
-        }
-
-        public SegmentedFile buildIndex(Descriptor desc, IndexSummary indexSummary)
-        {
-            return complete(desc.filenameFor(Component.PRIMARY_INDEX), bufferSize(desc, indexSummary), -1L);
-        }
-
-        private static int bufferSize(StatsMetadata stats)
-        {
-            return bufferSize(stats.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile()));
-        }
-
-        private static int bufferSize(Descriptor desc, IndexSummary indexSummary)
-        {
-            File file = new File(desc.filenameFor(Component.PRIMARY_INDEX));
-            return bufferSize(file.length() / indexSummary.size());
-        }
-
-        /**
-            Return the buffer size for a given record size. For spinning disks always add one page.
-            For solid state disks only add one page if the chance of crossing to the next page is more
-            than a predifined value, @see Config.disk_optimization_page_cross_chance.
-         */
-        static int bufferSize(long recordSize)
-        {
-            Config.DiskOptimizationStrategy strategy = DatabaseDescriptor.getDiskOptimizationStrategy();
-            if (strategy == Config.DiskOptimizationStrategy.ssd)
-            {
-                // The crossing probability is calculated assuming a uniform distribution of record
-                // start position in a page, so it's the record size modulo the page size divided by
-                // the total page size.
-                double pageCrossProbability = (recordSize % 4096) / 4096.;
-                // if the page cross probability is equal or bigger than disk_optimization_page_cross_chance we add one page
-                if ((pageCrossProbability - DatabaseDescriptor.getDiskOptimizationPageCrossChance()) > -1e-16)
-                    recordSize += 4096;
-
-                return roundBufferSize(recordSize);
-            }
-            else if (strategy == Config.DiskOptimizationStrategy.spinning)
-            {
-                return roundBufferSize(recordSize + 4096);
-            }
-            else
-            {
-                throw new IllegalStateException("Unsupported disk optimization strategy: " + strategy);
-            }
-        }
-
-        /**
-           Round up to the next multiple of 4k but no more than 64k
-         */
-        static int roundBufferSize(long size)
-        {
-            if (size <= 0)
-                return 4096;
-
-            size = (size + 4095) & ~4095;
-            return (int)Math.min(size, 1 << 16);
-        }
-
-        public void serializeBounds(DataOutput out, Version version) throws IOException
-        {
-            if (!version.hasBoundaries())
-                return;
-
-            out.writeUTF(DatabaseDescriptor.getDiskAccessMode().name());
-        }
-
-        public void deserializeBounds(DataInput in, Version version) throws IOException
-        {
-            if (!version.hasBoundaries())
-                return;
-
-            if (!in.readUTF().equals(DatabaseDescriptor.getDiskAccessMode().name()))
-                throw new IOException("Cannot deserialize SSTable Summary component because the DiskAccessMode was changed!");
-        }
-
-        public Throwable close(Throwable accumulate)
-        {
-            if (channel != null)
-                return channel.close(accumulate);
-
-            return accumulate;
-        }
-
-        public void close()
-        {
-            maybeFail(close(null));
-        }
-
-        private ChannelProxy getChannel(String path)
-        {
-            if (channel != null)
-            {
-                // This is really fragile, both path and channel.filePath()
-                // must agree, i.e. they both must be absolute or both relative
-                // eventually we should really pass the filePath to the builder
-                // constructor and remove this
-                if (channel.filePath().equals(path))
-                    return channel.sharedCopy();
-                else
-                    channel.close();
-            }
-
-            channel = new ChannelProxy(path);
-            return channel.sharedCopy();
-        }
-    }
-
-    @Override
-    public String toString() {
-        return getClass().getSimpleName() + "(path='" + path() + '\'' +
-               ", length=" + rebufferer.fileLength() +
-               ')';
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/SimpleChunkReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SimpleChunkReader.java b/src/java/org/apache/cassandra/io/util/SimpleChunkReader.java
index 7bfb57b..bc1a529 100644
--- a/src/java/org/apache/cassandra/io/util/SimpleChunkReader.java
+++ b/src/java/org/apache/cassandra/io/util/SimpleChunkReader.java
@@ -27,7 +27,7 @@ class SimpleChunkReader extends AbstractReaderFileProxy implements ChunkReader
     private final int bufferSize;
     private final BufferType bufferType;
 
-    public SimpleChunkReader(ChannelProxy channel, long fileLength, BufferType bufferType, int bufferSize)
+    SimpleChunkReader(ChannelProxy channel, long fileLength, BufferType bufferType, int bufferSize)
     {
         super(channel, fileLength);
         this.bufferSize = bufferSize;
@@ -55,15 +55,9 @@ class SimpleChunkReader extends AbstractReaderFileProxy implements ChunkReader
     }
 
     @Override
-    public boolean alignmentRequired()
-    {
-        return false;
-    }
-
-    @Override
     public Rebufferer instantiateRebufferer()
     {
-        return BufferManagingRebufferer.on(this);
+        return new BufferManagingRebufferer.Unaligned(this);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/SpinningDiskOptimizationStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SpinningDiskOptimizationStrategy.java b/src/java/org/apache/cassandra/io/util/SpinningDiskOptimizationStrategy.java
new file mode 100644
index 0000000..5cec282
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/SpinningDiskOptimizationStrategy.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+public class SpinningDiskOptimizationStrategy implements DiskOptimizationStrategy
+{
+    /**
+     * For spinning disks always add one page.
+     */
+    @Override
+    public int bufferSize(long recordSize)
+    {
+        return roundBufferSize(recordSize + 4096);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/src/java/org/apache/cassandra/io/util/SsdDiskOptimizationStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SsdDiskOptimizationStrategy.java b/src/java/org/apache/cassandra/io/util/SsdDiskOptimizationStrategy.java
new file mode 100644
index 0000000..032ec2b
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/SsdDiskOptimizationStrategy.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+public class SsdDiskOptimizationStrategy implements DiskOptimizationStrategy
+{
+    private final double diskOptimizationPageCrossChance;
+
+    public SsdDiskOptimizationStrategy(double diskOptimizationPageCrossChance)
+    {
+        this.diskOptimizationPageCrossChance = diskOptimizationPageCrossChance;
+    }
+
+    /**
+     * For solid state disks only add one page if the chance of crossing to the next page is more
+     * than a predifined value.
+     *
+     * @see org.apache.cassandra.config.Config#disk_optimization_page_cross_chance
+     */
+    @Override
+    public int bufferSize(long recordSize)
+    {
+        // The crossing probability is calculated assuming a uniform distribution of record
+        // start position in a page, so it's the record size modulo the page size divided by
+        // the total page size.
+        double pageCrossProbability = (recordSize % 4096) / 4096.;
+        // if the page cross probability is equal or bigger than disk_optimization_page_cross_chance we add one page
+        if ((pageCrossProbability - diskOptimizationPageCrossChance) > -1e-16)
+            recordSize += 4096;
+
+        return roundBufferSize(recordSize);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/test/unit/org/apache/cassandra/MockSchema.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/MockSchema.java b/test/unit/org/apache/cassandra/MockSchema.java
index a406290..7f9de55 100644
--- a/test/unit/org/apache/cassandra/MockSchema.java
+++ b/test/unit/org/apache/cassandra/MockSchema.java
@@ -34,16 +34,14 @@ import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.IndexSummary;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.sstable.metadata.MetadataType;
 import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
-import org.apache.cassandra.io.util.BufferedSegmentedFile;
-import org.apache.cassandra.io.util.ChannelProxy;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.Memory;
-import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.io.util.FileHandle;
 import org.apache.cassandra.schema.CachingParams;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
@@ -62,7 +60,7 @@ public class MockSchema
     public static final Keyspace ks = Keyspace.mockKS(KeyspaceMetadata.create("mockks", KeyspaceParams.simpleTransient(1)));
 
     public static final IndexSummary indexSummary;
-    private static final SegmentedFile segmentedFile = new BufferedSegmentedFile(new ChannelProxy(temp("mocksegmentedfile")), RandomAccessReader.DEFAULT_BUFFER_SIZE, 0);
+    private static final FileHandle RANDOM_ACCESS_READER_FACTORY = new FileHandle.Builder(temp("mocksegmentedfile").getAbsolutePath()).complete();
 
     public static Memtable memtable(ColumnFamilyStore cfs)
     {
@@ -89,7 +87,7 @@ public class MockSchema
         Descriptor descriptor = new Descriptor(cfs.getDirectories().getDirectoryForNewSSTables(),
                                                cfs.keyspace.getName(),
                                                cfs.getColumnFamilyName(),
-                                               generation);
+                                               generation, SSTableFormat.Type.BIG);
         Set<Component> components = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.TOC);
         for (Component component : components)
         {
@@ -122,7 +120,7 @@ public class MockSchema
                                                  .finalizeMetadata(cfs.metadata.partitioner.getClass().getCanonicalName(), 0.01f, -1, header)
                                                  .get(MetadataType.STATS);
         SSTableReader reader = SSTableReader.internalOpen(descriptor, components, cfs.metadata,
-                                                          segmentedFile.sharedCopy(), segmentedFile.sharedCopy(), indexSummary.sharedCopy(),
+                                                          RANDOM_ACCESS_READER_FACTORY.sharedCopy(), RANDOM_ACCESS_READER_FACTORY.sharedCopy(), indexSummary.sharedCopy(),
                                                           new AlwaysPresentFilter(), 1L, metadata, SSTableReader.OpenReason.NORMAL, header);
         reader.first = reader.last = readerBounds(generation);
         if (!keepRef)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index af43152..5d01886 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -43,6 +43,7 @@ import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.metrics.ClearableHistogram;
 import org.apache.cassandra.schema.KeyspaceParams;
@@ -350,8 +351,9 @@ public class ColumnFamilyStoreTest
 
         for (int version = 1; version <= 2; ++version)
         {
-            Descriptor existing = new Descriptor(cfs.getDirectories().getDirectoryForNewSSTables(), KEYSPACE2, CF_STANDARD1, version);
-            Descriptor desc = new Descriptor(Directories.getBackupsDirectory(existing), KEYSPACE2, CF_STANDARD1, version);
+            Descriptor existing = new Descriptor(cfs.getDirectories().getDirectoryForNewSSTables(), KEYSPACE2, CF_STANDARD1, version,
+                                                 SSTableFormat.Type.BIG);
+            Descriptor desc = new Descriptor(Directories.getBackupsDirectory(existing), KEYSPACE2, CF_STANDARD1, version, SSTableFormat.Type.BIG);
             for (Component c : new Component[]{ Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.STATS })
                 assertTrue("Cannot find backed-up file:" + desc.filenameFor(c), new File(desc.filenameFor(c)).exists());
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/test/unit/org/apache/cassandra/db/DirectoriesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/DirectoriesTest.java b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
index f8d01a8..9a1c0bd 100644
--- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java
+++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
@@ -41,6 +41,7 @@ import org.apache.cassandra.index.internal.CassandraIndex;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.service.DefaultFSErrorHandler;
@@ -117,7 +118,7 @@ public class DirectoriesTest
 
     private static void createFakeSSTable(File dir, String cf, int gen, List<File> addTo) throws IOException
     {
-        Descriptor desc = new Descriptor(dir, KS, cf, gen);
+        Descriptor desc = new Descriptor(dir, KS, cf, gen, SSTableFormat.Type.BIG);
         for (Component c : new Component[]{ Component.DATA, Component.PRIMARY_INDEX, Component.FILTER })
         {
             File f = new File(desc.filenameFor(c));
@@ -152,7 +153,7 @@ public class DirectoriesTest
             Directories directories = new Directories(cfm);
             assertEquals(cfDir(cfm), directories.getDirectoryForNewSSTables());
 
-            Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1);
+            Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1, SSTableFormat.Type.BIG);
             File snapshotDir = new File(cfDir(cfm),  File.separator + Directories.SNAPSHOT_SUBDIR + File.separator + "42");
             assertEquals(snapshotDir, Directories.getSnapshotDirectory(desc, "42"));
 
@@ -186,8 +187,8 @@ public class DirectoriesTest
         {
             assertEquals(cfDir(INDEX_CFM), dir);
         }
-        Descriptor parentDesc = new Descriptor(parentDirectories.getDirectoryForNewSSTables(), KS, PARENT_CFM.cfName, 0);
-        Descriptor indexDesc = new Descriptor(indexDirectories.getDirectoryForNewSSTables(), KS, INDEX_CFM.cfName, 0);
+        Descriptor parentDesc = new Descriptor(parentDirectories.getDirectoryForNewSSTables(), KS, PARENT_CFM.cfName, 0, SSTableFormat.Type.BIG);
+        Descriptor indexDesc = new Descriptor(indexDirectories.getDirectoryForNewSSTables(), KS, INDEX_CFM.cfName, 0, SSTableFormat.Type.BIG);
 
         // snapshot dir should be created under its parent's
         File parentSnapshotDirectory = Directories.getSnapshotDirectory(parentDesc, "test");
@@ -204,9 +205,9 @@ public class DirectoriesTest
                      indexDirectories.snapshotCreationTime("test"));
 
         // check true snapshot size
-        Descriptor parentSnapshot = new Descriptor(parentSnapshotDirectory, KS, PARENT_CFM.cfName, 0);
+        Descriptor parentSnapshot = new Descriptor(parentSnapshotDirectory, KS, PARENT_CFM.cfName, 0, SSTableFormat.Type.BIG);
         createFile(parentSnapshot.filenameFor(Component.DATA), 30);
-        Descriptor indexSnapshot = new Descriptor(indexSnapshotDirectory, KS, INDEX_CFM.cfName, 0);
+        Descriptor indexSnapshot = new Descriptor(indexSnapshotDirectory, KS, INDEX_CFM.cfName, 0, SSTableFormat.Type.BIG);
         createFile(indexSnapshot.filenameFor(Component.DATA), 40);
 
         assertEquals(30, parentDirectories.trueSnapshotsSize());
@@ -354,7 +355,7 @@ public class DirectoriesTest
             final String n = Long.toString(System.nanoTime());
             Callable<File> directoryGetter = new Callable<File>() {
                 public File call() throws Exception {
-                    Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1);
+                    Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1, SSTableFormat.Type.BIG);
                     return Directories.getSnapshotDirectory(desc, n);
                 }
             };

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
index 79eb449..3f43b51 100644
--- a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
+++ b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
@@ -52,7 +52,6 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.rows.UnfilteredSerializer;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.compress.BufferType;
 import org.apache.cassandra.io.sstable.IndexInfo;
 import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
 import org.apache.cassandra.io.sstable.format.Version;
@@ -803,7 +802,7 @@ public class RowIndexEntryTest extends CQLTester
 
         RowIndexEntry rie = new RowIndexEntry(0L)
         {
-            public IndexInfoRetriever openWithIndex(SegmentedFile indexFile)
+            public IndexInfoRetriever openWithIndex(FileHandle indexFile)
             {
                 return new IndexInfoRetriever()
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
index 5ed22e4..2620a31 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
@@ -46,15 +46,13 @@ import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.compaction.*;
 import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.sstable.metadata.MetadataType;
 import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
-import org.apache.cassandra.io.util.BufferedSegmentedFile;
-import org.apache.cassandra.io.util.ChannelProxy;
 import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.io.util.FileHandle;
 import org.apache.cassandra.utils.AlwaysPresentFilter;
 import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest;
 import org.apache.cassandra.utils.concurrent.Transactional;
@@ -1156,7 +1154,7 @@ public class LogTransactionTest extends AbstractTransactionalTest
 
     private static SSTableReader sstable(File dataFolder, ColumnFamilyStore cfs, int generation, int size) throws IOException
     {
-        Descriptor descriptor = new Descriptor(dataFolder, cfs.keyspace.getName(), cfs.getTableName(), generation);
+        Descriptor descriptor = new Descriptor(dataFolder, cfs.keyspace.getName(), cfs.getTableName(), generation, SSTableFormat.Type.BIG);
         Set<Component> components = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.TOC);
         for (Component component : components)
         {
@@ -1169,8 +1167,8 @@ public class LogTransactionTest extends AbstractTransactionalTest
             }
         }
 
-        SegmentedFile dFile = new BufferedSegmentedFile(new ChannelProxy(new File(descriptor.filenameFor(Component.DATA))), RandomAccessReader.DEFAULT_BUFFER_SIZE, 0);
-        SegmentedFile iFile = new BufferedSegmentedFile(new ChannelProxy(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))), RandomAccessReader.DEFAULT_BUFFER_SIZE, 0);
+        FileHandle dFile = new FileHandle.Builder(descriptor.filenameFor(Component.DATA)).complete();
+        FileHandle iFile = new FileHandle.Builder(descriptor.filenameFor(Component.PRIMARY_INDEX)).complete();
 
         SerializationHeader header = SerializationHeader.make(cfs.metadata, Collections.emptyList());
         StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata.comparator)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
index 309083b..2de6b62 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -70,39 +70,38 @@ public class CompressedRandomAccessReaderTest
     {
         File f = File.createTempFile("compressed6791_", "3");
         String filename = f.getAbsolutePath();
-        try(ChannelProxy channel = new ChannelProxy(f))
+        MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(BytesType.instance));
+        try(CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata",
+                                                                               null, SequentialWriterOption.DEFAULT,
+                                                                               CompressionParams.snappy(32),
+                                                                               sstableMetadataCollector))
         {
 
-            MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(BytesType.instance));
-            try(CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata",
-                                                                                   null, SequentialWriterOption.DEFAULT,
-                                                                                   CompressionParams.snappy(32),
-                                                                                   sstableMetadataCollector))
-            {
-
-                for (int i = 0; i < 20; i++)
-                    writer.write("x".getBytes());
+            for (int i = 0; i < 20; i++)
+                writer.write("x".getBytes());
 
-                DataPosition mark = writer.mark();
-                // write enough garbage to create new chunks:
-                for (int i = 0; i < 40; ++i)
-                    writer.write("y".getBytes());
+            DataPosition mark = writer.mark();
+            // write enough garbage to create new chunks:
+            for (int i = 0; i < 40; ++i)
+                writer.write("y".getBytes());
 
-                writer.resetAndTruncate(mark);
+            writer.resetAndTruncate(mark);
 
-                for (int i = 0; i < 20; i++)
-                    writer.write("x".getBytes());
-                writer.finish();
-            }
+            for (int i = 0; i < 20; i++)
+                writer.write("x".getBytes());
+            writer.finish();
+        }
 
-            try(RandomAccessReader reader = RandomAccessReader.builder(channel)
-                                            .compression(new CompressionMetadata(filename + ".metadata", f.length(), ChecksumType.CRC32))
-                                            .build())
-            {
-                String res = reader.readLine();
-                assertEquals(res, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx");
-                assertEquals(40, res.length());
-            }
+        try (FileHandle.Builder builder = new FileHandle.Builder(filename)
+                                                              .withCompressionMetadata(new CompressionMetadata(filename + ".metadata",
+                                                                                                               f.length(),
+                                                                                                               ChecksumType.CRC32));
+             FileHandle fh = builder.complete();
+             RandomAccessReader reader = fh.createReader())
+        {
+            String res = reader.readLine();
+            assertEquals(res, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx");
+            assertEquals(40, res.length());
         }
         finally
         {
@@ -117,56 +116,39 @@ public class CompressedRandomAccessReaderTest
     private static void testResetAndTruncate(File f, boolean compressed, boolean usemmap, int junkSize) throws IOException
     {
         final String filename = f.getAbsolutePath();
-        try(ChannelProxy channel = new ChannelProxy(f))
-        {
-            MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(BytesType.instance));
-            try(SequentialWriter writer = compressed
+        MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(BytesType.instance));
+        try(SequentialWriter writer = compressed
                 ? new CompressedSequentialWriter(f, filename + ".metadata",
-                                                 null, SequentialWriterOption.DEFAULT,
-                                                 CompressionParams.snappy(), sstableMetadataCollector)
+                null, SequentialWriterOption.DEFAULT,
+                CompressionParams.snappy(), sstableMetadataCollector)
                 : new SequentialWriter(f))
-            {
-                writer.write("The quick ".getBytes());
-                DataPosition mark = writer.mark();
-                writer.write("blue fox jumps over the lazy dog".getBytes());
-
-                // write enough to be sure to change chunk
-                for (int i = 0; i < junkSize; ++i)
-                {
-                    writer.write((byte) 1);
-                }
-
-                writer.resetAndTruncate(mark);
-                writer.write("brown fox jumps over the lazy dog".getBytes());
-                writer.finish();
-            }
-            assert f.exists();
-
-            CompressionMetadata compressionMetadata = compressed ? new CompressionMetadata(filename + ".metadata", f.length(), ChecksumType.CRC32) : null;
-            RandomAccessReader.Builder builder = RandomAccessReader.builder(channel);
-            if (compressed)
-                builder.compression(compressionMetadata);
+        {
+            writer.write("The quick ".getBytes());
+            DataPosition mark = writer.mark();
+            writer.write("blue fox jumps over the lazy dog".getBytes());
 
-            MmappedRegions regions = null;
-            if (usemmap)
+            // write enough to be sure to change chunk
+            for (int i = 0; i < junkSize; ++i)
             {
-                regions = compressed
-                        ? MmappedRegions.map(channel, compressionMetadata)
-                        : MmappedRegions.map(channel, f.length());
-                builder.regions(regions);
+                writer.write((byte) 1);
             }
 
-            try(RandomAccessReader reader = builder.build())
-            {
-                String expected = "The quick brown fox jumps over the lazy dog";
-                assertEquals(expected.length(), reader.length());
-                byte[] b = new byte[expected.length()];
-                reader.readFully(b);
-                assert new String(b).equals(expected) : "Expecting '" + expected + "', got '" + new String(b) + '\'';
-            }
+            writer.resetAndTruncate(mark);
+            writer.write("brown fox jumps over the lazy dog".getBytes());
+            writer.finish();
+        }
+        assert f.exists();
 
-            if (regions != null)
-                regions.close();
+        CompressionMetadata compressionMetadata = compressed ? new CompressionMetadata(filename + ".metadata", f.length(), ChecksumType.CRC32) : null;
+        try (FileHandle.Builder builder = new FileHandle.Builder(filename).mmapped(usemmap).withCompressionMetadata(compressionMetadata);
+             FileHandle fh = builder.complete();
+             RandomAccessReader reader = fh.createReader())
+        {
+            String expected = "The quick brown fox jumps over the lazy dog";
+            assertEquals(expected.length(), reader.length());
+            byte[] b = new byte[expected.length()];
+            reader.readFully(b);
+            assert new String(b).equals(expected) : "Expecting '" + expected + "', got '" + new String(b) + '\'';
         }
         finally
         {
@@ -201,70 +183,69 @@ public class CompressedRandomAccessReaderTest
             writer.finish();
         }
 
-        try(ChannelProxy channel = new ChannelProxy(file))
-        {
-            // open compression metadata and get chunk information
-            CompressionMetadata meta = new CompressionMetadata(metadata.getPath(), file.length(), ChecksumType.CRC32);
-            CompressionMetadata.Chunk chunk = meta.chunkFor(0);
+        // open compression metadata and get chunk information
+        CompressionMetadata meta = new CompressionMetadata(metadata.getPath(), file.length(), ChecksumType.CRC32);
+        CompressionMetadata.Chunk chunk = meta.chunkFor(0);
 
-            try(RandomAccessReader reader = RandomAccessReader.builder(channel).compression(meta).build())
-            {// read and verify compressed data
-                assertEquals(CONTENT, reader.readLine());
+        try (FileHandle.Builder builder = new FileHandle.Builder(file.getPath()).withCompressionMetadata(meta);
+             FileHandle fh = builder.complete();
+             RandomAccessReader reader = fh.createReader())
+        {// read and verify compressed data
+            assertEquals(CONTENT, reader.readLine());
 
-                Random random = new Random();
-                RandomAccessFile checksumModifier = null;
+            Random random = new Random();
+            RandomAccessFile checksumModifier = null;
 
-                try
+            try
+            {
+                checksumModifier = new RandomAccessFile(file, "rw");
+                byte[] checksum = new byte[4];
+
+                // seek to the end of the compressed chunk
+                checksumModifier.seek(chunk.length);
+                // read checksum bytes
+                checksumModifier.read(checksum);
+                // seek back to the chunk end
+                checksumModifier.seek(chunk.length);
+
+                // lets modify one byte of the checksum on each iteration
+                for (int i = 0; i < checksum.length; i++)
                 {
-                    checksumModifier = new RandomAccessFile(file, "rw");
-                    byte[] checksum = new byte[4];
-
-                    // seek to the end of the compressed chunk
-                    checksumModifier.seek(chunk.length);
-                    // read checksum bytes
-                    checksumModifier.read(checksum);
-                    // seek back to the chunk end
-                    checksumModifier.seek(chunk.length);
+                    checksumModifier.write(random.nextInt());
+                    SyncUtil.sync(checksumModifier); // making sure that change was synced with disk
 
-                    // lets modify one byte of the checksum on each iteration
-                    for (int i = 0; i < checksum.length; i++)
+                    try (final RandomAccessReader r = fh.createReader())
                     {
-                        checksumModifier.write(random.nextInt());
-                        SyncUtil.sync(checksumModifier); // making sure that change was synced with disk
-
-                        try (final RandomAccessReader r = RandomAccessReader.builder(channel).compression(meta).build())
+                        Throwable exception = null;
+                        try
+                        {
+                            r.readLine();
+                        }
+                        catch (Throwable t)
                         {
-                            Throwable exception = null;
-                            try
-                            {
-                                r.readLine();
-                            }
-                            catch (Throwable t)
-                            {
-                                exception = t;
-                            }
-                            assertNotNull(exception);
-                            assertSame(exception.getClass(), CorruptSSTableException.class);
-                            assertSame(exception.getCause().getClass(), CorruptBlockException.class);
+                            exception = t;
                         }
+                        assertNotNull(exception);
+                        assertSame(exception.getClass(), CorruptSSTableException.class);
+                        assertSame(exception.getCause().getClass(), CorruptBlockException.class);
                     }
+                }
 
-                    // lets write original checksum and check if we can read data
-                    updateChecksum(checksumModifier, chunk.length, checksum);
+                // lets write original checksum and check if we can read data
+                updateChecksum(checksumModifier, chunk.length, checksum);
 
-                    try (RandomAccessReader cr = RandomAccessReader.builder(channel).compression(meta).build())
-                    {
-                        // read and verify compressed data
-                        assertEquals(CONTENT, cr.readLine());
-                        // close reader
-                    }
-                }
-                finally
+                try (RandomAccessReader cr = fh.createReader())
                 {
-                    if (checksumModifier != null)
-                        checksumModifier.close();
+                    // read and verify compressed data
+                    assertEquals(CONTENT, cr.readLine());
+                    // close reader
                 }
             }
+            finally
+            {
+                if (checksumModifier != null)
+                    checksumModifier.close();
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
index 9959c7b..fa9643b 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
@@ -34,7 +34,6 @@ import org.junit.Test;
 import junit.framework.Assert;
 
 import org.apache.cassandra.db.ClusteringComparator;
-import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@ -82,43 +81,42 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest
     private void testWrite(File f, int bytesToTest) throws IOException
     {
         final String filename = f.getAbsolutePath();
-        final ChannelProxy channel = new ChannelProxy(f);
-
-        try
+        MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(Collections.singletonList(BytesType.instance)));
+
+        byte[] dataPre = new byte[bytesToTest];
+        byte[] rawPost = new byte[bytesToTest];
+        try (CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata",
+                null, SequentialWriterOption.DEFAULT,
+                compressionParameters,
+                sstableMetadataCollector))
         {
-            MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(Arrays.<AbstractType<?>>asList(BytesType.instance)));
-
-            byte[] dataPre = new byte[bytesToTest];
-            byte[] rawPost = new byte[bytesToTest];
-            try (CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata",
-                                                                                    null, SequentialWriterOption.DEFAULT,
-                                                                                    compressionParameters,
-                                                                                    sstableMetadataCollector))
+            Random r = new Random(42);
+
+            // Test both write with byte[] and ByteBuffer
+            r.nextBytes(dataPre);
+            r.nextBytes(rawPost);
+            ByteBuffer dataPost = makeBB(bytesToTest);
+            dataPost.put(rawPost);
+            dataPost.flip();
+
+            writer.write(dataPre);
+            DataPosition mark = writer.mark();
+
+            // Write enough garbage to transition chunk
+            for (int i = 0; i < CompressionParams.DEFAULT_CHUNK_LENGTH; i++)
             {
-                Random r = new Random(42);
-
-                // Test both write with byte[] and ByteBuffer
-                r.nextBytes(dataPre);
-                r.nextBytes(rawPost);
-                ByteBuffer dataPost = makeBB(bytesToTest);
-                dataPost.put(rawPost);
-                dataPost.flip();
-
-                writer.write(dataPre);
-                DataPosition mark = writer.mark();
-
-                // Write enough garbage to transition chunk
-                for (int i = 0; i < CompressionParams.DEFAULT_CHUNK_LENGTH; i++)
-                {
-                    writer.write((byte)i);
-                }
-                writer.resetAndTruncate(mark);
-                writer.write(dataPost);
-                writer.finish();
+                writer.write((byte)i);
             }
+            writer.resetAndTruncate(mark);
+            writer.write(dataPost);
+            writer.finish();
+        }
 
-            assert f.exists();
-            RandomAccessReader reader = RandomAccessReader.builder(channel).compression(new CompressionMetadata(filename + ".metadata", f.length(), ChecksumType.CRC32)).build();
+        assert f.exists();
+        try (FileHandle.Builder builder = new FileHandle.Builder(filename).withCompressionMetadata(new CompressionMetadata(filename + ".metadata", f.length(), ChecksumType.CRC32));
+             FileHandle fh = builder.complete();
+             RandomAccessReader reader = fh.createReader())
+        {
             assertEquals(dataPre.length + rawPost.length, reader.length());
             byte[] result = new byte[(int)reader.length()];
 
@@ -134,9 +132,6 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest
         }
         finally
         {
-            // cleanup
-            channel.close();
-
             if (f.exists())
                 f.delete();
             File metadata = new File(f + ".metadata");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java b/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java
index f769293..6226ee6 100644
--- a/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java
@@ -77,14 +77,14 @@ public class DescriptorTest
     private void testFromFilenameFor(File dir)
     {
         // normal
-        checkFromFilename(new Descriptor(dir, ksname, cfname, 1), false);
+        checkFromFilename(new Descriptor(dir, ksname, cfname, 1, SSTableFormat.Type.BIG), false);
         // skip component (for streaming lock file)
-        checkFromFilename(new Descriptor(dir, ksname, cfname, 2), true);
+        checkFromFilename(new Descriptor(dir, ksname, cfname, 2, SSTableFormat.Type.BIG), true);
 
         // secondary index
         String idxName = "myidx";
         File idxDir = new File(dir.getAbsolutePath() + File.separator + Directories.SECONDARY_INDEX_NAME_SEPARATOR + idxName);
-        checkFromFilename(new Descriptor(idxDir, ksname, cfname + Directories.SECONDARY_INDEX_NAME_SEPARATOR + idxName, 4), false);
+        checkFromFilename(new Descriptor(idxDir, ksname, cfname + Directories.SECONDARY_INDEX_NAME_SEPARATOR + idxName, 4, SSTableFormat.Type.BIG), false);
 
         // legacy version
         checkFromFilename(new Descriptor("ja", dir, ksname, cfname, 1, SSTableFormat.Type.LEGACY), false);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index 151a995..0928ad4 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@ -31,7 +31,6 @@ import org.junit.runner.RunWith;
 import org.apache.cassandra.OrderedJUnit4ClassRunner;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.CompactionManager;
@@ -47,7 +46,6 @@ import org.apache.cassandra.index.Index;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.MmappedRegions;
-import org.apache.cassandra.io.util.SegmentedFile;
 import org.apache.cassandra.schema.CachingParams;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.service.CacheService;
@@ -393,13 +391,7 @@ public class SSTableReaderTest
             SSTableReader sstable = indexCfs.getLiveSSTables().iterator().next();
             assert sstable.first.getToken() instanceof LocalToken;
 
-            try (SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode(),
-                                                                           false);
-                 SegmentedFile.Builder dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(),
-                                                                           sstable.compression))
-            {
-                sstable.saveSummary(ibuilder, dbuilder);
-            }
+            sstable.saveSummary();
             SSTableReader reopened = SSTableReader.open(sstable.descriptor);
             assert reopened.first.getToken() instanceof LocalToken;
             reopened.selfRef().release();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4133f38/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
index 5c7ff02..df9d1aa 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.service.ActiveRepairService;
 
@@ -76,7 +77,7 @@ public class SSTableUtils
         File cfDir = new File(tempdir, keyspaceName + File.separator + cfname);
         cfDir.mkdirs();
         cfDir.deleteOnExit();
-        File datafile = new File(new Descriptor(cfDir, keyspaceName, cfname, generation).filenameFor(Component.DATA));
+        File datafile = new File(new Descriptor(cfDir, keyspaceName, cfname, generation, SSTableFormat.Type.BIG).filenameFor(Component.DATA));
         if (!datafile.createNewFile())
             throw new IOException("unable to create file " + datafile);
         datafile.deleteOnExit();