You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2015/01/16 15:39:53 UTC

cassandra git commit: Support direct buffer decompression for reads

Repository: cassandra
Updated Branches:
  refs/heads/trunk 3dd9c38a1 -> 895ec3ea1


Support direct buffer decompression for reads

Patch by tjake; reviewed by Branimir Lambov for CASSANDRA-8464


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/895ec3ea
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/895ec3ea
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/895ec3ea

Branch: refs/heads/trunk
Commit: 895ec3ea1e68b74ee025317f57a09c1ef0b512dd
Parents: 3dd9c38
Author: T Jake Luciani <ja...@apache.org>
Authored: Fri Jan 16 09:37:06 2015 -0500
Committer: T Jake Luciani <ja...@apache.org>
Committed: Fri Jan 16 09:38:42 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra-env.sh                           |   1 +
 .../compress/CompressedRandomAccessReader.java  | 179 +++++++++++++++++--
 .../io/compress/CompressionMetadata.java        |  11 +-
 .../io/compress/DeflateCompressor.java          |  17 ++
 .../cassandra/io/compress/ICompressor.java      |  12 ++
 .../cassandra/io/compress/LZ4Compressor.java    |  43 ++++-
 .../cassandra/io/compress/SnappyCompressor.java |  13 ++
 .../cassandra/io/sstable/format/Version.java    |   2 -
 .../io/sstable/format/big/BigFormat.java        |  20 +--
 .../org/apache/cassandra/io/util/FileUtils.java |   5 +-
 .../cassandra/io/util/MmappedSegmentedFile.java |   1 +
 .../cassandra/io/util/RandomAccessReader.java   |  16 +-
 .../compress/CompressedInputStream.java         |  15 +-
 .../compress/CompressedStreamReader.java        |   2 +-
 .../org/apache/cassandra/utils/FBUtilities.java |  67 +++++++
 .../CompressedRandomAccessReaderTest.java       |   6 +-
 .../cassandra/io/compress/CompressorTest.java   | 133 ++++++++++++++
 .../io/compress/LZ4CompressorTest.java          |  84 ---------
 .../compress/CompressedInputStreamTest.java     |   2 +-
 20 files changed, 474 insertions(+), 156 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 27b511a..79181e1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0
+ * Support direct buffer decompression for reads (CASSANDRA-8464)
  * DirectByteBuffer compatible LZ4 methods (CASSANDRA-7039)
  * Add role based access control (CASSANDRA-7653)
  * Group sstables for anticompaction correctly (CASSANDRA-8578)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/conf/cassandra-env.sh
----------------------------------------------------------------------
diff --git a/conf/cassandra-env.sh b/conf/cassandra-env.sh
index 58605ca..f9641ed 100644
--- a/conf/cassandra-env.sh
+++ b/conf/cassandra-env.sh
@@ -251,6 +251,7 @@ fi
 
 # uncomment to have Cassandra JVM log internal method compilation (developers only)
 # JVM_OPTS="$JVM_OPTS -XX:+UnlockDiagnosticVMOptions -XX:+LogCompilation"
+# JVM_OPTS="$JVM_OPTS -XX:+UnlockCommercialFeatures -XX:+FlightRecorder"
 
 # Prefer binding to IPv4 network intefaces (when net.ipv6.bindv6only=1). See
 # http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6342561 (short version:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index dca5ade..57abba9 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@ -19,14 +19,22 @@ package org.apache.cassandra.io.compress;
 
 import java.io.*;
 import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.zip.Adler32;
-import java.util.zip.CRC32;
-import java.util.zip.Checksum;
 
+
+import com.google.common.primitives.Ints;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.util.CompressedPoolingSegmentedFile;
+import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.PoolingSegmentedFile;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.FBUtilities;
@@ -37,6 +45,8 @@ import org.apache.cassandra.utils.FBUtilities;
  */
 public class CompressedRandomAccessReader extends RandomAccessReader
 {
+    private static final boolean useMmap = DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap;
+
     public static CompressedRandomAccessReader open(String path, CompressionMetadata metadata, CompressedPoolingSegmentedFile owner)
     {
         try
@@ -61,33 +71,96 @@ public class CompressedRandomAccessReader extends RandomAccessReader
         }
     }
 
+    private TreeMap<Long, MappedByteBuffer> chunkSegments;
+    private int MAX_SEGMENT_SIZE = Integer.MAX_VALUE;
+
     private final CompressionMetadata metadata;
 
     // we read the raw compressed bytes into this buffer, then move the uncompressed ones into super.buffer.
     private ByteBuffer compressed;
 
     // re-use single crc object
-    private final Checksum checksum;
+    private final Adler32 checksum;
 
     // raw checksum bytes
-    private final ByteBuffer checksumBytes = ByteBuffer.wrap(new byte[4]);
+    private ByteBuffer checksumBytes;
 
     protected CompressedRandomAccessReader(String dataFilePath, CompressionMetadata metadata, PoolingSegmentedFile owner) throws FileNotFoundException
     {
-        super(new File(dataFilePath), metadata.chunkLength(), owner);
+        super(new File(dataFilePath), metadata.chunkLength(), metadata.compressor().useDirectOutputByteBuffers(), owner);
         this.metadata = metadata;
-        checksum = metadata.hasPostCompressionAdlerChecksums ? new Adler32() : new CRC32();
-        compressed = ByteBuffer.wrap(new byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())]);
+        checksum = new Adler32();
+
+        if (!useMmap)
+        {
+            compressed = ByteBuffer.wrap(new byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())]);
+            checksumBytes = ByteBuffer.wrap(new byte[4]);
+        }
+        else
+        {
+            try
+            {
+                createMappedSegments();
+            }
+            catch (IOException e)
+            {
+                throw new IOError(e);
+            }
+        }
+    }
+
+    private void createMappedSegments() throws IOException
+    {
+        chunkSegments = new TreeMap<>();
+        long offset = 0;
+        long lastSegmentOffset = 0;
+        long segmentSize = 0;
+
+        while (offset < metadata.dataLength)
+        {
+            CompressionMetadata.Chunk chunk = metadata.chunkFor(offset);
+
+            //Reached a new mmap boundary
+            if (segmentSize + chunk.length + 4 > MAX_SEGMENT_SIZE)
+            {
+                chunkSegments.put(lastSegmentOffset, channel.map(FileChannel.MapMode.READ_ONLY, lastSegmentOffset, segmentSize));
+                lastSegmentOffset += segmentSize;
+                segmentSize = 0;
+            }
+
+            segmentSize += chunk.length + 4; //checksum
+            offset += metadata.chunkLength();
+        }
+
+        if (segmentSize > 0)
+            chunkSegments.put(lastSegmentOffset, channel.map(FileChannel.MapMode.READ_ONLY, lastSegmentOffset, segmentSize));
     }
 
-    protected ByteBuffer allocateBuffer(int bufferSize)
+    protected ByteBuffer allocateBuffer(int bufferSize, boolean useDirect)
     {
         assert Integer.bitCount(bufferSize) == 1;
-        return ByteBuffer.allocate(bufferSize);
+        return useMmap && useDirect
+                ? ByteBuffer.allocateDirect(bufferSize)
+                : ByteBuffer.allocate(bufferSize);
     }
 
     @Override
-    protected void reBuffer()
+    public void deallocate()
+    {
+        super.deallocate();
+
+        if (chunkSegments != null)
+        {
+            for (Map.Entry<Long, MappedByteBuffer> entry : chunkSegments.entrySet())
+            {
+                FileUtils.clean(entry.getValue());
+            }
+        }
+
+        chunkSegments = null;
+    }
+
+    private void reBufferStandard()
     {
         try
         {
@@ -126,14 +199,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader
             if (metadata.parameters.getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
             {
 
-                if (metadata.hasPostCompressionAdlerChecksums)
-                {
-                    checksum.update(compressed.array(), 0, chunk.length);
-                }
-                else
-                {
-                    checksum.update(buffer.array(), 0, decompressedBytes);
-                }
+                checksum.update(compressed.array(), 0, chunk.length);
 
                 if (checksum(chunk) != (int) checksum.getValue())
                     throw new CorruptBlockException(getPath(), chunk);
@@ -156,6 +222,81 @@ public class CompressedRandomAccessReader extends RandomAccessReader
         }
     }
 
+    private void reBufferMmap()
+    {
+        try
+        {
+            long position = current();
+            assert position < metadata.dataLength;
+
+            CompressionMetadata.Chunk chunk = metadata.chunkFor(position);
+
+            Map.Entry<Long, MappedByteBuffer> entry = chunkSegments.floorEntry(chunk.offset);
+            long segmentOffset = entry.getKey();
+            int chunkOffset = Ints.checkedCast(chunk.offset - segmentOffset);
+            MappedByteBuffer compressedChunk = entry.getValue();
+
+            compressedChunk.position(chunkOffset);
+            compressedChunk.limit(chunkOffset + chunk.length);
+            compressedChunk.mark();
+
+            buffer.clear();
+            int decompressedBytes;
+            try
+            {
+                decompressedBytes = metadata.compressor().uncompress(compressedChunk, buffer);
+                buffer.limit(decompressedBytes);
+            }
+            catch (IOException e)
+            {
+                throw new CorruptBlockException(getPath(), chunk);
+            }
+            finally
+            {
+                compressedChunk.limit(compressedChunk.capacity());
+            }
+
+            if (metadata.parameters.getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
+            {
+                compressedChunk.reset();
+                compressedChunk.limit(chunkOffset + chunk.length);
+
+                FBUtilities.directCheckSum(checksum, compressedChunk);
+
+                compressedChunk.limit(compressedChunk.capacity());
+
+
+                if (compressedChunk.getInt() != (int) checksum.getValue())
+                    throw new CorruptBlockException(getPath(), chunk);
+
+                // reset checksum object back to the original (blank) state
+                checksum.reset();
+            }
+
+            // buffer offset is always aligned
+            bufferOffset = position & ~(buffer.capacity() - 1);
+            buffer.position((int) (position - bufferOffset));
+        }
+        catch (CorruptBlockException e)
+        {
+            throw new CorruptSSTableException(e, getPath());
+        }
+
+    }
+
+    @Override
+    protected void reBuffer()
+    {
+        if (useMmap)
+        {
+            reBufferMmap();
+        }
+        else
+        {
+            reBufferStandard();
+        }
+    }
+
     private int checksum(CompressionMetadata.Chunk chunk) throws IOException
     {
         assert channel.position() == chunk.offset + chunk.length;
@@ -167,7 +308,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader
 
     public int getTotalBufferSize()
     {
-        return super.getTotalBufferSize() + compressed.capacity();
+        return super.getTotalBufferSize() + (useMmap ? 0 : compressed.capacity());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 57d7cbe..6139a5c 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -62,7 +62,6 @@ public class CompressionMetadata
 {
     public final long dataLength;
     public final long compressedFileLength;
-    public final boolean hasPostCompressionAdlerChecksums;
     private final Memory chunkOffsets;
     private final long chunkOffsetsSize;
     public final String indexFilePath;
@@ -82,14 +81,13 @@ public class CompressionMetadata
     public static CompressionMetadata create(String dataFilePath)
     {
         Descriptor desc = Descriptor.fromFilename(dataFilePath);
-        return new CompressionMetadata(desc.filenameFor(Component.COMPRESSION_INFO), new File(dataFilePath).length(), desc.version.hasPostCompressionAdlerChecksums());
+        return new CompressionMetadata(desc.filenameFor(Component.COMPRESSION_INFO), new File(dataFilePath).length());
     }
 
     @VisibleForTesting
-    CompressionMetadata(String indexFilePath, long compressedLength, boolean hasPostCompressionAdlerChecksums)
+    CompressionMetadata(String indexFilePath, long compressedLength)
     {
         this.indexFilePath = indexFilePath;
-        this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums;
 
         DataInputStream stream;
         try
@@ -137,13 +135,12 @@ public class CompressionMetadata
         this.chunkOffsetsSize = chunkOffsets.size();
     }
 
-    private CompressionMetadata(String filePath, CompressionParameters parameters, RefCountedMemory offsets, long offsetsSize, long dataLength, long compressedLength, boolean hasPostCompressionAdlerChecksums)
+    private CompressionMetadata(String filePath, CompressionParameters parameters, RefCountedMemory offsets, long offsetsSize, long dataLength, long compressedLength)
     {
         this.indexFilePath = filePath;
         this.parameters = parameters;
         this.dataLength = dataLength;
         this.compressedFileLength = compressedLength;
-        this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums;
         this.chunkOffsets = offsets;
         offsets.reference();
         this.chunkOffsetsSize = offsetsSize;
@@ -342,7 +339,7 @@ public class CompressionMetadata
                 default:
                     throw new AssertionError();
             }
-            return new CompressionMetadata(filePath, parameters, offsets, count * 8L, dataLength, compressedLength, latestVersion.hasPostCompressionAdlerChecksums());
+            return new CompressionMetadata(filePath, parameters, offsets, count * 8L, dataLength, compressedLength);
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java b/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
index 125a08f..546b506 100644
--- a/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
+++ b/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
@@ -17,7 +17,10 @@
  */
 package org.apache.cassandra.io.compress;
 
+import org.apache.cassandra.utils.ByteBufferUtil;
+
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
@@ -113,4 +116,18 @@ public class DeflateCompressor implements ICompressor
             throw new IOException(e);
         }
     }
+
+    public int uncompress(ByteBuffer input_, ByteBuffer output) throws IOException
+    {
+        if (!output.hasArray())
+            throw new IllegalArgumentException("DeflateCompressor doesn't work with direct byte buffers");
+
+        byte[] input = ByteBufferUtil.getArray(input_);
+        return uncompress(input, 0, input.length, output.array(), output.arrayOffset() + output.position());
+    }
+
+    public boolean useDirectOutputByteBuffers()
+    {
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/src/java/org/apache/cassandra/io/compress/ICompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/ICompressor.java b/src/java/org/apache/cassandra/io/compress/ICompressor.java
index be76bc5..81d1425 100644
--- a/src/java/org/apache/cassandra/io/compress/ICompressor.java
+++ b/src/java/org/apache/cassandra/io/compress/ICompressor.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.io.compress;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Set;
 
 public interface ICompressor
@@ -28,6 +29,17 @@ public interface ICompressor
 
     public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) throws IOException;
 
+    /**
+     * Decompression for DirectByteBuffers
+     */
+    public int uncompress(ByteBuffer input, ByteBuffer output) throws IOException;
+
+    /**
+     * Notifies user if this compressor will wants/requires a direct byte buffers to
+     * decompress direct byteBuffers
+     */
+    public boolean useDirectOutputByteBuffers();
+
     public Set<String> supportedOptions();
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java b/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
index 0cf36c1..f458cb6 100644
--- a/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
+++ b/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.io.compress;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Map;
@@ -25,10 +26,10 @@ import java.util.Set;
 
 import net.jpountz.lz4.LZ4Exception;
 import net.jpountz.lz4.LZ4Factory;
+import org.apache.cassandra.utils.FastByteOperations;
 
 public class LZ4Compressor implements ICompressor
 {
-
     private static final int INTEGER_BYTES = 4;
     private static final LZ4Compressor instance = new LZ4Compressor();
 
@@ -38,13 +39,13 @@ public class LZ4Compressor implements ICompressor
     }
 
     private final net.jpountz.lz4.LZ4Compressor compressor;
-    private final net.jpountz.lz4.LZ4Decompressor decompressor;
+    private final net.jpountz.lz4.LZ4FastDecompressor decompressor;
 
     private LZ4Compressor()
     {
         final LZ4Factory lz4Factory = LZ4Factory.fastestInstance();
         compressor = lz4Factory.fastCompressor();
-        decompressor = lz4Factory.decompressor();
+        decompressor = lz4Factory.fastDecompressor();
     }
 
     public int initialCompressedBufferLength(int chunkLength)
@@ -97,8 +98,42 @@ public class LZ4Compressor implements ICompressor
         return decompressedLength;
     }
 
+    public int uncompress(ByteBuffer input, ByteBuffer output) throws IOException
+    {
+        int pos = input.position();
+        final int decompressedLength = (input.get(pos) & 0xFF)
+                | ((input.get(pos + 1) & 0xFF) << 8)
+                | ((input.get(pos + 2) & 0xFF) << 16)
+                | ((input.get(pos + 3) & 0xFF) << 24);
+
+        int inputLength = input.remaining() - INTEGER_BYTES;
+
+        final int compressedLength;
+        try
+        {
+            compressedLength = decompressor.decompress(input, input.position() + INTEGER_BYTES, output, output.position(), decompressedLength);
+        }
+        catch (LZ4Exception e)
+        {
+            throw new IOException(e);
+        }
+
+        if (compressedLength != inputLength)
+        {
+            throw new IOException("Compressed lengths mismatch: "+compressedLength+" vs "+inputLength);
+        }
+
+        return decompressedLength;
+    }
+
+    @Override
+    public boolean useDirectOutputByteBuffers()
+    {
+        return false;
+    }
+
     public Set<String> supportedOptions()
     {
-        return new HashSet<String>(Arrays.asList(CompressionParameters.CRC_CHECK_CHANCE));
+        return new HashSet<>(Arrays.asList(CompressionParameters.CRC_CHECK_CHANCE));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java b/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java
index 3583201..f5a2062 100644
--- a/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java
+++ b/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java
@@ -18,10 +18,12 @@
 package org.apache.cassandra.io.compress;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.xerial.snappy.Snappy;
@@ -95,4 +97,15 @@ public class SnappyCompressor implements ICompressor
     {
         return Snappy.rawUncompress(input, inputOffset, inputLength, output, outputOffset);
     }
+
+    public int uncompress(ByteBuffer input, ByteBuffer output) throws IOException
+    {
+        return Snappy.uncompress(input, output);
+    }
+
+    @Override
+    public boolean useDirectOutputByteBuffers()
+    {
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/src/java/org/apache/cassandra/io/sstable/format/Version.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/Version.java b/src/java/org/apache/cassandra/io/sstable/format/Version.java
index 5da0cb8..faaa89e 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/Version.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/Version.java
@@ -40,8 +40,6 @@ public abstract class Version
 
     public abstract boolean isLatestVersion();
 
-    public abstract boolean hasPostCompressionAdlerChecksums();
-
     public abstract boolean hasSamplingLevel();
 
     public abstract boolean hasNewStatsFile();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index eb43968..e1a5622 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@ -126,16 +126,8 @@ public class BigFormat implements SSTableFormat
     static class BigVersion extends Version
     {
         public static final String current_version = "la";
-        public static final String earliest_supported_version = "ja";
-
-        // ja (2.0.0): super columns are serialized as composites (note that there is no real format change,
-        //               this is mostly a marker to know if we should expect super columns or not. We do need
-        //               a major version bump however, because we should not allow streaming of super columns
-        //               into this new format)
-        //             tracks max local deletiontime in sstable metadata
-        //             records bloom_filter_fp_chance in metadata component
-        //             remove data size and column count from data file (CASSANDRA-4180)
-        //             tracks max/min column values (according to comparator)
+        public static final String earliest_supported_version = "jb";
+
         // jb (2.0.1): switch from crc32 to adler32 for compression checksums
         //             checksum the compressed data
         // ka (2.1.0): new Statistics.db file format
@@ -145,7 +137,6 @@ public class BigFormat implements SSTableFormat
         // la (3.0.0): new file name format
 
         private final boolean isLatestVersion;
-        private final boolean hasPostCompressionAdlerChecksums;
         private final boolean hasSamplingLevel;
         private final boolean newStatsFile;
         private final boolean hasAllAdlerChecksums;
@@ -158,7 +149,6 @@ public class BigFormat implements SSTableFormat
             super(instance,version);
 
             isLatestVersion = version.compareTo(current_version) == 0;
-            hasPostCompressionAdlerChecksums = version.compareTo("jb") >= 0;
             hasSamplingLevel = version.compareTo("ka") >= 0;
             newStatsFile = version.compareTo("ka") >= 0;
             hasAllAdlerChecksums = version.compareTo("ka") >= 0;
@@ -174,12 +164,6 @@ public class BigFormat implements SSTableFormat
         }
 
         @Override
-        public boolean hasPostCompressionAdlerChecksums()
-        {
-            return hasPostCompressionAdlerChecksums;
-        }
-
-        @Override
         public boolean hasSamplingLevel()
         {
             return hasSamplingLevel;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/src/java/org/apache/cassandra/io/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java
index 080caa5..837cc6a 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -278,9 +278,10 @@ public class FileUtils
         return canCleanDirectBuffers;
     }
 
-    public static void clean(MappedByteBuffer buffer)
+    public static void clean(ByteBuffer buffer)
     {
-        ((DirectBuffer) buffer).cleaner().clean();
+        if (isCleanerAvailable() && buffer.isDirect())
+            ((DirectBuffer)buffer).cleaner().clean();
     }
 
     public static void createDirectory(String directory)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
index bf120a3..6f2def0 100644
--- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.utils.JVMStabilityInspector;
+import sun.nio.ch.DirectBuffer;
 
 public class MmappedSegmentedFile extends SegmentedFile
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index 58205d8..6bff378 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -53,6 +53,11 @@ public class RandomAccessReader extends AbstractDataInput implements FileDataInp
 
     protected RandomAccessReader(File file, int bufferSize, PoolingSegmentedFile owner) throws FileNotFoundException
     {
+        this(file, bufferSize, false, owner);
+    }
+
+    protected RandomAccessReader(File file, int bufferSize, boolean useDirectBuffer, PoolingSegmentedFile owner) throws FileNotFoundException
+    {
         this.owner = owner;
 
         filePath = file.getAbsolutePath();
@@ -79,13 +84,16 @@ public class RandomAccessReader extends AbstractDataInput implements FileDataInp
         {
             throw new FSReadError(e, filePath);
         }
-        buffer = allocateBuffer(bufferSize);
+        buffer = allocateBuffer(bufferSize, useDirectBuffer);
         buffer.limit(0);
     }
 
-    protected ByteBuffer allocateBuffer(int bufferSize)
+    protected ByteBuffer allocateBuffer(int bufferSize, boolean useDirectBuffer)
     {
-        return ByteBuffer.allocate((int) Math.min(fileLength, bufferSize));
+        int size = (int) Math.min(fileLength, bufferSize);
+        return useDirectBuffer
+                ? ByteBuffer.allocate(size)
+                : ByteBuffer.allocateDirect(size);
     }
 
     public static RandomAccessReader open(File file, PoolingSegmentedFile owner)
@@ -239,6 +247,8 @@ public class RandomAccessReader extends AbstractDataInput implements FileDataInp
     public void deallocate()
     {
         bufferOffset += buffer.position();
+        FileUtils.clean(buffer);
+
         buffer = null; // makes sure we don't use this after it's ostensibly closed
 
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 449546f..54f6eda 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -62,17 +62,15 @@ public class CompressedInputStream extends InputStream
     private static final byte[] POISON_PILL = new byte[0];
 
     private long totalCompressedBytesRead;
-    private final boolean hasPostCompressionAdlerChecksums;
 
     /**
      * @param source Input source to read compressed data from
      * @param info Compression info
      */
-    public CompressedInputStream(InputStream source, CompressionInfo info, boolean hasPostCompressionAdlerChecksums)
+    public CompressedInputStream(InputStream source, CompressionInfo info)
     {
         this.info = info;
-        this.checksum = hasPostCompressionAdlerChecksums ? new Adler32() : new CRC32();
-        this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums;
+        this.checksum =  new Adler32();
         this.buffer = new byte[info.parameters.chunkLength()];
         // buffer is limited to store up to 1024 chunks
         this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
@@ -117,14 +115,7 @@ public class CompressedInputStream extends InputStream
         // validate crc randomly
         if (info.parameters.getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
         {
-            if (hasPostCompressionAdlerChecksums)
-            {
-                checksum.update(compressed, 0, compressed.length - checksumBytes.length);
-            }
-            else
-            {
-                checksum.update(buffer, 0, validBufferBytes);
-            }
+            checksum.update(compressed, 0, compressed.length - checksumBytes.length);
 
             System.arraycopy(compressed, compressed.length - checksumBytes.length, checksumBytes, 0, checksumBytes.length);
             if (Ints.fromByteArray(checksumBytes) != (int) checksum.getValue())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 0595e0c..46f7d4f 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -76,7 +76,7 @@ public class CompressedStreamReader extends StreamReader
 
         SSTableWriter writer = createWriter(cfs, totalSize, repairedAt, format);
 
-        CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, inputVersion.hasPostCompressionAdlerChecksums());
+        CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo);
         BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index 0462e5e..c9024ec 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.utils;
 
 import java.io.*;
 import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.math.BigInteger;
 import java.net.*;
 import java.nio.ByteBuffer;
@@ -26,10 +28,12 @@ import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.zip.Adler32;
 import java.util.zip.Checksum;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.AbstractIterator;
+import com.google.common.primitives.Ints;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,6 +48,8 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
+import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.IAllocator;
@@ -634,6 +640,67 @@ public class FBUtilities
         checksum.update((v >>> 0) & 0xFF);
     }
 
+    private static Method directUpdate;
+    static
+    {
+        try
+        {
+            directUpdate = Adler32.class.getDeclaredMethod("update", new Class[]{ByteBuffer.class});
+            directUpdate.setAccessible(true);
+        } catch (NoSuchMethodException e)
+        {
+            logger.warn("JVM doesn't support Adler32 byte buffer access");
+            directUpdate = null;
+        }
+    }
+
+    private static final ThreadLocal<byte[]> localDigestBuffer = new ThreadLocal<byte[]>()
+    {
+        @Override
+        protected byte[] initialValue()
+        {
+            return new byte[CompressionParameters.DEFAULT_CHUNK_LENGTH];
+        }
+    };
+
+    //Java 7 has this method but it's private till Java 8. Thanks JDK!
+    public static boolean supportsDirectChecksum()
+    {
+        return directUpdate != null;
+    }
+
+    public static void directCheckSum(Adler32 checksum, ByteBuffer bb)
+    {
+        if (directUpdate != null)
+        {
+            try
+            {
+                directUpdate.invoke(checksum, bb);
+                return;
+            } catch (IllegalAccessException e)
+            {
+                directUpdate = null;
+                logger.warn("JVM doesn't support Adler32 byte buffer access");
+            }
+            catch (InvocationTargetException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+        //Fallback
+        byte[] buffer = localDigestBuffer.get();
+
+        int remaining;
+        while ((remaining = bb.remaining()) > 0)
+        {
+            remaining = Math.min(remaining, buffer.length);
+            ByteBufferUtil.arrayCopy(bb, bb.position(), buffer, 0, remaining);
+            bb.position(bb.position() + remaining);
+            checksum.update(buffer, 0, remaining);
+        }
+    }
+
     public static long abs(long index)
     {
         long negbit = index >> 63;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
index 900abd8..58bf5cb 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
@@ -80,7 +80,7 @@ public class CompressedRandomAccessReaderTest
                 writer.write("x".getBytes());
             writer.close();
 
-            CompressedRandomAccessReader reader = CompressedRandomAccessReader.open(filename, new CompressionMetadata(filename + ".metadata", f.length(), true));
+            CompressedRandomAccessReader reader = CompressedRandomAccessReader.open(filename, new CompressionMetadata(filename + ".metadata", f.length()));
             String res = reader.readLine();
             assertEquals(res, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx");
             assertEquals(40, res.length());
@@ -123,7 +123,7 @@ public class CompressedRandomAccessReaderTest
 
             assert f.exists();
             RandomAccessReader reader = compressed
-                                      ? CompressedRandomAccessReader.open(filename, new CompressionMetadata(filename + ".metadata", f.length(), true))
+                                      ? CompressedRandomAccessReader.open(filename, new CompressionMetadata(filename + ".metadata", f.length()))
                                       : RandomAccessReader.open(f);
             String expected = "The quick brown fox jumps over the lazy dog";
             assertEquals(expected.length(), reader.length());
@@ -160,7 +160,7 @@ public class CompressedRandomAccessReaderTest
         writer.close();
 
         // open compression metadata and get chunk information
-        CompressionMetadata meta = new CompressionMetadata(metadata.getPath(), file.length(), true);
+        CompressionMetadata meta = new CompressionMetadata(metadata.getPath(), file.length());
         CompressionMetadata.Chunk chunk = meta.chunkFor(0);
 
         RandomAccessReader reader = CompressedRandomAccessReader.open(file.getPath(), meta);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/test/unit/org/apache/cassandra/io/compress/CompressorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressorTest.java b/test/unit/org/apache/cassandra/io/compress/CompressorTest.java
new file mode 100644
index 0000000..04396e0
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/compress/CompressorTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.compress;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Random;
+
+import com.google.common.io.Files;
+import org.apache.cassandra.io.compress.ICompressor.WrappedArray;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class CompressorTest
+{
+    ICompressor compressor;
+
+    ICompressor[] compressors = new ICompressor[] {
+            LZ4Compressor.create(Collections.<String, String>emptyMap()),
+            DeflateCompressor.create(Collections.<String, String>emptyMap()),
+            SnappyCompressor.create(Collections.<String, String>emptyMap())
+    };
+
+
+    @Test
+    public void testAllCompressors() throws IOException
+    {
+        for (ICompressor compressor : compressors)
+        {
+            this.compressor = compressor;
+
+            testEmptyArray();
+            testLongArray();
+            testShortArray();
+            testMappedFile();
+        }
+    }
+
+
+    public void test(byte[] data, int off, int len) throws IOException
+    {
+        final int outOffset = 3;
+        final WrappedArray out = new WrappedArray(new byte[outOffset + compressor.initialCompressedBufferLength(len)]);
+        new Random().nextBytes(out.buffer);
+        final int compressedLength = compressor.compress(data, off, len, out, outOffset);
+        final int restoredOffset = 5;
+        final byte[] restored = new byte[restoredOffset + len];
+        new Random().nextBytes(restored);
+        final int decompressedLength = compressor.uncompress(out.buffer, outOffset, compressedLength, restored, restoredOffset);
+        assertEquals(decompressedLength, len);
+        assertArrayEquals(Arrays.copyOfRange(data, off, off + len),
+                          Arrays.copyOfRange(restored, restoredOffset, restoredOffset + decompressedLength));
+    }
+
+    public void test(byte[] data) throws IOException
+    {
+        test(data, 0, data.length);
+    }
+
+    public void testEmptyArray() throws IOException
+    {
+        test(new byte[0]);
+    }
+
+    public void testShortArray() throws UnsupportedEncodingException, IOException
+    {
+        test("Cassandra".getBytes("UTF-8"), 1, 7);
+    }
+
+    public void testLongArray() throws UnsupportedEncodingException, IOException
+    {
+        byte[] data = new byte[1 << 20];
+        test(data, 13, 1 << 19);
+        new Random(0).nextBytes(data);
+        test(data, 13, 1 << 19);
+    }
+
+    public void testMappedFile() throws IOException
+    {
+        byte[] data = new byte[1 << 20];
+        new Random().nextBytes(data);
+
+        //create a temp file
+        File temp = File.createTempFile("tempfile", ".tmp");
+        temp.deleteOnExit();
+
+        //Prepend some random bytes to the output and compress
+        final int outOffset = 3;
+        final WrappedArray out = new WrappedArray(new byte[outOffset + compressor.initialCompressedBufferLength(data.length)]);
+        new Random().nextBytes(out.buffer);
+        final int compressedLength = compressor.compress(data, 0, data.length, out, outOffset);
+        Files.write(out.buffer, temp);
+
+        MappedByteBuffer mappedData = Files.map(temp);
+        mappedData.position(outOffset);
+        mappedData.limit(compressedLength+outOffset);
+
+
+        ByteBuffer result = compressor.useDirectOutputByteBuffers()
+                ? ByteBuffer.allocateDirect(data.length + 100)
+                : ByteBuffer.allocate(data.length + 100);
+
+        int length = compressor.uncompress(mappedData, result);
+
+        Assert.assertEquals(data.length, length);
+        for (int i = 0; i < length; i++)
+        {
+            Assert.assertEquals("Decompression mismatch at byte "+i, data[i], result.get());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/test/unit/org/apache/cassandra/io/compress/LZ4CompressorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/LZ4CompressorTest.java b/test/unit/org/apache/cassandra/io/compress/LZ4CompressorTest.java
deleted file mode 100644
index 56ffdf1..0000000
--- a/test/unit/org/apache/cassandra/io/compress/LZ4CompressorTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.compress;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Random;
-
-import org.apache.cassandra.io.compress.ICompressor.WrappedArray;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-public class LZ4CompressorTest
-{
-
-    LZ4Compressor compressor;
-
-    @Before
-    public void setUp()
-    {
-        compressor = LZ4Compressor.create(Collections.<String, String>emptyMap());
-    }
-
-    public void test(byte[] data, int off, int len) throws IOException
-    {
-        final int outOffset = 3;
-        final WrappedArray out = new WrappedArray(new byte[outOffset + compressor.initialCompressedBufferLength(len)]);
-        new Random().nextBytes(out.buffer);
-        final int compressedLength = compressor.compress(data, off, len, out, outOffset);
-        final int restoredOffset = 5;
-        final byte[] restored = new byte[restoredOffset + len];
-        new Random().nextBytes(restored);
-        final int decompressedLength = compressor.uncompress(out.buffer, outOffset, compressedLength, restored, restoredOffset);
-        assertEquals(decompressedLength, len);
-        assertArrayEquals(Arrays.copyOfRange(data, off, off + len),
-                          Arrays.copyOfRange(restored, restoredOffset, restoredOffset + decompressedLength));
-    }
-
-    public void test(byte[] data) throws IOException
-    {
-        test(data, 0, data.length);
-    }
-
-    @Test
-    public void testEmptyArray() throws IOException
-    {
-        test(new byte[0]);
-    }
-
-    @Test
-    public void testShortArray() throws UnsupportedEncodingException, IOException
-    {
-        test("Cassandra".getBytes("UTF-8"), 1, 7);
-    }
-
-    @Test
-    public void testLongArray() throws UnsupportedEncodingException, IOException
-    {
-        byte[] data = new byte[1 << 20];
-        test(data, 13, 1 << 19);
-        new Random(0).nextBytes(data);
-        test(data, 13, 1 << 19);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index 42a83a0..128ec3c 100644
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -111,7 +111,7 @@ public class CompressedInputStreamTest
 
         // read buffer using CompressedInputStream
         CompressionInfo info = new CompressionInfo(chunks, param);
-        CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, true);
+        CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info);
         DataInputStream in = new DataInputStream(input);
 
         for (int i = 0; i < sections.size(); i++)