You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/09/04 13:46:24 UTC

[03/10] cassandra git commit: Faster sequential IO (CASSANDRA-8630)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index b13d154..e9b0ee4 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -19,119 +19,129 @@ package org.apache.cassandra.io.util;
 
 import java.io.*;
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import com.google.common.primitives.Ints;
+import com.google.common.util.concurrent.RateLimiter;
 
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.compress.BufferType;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.memory.BufferPool;
 
-public class RandomAccessReader extends AbstractDataInput implements FileDataInput
+public class RandomAccessReader extends RebufferingInputStream implements FileDataInput
 {
+    // The default buffer size when the client doesn't specify it
     public static final int DEFAULT_BUFFER_SIZE = 4096;
 
+    // The maximum buffer size when the limiter is not null, i.e. when throttling
+    // is enabled. This is required to avoid aquiring permits that are too large.
+    public static final int MAX_THROTTLED_BUFFER_SIZE = 1 << 16; // 64k
+
     // the IO channel to the file, we do not own a reference to this due to
     // performance reasons (CASSANDRA-9379) so it's up to the owner of the RAR to
     // ensure that the channel stays open and that it is closed afterwards
     protected final ChannelProxy channel;
 
-    // buffer which will cache file blocks
-    protected ByteBuffer buffer;
+    // optional memory mapped regions for the channel
+    protected final MmappedRegions regions;
 
-    // `bufferOffset` is the offset of the beginning of the buffer
-    // `markedPointer` folds the offset of the last file mark
-    protected long bufferOffset, markedPointer;
+    // An optional limiter that will throttle the amount of data we read
+    protected final RateLimiter limiter;
 
-    // this can be overridden at construction to a value shorter than the true length of the file;
-    // if so, it acts as an imposed limit on reads, rather than a convenience property
+    // the file length, this can be overridden at construction to a value shorter
+    // than the true length of the file; if so, it acts as an imposed limit on reads,
+    // required when opening sstables early not to read past the mark
     private final long fileLength;
 
-    protected RandomAccessReader(ChannelProxy channel, int bufferSize, long overrideLength, BufferType bufferType)
-    {
-        this.channel = channel;
+    // the buffer size for buffered readers
+    protected final int bufferSize;
 
-        if (bufferSize <= 0)
-            throw new IllegalArgumentException("bufferSize must be positive");
+    // the buffer type for buffered readers
+    private final BufferType bufferType;
 
-        // we can cache file length in read-only mode
-        fileLength = overrideLength <= 0 ? channel.size() : overrideLength;
+    // offset from the beginning of the file
+    protected long bufferOffset;
 
-        buffer = allocateBuffer(getBufferSize(bufferSize), bufferType);
-        buffer.limit(0);
-    }
+    // offset of the last file mark
+    protected long markedPointer;
 
-    /** The buffer size is typically already page aligned but if that is not the case
-     * make sure that it is a multiple of the page size, 4096.
-     * */
-    protected int getBufferSize(int size)
+    protected RandomAccessReader(Builder builder)
     {
-        if ((size & ~4095) != size)
-        { // should already be a page size multiple but if that's not case round it up
-            size = (size + 4095) & ~4095;
-        }
-        return size;
-    }
+        super(null);
 
-    protected ByteBuffer allocateBuffer(int size, BufferType bufferType)
-    {
-        return BufferPool.get(size, bufferType);
-    }
+        this.channel = builder.channel;
+        this.regions = builder.regions;
+        this.limiter = builder.limiter;
+        this.fileLength = builder.overrideLength <= 0 ? builder.channel.size() : builder.overrideLength;
+        this.bufferSize = getBufferSize(builder);
+        this.bufferType = builder.bufferType;
 
-    // A wrapper of the RandomAccessReader that closes the channel when done.
-    // For performance reasons RAR does not increase the reference count of
-    // a channel but assumes the owner will keep it open and close it,
-    // see CASSANDRA-9379, this thin class is just for those cases where we do
-    // not have a shared channel.
-    private static class RandomAccessReaderWithChannel extends RandomAccessReader
-    {
-        @SuppressWarnings("resource")
-        RandomAccessReaderWithChannel(File file)
-        {
-            super(new ChannelProxy(file), DEFAULT_BUFFER_SIZE, -1L, BufferType.OFF_HEAP);
-        }
+        if (builder.bufferSize <= 0)
+            throw new IllegalArgumentException("bufferSize must be positive");
 
-        @Override
-        public void close()
-        {
-            try
-            {
-                super.close();
-            }
-            finally
-            {
-                channel.close();
-            }
-        }
+        if (builder.initializeBuffers)
+            initializeBuffer();
     }
 
-    public static RandomAccessReader open(File file)
+    protected int getBufferSize(Builder builder)
     {
-        return new RandomAccessReaderWithChannel(file);
+        if (builder.limiter == null)
+            return builder.bufferSize;
+
+        // limit to ensure more accurate throttling
+        return Math.min(MAX_THROTTLED_BUFFER_SIZE, builder.bufferSize);
     }
 
-    public static RandomAccessReader open(ChannelProxy channel)
+    protected void initializeBuffer()
     {
-        return open(channel, DEFAULT_BUFFER_SIZE, -1L);
+        if (regions == null)
+            buffer = allocateBuffer(bufferSize);
+        else
+            buffer = regions.floor(0).buffer.duplicate();
+
+        buffer.limit(0);
     }
 
-    public static RandomAccessReader open(ChannelProxy channel, int bufferSize, long overrideSize)
+    protected ByteBuffer allocateBuffer(int size)
     {
-        return new RandomAccessReader(channel, bufferSize, overrideSize, BufferType.OFF_HEAP);
+        return BufferPool.get(size, bufferType).order(ByteOrder.BIG_ENDIAN);
     }
 
-    public ChannelProxy getChannel()
+    protected void releaseBuffer()
     {
-        return channel;
+        if (buffer != null)
+        {
+            if (regions == null)
+                BufferPool.put(buffer);
+            buffer = null;
+        }
     }
 
     /**
      * Read data from file starting from current currentOffset to populate buffer.
      */
-    protected void reBuffer()
+    public void reBuffer()
+    {
+        if (isEOF())
+            return;
+
+        if (regions == null)
+            reBufferStandard();
+        else
+            reBufferMmap();
+
+        if (limiter != null)
+            limiter.acquire(buffer.remaining());
+
+        assert buffer.order() == ByteOrder.BIG_ENDIAN : "Buffer must have BIG ENDIAN byte ordering";
+    }
+
+    protected void reBufferStandard()
     {
         bufferOffset += buffer.position();
-        buffer.clear();
         assert bufferOffset < fileLength;
 
+        buffer.clear();
         long position = bufferOffset;
         long limit = bufferOffset;
 
@@ -145,15 +155,31 @@ public class RandomAccessReader extends AbstractDataInput implements FileDataInp
         {
             int n = channel.read(buffer, position);
             if (n < 0)
-                break;
+                throw new FSReadError(new IOException("Unexpected end of file"), channel.filePath());
+
             position += n;
             limit = bufferOffset + buffer.position();
         }
-        if (limit > fileLength)
-            buffer.position((int)(fileLength - bufferOffset));
+
         buffer.flip();
     }
 
+    protected void reBufferMmap()
+    {
+        long position = bufferOffset + buffer.position();
+        assert position < fileLength;
+
+        MmappedRegions.Region region = regions.floor(position);
+        bufferOffset = region.bottom();
+        buffer = region.buffer.duplicate();
+        buffer.position(Ints.checkedCast(position - bufferOffset));
+
+        if (limiter != null && bufferSize < buffer.remaining())
+        { // ensure accurate throttling
+            buffer.limit(buffer.position() + bufferSize);
+        }
+    }
+
     @Override
     public long getFilePointer()
     {
@@ -170,19 +196,23 @@ public class RandomAccessReader extends AbstractDataInput implements FileDataInp
         return channel.filePath();
     }
 
-    public int getTotalBufferSize()
+    public ChannelProxy getChannel()
     {
-        //This may NPE so we make a ref
-        //https://issues.apache.org/jira/browse/CASSANDRA-7756
-        ByteBuffer ref = buffer;
-        return ref != null ? ref.capacity() : 0;
+        return channel;
     }
 
-    public void reset()
+    @Override
+    public void reset() throws IOException
     {
         seek(markedPointer);
     }
 
+    @Override
+    public boolean markSupported()
+    {
+        return true;
+    }
+
     public long bytesPastMark()
     {
         long bytes = current() - markedPointer;
@@ -215,7 +245,7 @@ public class RandomAccessReader extends AbstractDataInput implements FileDataInp
      */
     public boolean isEOF()
     {
-        return getFilePointer() == length();
+        return current() == length();
     }
 
     public long bytesRemaining()
@@ -224,22 +254,29 @@ public class RandomAccessReader extends AbstractDataInput implements FileDataInp
     }
 
     @Override
+    public int available() throws IOException
+    {
+        return Ints.saturatedCast(bytesRemaining());
+    }
+
+    @Override
     public void close()
     {
 	    //make idempotent
         if (buffer == null)
             return;
 
-
         bufferOffset += buffer.position();
-        BufferPool.put(buffer);
-        buffer = null;
+        releaseBuffer();
+
+        //For performance reasons we don't keep a reference to the file
+        //channel so we don't close it
     }
 
     @Override
     public String toString()
     {
-        return getClass().getSimpleName() + "(" + "filePath='" + channel + "')";
+        return getClass().getSimpleName() + "(filePath='" + channel + "')";
     }
 
     /**
@@ -286,93 +323,197 @@ public class RandomAccessReader extends AbstractDataInput implements FileDataInp
         assert current() == newPosition;
     }
 
-    // -1 will be returned if there is nothing to read; higher-level methods like readInt
-    // or readFully (from RandomAccessFile) will throw EOFException but this should not
-    public int read()
+    /**
+     * Reads a line of text form the current position in this file. A line is
+     * represented by zero or more characters followed by {@code '\n'}, {@code
+     * '\r'}, {@code "\r\n"} or the end of file marker. The string does not
+     * include the line terminating sequence.
+     * <p/>
+     * Blocks until a line terminating sequence has been read, the end of the
+     * file is reached or an exception is thrown.
+     *
+     * @return the contents of the line or {@code null} if no characters have
+     * been read before the end of the file has been reached.
+     * @throws IOException if this file is closed or another I/O error occurs.
+     */
+    public final String readLine() throws IOException
     {
-        if (buffer == null)
-            throw new AssertionError("Attempted to read from closed RAR");
-
-        if (isEOF())
-            return -1; // required by RandomAccessFile
-
-        if (!buffer.hasRemaining())
-            reBuffer();
+        StringBuilder line = new StringBuilder(80); // Typical line length
+        boolean foundTerminator = false;
+        long unreadPosition = -1;
+        while (true)
+        {
+            int nextByte = read();
+            switch (nextByte)
+            {
+                case -1:
+                    return line.length() != 0 ? line.toString() : null;
+                case (byte) '\r':
+                    if (foundTerminator)
+                    {
+                        seek(unreadPosition);
+                        return line.toString();
+                    }
+                    foundTerminator = true;
+                    /* Have to be able to peek ahead one byte */
+                    unreadPosition = getPosition();
+                    break;
+                case (byte) '\n':
+                    return line.toString();
+                default:
+                    if (foundTerminator)
+                    {
+                        seek(unreadPosition);
+                        return line.toString();
+                    }
+                    line.append((char) nextByte);
+            }
+        }
+    }
 
-        return (int)buffer.get() & 0xff;
+    public long length()
+    {
+        return fileLength;
     }
 
-    @Override
-    public int read(byte[] buffer)
+    public long getPosition()
     {
-        return read(buffer, 0, buffer.length);
+        return current();
     }
 
-    @Override
-    // -1 will be returned if there is nothing to read; higher-level methods like readInt
-    // or readFully (from RandomAccessFile) will throw EOFException but this should not
-    public int read(byte[] buff, int offset, int length)
+    public static class Builder
     {
-        if (buffer == null)
-            throw new IllegalStateException("Attempted to read from closed RAR");
+        // The NIO file channel or an empty channel
+        public final ChannelProxy channel;
 
-        if (length == 0)
-            return 0;
+        // We override the file length when we open sstables early, so that we do not
+        // read past the early mark
+        public long overrideLength;
 
-        if (isEOF())
-            return -1;
+        // The size of the buffer for buffered readers
+        public int bufferSize;
 
-        if (!buffer.hasRemaining())
-            reBuffer();
+        // The type of the buffer for buffered readers
+        public BufferType bufferType;
 
-        int toCopy = Math.min(length, buffer.remaining());
-        buffer.get(buff, offset, toCopy);
-        return toCopy;
-    }
+        // The mmap segments for mmap readers
+        public MmappedRegions regions;
 
-    public ByteBuffer readBytes(int length) throws EOFException
-    {
-        assert length >= 0 : "buffer length should not be negative: " + length;
+        // An optional limiter that will throttle the amount of data we read
+        public RateLimiter limiter;
 
-        if (buffer == null)
-            throw new IllegalStateException("Attempted to read from closed RAR");
+        public boolean initializeBuffers;
 
-        try
+        public Builder(ChannelProxy channel)
         {
-            ByteBuffer result = ByteBuffer.allocate(length);
-            while (result.hasRemaining())
-            {
-                if (isEOF())
-                    throw new EOFException();
-                if (!buffer.hasRemaining())
-                    reBuffer();
-                ByteBufferUtil.put(buffer, result);
+            this.channel = channel;
+            this.overrideLength = -1L;
+            this.bufferSize = getBufferSize(DEFAULT_BUFFER_SIZE);
+            this.bufferType = BufferType.OFF_HEAP;
+            this.regions = null;
+            this.limiter = null;
+            this.initializeBuffers = true;
+        }
+
+        /** The buffer size is typically already page aligned but if that is not the case
+         * make sure that it is a multiple of the page size, 4096.
+         * */
+        private static int getBufferSize(int size)
+        {
+            if ((size & ~4095) != size)
+            { // should already be a page size multiple but if that's not case round it up
+                size = (size + 4095) & ~4095;
             }
-            result.flip();
-            return result;
+            return size;
+        }
+
+        public Builder overrideLength(long overrideLength)
+        {
+            if (overrideLength > channel.size())
+                throw new IllegalArgumentException("overrideLength cannot be more than the file size");
+
+            this.overrideLength = overrideLength;
+            return this;
         }
-        catch (EOFException e)
+
+        public Builder bufferSize(int bufferSize)
         {
-            throw e;
+            if (bufferSize <= 0)
+                throw new IllegalArgumentException("bufferSize must be positive");
+
+            this.bufferSize = getBufferSize(bufferSize);
+            return this;
         }
-        catch (Exception e)
+
+        public Builder bufferType(BufferType bufferType)
         {
-            throw new FSReadError(e, channel.toString());
+            this.bufferType = bufferType;
+            return this;
+        }
+
+        public Builder regions(MmappedRegions regions)
+        {
+            this.regions = regions;
+            return this;
+        }
+
+        public Builder limiter(RateLimiter limiter)
+        {
+            this.limiter = limiter;
+            return this;
+        }
+
+        public Builder initializeBuffers(boolean initializeBuffers)
+        {
+            this.initializeBuffers = initializeBuffers;
+            return this;
+        }
+
+        public RandomAccessReader build()
+        {
+            return new RandomAccessReader(this);
+        }
+
+        public RandomAccessReader buildWithChannel()
+        {
+            return new RandomAccessReaderWithOwnChannel(this);
         }
     }
 
-    public long length()
+    // A wrapper of the RandomAccessReader that closes the channel when done.
+    // For performance reasons RAR does not increase the reference count of
+    // a channel but assumes the owner will keep it open and close it,
+    // see CASSANDRA-9379, this thin class is just for those cases where we do
+    // not have a shared channel.
+    public static class RandomAccessReaderWithOwnChannel extends RandomAccessReader
     {
-        return fileLength;
+        protected RandomAccessReaderWithOwnChannel(Builder builder)
+        {
+            super(builder);
+        }
+
+        @Override
+        public void close()
+        {
+            try
+            {
+                super.close();
+            }
+            finally
+            {
+                channel.close();
+            }
+        }
     }
 
-    public long getPosition()
+    @SuppressWarnings("resource")
+    public static RandomAccessReader open(File file)
     {
-        return bufferOffset + (buffer == null ? 0 : buffer.position());
+        return new Builder(new ChannelProxy(file)).buildWithChannel();
     }
 
-    public long getPositionLimit()
+    public static RandomAccessReader open(ChannelProxy channel)
     {
-        return length();
+        return new Builder(channel).build();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java b/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
new file mode 100644
index 0000000..7d64f3d
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.util;
+
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import net.nicoulaj.compilecommand.annotations.DontInline;
+import net.nicoulaj.compilecommand.annotations.Print;
+import org.apache.cassandra.utils.FastByteOperations;
+import org.apache.cassandra.utils.vint.VIntCoding;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Rough equivalent of BufferedInputStream and DataInputStream wrapping a ByteBuffer that can be refilled
+ * via rebuffer. Implementations provide this buffer from various channels (socket, file, memory, etc).
+ *
+ * RebufferingInputStream is not thread safe.
+ */
+public abstract class RebufferingInputStream extends InputStream implements DataInputPlus, Closeable
+{
+    protected ByteBuffer buffer;
+
+    protected RebufferingInputStream(ByteBuffer buffer)
+    {
+        Preconditions.checkArgument(buffer == null || buffer.order() == ByteOrder.BIG_ENDIAN, "Buffer must have BIG ENDIAN byte ordering");
+        this.buffer = buffer;
+    }
+
+    /**
+     * Implementations must implement this method to refill the buffer.
+     * They can expect the buffer to be empty when this method is invoked.
+     * @throws IOException
+     */
+    protected abstract void reBuffer() throws IOException;
+
+    @Override
+    public void readFully(byte[] b) throws IOException
+    {
+        readFully(b, 0, b.length);
+    }
+
+    @Override
+    public void readFully(byte[] b, int off, int len) throws IOException
+    {
+        int read = read(b, off, len);
+        if (read < len)
+            throw new EOFException();
+    }
+
+    @Print
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+
+        // avoid int overflow
+        if (off < 0 || off > b.length || len < 0 || len > b.length - off)
+            throw new IndexOutOfBoundsException();
+
+        if (len == 0)
+            return 0;
+
+        int copied = 0;
+        while (copied < len)
+        {
+            int position = buffer.position();
+            int remaining = buffer.limit() - position;
+            if (remaining == 0)
+            {
+                reBuffer();
+                position = buffer.position();
+                remaining = buffer.limit() - position;
+                if (remaining == 0)
+                    return copied == 0 ? -1 : copied;
+            }
+            int toCopy = Math.min(len - copied, remaining);
+            FastByteOperations.copy(buffer, position, b, off + copied, toCopy);
+            buffer.position(position + toCopy);
+            copied += toCopy;
+        }
+
+        return copied;
+    }
+
+    @DontInline
+    protected long readPrimitiveSlowly(int bytes) throws IOException
+    {
+        long result = 0;
+        for (int i = 0; i < bytes; i++)
+            result = (result << 8) | (readByte() & 0xFFL);
+        return result;
+    }
+
+    @Override
+    public int skipBytes(int n) throws IOException
+    {
+        int skipped = 0;
+
+        while (skipped < n)
+        {
+            int skippedThisTime = (int)skip(n - skipped);
+            if (skippedThisTime <= 0) break;
+            skipped += skippedThisTime;
+        }
+
+        return skipped;
+    }
+
+    @Override
+    public boolean readBoolean() throws IOException
+    {
+        return readByte() != 0;
+    }
+
+    @Override
+    public byte readByte() throws IOException
+    {
+        if (!buffer.hasRemaining())
+        {
+            reBuffer();
+            if (!buffer.hasRemaining())
+                throw new EOFException();
+        }
+
+        return buffer.get();
+    }
+
+    @Override
+    public int readUnsignedByte() throws IOException
+    {
+        return readByte() & 0xff;
+    }
+
+    @Override
+    public short readShort() throws IOException
+    {
+        if (buffer.remaining() >= 2)
+            return buffer.getShort();
+        else
+            return (short) readPrimitiveSlowly(2);
+    }
+
+    @Override
+    public int readUnsignedShort() throws IOException
+    {
+        return readShort() & 0xFFFF;
+    }
+
+    @Override
+    public char readChar() throws IOException
+    {
+        if (buffer.remaining() >= 2)
+            return buffer.getChar();
+        else
+            return (char) readPrimitiveSlowly(2);
+    }
+
+    @Override
+    public int readInt() throws IOException
+    {
+        if (buffer.remaining() >= 4)
+            return buffer.getInt();
+        else
+            return (int) readPrimitiveSlowly(4);
+    }
+
+    @Override
+    public long readLong() throws IOException
+    {
+        if (buffer.remaining() >= 8)
+            return buffer.getLong();
+        else
+            return readPrimitiveSlowly(8);
+    }
+
+    public long readVInt() throws IOException
+    {
+        return VIntCoding.decodeZigZag64(readUnsignedVInt());
+    }
+
+    public long readUnsignedVInt() throws IOException
+    {
+        //If 9 bytes aren't available use the slow path in VIntCoding
+        if (buffer.remaining() < 9)
+            return VIntCoding.readUnsignedVInt(this);
+
+        byte firstByte = buffer.get();
+
+        //Bail out early if this is one byte, necessary or it fails later
+        if (firstByte >= 0)
+            return firstByte;
+
+        int extraBytes = VIntCoding.numberOfExtraBytesToRead(firstByte);
+
+        int position = buffer.position();
+        int extraBits = extraBytes * 8;
+
+        long retval = buffer.getLong(position);
+        if (buffer.order() == ByteOrder.LITTLE_ENDIAN)
+            retval = Long.reverseBytes(retval);
+        buffer.position(position + extraBytes);
+
+        // truncate the bytes we read in excess of those we needed
+        retval >>>= 64 - extraBits;
+        // remove the non-value bits from the first byte
+        firstByte &= VIntCoding.firstByteValueMask(extraBytes);
+        // shift the first byte up to its correct position
+        retval |= (long) firstByte << extraBits;
+        return retval;
+    }
+
+    @Override
+    public float readFloat() throws IOException
+    {
+        if (buffer.remaining() >= 4)
+            return buffer.getFloat();
+        else
+            return Float.intBitsToFloat((int)readPrimitiveSlowly(4));
+    }
+
+    @Override
+    public double readDouble() throws IOException
+    {
+        if (buffer.remaining() >= 8)
+            return buffer.getDouble();
+        else
+            return Double.longBitsToDouble(readPrimitiveSlowly(8));
+    }
+
+    @Override
+    public String readLine() throws IOException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String readUTF() throws IOException
+    {
+        return DataInputStream.readUTF(this);
+    }
+
+    @Override
+    public int read() throws IOException
+    {
+        try
+        {
+            return readUnsignedByte();
+        }
+        catch (EOFException ex)
+        {
+            return -1;
+        }
+    }
+
+    @Override
+    public void reset() throws IOException
+    {
+        throw new IOException("mark/reset not supported");
+    }
+
+    @Override
+    public boolean markSupported()
+    {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
index e586682..c2a2374 100644
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@ -21,23 +21,19 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.File;
 import java.io.IOException;
-import java.nio.MappedByteBuffer;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
 
 import com.google.common.util.concurrent.RateLimiter;
 
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.IndexSummary;
 import org.apache.cassandra.io.sstable.IndexSummaryBuilder;
+import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.utils.CLibrary;
-import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.concurrent.RefCounted;
 import org.apache.cassandra.utils.concurrent.SharedCloseableImpl;
 
@@ -79,7 +75,7 @@ public abstract class SegmentedFile extends SharedCloseableImpl
         this.onDiskLength = onDiskLength;
     }
 
-    public SegmentedFile(SegmentedFile copy)
+    protected SegmentedFile(SegmentedFile copy)
     {
         super(copy);
         channel = copy.channel;
@@ -93,7 +89,7 @@ public abstract class SegmentedFile extends SharedCloseableImpl
         return channel.filePath();
     }
 
-    protected static abstract class Cleanup implements RefCounted.Tidy
+    protected static class Cleanup implements RefCounted.Tidy
     {
         final ChannelProxy channel;
         protected Cleanup(ChannelProxy channel)
@@ -116,16 +112,22 @@ public abstract class SegmentedFile extends SharedCloseableImpl
 
     public RandomAccessReader createReader()
     {
-        return RandomAccessReader.open(channel, bufferSize, length);
+        return new RandomAccessReader.Builder(channel)
+               .overrideLength(length)
+               .bufferSize(bufferSize)
+               .build();
     }
 
-    public RandomAccessReader createThrottledReader(RateLimiter limiter)
+    public RandomAccessReader createReader(RateLimiter limiter)
     {
-        assert limiter != null;
-        return ThrottledReader.open(channel, bufferSize, length, limiter);
+        return new RandomAccessReader.Builder(channel)
+               .overrideLength(length)
+               .bufferSize(bufferSize)
+               .limiter(limiter)
+               .build();
     }
 
-    public FileDataInput getSegment(long position)
+    public FileDataInput createReader(long position)
     {
         RandomAccessReader reader = createReader();
         reader.seek(position);
@@ -153,14 +155,6 @@ public abstract class SegmentedFile extends SharedCloseableImpl
     }
 
     /**
-     * @return An Iterator over segments, beginning with the segment containing the given position: each segment must be closed after use.
-     */
-    public Iterator<FileDataInput> iterator(long position)
-    {
-        return new SegmentIterator(position);
-    }
-
-    /**
      * Collects potential segmentation points in an underlying file, and builds a SegmentedFile to represent it.
      */
     public static abstract class Builder implements AutoCloseable
@@ -168,13 +162,6 @@ public abstract class SegmentedFile extends SharedCloseableImpl
         private ChannelProxy channel;
 
         /**
-         * Adds a position that would be a safe place for a segment boundary in the file. For a block/row based file
-         * format, safe boundaries are block/row edges.
-         * @param boundary The absolute position of the potential boundary in the file.
-         */
-        public abstract void addPotentialBoundary(long boundary);
-
-        /**
          * Called after all potential boundaries have been added to apply this Builder to a concrete file on disk.
          * @param channel The channel to the file on disk.
          */
@@ -214,12 +201,12 @@ public abstract class SegmentedFile extends SharedCloseableImpl
             return complete(desc.filenameFor(Component.PRIMARY_INDEX), bufferSize(desc, indexSummary), -1L);
         }
 
-        private int bufferSize(StatsMetadata stats)
+        private static int bufferSize(StatsMetadata stats)
         {
             return bufferSize(stats.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile()));
         }
 
-        private int bufferSize(Descriptor desc, IndexSummary indexSummary)
+        private static int bufferSize(Descriptor desc, IndexSummary indexSummary)
         {
             File file = new File(desc.filenameFor(Component.PRIMARY_INDEX));
             return bufferSize(file.length() / indexSummary.size());
@@ -267,13 +254,19 @@ public abstract class SegmentedFile extends SharedCloseableImpl
             return (int)Math.min(size, 1 << 16);
         }
 
-        public void serializeBounds(DataOutput out) throws IOException
+        public void serializeBounds(DataOutput out, Version version) throws IOException
         {
+            if (!version.hasBoundaries())
+                return;
+
             out.writeUTF(DatabaseDescriptor.getDiskAccessMode().name());
         }
 
-        public void deserializeBounds(DataInput in) throws IOException
+        public void deserializeBounds(DataInput in, Version version) throws IOException
         {
+            if (!version.hasBoundaries())
+                return;
+
             if (!in.readUTF().equals(DatabaseDescriptor.getDiskAccessMode().name()))
                 throw new IOException("Cannot deserialize SSTable Summary component because the DiskAccessMode was changed!");
         }
@@ -282,6 +275,7 @@ public abstract class SegmentedFile extends SharedCloseableImpl
         {
             if (channel != null)
                 return channel.close(accumulate);
+
             return accumulate;
         }
 
@@ -294,6 +288,10 @@ public abstract class SegmentedFile extends SharedCloseableImpl
         {
             if (channel != null)
             {
+                // This is really fragile, both path and channel.filePath()
+                // must agree, i.e. they both must be absolute or both relative
+                // eventually we should really pass the filePath to the builder
+                // constructor and remove this
                 if (channel.filePath().equals(path))
                     return channel.sharedCopy();
                 else
@@ -305,61 +303,10 @@ public abstract class SegmentedFile extends SharedCloseableImpl
         }
     }
 
-    static final class Segment extends Pair<Long, MappedByteBuffer> implements Comparable<Segment>
-    {
-        public Segment(long offset, MappedByteBuffer segment)
-        {
-            super(offset, segment);
-        }
-
-        public final int compareTo(Segment that)
-        {
-            return (int)Math.signum(this.left - that.left);
-        }
-    }
-
-    /**
-     * A lazy Iterator over segments in forward order from the given position.  It is caller's responsibility
-     * to close the FileDataIntputs when finished.
-     */
-    final class SegmentIterator implements Iterator<FileDataInput>
-    {
-        private long nextpos;
-        public SegmentIterator(long position)
-        {
-            this.nextpos = position;
-        }
-
-        public boolean hasNext()
-        {
-            return nextpos < length;
-        }
-
-        public FileDataInput next()
-        {
-            long position = nextpos;
-            if (position >= length)
-                throw new NoSuchElementException();
-
-            FileDataInput segment = getSegment(nextpos);
-            try
-            {
-                nextpos = nextpos + segment.bytesRemaining();
-            }
-            catch (IOException e)
-            {
-                throw new FSReadError(e, path());
-            }
-            return segment;
-        }
-
-        public void remove() { throw new UnsupportedOperationException(); }
-    }
-
     @Override
     public String toString() {
-        return getClass().getSimpleName() + "(path='" + path() + "'" +
+        return getClass().getSimpleName() + "(path='" + path() + '\'' +
                ", length=" + length +
-               ")";
+               ')';
 }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/ThrottledReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ThrottledReader.java b/src/java/org/apache/cassandra/io/util/ThrottledReader.java
deleted file mode 100644
index 024d38f..0000000
--- a/src/java/org/apache/cassandra/io/util/ThrottledReader.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package org.apache.cassandra.io.util;
-/*
- * 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * 
- */
-
-
-import com.google.common.util.concurrent.RateLimiter;
-
-import org.apache.cassandra.io.compress.BufferType;
-
-public class ThrottledReader extends RandomAccessReader
-{
-    private final RateLimiter limiter;
-
-    protected ThrottledReader(ChannelProxy channel, int bufferSize, long overrideLength, RateLimiter limiter)
-    {
-        super(channel, bufferSize, overrideLength, BufferType.OFF_HEAP);
-        this.limiter = limiter;
-    }
-
-    protected void reBuffer()
-    {
-        limiter.acquire(buffer.capacity());
-        super.reBuffer();
-    }
-
-    public static ThrottledReader open(ChannelProxy channel, int bufferSize, long overrideLength, RateLimiter limiter)
-    {
-        return new ThrottledReader(channel, bufferSize, overrideLength, limiter);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
index 6b24707..adbd091 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
@@ -55,7 +55,7 @@ public class CompressedStreamWriter extends StreamWriter
     public void write(DataOutputStreamPlus out) throws IOException
     {
         long totalSize = totalSize();
-        try (RandomAccessReader file = sstable.openDataReader(); final ChannelProxy fc = file.getChannel().sharedCopy())
+        try (ChannelProxy fc = sstable.getDataChannel().sharedCopy())
         {
             long progress = 0L;
             // calculate chunks to transfer. we want to send continuous chunks altogether.
@@ -72,13 +72,7 @@ public class CompressedStreamWriter extends StreamWriter
                     final long bytesTransferredFinal = bytesTransferred;
                     final int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred);
                     limiter.acquire(toTransfer);
-                    long lastWrite = out.applyToChannel(new Function<WritableByteChannel, Long>()
-                    {
-                        public Long apply(WritableByteChannel wbc)
-                        {
-                            return fc.transferTo(section.left + bytesTransferredFinal, toTransfer, wbc);
-                        }
-                    });
+                    long lastWrite = out.applyToChannel((wbc) -> fc.transferTo(section.left + bytesTransferredFinal, toTransfer, wbc));
                     bytesTransferred += lastWrite;
                     progress += lastWrite;
                     session.progress(sstable.descriptor, ProgressInfo.Direction.OUT, progress, totalSize);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
index abc2a37..a05c3c8 100644
--- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
+++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
@@ -398,9 +398,6 @@ public class ByteBufferUtil
         if (length == 0)
             return EMPTY_BYTE_BUFFER;
 
-        if (in instanceof FileDataInput)
-            return ((FileDataInput) in).readBytes(length);
-
         byte[] buff = new byte[length];
         in.readFully(buff);
         return ByteBuffer.wrap(buff);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/utils/Throwables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/Throwables.java b/src/java/org/apache/cassandra/utils/Throwables.java
index d6ce7b4..a5170ff 100644
--- a/src/java/org/apache/cassandra/utils/Throwables.java
+++ b/src/java/org/apache/cassandra/utils/Throwables.java
@@ -82,29 +82,54 @@ public final class Throwables
     @SuppressWarnings("unchecked")
     public static <E extends Exception> void perform(Stream<DiscreteAction<? extends E>> actions) throws E
     {
-        Throwable fail = null;
-        Iterator<DiscreteAction<? extends E>> iter = actions.iterator();
-        while (iter.hasNext())
+        Throwable fail = perform(null, actions);
+        if (failIfCanCast(fail, null))
+            throw (E) fail;
+    }
+
+    public static Throwable perform(Throwable accumulate, Stream<? extends DiscreteAction<?>> actions)
+    {
+        return perform(accumulate, actions.iterator());
+    }
+
+    public static Throwable perform(Throwable accumulate, Iterator<? extends DiscreteAction<?>> actions)
+    {
+        while (actions.hasNext())
         {
-            DiscreteAction<? extends E> action = iter.next();
+            DiscreteAction<?> action = actions.next();
             try
             {
                 action.perform();
             }
             catch (Throwable t)
             {
-                fail = merge(fail, t);
+                accumulate = merge(accumulate, t);
             }
         }
-
-        if (failIfCanCast(fail, null))
-            throw (E) fail;
+        return accumulate;
     }
 
     @SafeVarargs
     public static void perform(File against, FileOpType opType, DiscreteAction<? extends IOException> ... actions)
     {
-        perform(Arrays.stream(actions).map((action) -> () ->
+        perform(against.getPath(), opType, actions);
+    }
+
+    @SafeVarargs
+    public static void perform(String filePath, FileOpType opType, DiscreteAction<? extends IOException> ... actions)
+    {
+        maybeFail(perform(null, filePath, opType, actions));
+    }
+
+    @SafeVarargs
+    public static Throwable perform(Throwable accumulate, String filePath, FileOpType opType, DiscreteAction<? extends IOException> ... actions)
+    {
+        return perform(accumulate, filePath, opType, Arrays.stream(actions));
+    }
+
+    public static Throwable perform(Throwable accumulate, String filePath, FileOpType opType, Stream<DiscreteAction<? extends IOException>> actions)
+    {
+        return perform(accumulate, actions.map((action) -> () ->
         {
             try
             {
@@ -112,7 +137,7 @@ public final class Throwables
             }
             catch (IOException e)
             {
-                throw (opType == FileOpType.WRITE) ? new FSWriteError(e, against) : new FSReadError(e, against);
+                throw (opType == FileOpType.WRITE) ? new FSWriteError(e, filePath) : new FSReadError(e, filePath);
             }
         }));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/utils/vint/VIntCoding.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/vint/VIntCoding.java b/src/java/org/apache/cassandra/utils/vint/VIntCoding.java
index 0ac4124..daf5006 100644
--- a/src/java/org/apache/cassandra/utils/vint/VIntCoding.java
+++ b/src/java/org/apache/cassandra/utils/vint/VIntCoding.java
@@ -67,7 +67,7 @@ public class VIntCoding
             return firstByte;
 
         int size = numberOfExtraBytesToRead(firstByte);
-        long retval = firstByte & firstByteValueMask(size);;
+        long retval = firstByte & firstByteValueMask(size);
         for (int ii = 0; ii < size; ii++)
         {
             byte b = input.readByte();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index fcdab62..555cdda 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@ -60,8 +60,7 @@ import org.apache.cassandra.db.commitlog.CommitLogSegment;
 import org.apache.cassandra.db.commitlog.CommitLogReplayer.CommitLogReplayException;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.util.ByteBufferDataInput;
-import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -535,13 +534,12 @@ public class CommitLogTest
     {
         ByteBuffer buf = ByteBuffer.allocate(1024);
         CommitLogDescriptor.writeHeader(buf, desc);
-        long length = buf.position();
         // Put some extra data in the stream.
         buf.putDouble(0.1);
         buf.flip();
-        FileDataInput input = new ByteBufferDataInput(buf, "input", 0, 0);
+
+        DataInputBuffer input = new DataInputBuffer(buf, false);
         CommitLogDescriptor read = CommitLogDescriptor.readHeader(input);
-        Assert.assertEquals("Descriptor length", length, input.getFilePointer());
         Assert.assertEquals("Descriptors", desc, read);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
index 1ab9ca7..fea83c1 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.rows.SerializationHelper;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.NIODataInputStream;
+import org.apache.cassandra.io.util.RebufferingInputStream;
 
 /**
  * Utility class for tests needing to examine the commitlog contents.
@@ -60,7 +61,7 @@ public class CommitLogTestReplayer extends CommitLogReplayer
     @Override
     void replayMutation(byte[] inputBuffer, int size, final long entryLocation, final CommitLogDescriptor desc)
     {
-        NIODataInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size);
+        RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size);
         Mutation mutation;
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/hints/ChecksummedDataInputTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/ChecksummedDataInputTest.java b/test/unit/org/apache/cassandra/hints/ChecksummedDataInputTest.java
index e431924..323a12d 100644
--- a/test/unit/org/apache/cassandra/hints/ChecksummedDataInputTest.java
+++ b/test/unit/org/apache/cassandra/hints/ChecksummedDataInputTest.java
@@ -17,96 +17,217 @@
  */
 package org.apache.cassandra.hints;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.zip.CRC32;
 
 import org.junit.Test;
 
-import org.apache.cassandra.hints.ChecksummedDataInput;
-import org.apache.cassandra.io.util.AbstractDataInput;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
 
 public class ChecksummedDataInputTest
 {
     @Test
-    public void testThatItWorks() throws IOException
+    public void testReadMethods() throws IOException
     {
+        // Make sure this array is bigger than the reader buffer size
+        // so we test updating the crc across buffer boundaries
+        byte[] b = new byte[RandomAccessReader.DEFAULT_BUFFER_SIZE * 2];
+        for (int i = 0; i < b.length; i++)
+            b[i] = (byte)i;
+
+        ByteBuffer buffer;
+
         // fill a bytebuffer with some input
-        DataOutputBuffer out = new DataOutputBuffer();
-        out.write(127);
-        out.write(new byte[]{ 0, 1, 2, 3, 4, 5, 6 });
-        out.writeBoolean(false);
-        out.writeByte(10);
-        out.writeChar('t');
-        out.writeDouble(3.3);
-        out.writeFloat(2.2f);
-        out.writeInt(42);
-        out.writeLong(Long.MAX_VALUE);
-        out.writeShort(Short.MIN_VALUE);
-        out.writeUTF("utf");
-        ByteBuffer buffer = out.buffer();
-
-        // calculate resulting CRC
+        try (DataOutputBuffer out = new DataOutputBuffer())
+        {
+            out.write(127);
+            out.write(b);
+            out.writeBoolean(false);
+            out.writeByte(10);
+            out.writeChar('t');
+            out.writeDouble(3.3);
+            out.writeFloat(2.2f);
+            out.writeInt(42);
+            out.writeLong(Long.MAX_VALUE);
+            out.writeShort(Short.MIN_VALUE);
+            out.writeUTF("utf");
+            out.writeVInt(67L);
+            out.writeUnsignedVInt(88L);
+            out.writeBytes("abcdefghi");
+
+            buffer = out.buffer();
+        }
+
+        // calculate expected CRC
         CRC32 crc = new CRC32();
         FBUtilities.updateChecksum(crc, buffer);
-        int expectedCRC = (int) crc.getValue();
-
-        ChecksummedDataInput crcInput = ChecksummedDataInput.wrap(new DummyByteBufferDataInput(buffer.duplicate()));
-        crcInput.limit(buffer.remaining());
-
-        // assert that we read all the right values back
-        assertEquals(127, crcInput.read());
-        byte[] bytes = new byte[7];
-        crcInput.readFully(bytes);
-        assertTrue(Arrays.equals(new byte[]{ 0, 1, 2, 3, 4, 5, 6 }, bytes));
-        assertEquals(false, crcInput.readBoolean());
-        assertEquals(10, crcInput.readByte());
-        assertEquals('t', crcInput.readChar());
-        assertEquals(3.3, crcInput.readDouble());
-        assertEquals(2.2f, crcInput.readFloat());
-        assertEquals(42, crcInput.readInt());
-        assertEquals(Long.MAX_VALUE, crcInput.readLong());
-        assertEquals(Short.MIN_VALUE, crcInput.readShort());
-        assertEquals("utf", crcInput.readUTF());
-
-        // assert that the crc matches, and that we've read exactly as many bytes as expected
-        assertEquals(0, crcInput.bytesRemaining());
-        assertEquals(expectedCRC, crcInput.getCrc());
+
+        // save the buffer to file to create a RAR
+        File file = File.createTempFile("testReadMethods", "1");
+        file.deleteOnExit();
+        try (SequentialWriter writer = SequentialWriter.open(file))
+        {
+            writer.write(buffer);
+            writer.writeInt((int) crc.getValue());
+            writer.finish();
+        }
+
+        assertTrue(file.exists());
+        assertEquals(buffer.remaining() + 4, file.length());
+
+        try (ChecksummedDataInput reader = ChecksummedDataInput.open(file))
+        {
+            reader.limit(buffer.remaining() + 4);
+
+            // assert that we read all the right values back
+            assertEquals(127, reader.read());
+            byte[] bytes = new byte[b.length];
+            reader.readFully(bytes);
+            assertTrue(Arrays.equals(bytes, b));
+            assertEquals(false, reader.readBoolean());
+            assertEquals(10, reader.readByte());
+            assertEquals('t', reader.readChar());
+            assertEquals(3.3, reader.readDouble());
+            assertEquals(2.2f, reader.readFloat());
+            assertEquals(42, reader.readInt());
+            assertEquals(Long.MAX_VALUE, reader.readLong());
+            assertEquals(Short.MIN_VALUE, reader.readShort());
+            assertEquals("utf", reader.readUTF());
+            assertEquals(67L, reader.readVInt());
+            assertEquals(88L, reader.readUnsignedVInt());
+            assertEquals("abcdefghi", new String(ByteBufferUtil.read(reader, 9).array(), StandardCharsets.UTF_8));
+
+            // assert that the crc matches, and that we've read exactly as many bytes as expected
+            assertTrue(reader.checkCrc());
+            assertEquals(0, reader.bytesRemaining());
+
+            reader.checkLimit(0);
+        }
     }
 
-    private static final class DummyByteBufferDataInput extends AbstractDataInput
+    @Test
+    public void testResetCrc() throws IOException
     {
-        private final ByteBuffer buffer;
+        CRC32 crc = new CRC32();
+        ByteBuffer buffer;
+
+        // fill a bytebuffer with some input
+        try (DataOutputBuffer out = new DataOutputBuffer())
+        {
+            out.write(127);
+            out.writeBoolean(false);
+            out.writeByte(10);
+            out.writeChar('t');
+
+            buffer = out.buffer();
+            FBUtilities.updateChecksum(crc, buffer);
+            out.writeInt((int) crc.getValue());
+
+            int bufferPos = out.getLength();
+            out.writeDouble(3.3);
+            out.writeFloat(2.2f);
+            out.writeInt(42);
+
+            buffer = out.buffer();
+            buffer.position(bufferPos);
+            crc.reset();
+            FBUtilities.updateChecksum(crc, buffer);
+
+            out.writeInt((int) crc.getValue());
+            buffer = out.buffer();
+        }
 
-        DummyByteBufferDataInput(ByteBuffer buffer)
+        // save the buffer to file to create a RAR
+        File file = File.createTempFile("testResetCrc", "1");
+        file.deleteOnExit();
+        try (SequentialWriter writer = SequentialWriter.open(file))
         {
-            this.buffer = buffer;
+            writer.write(buffer);
+            writer.finish();
         }
 
-        public void seek(long position)
+        assertTrue(file.exists());
+        assertEquals(buffer.remaining(), file.length());
+
+        try (ChecksummedDataInput reader = ChecksummedDataInput.open(file))
         {
-            throw new UnsupportedOperationException();
+            reader.limit(buffer.remaining());
+
+            // assert that we read all the right values back
+            assertEquals(127, reader.read());
+            assertEquals(false, reader.readBoolean());
+            assertEquals(10, reader.readByte());
+            assertEquals('t', reader.readChar());
+            assertTrue(reader.checkCrc());
+
+            reader.resetCrc();
+            assertEquals(3.3, reader.readDouble());
+            assertEquals(2.2f, reader.readFloat());
+            assertEquals(42, reader.readInt());
+            assertTrue(reader.checkCrc());
+            assertEquals(0, reader.bytesRemaining());
         }
+    }
 
-        public long getPosition()
+    @Test
+    public void testFailedCrc() throws IOException
+    {
+        CRC32 crc = new CRC32();
+        ByteBuffer buffer;
+
+        // fill a bytebuffer with some input
+        try (DataOutputBuffer out = new DataOutputBuffer())
         {
-            throw new UnsupportedOperationException();
+            out.write(127);
+            out.writeBoolean(false);
+            out.writeByte(10);
+            out.writeChar('t');
+
+            buffer = out.buffer();
+            FBUtilities.updateChecksum(crc, buffer);
+
+            // update twice so it won't match
+            FBUtilities.updateChecksum(crc, buffer);
+            out.writeInt((int) crc.getValue());
+
+            buffer = out.buffer();
         }
 
-        public long getPositionLimit()
+        // save the buffer to file to create a RAR
+        File file = File.createTempFile("testFailedCrc", "1");
+        file.deleteOnExit();
+        try (SequentialWriter writer = SequentialWriter.open(file))
         {
-            throw new UnsupportedOperationException();
+            writer.write(buffer);
+            writer.finish();
         }
 
-        public int read()
+        assertTrue(file.exists());
+        assertEquals(buffer.remaining(), file.length());
+
+        try (ChecksummedDataInput reader = ChecksummedDataInput.open(file))
         {
-            return buffer.get() & 0xFF;
+            reader.limit(buffer.remaining());
+
+            // assert that we read all the right values back
+            assertEquals(127, reader.read());
+            assertEquals(false, reader.readBoolean());
+            assertEquals(10, reader.readByte());
+            assertEquals('t', reader.readChar());
+            assertFalse(reader.checkCrc());
+            assertEquals(0, reader.bytesRemaining());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/ChecksummedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/ChecksummedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/ChecksummedRandomAccessReaderTest.java
deleted file mode 100644
index c1e43c9..0000000
--- a/test/unit/org/apache/cassandra/io/ChecksummedRandomAccessReaderTest.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.io;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.util.Arrays;
-import java.util.concurrent.ThreadLocalRandom;
-
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-import org.apache.cassandra.io.util.ChecksummedRandomAccessReader;
-import org.apache.cassandra.io.util.ChecksummedSequentialWriter;
-import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.io.util.SequentialWriter;
-
-public class ChecksummedRandomAccessReaderTest
-{
-    @Test
-    public void readFully() throws IOException
-    {
-        final File data = File.createTempFile("testReadFully", "data");
-        final File crc = File.createTempFile("testReadFully", "crc");
-
-        final byte[] expected = new byte[70 * 1024];   // bit more than crc chunk size, so we can test rebuffering.
-        ThreadLocalRandom.current().nextBytes(expected);
-
-        SequentialWriter writer = ChecksummedSequentialWriter.open(data, crc);
-        writer.write(expected);
-        writer.finish();
-
-        assert data.exists();
-
-        RandomAccessReader reader = ChecksummedRandomAccessReader.open(data, crc);
-        byte[] b = new byte[expected.length];
-        reader.readFully(b);
-
-        assertArrayEquals(expected, b);
-
-        assertTrue(reader.isEOF());
-
-        reader.close();
-    }
-
-    @Test
-    public void seek() throws IOException
-    {
-        final File data = File.createTempFile("testSeek", "data");
-        final File crc = File.createTempFile("testSeek", "crc");
-
-        final byte[] dataBytes = new byte[70 * 1024];   // bit more than crc chunk size
-        ThreadLocalRandom.current().nextBytes(dataBytes);
-
-        SequentialWriter writer = ChecksummedSequentialWriter.open(data, crc);
-        writer.write(dataBytes);
-        writer.finish();
-
-        assert data.exists();
-
-        RandomAccessReader reader = ChecksummedRandomAccessReader.open(data, crc);
-
-        final int seekPosition = 66000;
-        reader.seek(seekPosition);
-
-        byte[] b = new byte[dataBytes.length - seekPosition];
-        reader.readFully(b);
-
-        byte[] expected = Arrays.copyOfRange(dataBytes, seekPosition, dataBytes.length);
-
-        assertArrayEquals(expected, b);
-
-        assertTrue(reader.isEOF());
-
-        reader.close();
-    }
-
-    @Test(expected = ChecksummedRandomAccessReader.CorruptFileException.class)
-    public void corruptionDetection() throws IOException
-    {
-        final File data = File.createTempFile("corruptionDetection", "data");
-        final File crc = File.createTempFile("corruptionDetection", "crc");
-
-        final byte[] expected = new byte[5 * 1024];
-        Arrays.fill(expected, (byte) 0);
-
-        SequentialWriter writer = ChecksummedSequentialWriter.open(data, crc);
-        writer.write(expected);
-        writer.finish();
-
-        assert data.exists();
-
-        // simulate corruption of file
-        try (RandomAccessFile dataFile = new RandomAccessFile(data, "rw"))
-        {
-            dataFile.seek(1024);
-            dataFile.write((byte) 5);
-        }
-
-        RandomAccessReader reader = ChecksummedRandomAccessReader.open(data, crc);
-        byte[] b = new byte[expected.length];
-        reader.readFully(b);
-
-        assertArrayEquals(expected, b);
-
-        assertTrue(reader.isEOF());
-
-        reader.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java
deleted file mode 100644
index edbd603..0000000
--- a/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java
+++ /dev/null
@@ -1,269 +0,0 @@
-package org.apache.cassandra.io;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-import org.apache.cassandra.io.util.ChannelProxy;
-import org.apache.cassandra.io.util.FileMark;
-import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.io.util.SequentialWriter;
-
-public class RandomAccessReaderTest
-{
-    @Test
-    public void testReadFully() throws IOException
-    {
-        testReadImpl(1, 0);
-    }
-
-    @Test
-    public void testReadLarge() throws IOException
-    {
-        testReadImpl(1000, 0);
-    }
-
-    @Test
-    public void testReadLargeWithSkip() throws IOException
-    {
-        testReadImpl(1000, 322);
-    }
-
-    @Test
-    public void testReadBufferSizeNotAligned() throws IOException
-    {
-        testReadImpl(1000, 0, 5122);
-    }
-
-    private void testReadImpl(int numIterations, int skipIterations) throws IOException
-    {
-        testReadImpl(numIterations, skipIterations, RandomAccessReader.DEFAULT_BUFFER_SIZE);
-    }
-
-    private void testReadImpl(int numIterations, int skipIterations, int bufferSize) throws IOException
-    {
-        final File f = File.createTempFile("testReadFully", "1");
-        final String expected = "The quick brown fox jumps over the lazy dog";
-
-        SequentialWriter writer = SequentialWriter.open(f);
-        for (int i = 0; i < numIterations; i++)
-            writer.write(expected.getBytes());
-        writer.finish();
-
-        assert f.exists();
-
-        ChannelProxy channel = new ChannelProxy(f);
-        RandomAccessReader reader = RandomAccessReader.open(channel, bufferSize, -1L);
-        assertEquals(f.getAbsolutePath(), reader.getPath());
-        assertEquals(expected.length() * numIterations, reader.length());
-
-        if (skipIterations > 0)
-        {
-            reader.seek(skipIterations * expected.length());
-        }
-
-        byte[] b = new byte[expected.length()];
-        int n = numIterations - skipIterations;
-        for (int i = 0; i < n; i++)
-        {
-            reader.readFully(b);
-            assertEquals(expected, new String(b));
-        }
-
-        assertTrue(reader.isEOF());
-        assertEquals(0, reader.bytesRemaining());
-
-        reader.close();
-        channel.close();
-    }
-
-    @Test
-    public void testReadBytes() throws IOException
-    {
-        File f = File.createTempFile("testReadBytes", "1");
-        final String expected = "The quick brown fox jumps over the lazy dog";
-
-        SequentialWriter writer = SequentialWriter.open(f);
-        writer.write(expected.getBytes());
-        writer.finish();
-
-        assert f.exists();
-
-        ChannelProxy channel = new ChannelProxy(f);
-        RandomAccessReader reader = RandomAccessReader.open(channel);
-        assertEquals(f.getAbsolutePath(), reader.getPath());
-        assertEquals(expected.length(), reader.length());
-
-        ByteBuffer b = reader.readBytes(expected.length());
-        assertEquals(expected, new String(b.array(), Charset.forName("UTF-8")));
-
-        assertTrue(reader.isEOF());
-        assertEquals(0, reader.bytesRemaining());
-
-        reader.close();
-        channel.close();
-    }
-
-    @Test
-    public void testReset() throws IOException
-    {
-        File f = File.createTempFile("testMark", "1");
-        final String expected = "The quick brown fox jumps over the lazy dog";
-        final int numIterations = 10;
-
-        SequentialWriter writer = SequentialWriter.open(f);
-        for (int i = 0; i < numIterations; i++)
-            writer.write(expected.getBytes());
-        writer.finish();
-
-        assert f.exists();
-
-        ChannelProxy channel = new ChannelProxy(f);
-        RandomAccessReader reader = RandomAccessReader.open(channel);
-        assertEquals(expected.length() * numIterations, reader.length());
-
-        ByteBuffer b = reader.readBytes(expected.length());
-        assertEquals(expected, new String(b.array(), Charset.forName("UTF-8")));
-
-        assertFalse(reader.isEOF());
-        assertEquals((numIterations - 1) * expected.length(), reader.bytesRemaining());
-
-        FileMark mark = reader.mark();
-        assertEquals(0, reader.bytesPastMark());
-        assertEquals(0, reader.bytesPastMark(mark));
-
-        for (int i = 0; i < (numIterations - 1); i++)
-        {
-            b = reader.readBytes(expected.length());
-            assertEquals(expected, new String(b.array(), Charset.forName("UTF-8")));
-        }
-        assertTrue(reader.isEOF());
-        assertEquals(expected.length() * (numIterations -1), reader.bytesPastMark());
-        assertEquals(expected.length() * (numIterations - 1), reader.bytesPastMark(mark));
-
-        reader.reset(mark);
-        assertEquals(0, reader.bytesPastMark());
-        assertEquals(0, reader.bytesPastMark(mark));
-        assertFalse(reader.isEOF());
-        for (int i = 0; i < (numIterations - 1); i++)
-        {
-            b = reader.readBytes(expected.length());
-            assertEquals(expected, new String(b.array(), Charset.forName("UTF-8")));
-        }
-
-        reader.reset();
-        assertEquals(0, reader.bytesPastMark());
-        assertEquals(0, reader.bytesPastMark(mark));
-        assertFalse(reader.isEOF());
-        for (int i = 0; i < (numIterations - 1); i++)
-        {
-            b = reader.readBytes(expected.length());
-            assertEquals(expected, new String(b.array(), Charset.forName("UTF-8")));
-        }
-
-        assertTrue(reader.isEOF());
-        reader.close();
-        channel.close();
-    }
-
-    @Test
-    public void testSeekSingleThread() throws IOException, InterruptedException
-    {
-        testSeek(1);
-    }
-
-    @Test
-    public void testSeekMultipleThreads() throws IOException, InterruptedException
-    {
-        testSeek(10);
-    }
-
-    private void testSeek(int numThreads) throws IOException, InterruptedException
-    {
-        final File f = File.createTempFile("testMark", "1");
-        final String[] expected = new String[10];
-        int len = 0;
-        for (int i = 0; i < expected.length; i++)
-        {
-            expected[i] = UUID.randomUUID().toString();
-            len += expected[i].length();
-        }
-        final int totalLength = len;
-
-        SequentialWriter writer = SequentialWriter.open(f);
-        for (int i = 0; i < expected.length; i++)
-            writer.write(expected[i].getBytes());
-        writer.finish();
-
-        assert f.exists();
-
-        final ChannelProxy channel = new ChannelProxy(f);
-
-        final Runnable worker = new Runnable() {
-
-            @Override
-            public void run()
-            {
-                try
-                {
-                    RandomAccessReader reader = RandomAccessReader.open(channel);
-                    assertEquals(totalLength, reader.length());
-
-                    ByteBuffer b = reader.readBytes(expected[0].length());
-                    assertEquals(expected[0], new String(b.array(), Charset.forName("UTF-8")));
-
-                    assertFalse(reader.isEOF());
-                    assertEquals(totalLength - expected[0].length(), reader.bytesRemaining());
-
-                    long filePointer = reader.getFilePointer();
-
-                    for (int i = 1; i < expected.length; i++)
-                    {
-                        b = reader.readBytes(expected[i].length());
-                        assertEquals(expected[i], new String(b.array(), Charset.forName("UTF-8")));
-                    }
-                    assertTrue(reader.isEOF());
-
-                    reader.seek(filePointer);
-                    assertFalse(reader.isEOF());
-                    for (int i = 1; i < expected.length; i++)
-                    {
-                        b = reader.readBytes(expected[i].length());
-                        assertEquals(expected[i], new String(b.array(), Charset.forName("UTF-8")));
-                    }
-
-                    assertTrue(reader.isEOF());
-                    reader.close();
-                }
-                catch (Exception ex)
-                {
-                    ex.printStackTrace();
-                    fail(ex.getMessage());
-                }
-            }
-        };
-
-        if(numThreads == 1)
-        {
-            worker.run();
-            return;
-        }
-
-        ExecutorService executor = Executors.newFixedThreadPool(numThreads);
-        for (int i = 0; i < numThreads; i++)
-            executor.submit(worker);
-
-        executor.shutdown();
-        executor.awaitTermination(1, TimeUnit.MINUTES);
-
-        channel.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
index 8f94cf2..9154d79 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.ChannelProxy;
 import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.MmappedRegions;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.io.util.SequentialWriter;
 import org.apache.cassandra.schema.CompressionParams;
@@ -39,6 +40,8 @@ import org.apache.cassandra.utils.SyncUtil;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
 
 public class CompressedRandomAccessReaderTest
 {
@@ -46,16 +49,24 @@ public class CompressedRandomAccessReaderTest
     public void testResetAndTruncate() throws IOException
     {
         // test reset in current buffer or previous one
-        testResetAndTruncate(File.createTempFile("normal", "1"), false, 10);
-        testResetAndTruncate(File.createTempFile("normal", "2"), false, CompressionParams.DEFAULT_CHUNK_LENGTH);
+        testResetAndTruncate(File.createTempFile("normal", "1"), false, false, 10);
+        testResetAndTruncate(File.createTempFile("normal", "2"), false, false, CompressionParams.DEFAULT_CHUNK_LENGTH);
     }
 
     @Test
     public void testResetAndTruncateCompressed() throws IOException
     {
         // test reset in current buffer or previous one
-        testResetAndTruncate(File.createTempFile("compressed", "1"), true, 10);
-        testResetAndTruncate(File.createTempFile("compressed", "2"), true, CompressionParams.DEFAULT_CHUNK_LENGTH);
+        testResetAndTruncate(File.createTempFile("compressed", "1"), true, false, 10);
+        testResetAndTruncate(File.createTempFile("compressed", "2"), true, false, CompressionParams.DEFAULT_CHUNK_LENGTH);
+    }
+
+    @Test
+    public void testResetAndTruncateCompressedMmap() throws IOException
+    {
+        // test reset in current buffer or previous one
+        testResetAndTruncate(File.createTempFile("compressed_mmap", "1"), true, true, 10);
+        testResetAndTruncate(File.createTempFile("compressed_mmap", "2"), true, true, CompressionParams.DEFAULT_CHUNK_LENGTH);
     }
 
     @Test
@@ -63,87 +74,102 @@ public class CompressedRandomAccessReaderTest
     {
         File f = File.createTempFile("compressed6791_", "3");
         String filename = f.getAbsolutePath();
-        ChannelProxy channel = new ChannelProxy(f);
-        try
+        try(ChannelProxy channel = new ChannelProxy(f))
         {
 
             MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(BytesType.instance));
-            CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata", CompressionParams.snappy(32), sstableMetadataCollector);
+            try(CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata", CompressionParams.snappy(32), sstableMetadataCollector))
+            {
 
-            for (int i = 0; i < 20; i++)
-                writer.write("x".getBytes());
+                for (int i = 0; i < 20; i++)
+                    writer.write("x".getBytes());
 
-            FileMark mark = writer.mark();
-            // write enough garbage to create new chunks:
-            for (int i = 0; i < 40; ++i)
-                writer.write("y".getBytes());
+                FileMark mark = writer.mark();
+                // write enough garbage to create new chunks:
+                for (int i = 0; i < 40; ++i)
+                    writer.write("y".getBytes());
 
-            writer.resetAndTruncate(mark);
+                writer.resetAndTruncate(mark);
 
-            for (int i = 0; i < 20; i++)
-                writer.write("x".getBytes());
-            writer.finish();
+                for (int i = 0; i < 20; i++)
+                    writer.write("x".getBytes());
+                writer.finish();
+            }
 
-            CompressedRandomAccessReader reader = CompressedRandomAccessReader.open(channel, new CompressionMetadata(filename + ".metadata", f.length(), ChecksumType.CRC32));
-            String res = reader.readLine();
-            assertEquals(res, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx");
-            assertEquals(40, res.length());
+            try(RandomAccessReader reader = new CompressedRandomAccessReader.Builder(channel,
+                                                                                     new CompressionMetadata(filename + ".metadata", f.length(), ChecksumType.CRC32))
+                                            .build())
+            {
+                String res = reader.readLine();
+                assertEquals(res, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx");
+                assertEquals(40, res.length());
+            }
         }
         finally
         {
-            // cleanup
-            channel.close();
-
             if (f.exists())
-                f.delete();
+                assertTrue(f.delete());
             File metadata = new File(filename+ ".metadata");
             if (metadata.exists())
                 metadata.delete();
         }
     }
 
-    private void testResetAndTruncate(File f, boolean compressed, int junkSize) throws IOException
+    private static void testResetAndTruncate(File f, boolean compressed, boolean usemmap, int junkSize) throws IOException
     {
         final String filename = f.getAbsolutePath();
-        ChannelProxy channel = new ChannelProxy(f);
-        try
+        try(ChannelProxy channel = new ChannelProxy(f))
         {
             MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(BytesType.instance)).replayPosition(null);
-            SequentialWriter writer = compressed
+            try(SequentialWriter writer = compressed
                 ? new CompressedSequentialWriter(f, filename + ".metadata", CompressionParams.snappy(), sstableMetadataCollector)
-                : SequentialWriter.open(f);
+                : SequentialWriter.open(f))
+            {
+                writer.write("The quick ".getBytes());
+                FileMark mark = writer.mark();
+                writer.write("blue fox jumps over the lazy dog".getBytes());
 
-            writer.write("The quick ".getBytes());
-            FileMark mark = writer.mark();
-            writer.write("blue fox jumps over the lazy dog".getBytes());
+                // write enough to be sure to change chunk
+                for (int i = 0; i < junkSize; ++i)
+                {
+                    writer.write((byte) 1);
+                }
 
-            // write enough to be sure to change chunk
-            for (int i = 0; i < junkSize; ++i)
+                writer.resetAndTruncate(mark);
+                writer.write("brown fox jumps over the lazy dog".getBytes());
+                writer.finish();
+            }
+            assert f.exists();
+
+            CompressionMetadata compressionMetadata = compressed ? new CompressionMetadata(filename + ".metadata", f.length(), ChecksumType.CRC32) : null;
+            RandomAccessReader.Builder builder = compressed
+                                                 ? new CompressedRandomAccessReader.Builder(channel, compressionMetadata)
+                                                 : new RandomAccessReader.Builder(channel);
+
+            if (usemmap)
             {
-                writer.write((byte)1);
+                if (compressed)
+                    builder.regions(MmappedRegions.map(channel, compressionMetadata));
+                else
+                    builder.regions(MmappedRegions.map(channel, f.length()));
             }
 
-            writer.resetAndTruncate(mark);
-            writer.write("brown fox jumps over the lazy dog".getBytes());
-            writer.finish();
+            try(RandomAccessReader reader = builder.build())
+            {
+                String expected = "The quick brown fox jumps over the lazy dog";
+                assertEquals(expected.length(), reader.length());
+                byte[] b = new byte[expected.length()];
+                reader.readFully(b);
+                assert new String(b).equals(expected) : "Expecting '" + expected + "', got '" + new String(b) + '\'';
+            }
 
-            assert f.exists();
-            RandomAccessReader reader = compressed
-                                      ? CompressedRandomAccessReader.open(channel, new CompressionMetadata(filename + ".metadata", f.length(), ChecksumType.CRC32))
-                                      : RandomAccessReader.open(f);
-            String expected = "The quick brown fox jumps over the lazy dog";
-            assertEquals(expected.length(), reader.length());
-            byte[] b = new byte[expected.length()];
-            reader.readFully(b);
-            assert new String(b).equals(expected) : "Expecting '" + expected + "', got '" + new String(b) + "'";
+            if (usemmap)
+                builder.regions.close();
         }
         finally
         {
-            // cleanup
-            channel.close();
-
             if (f.exists())
-                f.delete();
+                assertTrue(f.delete());
             File metadata = new File(filename + ".metadata");
             if (compressed && metadata.exists())
                 metadata.delete();
@@ -161,6 +187,9 @@ public class CompressedRandomAccessReaderTest
         File metadata = new File(file.getPath() + ".meta");
         metadata.deleteOnExit();
 
+        assertTrue(file.createNewFile());
+        assertTrue(metadata.createNewFile());
+
         MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(BytesType.instance)).replayPosition(null);
         try (SequentialWriter writer = new CompressedSequentialWriter(file, metadata.getPath(), CompressionParams.snappy(), sstableMetadataCollector))
         {
@@ -168,74 +197,74 @@ public class CompressedRandomAccessReaderTest
             writer.finish();
         }
 
-        ChannelProxy channel = new ChannelProxy(file);
-
-        // open compression metadata and get chunk information
-        CompressionMetadata meta = new CompressionMetadata(metadata.getPath(), file.length(), ChecksumType.CRC32);
-        CompressionMetadata.Chunk chunk = meta.chunkFor(0);
-
-        RandomAccessReader reader = CompressedRandomAccessReader.open(channel, meta);
-        // read and verify compressed data
-        assertEquals(CONTENT, reader.readLine());
-
-        Random random = new Random();
-        RandomAccessFile checksumModifier = null;
-
-        try
+        try(ChannelProxy channel = new ChannelProxy(file))
         {
-            checksumModifier = new RandomAccessFile(file, "rw");
-            byte[] checksum = new byte[4];
-
-            // seek to the end of the compressed chunk
-            checksumModifier.seek(chunk.length);
-            // read checksum bytes
-            checksumModifier.read(checksum);
-            // seek back to the chunk end
-            checksumModifier.seek(chunk.length);
-
-            // lets modify one byte of the checksum on each iteration
-            for (int i = 0; i < checksum.length; i++)
-            {
-                checksumModifier.write(random.nextInt());
-                SyncUtil.sync(checksumModifier); // making sure that change was synced with disk
+            // open compression metadata and get chunk information
+            CompressionMetadata meta = new CompressionMetadata(metadata.getPath(), file.length(), ChecksumType.CRC32);
+            CompressionMetadata.Chunk chunk = meta.chunkFor(0);
+
+            try(RandomAccessReader reader = new CompressedRandomAccessReader.Builder(channel, meta).build())
+            {// read and verify compressed data
+                assertEquals(CONTENT, reader.readLine());
 
-                final RandomAccessReader r = CompressedRandomAccessReader.open(channel, meta);
+                Random random = new Random();
+                RandomAccessFile checksumModifier = null;
 
-                Throwable exception = null;
                 try
                 {
-                    r.readLine();
+                    checksumModifier = new RandomAccessFile(file, "rw");
+                    byte[] checksum = new byte[4];
+
+                    // seek to the end of the compressed chunk
+                    checksumModifier.seek(chunk.length);
+                    // read checksum bytes
+                    checksumModifier.read(checksum);
+                    // seek back to the chunk end
+                    checksumModifier.seek(chunk.length);
+
+                    // lets modify one byte of the checksum on each iteration
+                    for (int i = 0; i < checksum.length; i++)
+                    {
+                        checksumModifier.write(random.nextInt());
+                        SyncUtil.sync(checksumModifier); // making sure that change was synced with disk
+
+                        try (final RandomAccessReader r = new CompressedRandomAccessReader.Builder(channel, meta).build())
+                        {
+                            Throwable exception = null;
+                            try
+                            {
+                                r.readLine();
+                            }
+                            catch (Throwable t)
+                            {
+                                exception = t;
+                            }
+                            assertNotNull(exception);
+                            assertSame(exception.getClass(), CorruptSSTableException.class);
+                            assertSame(exception.getCause().getClass(), CorruptBlockException.class);
+                        }
+                    }
+
+                    // lets write original checksum and check if we can read data
+                    updateChecksum(checksumModifier, chunk.length, checksum);
+
+                    try (RandomAccessReader cr = new CompressedRandomAccessReader.Builder(channel, meta).build())
+                    {
+                        // read and verify compressed data
+                        assertEquals(CONTENT, cr.readLine());
+                        // close reader
+                    }
                 }
-                catch (Throwable t)
+                finally
                 {
-                    exception = t;
+                    if (checksumModifier != null)
+                        checksumModifier.close();
                 }
-                assertNotNull(exception);
-                assertEquals(exception.getClass(), CorruptSSTableException.class);
-                assertEquals(exception.getCause().getClass(), CorruptBlockException.class);
-
-                r.close();
             }
-
-            // lets write original checksum and check if we can read data
-            updateChecksum(checksumModifier, chunk.length, checksum);
-
-            reader = CompressedRandomAccessReader.open(channel, meta);
-            // read and verify compressed data
-            assertEquals(CONTENT, reader.readLine());
-            // close reader
-            reader.close();
-        }
-        finally
-        {
-            channel.close();
-
-            if (checksumModifier != null)
-                checksumModifier.close();
         }
     }
 
-    private void updateChecksum(RandomAccessFile file, long checksumOffset, byte[] checksum) throws IOException
+    private static void updateChecksum(RandomAccessFile file, long checksumOffset, byte[] checksum) throws IOException
     {
         file.seek(checksumOffset);
         file.write(checksum);