You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2015/05/26 17:22:21 UTC

cassandra git commit: Improve ByteBuffer compression interface

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 28afa1c27 -> 3adfd1575


Improve ByteBuffer compression interface

Patch by blambov; reviewed by jmckenzie for CASSANDRA-9096


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3adfd157
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3adfd157
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3adfd157

Branch: refs/heads/cassandra-2.2
Commit: 3adfd1575cc3cf81397a7d148d5b2ed768b67014
Parents: 28afa1c
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue May 26 11:18:50 2015 -0400
Committer: Josh McKenzie <jo...@datastax.com>
Committed: Tue May 26 11:18:50 2015 -0400

----------------------------------------------------------------------
 .../db/commitlog/CompressedSegment.java         |  44 +++---
 .../cassandra/io/compress/BufferType.java       |  45 ++++++
 .../compress/CompressedRandomAccessReader.java  |  50 +++----
 .../io/compress/CompressedSequentialWriter.java |  47 +++---
 .../io/compress/DeflateCompressor.java          | 133 +++++++++++++----
 .../cassandra/io/compress/ICompressor.java      |  41 +++--
 .../cassandra/io/compress/LZ4Compressor.java    |  57 ++++---
 .../cassandra/io/compress/SnappyCompressor.java |  38 +++--
 .../io/util/ChecksummedSequentialWriter.java    |   6 +-
 .../cassandra/io/util/RandomAccessReader.java   |  13 +-
 .../cassandra/io/util/SequentialWriter.java     |  10 +-
 .../cassandra/io/util/ThrottledReader.java      |   4 +-
 .../org/apache/cassandra/utils/FBUtilities.java |   9 +-
 .../db/commitlog/CommitLogStressTest.java       |  21 +--
 .../io/compress/CompressorPerformance.java      |  99 ++++++++++++
 .../cassandra/io/RandomAccessReaderTest.java    |  11 +-
 .../CompressedRandomAccessReaderTest.java       |   2 +-
 .../CompressedSequentialWriterTest.java         |   4 +-
 .../cassandra/io/compress/CompressorTest.java   | 149 +++++++++++--------
 .../cassandra/io/util/DataOutputTest.java       |   4 +-
 .../cassandra/io/util/SequentialWriterTest.java |   4 +-
 21 files changed, 504 insertions(+), 287 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index f3a80bc..73bc5e2 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@ -24,8 +24,8 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.BufferType;
 import org.apache.cassandra.io.compress.ICompressor;
-import org.apache.cassandra.io.compress.ICompressor.WrappedByteBuffer;
 import org.apache.cassandra.io.util.FileUtils;
 
 /*
@@ -34,10 +34,10 @@ import org.apache.cassandra.io.util.FileUtils;
  */
 public class CompressedSegment extends CommitLogSegment
 {
-    static private final ThreadLocal<WrappedByteBuffer> compressedBufferHolder = new ThreadLocal<WrappedByteBuffer>() {
-        protected WrappedByteBuffer initialValue()
+    static private final ThreadLocal<ByteBuffer> compressedBufferHolder = new ThreadLocal<ByteBuffer>() {
+        protected ByteBuffer initialValue()
         {
-            return new WrappedByteBuffer(ByteBuffer.allocate(0));
+            return ByteBuffer.allocate(0);
         }
     };
     static Queue<ByteBuffer> bufferPool = new ConcurrentLinkedQueue<>();
@@ -69,17 +69,9 @@ public class CompressedSegment extends CommitLogSegment
         }
     }
 
-    static ByteBuffer allocate(ICompressor compressor, int size)
-    {
-        if (compressor.useDirectOutputByteBuffers())
-            return ByteBuffer.allocateDirect(size);
-        else
-            return ByteBuffer.allocate(size);
-    }
-    
     ByteBuffer allocate(int size)
     {
-        return allocate(compressor, size);
+        return compressor.preferredBufferType().allocate(size);
     }
 
     ByteBuffer createBuffer(CommitLog commitLog)
@@ -88,7 +80,7 @@ public class CompressedSegment extends CommitLogSegment
         if (buf == null)
         {
             // this.compressor is not yet set, so we must use the commitLog's one.
-            buf = allocate(commitLog.compressor, DatabaseDescriptor.getCommitLogSegmentSize());
+            buf = commitLog.compressor.preferredBufferType().allocate(DatabaseDescriptor.getCommitLogSegmentSize());
         } else
             buf.clear();
         return buf;
@@ -104,26 +96,24 @@ public class CompressedSegment extends CommitLogSegment
         // The length may be 0 when the segment is being closed.
         assert length > 0 || length == 0 && !isStillAllocating();
 
-        try {
-
-            int compressedLength = compressor.initialCompressedBufferLength(length);
-            WrappedByteBuffer wrappedCompressedBuffer = compressedBufferHolder.get();
-            ByteBuffer compressedBuffer = wrappedCompressedBuffer.buffer;
-            if (compressedBuffer.isDirect() != compressor.useDirectOutputByteBuffers() ||
-                compressedBuffer.capacity() < compressedLength + COMPRESSED_MARKER_SIZE)
+        try
+        {
+            int neededBufferSize = compressor.initialCompressedBufferLength(length) + COMPRESSED_MARKER_SIZE;
+            ByteBuffer compressedBuffer = compressedBufferHolder.get();
+            if (compressor.preferredBufferType() != BufferType.typeOf(compressedBuffer) ||
+                compressedBuffer.capacity() < neededBufferSize)
             {
-                compressedBuffer = allocate(compressedLength + COMPRESSED_MARKER_SIZE);
-                FileUtils.clean(wrappedCompressedBuffer.buffer);
-                wrappedCompressedBuffer.buffer = compressedBuffer;
+                FileUtils.clean(compressedBuffer);
+                compressedBuffer = allocate(neededBufferSize);
+                compressedBufferHolder.set(compressedBuffer);
             }
 
             ByteBuffer inputBuffer = buffer.duplicate();
             inputBuffer.limit(contentStart + length).position(contentStart);
             compressedBuffer.limit(compressedBuffer.capacity()).position(COMPRESSED_MARKER_SIZE);
-            compressedLength = compressor.compress(inputBuffer, wrappedCompressedBuffer);
+            compressor.compress(inputBuffer, compressedBuffer);
 
-            compressedBuffer.position(0);
-            compressedBuffer.limit(COMPRESSED_MARKER_SIZE + compressedLength);
+            compressedBuffer.flip();
             compressedBuffer.putInt(SYNC_MARKER_SIZE, length);
 
             // Only one thread can be here at a given time.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/src/java/org/apache/cassandra/io/compress/BufferType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/BufferType.java b/src/java/org/apache/cassandra/io/compress/BufferType.java
new file mode 100644
index 0000000..8817802
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/compress/BufferType.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.compress;
+
+import java.nio.ByteBuffer;
+
+public enum BufferType
+{
+    ON_HEAP
+    {
+        public ByteBuffer allocate(int size)
+        {
+            return ByteBuffer.allocate(size);
+        }
+    },
+    OFF_HEAP
+    {
+        public ByteBuffer allocate(int size)
+        {
+            return ByteBuffer.allocateDirect(size);
+        }
+    };
+
+    public abstract ByteBuffer allocate(int size);
+
+    public static BufferType typeOf(ByteBuffer buffer)
+    {
+        return buffer.isDirect() ? OFF_HEAP : ON_HEAP;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index 1febe37..e6ac60a 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.io.compress;
 import java.io.*;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.ThreadLocalRandom;
@@ -29,8 +28,6 @@ import java.util.zip.Adler32;
 
 import com.google.common.primitives.Ints;
 
-import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.util.*;
@@ -81,24 +78,23 @@ public class CompressedRandomAccessReader extends RandomAccessReader
 
     protected CompressedRandomAccessReader(ChannelProxy channel, CompressionMetadata metadata, ICompressedFile file) throws FileNotFoundException
     {
-        super(channel, metadata.chunkLength(), metadata.compressedFileLength, metadata.compressor().useDirectOutputByteBuffers(), file instanceof PoolingSegmentedFile ? (PoolingSegmentedFile) file : null);
+        super(channel, metadata.chunkLength(), metadata.compressedFileLength, metadata.compressor().preferredBufferType(), file instanceof PoolingSegmentedFile ? (PoolingSegmentedFile) file : null);
         this.metadata = metadata;
         checksum = new Adler32();
 
         chunkSegments = file == null ? null : file.chunkSegments();
         if (chunkSegments == null)
         {
-            compressed = super.allocateBuffer(metadata.compressor().initialCompressedBufferLength(metadata.chunkLength()), metadata.compressor().useDirectOutputByteBuffers());
+            compressed = super.allocateBuffer(metadata.compressor().initialCompressedBufferLength(metadata.chunkLength()), metadata.compressor().preferredBufferType());
             checksumBytes = ByteBuffer.wrap(new byte[4]);
         }
     }
 
-    protected ByteBuffer allocateBuffer(int bufferSize, boolean useDirect)
+    @Override
+    protected ByteBuffer allocateBuffer(int bufferSize, BufferType bufferType)
     {
         assert Integer.bitCount(bufferSize) == 1;
-        return useDirect
-                ? ByteBuffer.allocateDirect(bufferSize)
-                : ByteBuffer.allocate(bufferSize);
+        return bufferType.allocate(bufferSize);
     }
 
     @Override
@@ -120,33 +116,32 @@ public class CompressedRandomAccessReader extends RandomAccessReader
             CompressionMetadata.Chunk chunk = metadata.chunkFor(position);
 
             if (compressed.capacity() < chunk.length)
-                compressed = ByteBuffer.wrap(new byte[chunk.length]);
+                compressed = allocateBuffer(chunk.length, metadata.compressor().preferredBufferType());
             else
                 compressed.clear();
             compressed.limit(chunk.length);
 
             if (channel.read(compressed, chunk.offset) != chunk.length)
                 throw new CorruptBlockException(getPath(), chunk);
-
-            // technically flip() is unnecessary since all the remaining work uses the raw array, but if that changes
-            // in the future this will save a lot of hair-pulling
             compressed.flip();
             buffer.clear();
-            int decompressedBytes;
+
             try
             {
-                decompressedBytes = metadata.compressor().uncompress(compressed, buffer);
-                buffer.limit(decompressedBytes);
+                metadata.compressor().uncompress(compressed, buffer);
             }
             catch (IOException e)
             {
-                buffer.limit(0);
                 throw new CorruptBlockException(getPath(), chunk);
             }
+            finally
+            {
+                buffer.flip();
+            }
 
             if (metadata.parameters.getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
             {
-                compressed.position(0);
+                compressed.rewind();
                 FBUtilities.directCheckSum(checksum, compressed);
 
                 if (checksum(chunk) != (int) checksum.getValue())
@@ -186,39 +181,32 @@ public class CompressedRandomAccessReader extends RandomAccessReader
             Map.Entry<Long, MappedByteBuffer> entry = chunkSegments.floorEntry(chunk.offset);
             long segmentOffset = entry.getKey();
             int chunkOffset = Ints.checkedCast(chunk.offset - segmentOffset);
-            ByteBuffer compressedChunk = entry.getValue().duplicate();
+            ByteBuffer compressedChunk = entry.getValue().duplicate(); // TODO: change to slice(chunkOffset) when we upgrade LZ4-java
 
-            compressedChunk.position(chunkOffset);
-            compressedChunk.limit(chunkOffset + chunk.length);
-            compressedChunk.mark();
+            compressedChunk.position(chunkOffset).limit(chunkOffset + chunk.length);
 
             buffer.clear();
-            int decompressedBytes;
+
             try
             {
-                decompressedBytes = metadata.compressor().uncompress(compressedChunk, buffer);
-                buffer.limit(decompressedBytes);
+                metadata.compressor().uncompress(compressedChunk, buffer);
             }
             catch (IOException e)
             {
-                buffer.limit(0);
                 throw new CorruptBlockException(getPath(), chunk);
             }
             finally
             {
-                compressedChunk.limit(compressedChunk.capacity());
+                buffer.flip();
             }
 
             if (metadata.parameters.getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
             {
-                compressedChunk.reset();
-                compressedChunk.limit(chunkOffset + chunk.length);
+                compressedChunk.position(chunkOffset).limit(chunkOffset + chunk.length);
 
                 FBUtilities.directCheckSum(checksum, compressedChunk);
 
                 compressedChunk.limit(compressedChunk.capacity());
-
-
                 if (compressedChunk.getInt() != (int) checksum.getValue())
                     throw new CorruptBlockException(getPath(), chunk);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index 6218526..9c7c776 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -47,7 +47,7 @@ public class CompressedSequentialWriter extends SequentialWriter
     private final ICompressor compressor;
 
     // used to store compressed data
-    private final ICompressor.WrappedByteBuffer compressed;
+    private ByteBuffer compressed;
 
     // holds a number of already written chunks
     private int chunkCount = 0;
@@ -63,13 +63,11 @@ public class CompressedSequentialWriter extends SequentialWriter
                                       CompressionParameters parameters,
                                       MetadataCollector sstableMetadataCollector)
     {
-        super(file, parameters.chunkLength(), parameters.sstableCompressor.useDirectOutputByteBuffers());
+        super(file, parameters.chunkLength(), parameters.sstableCompressor.preferredBufferType());
         this.compressor = parameters.sstableCompressor;
 
         // buffer for compression should be the same size as buffer itself
-        compressed = compressor.useDirectOutputByteBuffers()
-            ? new ICompressor.WrappedByteBuffer(ByteBuffer.allocateDirect(compressor.initialCompressedBufferLength(buffer.capacity())))
-            : new ICompressor.WrappedByteBuffer(ByteBuffer.allocate(compressor.initialCompressedBufferLength(buffer.capacity())));
+        compressed = compressor.preferredBufferType().allocate(compressor.initialCompressedBufferLength(buffer.capacity()));
 
         /* Index File (-CompressionInfo.db component) and it's header */
         metadataWriter = CompressionMetadata.Writer.open(parameters, offsetsPath);
@@ -102,22 +100,19 @@ public class CompressedSequentialWriter extends SequentialWriter
     {
         seekToChunkStart(); // why is this necessary? seems like it should always be at chunk start in normal operation
 
-        int compressedLength;
         try
         {
             // compressing data with buffer re-use
             buffer.flip();
-            compressed.buffer.clear();
-            compressedLength = compressor.compress(buffer, compressed);
-
-            // Compressors don't modify sentinels in our BB - we rely on buffer.position() for bufferOffset adjustment
-            buffer.position(buffer.limit());
+            compressed.clear();
+            compressor.compress(buffer, compressed);
         }
         catch (IOException e)
         {
             throw new RuntimeException("Compression exception", e); // shouldn't happen
         }
 
+        int compressedLength = compressed.position();
         uncompressedSize += buffer.position();
         compressedSize += compressedLength;
 
@@ -127,15 +122,13 @@ public class CompressedSequentialWriter extends SequentialWriter
             metadataWriter.addOffset(chunkOffset);
             chunkCount++;
 
-            assert compressedLength <= compressed.buffer.capacity();
-
             // write out the compressed data
-            compressed.buffer.flip();
-            channel.write(compressed.buffer);
+            compressed.flip();
+            channel.write(compressed);
 
             // write corresponding checksum
-            compressed.buffer.rewind();
-            crcMetadata.appendDirect(compressed.buffer, true);
+            compressed.rewind();
+            crcMetadata.appendDirect(compressed, true);
             lastFlushOffset += compressedLength + 4;
 
             // adjust our bufferOffset to account for the new uncompressed data we've now written out
@@ -189,24 +182,22 @@ public class CompressedSequentialWriter extends SequentialWriter
 
         // compressed chunk size (- 4 bytes reserved for checksum)
         int chunkSize = (int) (metadataWriter.chunkOffsetBy(realMark.nextChunkIndex) - chunkOffset - 4);
-        if (compressed.buffer.capacity() < chunkSize)
-            compressed.buffer = compressor.useDirectOutputByteBuffers()
-                    ? ByteBuffer.allocateDirect(chunkSize)
-                    : ByteBuffer.allocate(chunkSize);
+        if (compressed.capacity() < chunkSize)
+            compressed = compressor.preferredBufferType().allocate(chunkSize);
 
         try
         {
-            compressed.buffer.clear();
-            compressed.buffer.limit(chunkSize);
+            compressed.clear();
+            compressed.limit(chunkSize);
             channel.position(chunkOffset);
-            channel.read(compressed.buffer);
+            channel.read(compressed);
 
             try
             {
                 // Repopulate buffer from compressed data
                 buffer.clear();
-                compressed.buffer.flip();
-                compressor.uncompress(compressed.buffer, buffer);
+                compressed.flip();
+                compressor.uncompress(compressed, buffer);
             }
             catch (IOException e)
             {
@@ -214,8 +205,8 @@ public class CompressedSequentialWriter extends SequentialWriter
             }
 
             Adler32 checksum = new Adler32();
-
-            FBUtilities.directCheckSum(checksum, compressed.buffer);
+            compressed.rewind();
+            FBUtilities.directCheckSum(checksum, compressed);
 
             crcCheckBuffer.clear();
             channel.read(crcCheckBuffer);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java b/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
index 833c375..f2ccd64 100644
--- a/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
+++ b/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
@@ -17,7 +17,7 @@
  */
 package org.apache.cassandra.io.compress;
 
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -66,45 +66,124 @@ public class DeflateCompressor implements ICompressor
         return Collections.emptySet();
     }
 
-    public int initialCompressedBufferLength(int chunkLength)
+    public int initialCompressedBufferLength(int sourceLen)
     {
-        return chunkLength;
+        // Taken from zlib deflateBound(). See http://www.zlib.net/zlib_tech.html.
+        return sourceLen + (sourceLen >> 12) + (sourceLen >> 14) + (sourceLen >> 25) + 13;
     }
 
-    public int compress(ByteBuffer src, ICompressor.WrappedByteBuffer dest)
+    public void compress(ByteBuffer input, ByteBuffer output)
     {
-        assert dest.buffer.hasArray();
+        if (input.hasArray() && output.hasArray())
+        {
+            int length = compressArray(input.array(), input.arrayOffset() + input.position(), input.remaining(),
+                                       output.array(), output.arrayOffset() + output.position(), output.remaining());
+            input.position(input.limit());
+            output.position(output.position() + length);
+        }
+        else
+            compressBuffer(input, output);
+    }
 
+    public int compressArray(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength)
+    {
         Deflater def = deflater.get();
         def.reset();
-        def.setInput(src.array(), src.arrayOffset() + src.position(), src.remaining());
+        def.setInput(input, inputOffset, inputLength);
         def.finish();
         if (def.needsInput())
             return 0;
 
-        int startPos = dest.buffer.position();
-        while (true)
+        int len = def.deflate(output, outputOffset, maxOutputLength);
+        assert def.finished();
+        return len;
+    }
+
+    public void compressBuffer(ByteBuffer input, ByteBuffer output)
+    {
+        Deflater def = deflater.get();
+        def.reset();
+
+        byte[] buffer = FBUtilities.getThreadLocalScratchBuffer();
+        // Use half the buffer for input, half for output.
+        int chunkLen = buffer.length / 2;
+        while (input.remaining() > chunkLen)
+        {
+            input.get(buffer, 0, chunkLen);
+            def.setInput(buffer, 0, chunkLen);
+            while (!def.needsInput())
+            {
+                int len = def.deflate(buffer, chunkLen, chunkLen);
+                output.put(buffer, chunkLen, len);
+            }
+        }
+        int inputLength = input.remaining();
+        input.get(buffer, 0, inputLength);
+        def.setInput(buffer, 0, inputLength);
+        def.finish();
+        while (!def.finished())
+        {
+            int len = def.deflate(buffer, chunkLen, chunkLen);
+            output.put(buffer, chunkLen, len);
+        }
+    }
+
+
+    public void uncompress(ByteBuffer input, ByteBuffer output) throws IOException
+    {
+        if (input.hasArray() && output.hasArray())
+        {
+            int length = uncompress(input.array(), input.arrayOffset() + input.position(), input.remaining(),
+                                    output.array(), output.arrayOffset() + output.position(), output.remaining());
+            input.position(input.limit());
+            output.position(output.position() + length);
+        }
+        else
+            uncompressBuffer(input, output);
+    }
+
+    public void uncompressBuffer(ByteBuffer input, ByteBuffer output) throws IOException
+    {
+        try
         {
-            int arrayOffset = dest.buffer.arrayOffset();
-            int len = def.deflate(dest.buffer.array(), arrayOffset + dest.buffer.position(), dest.buffer.remaining());
-            dest.buffer.position(dest.buffer.position() + len);
-            if (def.finished())
+            Inflater inf = inflater.get();
+            inf.reset();
+
+            byte[] buffer = FBUtilities.getThreadLocalScratchBuffer();
+            // Use half the buffer for input, half for output.
+            int chunkLen = buffer.length / 2;
+            while (input.remaining() > chunkLen)
             {
-                return dest.buffer.position() - startPos;
+                input.get(buffer, 0, chunkLen);
+                inf.setInput(buffer, 0, chunkLen);
+                while (!inf.needsInput())
+                {
+                    int len = inf.inflate(buffer, chunkLen, chunkLen);
+                    output.put(buffer, chunkLen, len);
+                }
             }
-            else
+            int inputLength = input.remaining();
+            input.get(buffer, 0, inputLength);
+            inf.setInput(buffer, 0, inputLength);
+            while (!inf.needsInput())
             {
-                // We're not done, output was too small. Increase it and continue
-                ByteBuffer newDest = ByteBuffer.allocate(dest.buffer.capacity()*4/3 + 1);
-                dest.buffer.rewind();
-                newDest.put(dest.buffer);
-                dest.buffer = newDest;
+                int len = inf.inflate(buffer, chunkLen, chunkLen);
+                output.put(buffer, chunkLen, len);
             }
         }
+        catch (DataFormatException e)
+        {
+            throw new IOException(e);
+        }
     }
 
     public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) throws IOException
     {
+        return uncompress(input, inputOffset, inputLength, output, outputOffset, output.length - outputOffset);
+    }
+
+    public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength) throws IOException
+    {
         Inflater inf = inflater.get();
         inf.reset();
         inf.setInput(input, inputOffset, inputLength);
@@ -114,7 +193,7 @@ public class DeflateCompressor implements ICompressor
         // We assume output is big enough
         try
         {
-            return inf.inflate(output, outputOffset, output.length - outputOffset);
+            return inf.inflate(output, outputOffset, maxOutputLength);
         }
         catch (DataFormatException e)
         {
@@ -122,18 +201,14 @@ public class DeflateCompressor implements ICompressor
         }
     }
 
-    public int uncompress(ByteBuffer input, ByteBuffer output) throws IOException
+    public boolean supports(BufferType bufferType)
     {
-        if (!output.hasArray())
-            throw new IllegalArgumentException("DeflateCompressor doesn't work with direct byte buffers");
-
-        if (input.hasArray())
-            return uncompress(input.array(), input.arrayOffset() + input.position(), input.remaining(), output.array(), output.arrayOffset() + output.position());
-        return uncompress(ByteBufferUtil.getArray(input), 0, input.remaining(), output.array(), output.arrayOffset() + output.position());
+        return true;
     }
 
-    public boolean useDirectOutputByteBuffers()
+    public BufferType preferredBufferType()
     {
-        return false;
+        // Prefer array-backed buffers.
+        return BufferType.ON_HEAP;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/src/java/org/apache/cassandra/io/compress/ICompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/ICompressor.java b/src/java/org/apache/cassandra/io/compress/ICompressor.java
index 0326a9f..5719834 100644
--- a/src/java/org/apache/cassandra/io/compress/ICompressor.java
+++ b/src/java/org/apache/cassandra/io/compress/ICompressor.java
@@ -28,38 +28,33 @@ public interface ICompressor
     public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) throws IOException;
 
     /**
-     * Compression for ByteBuffers
+     * Compression for ByteBuffers.
+     *
+     * The data between input.position() and input.limit() is compressed and placed into output starting from output.position().
+     * Positions in both buffers are moved to reflect the bytes read and written. Limits are not changed.
      */
-    public int compress(ByteBuffer input, WrappedByteBuffer output) throws IOException;
+    public void compress(ByteBuffer input, ByteBuffer output) throws IOException;
 
     /**
-     * Decompression for DirectByteBuffers
+     * Decompression for DirectByteBuffers.
+     *
+     * The data between input.position() and input.limit() is uncompressed and placed into output starting from output.position().
+     * Positions in both buffers are moved to reflect the bytes read and written. Limits are not changed.
      */
-    public int uncompress(ByteBuffer input, ByteBuffer output) throws IOException;
+    public void uncompress(ByteBuffer input, ByteBuffer output) throws IOException;
 
     /**
-     * Notifies user if this compressor will wants/requires a direct byte buffers to
-     * decompress direct byteBuffers
+     * Returns the preferred (most efficient) buffer type for this compressor.
      */
-    public boolean useDirectOutputByteBuffers();
-
-    public Set<String> supportedOptions();
+    public BufferType preferredBufferType();
 
     /**
-     * A simple wrapped Bytebuffer.
-     * Not all implementations allow us to know the maximum size after
-     * compression. This makes it hard to size the output buffer for compression
-     * (and we want to reuse the buffer).  Instead we use this wrapped ByteBuffer
-     * so that compress(...) can have the liberty to resize the underlying array if
-     * necessary.
+     * Checks if the given buffer would be supported by the compressor. If a type is supported the compressor must be
+     * able to use it in combination with all other supported types.
+     *
+     * Direct and memory-mapped buffers must be supported by all compressors.
      */
-    public static class WrappedByteBuffer
-    {
-        public ByteBuffer buffer;
+    public boolean supports(BufferType bufferType);
 
-        public WrappedByteBuffer(ByteBuffer buffer)
-        {
-            this.buffer = buffer;
-        }
-    }
+    public Set<String> supportedOptions();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java b/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
index 9d54048..5fd4309 100644
--- a/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
+++ b/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
@@ -55,20 +55,17 @@ public class LZ4Compressor implements ICompressor
         return INTEGER_BYTES + compressor.maxCompressedLength(chunkLength);
     }
 
-    public int compress(ByteBuffer src, WrappedByteBuffer dest) throws IOException
+    public void compress(ByteBuffer input, ByteBuffer output) throws IOException
     {
-        final ByteBuffer buf = dest.buffer;
-        int len = src.remaining();
-        dest.buffer.put((byte) len);
-        dest.buffer.put((byte) (len >>> 8));
-        dest.buffer.put((byte) (len >>> 16));
-        dest.buffer.put((byte) (len >>> 24));
-
-        int start = dest.buffer.position();
+        int len = input.remaining();
+        output.put((byte) len);
+        output.put((byte) (len >>> 8));
+        output.put((byte) (len >>> 16));
+        output.put((byte) (len >>> 24));
+
         try
         {
-            compressor.compress(src, dest.buffer);
-            return INTEGER_BYTES + (buf.position() - start);
+            compressor.compress(input, output);
         }
         catch (LZ4Exception e)
         {
@@ -103,44 +100,42 @@ public class LZ4Compressor implements ICompressor
         return decompressedLength;
     }
 
-    public int uncompress(ByteBuffer input, ByteBuffer output) throws IOException
+    public void uncompress(ByteBuffer input, ByteBuffer output) throws IOException
     {
-        if (input.hasArray() && output.hasArray())
-            return uncompress(input.array(), input.arrayOffset() + input.position(), input.remaining(), output.array(), output.arrayOffset() + output.position());
-
-        int pos = input.position();
-        final int decompressedLength = (input.get(pos) & 0xFF)
-                | ((input.get(pos + 1) & 0xFF) << 8)
-                | ((input.get(pos + 2) & 0xFF) << 16)
-                | ((input.get(pos + 3) & 0xFF) << 24);
-        int inputLength = input.remaining() - INTEGER_BYTES;
+        final int decompressedLength = (input.get() & 0xFF)
+                | ((input.get() & 0xFF) << 8)
+                | ((input.get() & 0xFF) << 16)
+                | ((input.get() & 0xFF) << 24);
 
-        final int compressedLength;
         try
         {
-            compressedLength = decompressor.decompress(input, input.position() + INTEGER_BYTES, output, output.position(), decompressedLength);
+            int compressedLength = decompressor.decompress(input, input.position(), output, output.position(), decompressedLength);
+            input.position(input.position() + compressedLength);
+            output.position(output.position() + decompressedLength);
         }
         catch (LZ4Exception e)
         {
             throw new IOException(e);
         }
 
-        if (compressedLength != inputLength)
+        if (input.remaining() > 0)
         {
-            throw new IOException("Compressed lengths mismatch - got: "+compressedLength+" vs expected: "+inputLength);
+            throw new IOException("Compressed lengths mismatch - "+input.remaining()+" bytes remain");
         }
+    }
 
-        return decompressedLength;
+    public Set<String> supportedOptions()
+    {
+        return new HashSet<>(Arrays.asList(CompressionParameters.CRC_CHECK_CHANCE));
     }
 
-    @Override
-    public boolean useDirectOutputByteBuffers()
+    public BufferType preferredBufferType()
     {
-        return true;
+        return BufferType.OFF_HEAP;
     }
 
-    public Set<String> supportedOptions()
+    public boolean supports(BufferType bufferType)
     {
-        return new HashSet<>(Arrays.asList(CompressionParameters.CRC_CHECK_CHANCE));
+        return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java b/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java
index 04f676b..9fc170a 100644
--- a/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java
+++ b/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java
@@ -79,14 +79,15 @@ public class SnappyCompressor implements ICompressor
         return Snappy.maxCompressedLength(chunkLength);
     }
 
-    public int compress(ByteBuffer src, WrappedByteBuffer dest) throws IOException
+    public void compress(ByteBuffer input, ByteBuffer output) throws IOException
     {
-        int result = Snappy.compress(src, dest.buffer);
+        int dlimit = output.limit();
+        Snappy.compress(input, output);
 
-        // Snappy doesn't match LZ4 and Deflate w/regards to state it leaves dest ByteBuffer's counters in
-        dest.buffer.position(dest.buffer.limit());
-        dest.buffer.limit(dest.buffer.capacity());
-        return result;
+        // Snappy doesn't match the ICompressor contract w/regards to state it leaves dest ByteBuffer's counters in
+        output.position(output.limit());
+        output.limit(dlimit);
+        input.position(input.limit());
     }
 
     public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) throws IOException
@@ -94,16 +95,27 @@ public class SnappyCompressor implements ICompressor
         return Snappy.rawUncompress(input, inputOffset, inputLength, output, outputOffset);
     }
 
-    public int uncompress(ByteBuffer input, ByteBuffer output) throws IOException
+    public void uncompress(ByteBuffer input, ByteBuffer output)
+            throws IOException
     {
-        if (input.hasArray() && output.hasArray())
-            return Snappy.rawUncompress(input.array(), input.arrayOffset() + input.position(), input.remaining(), output.array(), output.arrayOffset() + output.position());
-        return Snappy.uncompress(input, output);
+        int dlimit = output.limit();
+        Snappy.uncompress(input, output);
+
+        // Snappy doesn't match the ICompressor contract w/regards to state it leaves dest ByteBuffer's counters in
+        output.position(output.limit());
+        output.limit(dlimit);
+        input.position(input.limit());
+    }
+
+    public BufferType preferredBufferType()
+    {
+        return BufferType.OFF_HEAP;
     }
 
-    @Override
-    public boolean useDirectOutputByteBuffers()
+    public boolean supports(BufferType bufferType)
     {
-        return true;
+        // Snappy can't deal with different input and output buffer types.
+        // To avoid possible problems, pretend it can't support array-backed at all.
+        return bufferType == BufferType.OFF_HEAP;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
index ec68c2d..d5e6be9 100644
--- a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.io.util;
 import java.io.File;
 import java.nio.ByteBuffer;
 
+import org.apache.cassandra.io.compress.BufferType;
+
 public class ChecksummedSequentialWriter extends SequentialWriter
 {
     private final SequentialWriter crcWriter;
@@ -27,8 +29,8 @@ public class ChecksummedSequentialWriter extends SequentialWriter
 
     public ChecksummedSequentialWriter(File file, int bufferSize, File crcPath)
     {
-        super(file, bufferSize, false);
-        crcWriter = new SequentialWriter(crcPath, 8 * 1024, false);
+        super(file, bufferSize, BufferType.ON_HEAP);
+        crcWriter = new SequentialWriter(crcPath, 8 * 1024, BufferType.ON_HEAP);
         crcMetadata = new DataIntegrityMetadata.ChecksumWriter(crcWriter.stream);
         crcMetadata.writeChunkSize(buffer.capacity());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index 328095b..302f054 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.compress.BufferType;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class RandomAccessReader extends AbstractDataInput implements FileDataInput
@@ -45,7 +46,7 @@ public class RandomAccessReader extends AbstractDataInput implements FileDataInp
 
     protected final PoolingSegmentedFile owner;
 
-    protected RandomAccessReader(ChannelProxy channel, int bufferSize, long overrideLength, boolean useDirectBuffer, PoolingSegmentedFile owner)
+    protected RandomAccessReader(ChannelProxy channel, int bufferSize, long overrideLength, BufferType bufferType, PoolingSegmentedFile owner)
     {
         this.channel = channel.sharedCopy();
         this.owner = owner;
@@ -57,16 +58,14 @@ public class RandomAccessReader extends AbstractDataInput implements FileDataInp
         // we can cache file length in read-only mode
         fileLength = overrideLength <= 0 ? channel.size() : overrideLength;
 
-        buffer = allocateBuffer(bufferSize, useDirectBuffer);
+        buffer = allocateBuffer(bufferSize, bufferType);
         buffer.limit(0);
     }
 
-    protected ByteBuffer allocateBuffer(int bufferSize, boolean useDirectBuffer)
+    protected ByteBuffer allocateBuffer(int bufferSize, BufferType bufferType)
     {
         int size = (int) Math.min(fileLength, bufferSize);
-        return useDirectBuffer
-                ? ByteBuffer.allocateDirect(size)
-                : ByteBuffer.allocate(size);
+        return bufferType.allocate(size);
     }
 
     public static RandomAccessReader open(ChannelProxy channel, long overrideSize, PoolingSegmentedFile owner)
@@ -100,7 +99,7 @@ public class RandomAccessReader extends AbstractDataInput implements FileDataInp
 
     private static RandomAccessReader open(ChannelProxy channel, int bufferSize, long overrideSize, PoolingSegmentedFile owner)
     {
-        return new RandomAccessReader(channel, bufferSize, overrideSize, false, owner);
+        return new RandomAccessReader(channel, bufferSize, overrideSize, BufferType.ON_HEAP, owner);
     }
 
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 3c35a34..304f702 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -27,6 +27,7 @@ import java.nio.file.StandardOpenOption;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.BufferType;
 import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.io.sstable.Descriptor;
@@ -49,7 +50,6 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
     private final String filePath;
 
     protected ByteBuffer buffer;
-    private final int fd;
     private int directoryFD;
     // directory should be synced only after first file sync, in other words, only once per file
     private boolean directorySynced = false;
@@ -119,7 +119,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
         }
     }
 
-    public SequentialWriter(File file, int bufferSize, boolean offheap)
+    public SequentialWriter(File file, int bufferSize, BufferType bufferType)
     {
         try
         {
@@ -136,13 +136,11 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
         filePath = file.getAbsolutePath();
 
         // Allow children to allocate buffer as direct (snappy compression) if necessary
-        buffer = offheap ? ByteBuffer.allocateDirect(bufferSize) : ByteBuffer.allocate(bufferSize);
+        buffer = bufferType.allocate(bufferSize);
 
         this.trickleFsync = DatabaseDescriptor.getTrickleFsync();
         this.trickleFsyncByteInterval = DatabaseDescriptor.getTrickleFsyncIntervalInKb() * 1024;
 
-        fd = CLibrary.getfd(channel);
-
         directoryFD = CLibrary.tryOpenDirectory(file.getParent());
         stream = new WrappedDataOutputStreamPlus(this, this);
     }
@@ -152,7 +150,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
      */
     public static SequentialWriter open(File file)
     {
-        return new SequentialWriter(file, RandomAccessReader.DEFAULT_BUFFER_SIZE, false);
+        return new SequentialWriter(file, RandomAccessReader.DEFAULT_BUFFER_SIZE, BufferType.ON_HEAP);
     }
 
     public static ChecksummedSequentialWriter open(File file, File crcPath)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/src/java/org/apache/cassandra/io/util/ThrottledReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ThrottledReader.java b/src/java/org/apache/cassandra/io/util/ThrottledReader.java
index 38e5cff..f725984 100644
--- a/src/java/org/apache/cassandra/io/util/ThrottledReader.java
+++ b/src/java/org/apache/cassandra/io/util/ThrottledReader.java
@@ -25,13 +25,15 @@ import java.io.FileNotFoundException;
 
 import com.google.common.util.concurrent.RateLimiter;
 
+import org.apache.cassandra.io.compress.BufferType;
+
 public class ThrottledReader extends RandomAccessReader
 {
     private final RateLimiter limiter;
 
     protected ThrottledReader(ChannelProxy channel, long overrideLength, RateLimiter limiter) throws FileNotFoundException
     {
-        super(channel, RandomAccessReader.DEFAULT_BUFFER_SIZE, overrideLength, false, null);
+        super(channel, RandomAccessReader.DEFAULT_BUFFER_SIZE, overrideLength, BufferType.ON_HEAP, null);
         this.limiter = limiter;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index b61f956..c20e33e 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -638,7 +638,7 @@ public class FBUtilities
         }
     }
 
-    private static final ThreadLocal<byte[]> localDigestBuffer = new ThreadLocal<byte[]>()
+    private static final ThreadLocal<byte[]> threadLocalScratchBuffer = new ThreadLocal<byte[]>()
     {
         @Override
         protected byte[] initialValue()
@@ -647,6 +647,11 @@ public class FBUtilities
         }
     };
 
+    public static byte[] getThreadLocalScratchBuffer()
+    {
+        return threadLocalScratchBuffer.get();
+    }
+
     //Java 7 has this method but it's private till Java 8. Thanks JDK!
     public static boolean supportsDirectChecksum()
     {
@@ -674,7 +679,7 @@ public class FBUtilities
         }
 
         //Fallback
-        byte[] buffer = localDigestBuffer.get();
+        byte[] buffer = getThreadLocalScratchBuffer();
 
         int remaining;
         while ((remaining = bb.remaining()) > 0)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
index a8cf8fd..7f9df9e 100644
--- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@ -30,6 +30,7 @@ import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
@@ -38,7 +39,6 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import junit.framework.Assert;
 
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.RateLimiter;
 
 import org.junit.BeforeClass;
@@ -53,7 +53,6 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.Cell;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ColumnSerializer;
-import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.io.util.FastByteArrayInputStream;
 
@@ -110,7 +109,7 @@ public class CommitLogStressTest
             CommitLogStressTest tester = new CommitLogStressTest();
             tester.testFixedSize();
         }
-        catch (Exception e)
+        catch (Throwable e)
         {
             e.printStackTrace(System.err);
         }
@@ -272,7 +271,7 @@ public class CommitLogStressTest
     {
         stop = false;
         for (int ii = 0; ii < NUM_THREADS; ii++) {
-            final CommitlogExecutor t = new CommitlogExecutor(commitLog);
+            final CommitlogExecutor t = new CommitlogExecutor(commitLog, new Random(ii));
             threads.add(t);
             t.start();
         }
@@ -313,7 +312,7 @@ public class CommitLogStressTest
         return maxMemory / (1024 * 1024);
     }
 
-    public static ByteBuffer randomBytes(int quantity, ThreadLocalRandom tlr) {
+    public static ByteBuffer randomBytes(int quantity, Random tlr) {
         ByteBuffer slice = ByteBuffer.allocate(quantity);
         ByteBuffer source = dataSource.duplicate();
         source.position(tlr.nextInt(source.capacity() - quantity));
@@ -329,27 +328,29 @@ public class CommitLogStressTest
         int cells = 0;
         int dataSize = 0;
         final CommitLog commitLog;
+        final Random random;
 
         volatile ReplayPosition rp;
 
-        public CommitlogExecutor(CommitLog commitLog)
+        public CommitlogExecutor(CommitLog commitLog, Random rand)
         {
             this.commitLog = commitLog;
+            this.random = rand;
         }
 
         public void run() {
             RateLimiter rl = rateLimit != 0 ? RateLimiter.create(rateLimit) : null;
-            final ThreadLocalRandom tlr = ThreadLocalRandom.current();
+            final Random rand = random != null ? random : ThreadLocalRandom.current();
             while (!stop) {
                 if (rl != null)
                     rl.acquire();
                 String ks = "Keyspace1";
-                ByteBuffer key = randomBytes(16, tlr);
+                ByteBuffer key = randomBytes(16, rand);
                 Mutation mutation = new Mutation(ks, key);
 
                 for (int ii = 0; ii < numCells; ii++) {
-                    int sz = randomSize ? tlr.nextInt(cellSize) : cellSize;
-                    ByteBuffer bytes = randomBytes(sz, tlr);
+                    int sz = randomSize ? rand.nextInt(cellSize) : cellSize;
+                    ByteBuffer bytes = randomBytes(sz, rand);
                     mutation.add("Standard1", Util.cellname("name" + ii), bytes,
                             System.currentTimeMillis());
                     hash = hash(hash, bytes);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/test/long/org/apache/cassandra/io/compress/CompressorPerformance.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/io/compress/CompressorPerformance.java b/test/long/org/apache/cassandra/io/compress/CompressorPerformance.java
new file mode 100644
index 0000000..7401951
--- /dev/null
+++ b/test/long/org/apache/cassandra/io/compress/CompressorPerformance.java
@@ -0,0 +1,99 @@
+package org.apache.cassandra.io.compress;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class CompressorPerformance
+{
+
+    static public void testPerformances() throws IOException
+    {
+        for (ICompressor compressor: new ICompressor[] {
+                SnappyCompressor.instance,  // warm up
+                DeflateCompressor.instance,
+                LZ4Compressor.instance,
+                SnappyCompressor.instance
+        })
+        {
+            for (BufferType in: BufferType.values())
+            {
+                if (compressor.supports(in))
+                {
+                    for (BufferType out: BufferType.values())
+                    {
+                        if (compressor.supports(out))
+                        {
+                            for (int i=0; i<10; ++i)
+                                testPerformance(compressor, in, out);
+                            System.out.println();
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    static ByteBuffer dataSource;
+    static int bufLen;
+
+    static private void testPerformance(ICompressor compressor, BufferType in, BufferType out) throws IOException
+    {
+        int len = dataSource.capacity();
+        int bufLen = compressor.initialCompressedBufferLength(len);
+        ByteBuffer input = in.allocate(bufLen);
+        ByteBuffer output = out.allocate(bufLen);
+
+        int checksum = 0;
+        int count = 100;
+
+        long time = System.nanoTime();
+        for (int i=0; i<count; ++i)
+        {
+            output.clear();
+            compressor.compress(dataSource, output);
+            // Make sure not optimized away.
+            checksum += output.get(ThreadLocalRandom.current().nextInt(output.position()));
+            dataSource.rewind();
+        }
+        long timec = System.nanoTime() - time;
+        output.flip();
+        input.put(output);
+        input.flip();
+
+        time = System.nanoTime();
+        for (int i=0; i<count; ++i)
+        {
+            output.clear();
+            compressor.uncompress(input, output);
+            // Make sure not optimized away.
+            checksum += output.get(ThreadLocalRandom.current().nextInt(output.position()));
+            input.rewind();
+        }
+        long timed = System.nanoTime() - time;
+        System.out.format("Compressor %s %s->%s compress %.3f ns/b %.3f mb/s uncompress %.3f ns/b %.3f mb/s.%s\n",
+                          compressor.getClass().getSimpleName(),
+                          in,
+                          out,
+                          1.0 * timec / (count * len),
+                          Math.scalb(1.0e9, -20) * count * len / timec,
+                          1.0 * timed / (count * len),
+                          Math.scalb(1.0e9, -20) * count * len / timed,
+                          checksum == 0 ? " " : "");
+    }
+
+    public static void main(String[] args) throws IOException
+    {
+        try (FileInputStream fis = new FileInputStream("CHANGES.txt"))
+        {
+            int len = (int)fis.getChannel().size();
+            dataSource = ByteBuffer.allocateDirect(len);
+            while (dataSource.hasRemaining()) {
+                fis.getChannel().read(dataSource);
+            }
+            dataSource.flip();
+        }
+        testPerformances();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java
index fe04096..71fab61 100644
--- a/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java
@@ -3,9 +3,7 @@ package org.apache.cassandra.io;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
 import java.nio.charset.Charset;
-import java.nio.file.StandardOpenOption;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -14,7 +12,6 @@ import java.util.concurrent.TimeUnit;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
-import org.apache.cassandra.io.compress.*;
 import org.apache.cassandra.io.util.ChannelProxy;
 import org.apache.cassandra.io.util.FileMark;
 import org.apache.cassandra.io.util.RandomAccessReader;
@@ -28,7 +25,7 @@ public class RandomAccessReaderTest
         final File f = File.createTempFile("testReadFully", "1");
         final String expected = "The quick brown fox jumps over the lazy dog";
 
-        SequentialWriter writer = new SequentialWriter(f, CompressionParameters.DEFAULT_CHUNK_LENGTH, false);
+        SequentialWriter writer = SequentialWriter.open(f);
         writer.write(expected.getBytes());
         writer.finish();
 
@@ -56,7 +53,7 @@ public class RandomAccessReaderTest
         File f = File.createTempFile("testReadBytes", "1");
         final String expected = "The quick brown fox jumps over the lazy dog";
 
-        SequentialWriter writer = new SequentialWriter(f, CompressionParameters.DEFAULT_CHUNK_LENGTH, false);
+        SequentialWriter writer = SequentialWriter.open(f);
         writer.write(expected.getBytes());
         writer.finish();
 
@@ -84,7 +81,7 @@ public class RandomAccessReaderTest
         final String expected = "The quick brown fox jumps over the lazy dog";
         final int numIterations = 10;
 
-        SequentialWriter writer = new SequentialWriter(f, CompressionParameters.DEFAULT_CHUNK_LENGTH, false);
+        SequentialWriter writer = SequentialWriter.open(f);
         for (int i = 0; i < numIterations; i++)
             writer.write(expected.getBytes());
         writer.finish();
@@ -163,7 +160,7 @@ public class RandomAccessReaderTest
         }
         final int totalLength = len;
 
-        SequentialWriter writer = new SequentialWriter(f, CompressionParameters.DEFAULT_CHUNK_LENGTH, false);
+        SequentialWriter writer = SequentialWriter.open(f);
         for (int i = 0; i < expected.length; i++)
             writer.write(expected[i].getBytes());
         writer.finish();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
index cfc4bb8..b013206 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
@@ -110,7 +110,7 @@ public class CompressedRandomAccessReaderTest
             MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance)).replayPosition(null);
             SequentialWriter writer = compressed
                 ? new CompressedSequentialWriter(f, filename + ".metadata", new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector)
-                : new SequentialWriter(f, CompressionParameters.DEFAULT_CHUNK_LENGTH, false);
+                : SequentialWriter.open(f);
 
             writer.write("The quick ".getBytes());
             FileMark mark = writer.mark();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
index 184319f..27b866d 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
@@ -142,9 +142,7 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest
 
     private ByteBuffer makeBB(int size)
     {
-        return compressor.useDirectOutputByteBuffers()
-                ? ByteBuffer.allocateDirect(size)
-                : ByteBuffer.allocate(size);
+        return compressor.preferredBufferType().allocate(size);
     }
 
     private final List<TestableCSW> writers = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/test/unit/org/apache/cassandra/io/compress/CompressorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressorTest.java b/test/unit/org/apache/cassandra/io/compress/CompressorTest.java
index 53021ee..6018cc7 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressorTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressorTest.java
@@ -21,19 +21,17 @@ import java.io.*;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Random;
 
+import com.google.common.io.Files;
 import static org.junit.Assert.*;
-
-import org.apache.cassandra.io.util.RandomAccessReader;
 import org.junit.Assert;
 import org.junit.Test;
 
-import com.google.common.io.Files;
-
-import org.apache.cassandra.io.compress.ICompressor.WrappedByteBuffer;
+import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class CompressorTest
@@ -62,27 +60,29 @@ public class CompressorTest
 
     public void testArrayUncompress(byte[] data, int off, int len) throws IOException
     {
-        ByteBuffer src = makeBB(len);
+        final int inOffset = 2;
+        ByteBuffer src = makeBB(len + inOffset);
+        src.position(inOffset);
         src.put(data, off, len);
-        src.rewind();
+        src.flip().position(inOffset);
 
         final int outOffset = 3;
-        final WrappedByteBuffer compressed = makeWrappedBB(outOffset + compressor.initialCompressedBufferLength(len));
-        fillBBWithRandom(compressed.buffer);
-        compressed.buffer.clear();
-        compressed.buffer.position(outOffset);
+        final ByteBuffer compressed = makeBB(outOffset + compressor.initialCompressedBufferLength(len));
+        fillBBWithRandom(compressed);
+        compressed.position(outOffset);
 
-        final int compressedLength = compressor.compress(src, compressed);
+        compressor.compress(src, compressed);
+        compressed.flip().position(outOffset);
 
         final int restoreOffset = 5;
         final byte[] restored = new byte[restoreOffset + len];
         new Random().nextBytes(restored);
 
         // need byte[] representation which direct buffers don't have
-        byte[] compressedBytes = new byte[compressed.buffer.capacity()];
-        ByteBufferUtil.arrayCopy(compressed.buffer, outOffset, compressedBytes, 0, compressed.buffer.capacity() - outOffset);
+        byte[] compressedBytes = new byte[compressed.capacity()];
+        ByteBufferUtil.arrayCopy(compressed, outOffset, compressedBytes, outOffset, compressed.capacity() - outOffset);
 
-        final int decompressedLength = compressor.uncompress(compressedBytes, 0, compressedLength, restored, restoreOffset);
+        final int decompressedLength = compressor.uncompress(compressedBytes, outOffset, compressed.remaining(), restored, restoreOffset);
 
         assertEquals(decompressedLength, len);
         assertArrayEquals(Arrays.copyOfRange(data, off, off + len),
@@ -128,27 +128,28 @@ public class CompressorTest
         final int outOffset = 3;
         byte[] garbage = new byte[outOffset + compressor.initialCompressedBufferLength(data.length)];
         new Random().nextBytes(garbage);
-        WrappedByteBuffer dest = makeWrappedBB(outOffset + compressor.initialCompressedBufferLength(data.length));
-        dest.buffer.put(garbage);
-        dest.buffer.clear();
-        dest.buffer.position(outOffset);
+        ByteBuffer dest = makeBB(outOffset + compressor.initialCompressedBufferLength(data.length));
+        dest.put(garbage);
+        dest.clear();
+        dest.position(outOffset);
 
-        final int compressedLength = compressor.compress(src, dest);
+        compressor.compress(src, dest);
+        int compressedLength = dest.position() - outOffset;
 
-        FileChannel channel = new FileOutputStream(temp, false).getChannel();
-        dest.buffer.clear();
-        channel.write(dest.buffer);
+        FileChannel channel = FileChannel.open(temp.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE);
+        dest.clear();
+        channel.write(dest);
 
         MappedByteBuffer mappedData = Files.map(temp);
-        mappedData.position(outOffset);
-        mappedData.limit(compressedLength + outOffset);
-
         ByteBuffer result = makeBB(data.length + 100);
+        mappedData.position(outOffset).limit(outOffset + compressedLength);
 
-        int length = compressor.uncompress(mappedData, result);
+        compressor.uncompress(mappedData, result);
+        channel.close();
+        result.flip();
 
-        Assert.assertEquals(data.length, length);
-        for (int i = 0; i < length; i++)
+        Assert.assertEquals(data.length, result.limit());
+        for (int i = 0; i < result.limit(); i++)
         {
             Assert.assertEquals("Decompression mismatch at byte "+i, data[i], result.get());
         }
@@ -177,51 +178,71 @@ public class CompressorTest
 
     private void testByteBuffers() throws IOException
     {
-        int n = RandomAccessReader.DEFAULT_BUFFER_SIZE;
-        byte[] srcData = new byte[n];
-        new Random().nextBytes(srcData);
-
-        ByteBuffer src = makeBB(n);
-        src.put(srcData, 0, n);
-        src.flip();
-
-        int outOffset = 5;
-        ICompressor.WrappedByteBuffer compressed = makeWrappedBB(outOffset + compressor.initialCompressedBufferLength(srcData.length));
-        byte[] garbage = new byte[compressed.buffer.capacity()];
-        new Random().nextBytes(garbage);
-        compressed.buffer.put(garbage);
-        compressed.buffer.clear();
-        compressed.buffer.position(outOffset);
-
-        compressor.compress(src, compressed);
-        compressed.buffer.flip();
-        compressed.buffer.position(outOffset);
-
-        ByteBuffer result = makeBB(outOffset + n);
-        int decompressed = compressor.uncompress(compressed.buffer, result);
-
-        assert decompressed == n;
-        for (int i = 0; i < n; ++i)
-            assert srcData[i] == result.get(i) : "Failed comparison on index: " + i + " with compressor: " + compressor.getClass().toString();
+        assert compressor.supports(BufferType.OFF_HEAP);
+        assert compressor.supports(compressor.preferredBufferType());
+
+        for (BufferType in: BufferType.values())
+            if (compressor.supports(in))
+                for (BufferType comp: BufferType.values())
+                    if (compressor.supports(comp))
+                        for (BufferType out: BufferType.values())
+                            if (compressor.supports(out))
+                                testByteBuffers(in, comp, out);
     }
 
-    private ByteBuffer makeBB(int size)
+    private void testByteBuffers(BufferType typeIn, BufferType typeComp, BufferType typeOut) throws IOException
     {
-        return compressor.useDirectOutputByteBuffers()
-                ? ByteBuffer.allocateDirect(size)
-                : ByteBuffer.allocate(size);
+        try
+        {
+            int n = RandomAccessReader.DEFAULT_BUFFER_SIZE;
+            byte[] srcData = new byte[n];
+            new Random().nextBytes(srcData);
+
+            final int inOffset = 2;
+            ByteBuffer src = typeIn.allocate(inOffset + n + inOffset);
+            src.position(inOffset);
+            src.put(srcData, 0, n);
+            src.flip().position(inOffset);
+
+            int outOffset = 5;
+            ByteBuffer compressed = typeComp.allocate(outOffset + compressor.initialCompressedBufferLength(srcData.length) + outOffset);
+            byte[] garbage = new byte[compressed.capacity()];
+            new Random().nextBytes(garbage);
+            compressed.put(garbage);
+            compressed.position(outOffset).limit(compressed.capacity() - outOffset);
+
+            compressor.compress(src, compressed);
+            assertEquals(inOffset + n, src.position());
+            assertEquals(inOffset + n, src.limit());
+            assertEquals(compressed.capacity() - outOffset, compressed.limit());
+            compressed.flip().position(outOffset);
+            int len = compressed.remaining();
+
+            ByteBuffer result = typeOut.allocate(inOffset + n + inOffset);
+            result.position(inOffset).limit(result.capacity() - inOffset);
+            compressor.uncompress(compressed, result);
+            assertEquals(outOffset + len, compressed.position());
+            assertEquals(outOffset + len, compressed.limit());
+            assertEquals(result.capacity() - inOffset, result.limit());
+
+            int decompressed = result.position() - inOffset;
+            assert decompressed == n : "Failed uncompressed size";
+            for (int i = 0; i < n; ++i)
+                assert srcData[i] == result.get(inOffset + i) : "Failed comparison on index: " + i;
+        }
+        catch (Throwable e)
+        {
+            throw new AssertionError("Failed testing compressor " + compressor.getClass().getSimpleName() + " with buffer types in:" + typeIn + " compressed:" + typeComp + " out:" + typeOut, e);
+        }
     }
 
-    private WrappedByteBuffer makeWrappedBB(int size)
+    private ByteBuffer makeBB(int size)
     {
-        return compressor.useDirectOutputByteBuffers()
-                ? new WrappedByteBuffer(ByteBuffer.allocateDirect(size))
-                : new WrappedByteBuffer(ByteBuffer.allocate(size));
+        return compressor.preferredBufferType().allocate(size);
     }
 
     private void fillBBWithRandom(ByteBuffer dest)
     {
-        ByteBuffer dupe = dest.duplicate();
         byte[] random = new byte[dest.capacity()];
         new Random().nextBytes(random);
         dest.clear();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
index ec280fa..70993d3 100644
--- a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
+++ b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
@@ -36,6 +36,8 @@ import java.util.concurrent.ThreadLocalRandom;
 
 import org.junit.Assert;
 import org.junit.Test;
+
+import org.apache.cassandra.io.compress.BufferType;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class DataOutputTest
@@ -176,7 +178,7 @@ public class DataOutputTest
     public void testSequentialWriter() throws IOException
     {
         File file = FileUtils.createTempFile("dataoutput", "test");
-        final SequentialWriter writer = new SequentialWriter(file, 32, false);
+        final SequentialWriter writer = new SequentialWriter(file, 32, BufferType.ON_HEAP);
         DataOutputStreamPlus write = new WrappedDataOutputStreamPlus(writer);
         DataInput canon = testWrite(write);
         write.flush();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
index ef52030..ce0f918 100644
--- a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
@@ -28,6 +28,8 @@ import java.util.concurrent.ThreadLocalRandom;
 import org.junit.After;
 
 import junit.framework.Assert;
+
+import org.apache.cassandra.io.compress.BufferType;
 import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest;
 
 import static org.apache.commons.io.FileUtils.*;
@@ -66,7 +68,7 @@ public class SequentialWriterTest extends AbstractTransactionalTest
 
         protected TestableSW(File file) throws IOException
         {
-            this(file, new SequentialWriter(file, 8 << 10, true));
+            this(file, new SequentialWriter(file, 8 << 10, BufferType.OFF_HEAP));
         }
 
         protected TestableSW(File file, SequentialWriter sw) throws IOException