You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2015/03/11 17:45:19 UTC

cassandra git commit: Convert SequentialWriter to nio

Repository: cassandra
Updated Branches:
  refs/heads/trunk c38ea0b1b -> bc7941c9d


Convert SequentialWriter to nio

Patch by jmckenzie; reviewed by tjake for CASSANDRA-8709


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bc7941c9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bc7941c9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bc7941c9

Branch: refs/heads/trunk
Commit: bc7941c9da7ad17fdf4793de411541876a1248ff
Parents: c38ea0b
Author: Joshua McKenzie <jm...@apache.org>
Authored: Wed Mar 11 11:43:04 2015 -0500
Committer: Joshua McKenzie <jm...@apache.org>
Committed: Wed Mar 11 11:43:04 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../io/compress/CompressedSequentialWriter.java | 112 +++++++----
 .../io/compress/CompressionMetadata.java        |   1 +
 .../io/compress/DeflateCompressor.java          |  21 +-
 .../cassandra/io/compress/ICompressor.java      |  25 ++-
 .../cassandra/io/compress/LZ4Compressor.java    |  28 +--
 .../cassandra/io/compress/SnappyCompressor.java |   9 +-
 .../io/util/ChecksummedSequentialWriter.java    |  12 +-
 .../io/util/DataIntegrityMetadata.java          |  25 ++-
 .../cassandra/io/util/RandomAccessReader.java   |   2 +-
 .../cassandra/io/util/SequentialWriter.java     | 200 +++++++------------
 .../org/apache/cassandra/utils/FBUtilities.java |   3 +-
 .../CompressedRandomAccessReaderTest.java       |   3 +-
 .../CompressedSequentialWriterTest.java         | 136 +++++++++++++
 .../cassandra/io/compress/CompressorTest.java   | 159 ++++++++++++---
 .../cassandra/io/util/DataOutputTest.java       |  16 +-
 .../compress/CompressedInputStreamTest.java     |   4 +-
 17 files changed, 503 insertions(+), 254 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc7941c9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 760068a..b999577 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0
+ * Convert SequentialWriter to nio (CASSANDRA-8709)
  * Add role based access control (CASSANDRA-7653, 8650, 7216, 8760, 8849, 8761, 8850)
  * Record client ip address in tracing sessions (CASSANDRA-8162)
  * Indicate partition key columns in response metadata for prepared

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc7941c9/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index 3fc35db..b4d9dcc 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -20,18 +20,19 @@ package org.apache.cassandra.io.compress;
 import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.zip.Adler32;
-import java.util.zip.Checksum;
 
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.DataIntegrityMetadata;
+import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
 import org.apache.cassandra.io.util.FileMark;
 import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.utils.FBUtilities;
 
 import static org.apache.cassandra.io.compress.CompressionMetadata.Writer.OpenType.FINAL;
 import static org.apache.cassandra.io.compress.CompressionMetadata.Writer.OpenType.SHARED;
@@ -50,31 +51,35 @@ public class CompressedSequentialWriter extends SequentialWriter
     private final ICompressor compressor;
 
     // used to store compressed data
-    private final ICompressor.WrappedArray compressed;
+    private final ICompressor.WrappedByteBuffer compressed;
 
     // holds a number of already written chunks
     private int chunkCount = 0;
 
-    private long originalSize = 0, compressedSize = 0;
+    private long uncompressedSize = 0, compressedSize = 0;
 
     private final MetadataCollector sstableMetadataCollector;
 
+    private final ByteBuffer crcCheckBuffer = ByteBuffer.allocate(4);
+
     public CompressedSequentialWriter(File file,
                                       String offsetsPath,
                                       CompressionParameters parameters,
                                       MetadataCollector sstableMetadataCollector)
     {
-        super(file, parameters.chunkLength());
+        super(file, parameters.chunkLength(), parameters.sstableCompressor.useDirectOutputByteBuffers());
         this.compressor = parameters.sstableCompressor;
 
         // buffer for compression should be the same size as buffer itself
-        compressed = new ICompressor.WrappedArray(new byte[compressor.initialCompressedBufferLength(buffer.length)]);
+        compressed = compressor.useDirectOutputByteBuffers()
+            ? new ICompressor.WrappedByteBuffer(ByteBuffer.allocateDirect(compressor.initialCompressedBufferLength(buffer.capacity())))
+            : new ICompressor.WrappedByteBuffer(ByteBuffer.allocate(compressor.initialCompressedBufferLength(buffer.capacity())));
 
         /* Index File (-CompressionInfo.db component) and it's header */
         metadataWriter = CompressionMetadata.Writer.open(parameters, offsetsPath);
 
         this.sstableMetadataCollector = sstableMetadataCollector;
-        crcMetadata = new DataIntegrityMetadata.ChecksumWriter(out);
+        crcMetadata = new DataIntegrityMetadata.ChecksumWriter(new DataOutputStreamAndChannel(channel));
     }
 
     @Override
@@ -82,7 +87,7 @@ public class CompressedSequentialWriter extends SequentialWriter
     {
         try
         {
-            return out.getFilePointer();
+            return channel.position();
         }
         catch (IOException e)
         {
@@ -111,14 +116,19 @@ public class CompressedSequentialWriter extends SequentialWriter
         try
         {
             // compressing data with buffer re-use
-            compressedLength = compressor.compress(buffer, 0, validBufferBytes, compressed, 0);
+            buffer.flip();
+            compressed.buffer.clear();
+            compressedLength = compressor.compress(buffer, compressed);
+
+            // Compressors don't modify sentinels in our BB - we rely on buffer.position() for bufferOffset adjustment
+            buffer.position(buffer.limit());
         }
         catch (IOException e)
         {
             throw new RuntimeException("Compression exception", e); // shouldn't happen
         }
 
-        originalSize += validBufferBytes;
+        uncompressedSize += buffer.position();
         compressedSize += compressedLength;
 
         try
@@ -127,13 +137,19 @@ public class CompressedSequentialWriter extends SequentialWriter
             metadataWriter.addOffset(chunkOffset);
             chunkCount++;
 
-            assert compressedLength <= compressed.buffer.length;
+            assert compressedLength <= compressed.buffer.capacity();
+
+            // write out the compressed data
+            compressed.buffer.flip();
+            channel.write(compressed.buffer);
 
-            // write data itself
-            out.write(compressed.buffer, 0, compressedLength);
             // write corresponding checksum
-            crcMetadata.append(compressed.buffer, 0, compressedLength, true);
+            compressed.buffer.rewind();
+            crcMetadata.appendDirect(compressed.buffer);
             lastFlushOffset += compressedLength + 4;
+
+            // adjust our bufferOffset to account for the new uncompressed data we've now written out
+            resetBuffer();
         }
         catch (IOException e)
         {
@@ -149,7 +165,7 @@ public class CompressedSequentialWriter extends SequentialWriter
     public CompressionMetadata open(long overrideLength, boolean isFinal)
     {
         if (overrideLength <= 0)
-            return metadataWriter.open(originalSize, chunkOffset, isFinal ? FINAL : SHARED_FINAL);
+            return metadataWriter.open(uncompressedSize, chunkOffset, isFinal ? FINAL : SHARED_FINAL);
         // we are early opening the file, make sure we open metadata with the correct size
         assert !isFinal;
         return metadataWriter.open(overrideLength, chunkOffset, SHARED);
@@ -158,7 +174,7 @@ public class CompressedSequentialWriter extends SequentialWriter
     @Override
     public FileMark mark()
     {
-        return new CompressedFileWriterMark(chunkOffset, current, validBufferBytes, chunkCount + 1);
+        return new CompressedFileWriterMark(chunkOffset, current(), buffer.position(), chunkCount + 1);
     }
 
     @Override
@@ -169,46 +185,54 @@ public class CompressedSequentialWriter extends SequentialWriter
         CompressedFileWriterMark realMark = (CompressedFileWriterMark) mark;
 
         // reset position
-        current = realMark.uncDataOffset;
+        long truncateTarget = realMark.uncDataOffset;
 
-        if (realMark.chunkOffset == chunkOffset) // current buffer
+        if (realMark.chunkOffset == chunkOffset)
         {
-            // just reset a buffer offset and return
-            validBufferBytes = realMark.bufferOffset;
+            // simply drop bytes to the right of our mark
+            buffer.position(realMark.validBufferBytes);
             return;
         }
 
-        // synchronize current buffer with disk
-        // because we don't want any data loss
+        // synchronize current buffer with disk - we don't want any data loss
         syncInternal();
 
-        // setting marker as a current offset
         chunkOffset = realMark.chunkOffset;
 
         // compressed chunk size (- 4 bytes reserved for checksum)
         int chunkSize = (int) (metadataWriter.chunkOffsetBy(realMark.nextChunkIndex) - chunkOffset - 4);
-        if (compressed.buffer.length < chunkSize)
-            compressed.buffer = new byte[chunkSize];
+        if (compressed.buffer.capacity() < chunkSize)
+            compressed.buffer = compressor.useDirectOutputByteBuffers()
+                    ? ByteBuffer.allocateDirect(chunkSize)
+                    : ByteBuffer.allocate(chunkSize);
 
         try
         {
-            out.seek(chunkOffset);
-            out.readFully(compressed.buffer, 0, chunkSize);
+            compressed.buffer.clear();
+            compressed.buffer.limit(chunkSize);
+            channel.position(chunkOffset);
+            channel.read(compressed.buffer);
 
             try
             {
-                // repopulate buffer
-                compressor.uncompress(compressed.buffer, 0, chunkSize, buffer, 0);
+                // Repopulate buffer from compressed data
+                buffer.clear();
+                compressed.buffer.flip();
+                compressor.uncompress(compressed.buffer, buffer);
             }
             catch (IOException e)
             {
                 throw new CorruptBlockException(getPath(), chunkOffset, chunkSize);
             }
 
-            Checksum checksum = new Adler32();
-            checksum.update(compressed.buffer, 0, chunkSize);
+            Adler32 checksum = new Adler32();
 
-            if (out.readInt() != (int) checksum.getValue())
+            FBUtilities.directCheckSum(checksum, compressed.buffer);
+
+            crcCheckBuffer.clear();
+            channel.read(crcCheckBuffer);
+            crcCheckBuffer.flip();
+            if (crcCheckBuffer.getInt() != (int) checksum.getValue())
                 throw new CorruptBlockException(getPath(), chunkOffset, chunkSize);
         }
         catch (CorruptBlockException e)
@@ -224,9 +248,11 @@ public class CompressedSequentialWriter extends SequentialWriter
             throw new FSReadError(e, getPath());
         }
 
-        // reset buffer
-        validBufferBytes = realMark.bufferOffset;
-        bufferOffset = current - validBufferBytes;
+        // Mark as dirty so we can guarantee the newly buffered bytes won't be lost on a rebuffer
+        buffer.position(realMark.validBufferBytes);
+        isDirty = true;
+
+        bufferOffset = truncateTarget - buffer.position();
         chunkCount = realMark.nextChunkIndex - 1;
 
         // truncate data and index file
@@ -243,7 +269,7 @@ public class CompressedSequentialWriter extends SequentialWriter
         {
             try
             {
-                out.seek(chunkOffset);
+                channel.position(chunkOffset);
             }
             catch (IOException e)
             {
@@ -256,13 +282,15 @@ public class CompressedSequentialWriter extends SequentialWriter
     public void close()
     {
         if (buffer == null)
-            return; // already closed
+            return;
+
+        long finalPosition = current();
 
         super.close();
-        sstableMetadataCollector.addCompressionRatio(compressedSize, originalSize);
+        sstableMetadataCollector.addCompressionRatio(compressedSize, uncompressedSize);
         try
         {
-            metadataWriter.close(current, chunkCount);
+            metadataWriter.close(finalPosition, chunkCount);
         }
         catch (IOException e)
         {
@@ -292,14 +320,14 @@ public class CompressedSequentialWriter extends SequentialWriter
         // uncompressed data offset (real data offset)
         final long uncDataOffset;
 
-        final int bufferOffset;
+        final int validBufferBytes;
         final int nextChunkIndex;
 
-        public CompressedFileWriterMark(long chunkOffset, long uncDataOffset, int bufferOffset, int nextChunkIndex)
+        public CompressedFileWriterMark(long chunkOffset, long uncDataOffset, int validBufferBytes, int nextChunkIndex)
         {
             this.chunkOffset = chunkOffset;
             this.uncDataOffset = uncDataOffset;
-            this.bufferOffset = bufferOffset;
+            this.validBufferBytes = validBufferBytes;
             this.nextChunkIndex = nextChunkIndex;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc7941c9/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 7237cb9..182cdd2 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -134,6 +134,7 @@ public class CompressionMetadata
         {
             FileUtils.closeQuietly(stream);
         }
+
         this.chunkOffsetsSize = chunkOffsets.size();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc7941c9/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java b/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
index 546b506..bede4da 100644
--- a/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
+++ b/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
@@ -71,29 +71,34 @@ public class DeflateCompressor implements ICompressor
         return chunkLength;
     }
 
-    public int compress(byte[] input, int inputOffset, int inputLength, ICompressor.WrappedArray output, int outputOffset)
+    public int compress(ByteBuffer src, ICompressor.WrappedByteBuffer dest)
     {
+        assert dest.buffer.hasArray();
+
         Deflater def = deflater.get();
         def.reset();
-        def.setInput(input, inputOffset, inputLength);
+        def.setInput(src.array(), src.position(), src.limit());
         def.finish();
         if (def.needsInput())
             return 0;
 
-        int offs = outputOffset;
+        int startPos = dest.buffer.position();
         while (true)
         {
-            offs += def.deflate(output.buffer, offs, output.buffer.length - offs);
+            int arrayOffset = dest.buffer.arrayOffset();
+            int len = def.deflate(dest.buffer.array(), arrayOffset + dest.buffer.position(), dest.buffer.remaining());
+            dest.buffer.position(dest.buffer.position() + len);
             if (def.finished())
             {
-                return offs - outputOffset;
+                return dest.buffer.position() - startPos;
             }
             else
             {
                 // We're not done, output was too small. Increase it and continue
-                byte[] newBuffer = new byte[(output.buffer.length*4)/3 + 1];
-                System.arraycopy(output.buffer, 0, newBuffer, 0, offs);
-                output.buffer = newBuffer;
+                ByteBuffer newDest = ByteBuffer.allocate(dest.buffer.capacity()*4/3 + 1);
+                dest.buffer.rewind();
+                newDest.put(dest.buffer);
+                dest.buffer = newDest;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc7941c9/src/java/org/apache/cassandra/io/compress/ICompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/ICompressor.java b/src/java/org/apache/cassandra/io/compress/ICompressor.java
index 81d1425..0326a9f 100644
--- a/src/java/org/apache/cassandra/io/compress/ICompressor.java
+++ b/src/java/org/apache/cassandra/io/compress/ICompressor.java
@@ -25,11 +25,14 @@ public interface ICompressor
 {
     public int initialCompressedBufferLength(int chunkLength);
 
-    public int compress(byte[] input, int inputOffset, int inputLength, WrappedArray output, int outputOffset) throws IOException;
-
     public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) throws IOException;
 
     /**
+     * Compression for ByteBuffers
+     */
+    public int compress(ByteBuffer input, WrappedByteBuffer output) throws IOException;
+
+    /**
      * Decompression for DirectByteBuffers
      */
     public int uncompress(ByteBuffer input, ByteBuffer output) throws IOException;
@@ -43,18 +46,18 @@ public interface ICompressor
     public Set<String> supportedOptions();
 
     /**
-     * A simple wrapper of a byte array.
-     * Not all implementation allows to know what is the maximum size after
-     * compression. This make it hard to size the ouput buffer for compress
-     * (and we want to reuse the buffer).  Instead we use this wrapped buffer
-     * so that compress can have the liberty to resize underlying array if
-     * need be.
+     * A simple wrapped Bytebuffer.
+     * Not all implementations allow us to know the maximum size after
+     * compression. This makes it hard to size the output buffer for compression
+     * (and we want to reuse the buffer).  Instead we use this wrapped ByteBuffer
+     * so that compress(...) can have the liberty to resize the underlying array if
+     * necessary.
      */
-    public static class WrappedArray
+    public static class WrappedByteBuffer
     {
-        public byte[] buffer;
+        public ByteBuffer buffer;
 
-        public WrappedArray(byte[] buffer)
+        public WrappedByteBuffer(ByteBuffer buffer)
         {
             this.buffer = buffer;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc7941c9/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java b/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
index f32f266..ab10a00 100644
--- a/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
+++ b/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
@@ -24,13 +24,16 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.annotations.VisibleForTesting;
 import net.jpountz.lz4.LZ4Exception;
 import net.jpountz.lz4.LZ4Factory;
 
 public class LZ4Compressor implements ICompressor
 {
     private static final int INTEGER_BYTES = 4;
-    private static final LZ4Compressor instance = new LZ4Compressor();
+
+    @VisibleForTesting
+    public static final LZ4Compressor instance = new LZ4Compressor();
 
     public static LZ4Compressor create(Map<String, String> args)
     {
@@ -52,18 +55,20 @@ public class LZ4Compressor implements ICompressor
         return INTEGER_BYTES + compressor.maxCompressedLength(chunkLength);
     }
 
-    public int compress(byte[] input, int inputOffset, int inputLength, WrappedArray output, int outputOffset) throws IOException
+    public int compress(ByteBuffer src, WrappedByteBuffer dest) throws IOException
     {
-        final byte[] dest = output.buffer;
-        dest[outputOffset] = (byte) inputLength;
-        dest[outputOffset + 1] = (byte) (inputLength >>> 8);
-        dest[outputOffset + 2] = (byte) (inputLength >>> 16);
-        dest[outputOffset + 3] = (byte) (inputLength >>> 24);
-        final int maxCompressedLength = compressor.maxCompressedLength(inputLength);
+        final ByteBuffer buf = dest.buffer;
+        int len = src.remaining();
+        dest.buffer.put((byte) len);
+        dest.buffer.put((byte) (len >>> 8));
+        dest.buffer.put((byte) (len >>> 16));
+        dest.buffer.put((byte) (len >>> 24));
+
+        int start = dest.buffer.position();
         try
         {
-            return INTEGER_BYTES + compressor.compress(input, inputOffset, inputLength,
-                                                       dest, outputOffset + INTEGER_BYTES, maxCompressedLength);
+            compressor.compress(src, dest.buffer);
+            return INTEGER_BYTES + (buf.position() - start);
         }
         catch (LZ4Exception e)
         {
@@ -104,7 +109,6 @@ public class LZ4Compressor implements ICompressor
                 | ((input.get(pos + 1) & 0xFF) << 8)
                 | ((input.get(pos + 2) & 0xFF) << 16)
                 | ((input.get(pos + 3) & 0xFF) << 24);
-
         int inputLength = input.remaining() - INTEGER_BYTES;
 
         final int compressedLength;
@@ -119,7 +123,7 @@ public class LZ4Compressor implements ICompressor
 
         if (compressedLength != inputLength)
         {
-            throw new IOException("Compressed lengths mismatch: "+compressedLength+" vs "+inputLength);
+            throw new IOException("Compressed lengths mismatch - got: "+compressedLength+" vs expected: "+inputLength);
         }
 
         return decompressedLength;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc7941c9/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java b/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java
index 0094042..7a91df1 100644
--- a/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java
+++ b/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java
@@ -87,9 +87,14 @@ public class SnappyCompressor implements ICompressor
         return Snappy.maxCompressedLength(chunkLength);
     }
 
-    public int compress(byte[] input, int inputOffset, int inputLength, ICompressor.WrappedArray output, int outputOffset) throws IOException
+    public int compress(ByteBuffer src, WrappedByteBuffer dest) throws IOException
     {
-        return Snappy.rawCompress(input, inputOffset, inputLength, output.buffer, outputOffset);
+        int result = Snappy.compress(src, dest.buffer);
+
+        // Snappy doesn't match LZ4 and Deflate w/regards to state it leaves dest ByteBuffer's counters in
+        dest.buffer.position(dest.buffer.limit());
+        dest.buffer.limit(dest.buffer.capacity());
+        return result;
     }
 
     public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc7941c9/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
index 526347b..5c5637a 100644
--- a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.io.util;
 
 import java.io.File;
+import java.nio.ByteBuffer;
 
 import org.apache.cassandra.io.sstable.Descriptor;
 
@@ -28,16 +29,19 @@ public class ChecksummedSequentialWriter extends SequentialWriter
 
     public ChecksummedSequentialWriter(File file, int bufferSize, File crcPath)
     {
-        super(file, bufferSize);
-        crcWriter = new SequentialWriter(crcPath, 8 * 1024);
+        super(file, bufferSize, false);
+        crcWriter = new SequentialWriter(crcPath, 8 * 1024, false);
         crcMetadata = new DataIntegrityMetadata.ChecksumWriter(crcWriter.stream);
-        crcMetadata.writeChunkSize(buffer.length);
+        crcMetadata.writeChunkSize(buffer.capacity());
     }
 
     protected void flushData()
     {
         super.flushData();
-        crcMetadata.append(buffer, 0, validBufferBytes, false);
+        ByteBuffer toAppend = buffer.duplicate();
+        toAppend.position(0);
+        toAppend.limit(buffer.position());
+        crcMetadata.appendDirect(toAppend);
     }
 
     public void writeFullChecksum(Descriptor descriptor)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc7941c9/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
index 281eb7b..464f9d2 100644
--- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
+++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.utils.CRC32Factory;
+import org.apache.cassandra.utils.FBUtilities;
 
 public class DataIntegrityMetadata
 {
@@ -87,9 +88,9 @@ public class DataIntegrityMetadata
 
     public static class ChecksumWriter
     {
-        private final Checksum incrementalChecksum = new Adler32();
+        private final Adler32 incrementalChecksum = new Adler32();
         private final DataOutput incrementalOut;
-        private final Checksum fullChecksum = new Adler32();
+        private final Adler32 fullChecksum = new Adler32();
 
         public ChecksumWriter(DataOutput incrementalOut)
         {
@@ -141,6 +142,26 @@ public class DataIntegrityMetadata
             }
         }
 
+        public void appendDirect(ByteBuffer bb)
+        {
+            try
+            {
+                ByteBuffer toAppend = bb.duplicate();
+                toAppend.mark();
+                FBUtilities.directCheckSum(incrementalChecksum, toAppend);
+                toAppend.reset();
+
+                incrementalOut.writeInt((int) incrementalChecksum.getValue());
+                incrementalChecksum.reset();
+
+                FBUtilities.directCheckSum(fullChecksum, toAppend);
+            }
+            catch (IOException e)
+            {
+                throw new IOError(e);
+            }
+        }
+
         public void writeFullChecksum(Descriptor descriptor)
         {
             File outFile = new File(descriptor.filenameFor(Component.DIGEST));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc7941c9/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index 0188259..09efd48 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -309,7 +309,7 @@ public class RandomAccessReader extends AbstractDataInput implements FileDataInp
         if (newPosition >= length()) // it is save to call length() in read-only mode
         {
             if (newPosition > length())
-                throw new IllegalArgumentException(String.format("unable to seek to position %d in %s (%d bytes) in read-only mode",
+                throw new IllegalArgumentException(String.format("Unable to seek to position %d in %s (%d bytes) in read-only mode",
                                                              newPosition, getPath(), length()));
             buffer.limit(0);
             bufferOffset = newPosition;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc7941c9/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 2fcdcbf..f8ea92f 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -20,7 +20,9 @@ package org.apache.cassandra.io.util;
 import java.io.*;
 import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
+import java.nio.channels.FileChannel;
 import java.nio.channels.WritableByteChannel;
+import java.nio.file.StandardOpenOption;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,7 +34,6 @@ import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.CLibrary;
 
 /**
@@ -49,16 +50,16 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
     // absolute path to the given file
     private final String filePath;
 
-    protected byte[] buffer;
+    protected ByteBuffer buffer;
     private final int fd;
     private int directoryFD;
     // directory should be synced only after first file sync, in other words, only once per file
     private boolean directorySynced = false;
 
-    protected long current = 0, bufferOffset;
-    protected int validBufferBytes;
+    // Offset for start of buffer relative to underlying file
+    protected long bufferOffset;
 
-    protected final RandomAccessFile out;
+    protected final FileChannel channel;
 
     // whether to do trickling fsync() to avoid sudden bursts of dirty buffer flushing by kernel causing read
     // latency spikes
@@ -71,44 +72,40 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
 
     protected Runnable runPostFlush;
 
-    public SequentialWriter(File file, int bufferSize)
+    public SequentialWriter(File file, int bufferSize, boolean offheap)
     {
         try
         {
-            out = new RandomAccessFile(file, "rw");
+            if (file.exists())
+                channel = FileChannel.open(file.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE);
+            else
+                channel = FileChannel.open(file.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE);
         }
-        catch (FileNotFoundException e)
+        catch (IOException e)
         {
             throw new RuntimeException(e);
         }
 
         filePath = file.getAbsolutePath();
 
-        buffer = new byte[bufferSize];
+        // Allow children to allocate buffer as direct (snappy compression) if necessary
+        buffer = offheap ? ByteBuffer.allocateDirect(bufferSize) : ByteBuffer.allocate(bufferSize);
+
         this.trickleFsync = DatabaseDescriptor.getTrickleFsync();
         this.trickleFsyncByteInterval = DatabaseDescriptor.getTrickleFsyncIntervalInKb() * 1024;
 
-        try
-        {
-            fd = CLibrary.getfd(out.getFD());
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e); // shouldn't happen
-        }
+        fd = CLibrary.getfd(channel);
 
         directoryFD = CLibrary.tryOpenDirectory(file.getParent());
         stream = new DataOutputStreamAndChannel(this, this);
     }
 
+    /**
+     * Open a heap-based, non-compressed SequentialWriter
+     */
     public static SequentialWriter open(File file)
     {
-        return open(file, RandomAccessReader.DEFAULT_BUFFER_SIZE);
-    }
-
-    public static SequentialWriter open(File file, int bufferSize)
-    {
-        return new SequentialWriter(file, bufferSize);
+        return new SequentialWriter(file, RandomAccessReader.DEFAULT_BUFFER_SIZE, false);
     }
 
     public static ChecksummedSequentialWriter open(File file, File crcPath)
@@ -126,38 +123,28 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
 
     public void write(int value) throws ClosedChannelException
     {
-        if (current >= bufferOffset + buffer.length)
-            reBuffer();
+        if (buffer == null)
+            throw new ClosedChannelException();
 
-        assert current < bufferOffset + buffer.length
-                : String.format("File (%s) offset %d, buffer offset %d.", getPath(), current, bufferOffset);
+        if (!buffer.hasRemaining())
+        {
+            reBuffer();
+        }
 
-        buffer[bufferCursor()] = (byte) value;
+        buffer.put((byte) value);
 
-        validBufferBytes += 1;
-        current += 1;
         isDirty = true;
         syncNeeded = true;
     }
 
-    public void write(byte[] buffer) throws ClosedChannelException
+    public void write(byte[] buffer) throws IOException
     {
-        write(buffer, 0, buffer.length);
+        write(ByteBuffer.wrap(buffer, 0, buffer.length));
     }
 
-    public void write(byte[] data, int offset, int length) throws ClosedChannelException
+    public void write(byte[] data, int offset, int length) throws IOException
     {
-        if (buffer == null)
-            throw new ClosedChannelException();
-
-        while (length > 0)
-        {
-            int n = writeAtMost(data, offset, length);
-            offset += n;
-            length -= n;
-            isDirty = true;
-            syncNeeded = true;
-        }
+        write(ByteBuffer.wrap(data, offset, length));
     }
 
     public int write(ByteBuffer src) throws IOException
@@ -166,75 +153,23 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
             throw new ClosedChannelException();
 
         int length = src.remaining();
-        int offset = src.position();
-        while (length > 0)
+        int finalLimit = src.limit();
+        while (src.hasRemaining())
         {
-            int n = writeAtMost(src, offset, length);
-            offset += n;
-            length -= n;
+            if (!buffer.hasRemaining())
+                reBuffer();
+
+            if (buffer.remaining() < src.remaining())
+                src.limit(src.position() + buffer.remaining());
+            buffer.put(src);
+            src.limit(finalLimit);
+
             isDirty = true;
             syncNeeded = true;
         }
-        src.position(offset);
         return length;
     }
 
-    /*
-     * Write at most "length" bytes from "data" starting at position "offset", and
-     * return the number of bytes written. caller is responsible for setting
-     * isDirty.
-     */
-    private int writeAtMost(ByteBuffer data, int offset, int length)
-    {
-        if (current >= bufferOffset + buffer.length)
-            reBuffer();
-
-        assert current < bufferOffset + buffer.length
-        : String.format("File (%s) offset %d, buffer offset %d.", getPath(), current, bufferOffset);
-
-
-        int toCopy = Math.min(length, buffer.length - bufferCursor());
-
-        // copy bytes from external buffer
-        ByteBufferUtil.arrayCopy(data, offset, buffer, bufferCursor(), toCopy);
-
-        assert current <= bufferOffset + buffer.length
-        : String.format("File (%s) offset %d, buffer offset %d.", getPath(), current, bufferOffset);
-
-        validBufferBytes = Math.max(validBufferBytes, bufferCursor() + toCopy);
-        current += toCopy;
-
-        return toCopy;
-    }
-
-    /*
-     * Write at most "length" bytes from "data" starting at position "offset", and
-     * return the number of bytes written. caller is responsible for setting
-     * isDirty.
-     */
-    private int writeAtMost(byte[] data, int offset, int length)
-    {
-        if (current >= bufferOffset + buffer.length)
-            reBuffer();
-
-        assert current < bufferOffset + buffer.length
-                : String.format("File (%s) offset %d, buffer offset %d.", getPath(), current, bufferOffset);
-
-
-        int toCopy = Math.min(length, buffer.length - bufferCursor());
-
-        // copy bytes from external buffer
-        System.arraycopy(data, offset, buffer, bufferCursor(), toCopy);
-
-        assert current <= bufferOffset + buffer.length
-                : String.format("File (%s) offset %d, buffer offset %d.", getPath(), current, bufferOffset);
-
-        validBufferBytes = Math.max(validBufferBytes, bufferCursor() + toCopy);
-        current += toCopy;
-
-        return toCopy;
-    }
-
     /**
      * Synchronize file contents with disk.
      */
@@ -247,7 +182,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
     {
         try
         {
-            out.getFD().sync();
+            channel.force(false);
         }
         catch (IOException e)
         {
@@ -291,7 +226,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
 
             if (trickleFsync)
             {
-                bytesSinceTrickleFsync += validBufferBytes;
+                bytesSinceTrickleFsync += buffer.position();
                 if (bytesSinceTrickleFsync >= trickleFsyncByteInterval)
                 {
                     syncDataOnlyInternal();
@@ -320,8 +255,9 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
     {
         try
         {
-            out.write(buffer, 0, validBufferBytes);
-            lastFlushOffset += validBufferBytes;
+            buffer.flip();
+            channel.write(buffer);
+            lastFlushOffset += buffer.position();
         }
         catch (IOException e)
         {
@@ -333,7 +269,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
 
     public long getFilePointer()
     {
-        return current;
+        return current();
     }
 
     /**
@@ -342,7 +278,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
      * size and not every write to the writer will modify this value.
      * Furthermore, for compressed files, this value refers to compressed data, while the
      * writer getFilePointer() refers to uncompressedFile
-     * 
+     *
      * @return the current file pointer
      */
     public long getOnDiskFilePointer()
@@ -354,7 +290,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
     {
         try
         {
-            return Math.max(Math.max(current, out.length()), bufferOffset + validBufferBytes);
+            return Math.max(current(), channel.size());
         }
         catch (IOException e)
         {
@@ -375,44 +311,48 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
 
     protected void resetBuffer()
     {
-        bufferOffset = current;
-        validBufferBytes = 0;
+        bufferOffset = current();
+        buffer.clear();
     }
 
-    private int bufferCursor()
+    protected long current()
     {
-        return (int) (current - bufferOffset);
+        return bufferOffset + (buffer == null ? 0 : buffer.position());
     }
 
     public FileMark mark()
     {
-        return new BufferedFileWriterMark(current);
+        return new BufferedFileWriterMark(current());
     }
 
+    /**
+     * Drops all buffered data that's past the limits of our new file mark + buffer capacity, or syncs and truncates
+     * the underlying file to the marked position
+     */
     public void resetAndTruncate(FileMark mark)
     {
         assert mark instanceof BufferedFileWriterMark;
 
-        long previous = current;
-        current = ((BufferedFileWriterMark) mark).pointer;
+        long previous = current();
+        long truncateTarget = ((BufferedFileWriterMark) mark).pointer;
 
-        if (previous - current <= validBufferBytes) // current buffer
+        // If we're resetting to a point within our buffered data, just adjust our buffered position to drop bytes to
+        // the right of the desired mark.
+        if (previous - truncateTarget <= buffer.position())
         {
-            validBufferBytes = validBufferBytes - ((int) (previous - current));
+            buffer.position(buffer.position() - ((int) (previous - truncateTarget)));
             return;
         }
 
-        // synchronize current buffer with disk
-        // because we don't want any data loss
+        // synchronize current buffer with disk - we don't want any data loss
         syncInternal();
 
         // truncate file to given position
-        truncate(current);
+        truncate(truncateTarget);
 
-        // reset channel position
         try
         {
-            out.seek(current);
+            channel.position(truncateTarget);
         }
         catch (IOException e)
         {
@@ -431,7 +371,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
     {
         try
         {
-            out.getChannel().truncate(toSize);
+            channel.truncate(toSize);
         }
         catch (IOException e)
         {
@@ -441,7 +381,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
 
     public boolean isOpen()
     {
-        return out.getChannel().isOpen();
+        return channel.isOpen();
     }
 
     @Override
@@ -472,7 +412,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
         }
 
         // close is idempotent
-        try { out.close(); }
+        try { channel.close(); }
         catch (Throwable t) { handle(t, throwExceptions); }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc7941c9/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index f8a0ea7..2263e46 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -667,7 +667,8 @@ public class FBUtilities
             {
                 directUpdate.invoke(checksum, bb);
                 return;
-            } catch (IllegalAccessException e)
+            }
+            catch (IllegalAccessException e)
             {
                 directUpdate = null;
                 logger.warn("JVM doesn't support Adler32 byte buffer access");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc7941c9/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
index 58bf5cb..2eff19b 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
@@ -55,6 +55,7 @@ public class CompressedRandomAccessReaderTest
         testResetAndTruncate(File.createTempFile("compressed", "1"), true, 10);
         testResetAndTruncate(File.createTempFile("compressed", "2"), true, CompressionParameters.DEFAULT_CHUNK_LENGTH);
     }
+
     @Test
     public void test6791() throws IOException, ConfigurationException
     {
@@ -105,7 +106,7 @@ public class CompressedRandomAccessReaderTest
             MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance)).replayPosition(null);
             SequentialWriter writer = compressed
                 ? new CompressedSequentialWriter(f, filename + ".metadata", new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector)
-                : new SequentialWriter(f, CompressionParameters.DEFAULT_CHUNK_LENGTH);
+                : new SequentialWriter(f, CompressionParameters.DEFAULT_CHUNK_LENGTH, false);
 
             writer.write("The quick ".getBytes());
             FileMark mark = writer.mark();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc7941c9/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
new file mode 100644
index 0000000..ab904f9
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.compress;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+
+import org.apache.cassandra.db.composites.SimpleDenseCellNameType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.RandomAccessReader;
+
+public class CompressedSequentialWriterTest
+{
+    private ICompressor compressor;
+
+    private void runTests(String testName) throws IOException
+    {
+        // Test small < 1 chunk data set
+        testWrite(File.createTempFile(testName + "_small", "1"), 25);
+
+        // Test to confirm pipeline w/chunk-aligned data writes works
+        testWrite(File.createTempFile(testName + "_chunkAligned", "1"), CompressionParameters.DEFAULT_CHUNK_LENGTH);
+
+        // Test to confirm pipeline on non-chunk boundaries works
+        testWrite(File.createTempFile(testName + "_large", "1"), CompressionParameters.DEFAULT_CHUNK_LENGTH * 3 + 100);
+    }
+
+    @Test
+    public void testLZ4Writer() throws IOException
+    {
+        compressor = LZ4Compressor.instance;
+        runTests("LZ4");
+    }
+
+    @Test
+    public void testDeflateWriter() throws IOException
+    {
+        compressor = DeflateCompressor.instance;
+        runTests("Deflate");
+    }
+
+    @Test
+    public void testSnappyWriter() throws IOException
+    {
+        compressor = SnappyCompressor.instance;
+        runTests("Snappy");
+    }
+
+    private void testWrite(File f, int bytesToTest) throws IOException
+    {
+        try
+        {
+            final String filename = f.getAbsolutePath();
+
+            MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance)).replayPosition(null);
+            CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata", new CompressionParameters(compressor), sstableMetadataCollector);
+
+            byte[] dataPre = new byte[bytesToTest];
+            byte[] rawPost = new byte[bytesToTest];
+            Random r = new Random();
+
+            // Test both write with byte[] and ByteBuffer
+            r.nextBytes(dataPre);
+            r.nextBytes(rawPost);
+            ByteBuffer dataPost = makeBB(bytesToTest);
+            dataPost.put(rawPost);
+            dataPost.flip();
+
+            writer.write(dataPre);
+            FileMark mark = writer.mark();
+
+            // Write enough garbage to transition chunk
+            for (int i = 0; i < CompressionParameters.DEFAULT_CHUNK_LENGTH; i++)
+            {
+                writer.write((byte)i);
+            }
+            writer.resetAndTruncate(mark);
+            writer.write(dataPost);
+            writer.close();
+
+            assert f.exists();
+            RandomAccessReader reader = CompressedRandomAccessReader.open(filename, new CompressionMetadata(filename + ".metadata", f.length()));
+            assertEquals(dataPre.length + rawPost.length, reader.length());
+            byte[] result = new byte[(int)reader.length()];
+
+            reader.readFully(result);
+
+            assert(reader.isEOF());
+            reader.close();
+
+            byte[] fullInput = new byte[bytesToTest * 2];
+            System.arraycopy(dataPre, 0, fullInput, 0, dataPre.length);
+            System.arraycopy(rawPost, 0, fullInput, bytesToTest, rawPost.length);
+            assert Arrays.equals(result, fullInput);
+        }
+        finally
+        {
+            // cleanup
+            if (f.exists())
+                f.delete();
+            File metadata = new File(f + ".metadata");
+            if (metadata.exists())
+                metadata.delete();
+        }
+    }
+
+    private ByteBuffer makeBB(int size)
+    {
+        return compressor.useDirectOutputByteBuffers()
+                ? ByteBuffer.allocateDirect(size)
+                : ByteBuffer.allocate(size);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc7941c9/test/unit/org/apache/cassandra/io/compress/CompressorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressorTest.java b/test/unit/org/apache/cassandra/io/compress/CompressorTest.java
index 5df0b29..53021ee 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressorTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressorTest.java
@@ -20,17 +20,21 @@ package org.apache.cassandra.io.compress;
 import java.io.*;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Random;
 
-import com.google.common.io.Files;
-import org.apache.cassandra.io.compress.ICompressor.WrappedArray;
+import static org.junit.Assert.*;
 
+import org.apache.cassandra.io.util.RandomAccessReader;
 import org.junit.Assert;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import com.google.common.io.Files;
+
+import org.apache.cassandra.io.compress.ICompressor.WrappedByteBuffer;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class CompressorTest
 {
@@ -42,7 +46,6 @@ public class CompressorTest
             SnappyCompressor.create(Collections.<String, String>emptyMap())
     };
 
-
     @Test
     public void testAllCompressors() throws IOException
     {
@@ -57,69 +60,90 @@ public class CompressorTest
         }
     }
 
-
-    public void test(byte[] data, int off, int len) throws IOException
+    public void testArrayUncompress(byte[] data, int off, int len) throws IOException
     {
+        ByteBuffer src = makeBB(len);
+        src.put(data, off, len);
+        src.rewind();
+
         final int outOffset = 3;
-        final WrappedArray out = new WrappedArray(new byte[outOffset + compressor.initialCompressedBufferLength(len)]);
-        new Random().nextBytes(out.buffer);
-        final int compressedLength = compressor.compress(data, off, len, out, outOffset);
-        final int restoredOffset = 5;
-        final byte[] restored = new byte[restoredOffset + len];
+        final WrappedByteBuffer compressed = makeWrappedBB(outOffset + compressor.initialCompressedBufferLength(len));
+        fillBBWithRandom(compressed.buffer);
+        compressed.buffer.clear();
+        compressed.buffer.position(outOffset);
+
+        final int compressedLength = compressor.compress(src, compressed);
+
+        final int restoreOffset = 5;
+        final byte[] restored = new byte[restoreOffset + len];
         new Random().nextBytes(restored);
-        final int decompressedLength = compressor.uncompress(out.buffer, outOffset, compressedLength, restored, restoredOffset);
+
+        // need byte[] representation which direct buffers don't have
+        byte[] compressedBytes = new byte[compressed.buffer.capacity()];
+        ByteBufferUtil.arrayCopy(compressed.buffer, outOffset, compressedBytes, 0, compressed.buffer.capacity() - outOffset);
+
+        final int decompressedLength = compressor.uncompress(compressedBytes, 0, compressedLength, restored, restoreOffset);
+
         assertEquals(decompressedLength, len);
         assertArrayEquals(Arrays.copyOfRange(data, off, off + len),
-                          Arrays.copyOfRange(restored, restoredOffset, restoredOffset + decompressedLength));
+                Arrays.copyOfRange(restored, restoreOffset, restoreOffset + decompressedLength));
     }
 
-    public void test(byte[] data) throws IOException
+    public void testArrayUncompress(byte[] data) throws IOException
     {
-        test(data, 0, data.length);
+        testArrayUncompress(data, 0, data.length);
     }
 
     public void testEmptyArray() throws IOException
     {
-        test(new byte[0]);
+        testArrayUncompress(new byte[0]);
     }
 
     public void testShortArray() throws UnsupportedEncodingException, IOException
     {
-        test("Cassandra".getBytes("UTF-8"), 1, 7);
+        testArrayUncompress("Cassandra".getBytes("UTF-8"), 1, 7);
     }
 
     public void testLongArray() throws UnsupportedEncodingException, IOException
     {
         byte[] data = new byte[1 << 20];
-        test(data, 13, 1 << 19);
+        testArrayUncompress(data, 13, 1 << 19);
         new Random(0).nextBytes(data);
-        test(data, 13, 1 << 19);
+        testArrayUncompress(data, 13, 1 << 19);
     }
 
     public void testMappedFile() throws IOException
     {
         byte[] data = new byte[1 << 20];
         new Random().nextBytes(data);
+        ByteBuffer src = makeBB(data.length);
+        src.put(data);
+        src.flip();
 
-        //create a temp file
+        // create a temp file
         File temp = File.createTempFile("tempfile", ".tmp");
         temp.deleteOnExit();
 
-        //Prepend some random bytes to the output and compress
+        // Prepend some random bytes to the output and compress
         final int outOffset = 3;
-        final WrappedArray out = new WrappedArray(new byte[outOffset + compressor.initialCompressedBufferLength(data.length)]);
-        new Random().nextBytes(out.buffer);
-        final int compressedLength = compressor.compress(data, 0, data.length, out, outOffset);
-        Files.write(out.buffer, temp);
+        byte[] garbage = new byte[outOffset + compressor.initialCompressedBufferLength(data.length)];
+        new Random().nextBytes(garbage);
+        WrappedByteBuffer dest = makeWrappedBB(outOffset + compressor.initialCompressedBufferLength(data.length));
+        dest.buffer.put(garbage);
+        dest.buffer.clear();
+        dest.buffer.position(outOffset);
+
+        final int compressedLength = compressor.compress(src, dest);
+
+        FileChannel channel = new FileOutputStream(temp, false).getChannel();
+        dest.buffer.clear();
+        channel.write(dest.buffer);
 
         MappedByteBuffer mappedData = Files.map(temp);
         mappedData.position(outOffset);
-        mappedData.limit(compressedLength+outOffset);
-
+        mappedData.limit(compressedLength + outOffset);
 
-        ByteBuffer result = compressor.useDirectOutputByteBuffers()
-                ? ByteBuffer.allocateDirect(data.length + 100)
-                : ByteBuffer.allocate(data.length + 100);
+        ByteBuffer result = makeBB(data.length + 100);
 
         int length = compressor.uncompress(mappedData, result);
 
@@ -129,4 +153,79 @@ public class CompressorTest
             Assert.assertEquals("Decompression mismatch at byte "+i, data[i], result.get());
         }
     }
+
+    @Test
+    public void testLZ4ByteBuffers() throws IOException
+    {
+        compressor = LZ4Compressor.create(Collections.<String, String>emptyMap());
+        testByteBuffers();
+    }
+
+    @Test
+    public void testDeflateByteBuffers() throws IOException
+    {
+        compressor = DeflateCompressor.create(Collections.<String, String>emptyMap());
+        testByteBuffers();
+    }
+
+    @Test
+    public void testSnappyByteBuffers() throws IOException
+    {
+        compressor = SnappyCompressor.create(Collections.<String, String>emptyMap());
+        testByteBuffers();
+    }
+
+    private void testByteBuffers() throws IOException
+    {
+        int n = RandomAccessReader.DEFAULT_BUFFER_SIZE;
+        byte[] srcData = new byte[n];
+        new Random().nextBytes(srcData);
+
+        ByteBuffer src = makeBB(n);
+        src.put(srcData, 0, n);
+        src.flip();
+
+        int outOffset = 5;
+        ICompressor.WrappedByteBuffer compressed = makeWrappedBB(outOffset + compressor.initialCompressedBufferLength(srcData.length));
+        byte[] garbage = new byte[compressed.buffer.capacity()];
+        new Random().nextBytes(garbage);
+        compressed.buffer.put(garbage);
+        compressed.buffer.clear();
+        compressed.buffer.position(outOffset);
+
+        compressor.compress(src, compressed);
+        compressed.buffer.flip();
+        compressed.buffer.position(outOffset);
+
+        ByteBuffer result = makeBB(outOffset + n);
+        int decompressed = compressor.uncompress(compressed.buffer, result);
+
+        assert decompressed == n;
+        for (int i = 0; i < n; ++i)
+            assert srcData[i] == result.get(i) : "Failed comparison on index: " + i + " with compressor: " + compressor.getClass().toString();
+    }
+
+    private ByteBuffer makeBB(int size)
+    {
+        return compressor.useDirectOutputByteBuffers()
+                ? ByteBuffer.allocateDirect(size)
+                : ByteBuffer.allocate(size);
+    }
+
+    private WrappedByteBuffer makeWrappedBB(int size)
+    {
+        return compressor.useDirectOutputByteBuffers()
+                ? new WrappedByteBuffer(ByteBuffer.allocateDirect(size))
+                : new WrappedByteBuffer(ByteBuffer.allocate(size));
+    }
+
+    private void fillBBWithRandom(ByteBuffer dest)
+    {
+        ByteBuffer dupe = dest.duplicate();
+        byte[] random = new byte[dest.capacity()];
+        new Random().nextBytes(random);
+        dest.clear();
+        dest.put(random);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc7941c9/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
index f2db428..624ca9b 100644
--- a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
+++ b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
@@ -41,7 +41,6 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class DataOutputTest
 {
-
     @Test
     public void testDataOutputStreamPlus() throws IOException
     {
@@ -145,7 +144,7 @@ public class DataOutputTest
     public void testSequentialWriter() throws IOException
     {
         File file = FileUtils.createTempFile("dataoutput", "test");
-        final SequentialWriter writer = new SequentialWriter(file, 32);
+        final SequentialWriter writer = new SequentialWriter(file, 32, false);
         DataOutputStreamAndChannel write = new DataOutputStreamAndChannel(writer, writer);
         DataInput canon = testWrite(write);
         write.flush();
@@ -162,32 +161,33 @@ public class DataOutputTest
         final DataOutput canon = new DataOutputStream(bos);
         Random rnd = ThreadLocalRandom.current();
 
-        byte[] bytes = new byte[50];
+        int size = 50;
+        byte[] bytes = new byte[size];
         rnd.nextBytes(bytes);
         ByteBufferUtil.writeWithLength(bytes, test);
         ByteBufferUtil.writeWithLength(bytes, canon);
 
-        bytes = new byte[50];
+        bytes = new byte[size];
         rnd.nextBytes(bytes);
         ByteBufferUtil.writeWithLength(wrap(bytes, false), test);
         ByteBufferUtil.writeWithLength(bytes, canon);
 
-        bytes = new byte[50];
+        bytes = new byte[size];
         rnd.nextBytes(bytes);
         ByteBufferUtil.writeWithLength(wrap(bytes, true), test);
         ByteBufferUtil.writeWithLength(bytes, canon);
 
-        bytes = new byte[50];
+        bytes = new byte[size];
         rnd.nextBytes(bytes);
         ByteBufferUtil.writeWithShortLength(bytes, test);
         ByteBufferUtil.writeWithShortLength(bytes, canon);
 
-        bytes = new byte[50];
+        bytes = new byte[size];
         rnd.nextBytes(bytes);
         ByteBufferUtil.writeWithShortLength(wrap(bytes, false), test);
         ByteBufferUtil.writeWithShortLength(bytes, canon);
 
-        bytes = new byte[50];
+        bytes = new byte[size];
         rnd.nextBytes(bytes);
         ByteBufferUtil.writeWithShortLength(wrap(bytes, true), test);
         ByteBufferUtil.writeWithShortLength(bytes, canon);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc7941c9/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index 128ec3c..016deb3 100644
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -117,8 +117,8 @@ public class CompressedInputStreamTest
         for (int i = 0; i < sections.size(); i++)
         {
             input.position(sections.get(i).left);
-            long exp = in.readLong();
-            assert exp == valuesToCheck[i] : "expected " + valuesToCheck[i] + " but was " + exp;
+            long readValue = in.readLong();
+            assert readValue == valuesToCheck[i] : "expected " + valuesToCheck[i] + " but was " + readValue;
         }
     }
 }