You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2015/03/11 17:45:19 UTC
cassandra git commit: Convert SequentialWriter to nio
Repository: cassandra
Updated Branches:
refs/heads/trunk c38ea0b1b -> bc7941c9d
Convert SequentialWriter to nio
Patch by jmckenzie; reviewed by tjake for CASSANDRA-8709
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bc7941c9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bc7941c9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bc7941c9
Branch: refs/heads/trunk
Commit: bc7941c9da7ad17fdf4793de411541876a1248ff
Parents: c38ea0b
Author: Joshua McKenzie <jm...@apache.org>
Authored: Wed Mar 11 11:43:04 2015 -0500
Committer: Joshua McKenzie <jm...@apache.org>
Committed: Wed Mar 11 11:43:04 2015 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../io/compress/CompressedSequentialWriter.java | 112 +++++++----
.../io/compress/CompressionMetadata.java | 1 +
.../io/compress/DeflateCompressor.java | 21 +-
.../cassandra/io/compress/ICompressor.java | 25 ++-
.../cassandra/io/compress/LZ4Compressor.java | 28 +--
.../cassandra/io/compress/SnappyCompressor.java | 9 +-
.../io/util/ChecksummedSequentialWriter.java | 12 +-
.../io/util/DataIntegrityMetadata.java | 25 ++-
.../cassandra/io/util/RandomAccessReader.java | 2 +-
.../cassandra/io/util/SequentialWriter.java | 200 +++++++------------
.../org/apache/cassandra/utils/FBUtilities.java | 3 +-
.../CompressedRandomAccessReaderTest.java | 3 +-
.../CompressedSequentialWriterTest.java | 136 +++++++++++++
.../cassandra/io/compress/CompressorTest.java | 159 ++++++++++++---
.../cassandra/io/util/DataOutputTest.java | 16 +-
.../compress/CompressedInputStreamTest.java | 4 +-
17 files changed, 503 insertions(+), 254 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc7941c9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 760068a..b999577 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0
+ * Convert SequentialWriter to nio (CASSANDRA-8709)
* Add role based access control (CASSANDRA-7653, 8650, 7216, 8760, 8849, 8761, 8850)
* Record client ip address in tracing sessions (CASSANDRA-8162)
* Indicate partition key columns in response metadata for prepared
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc7941c9/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index 3fc35db..b4d9dcc 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -20,18 +20,19 @@ package org.apache.cassandra.io.compress;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.zip.Adler32;
-import java.util.zip.Checksum;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.DataIntegrityMetadata;
+import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
import org.apache.cassandra.io.util.FileMark;
import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.utils.FBUtilities;
import static org.apache.cassandra.io.compress.CompressionMetadata.Writer.OpenType.FINAL;
import static org.apache.cassandra.io.compress.CompressionMetadata.Writer.OpenType.SHARED;
@@ -50,31 +51,35 @@ public class CompressedSequentialWriter extends SequentialWriter
private final ICompressor compressor;
// used to store compressed data
- private final ICompressor.WrappedArray compressed;
+ private final ICompressor.WrappedByteBuffer compressed;
// holds a number of already written chunks
private int chunkCount = 0;
- private long originalSize = 0, compressedSize = 0;
+ private long uncompressedSize = 0, compressedSize = 0;
private final MetadataCollector sstableMetadataCollector;
+ private final ByteBuffer crcCheckBuffer = ByteBuffer.allocate(4);
+
public CompressedSequentialWriter(File file,
String offsetsPath,
CompressionParameters parameters,
MetadataCollector sstableMetadataCollector)
{
- super(file, parameters.chunkLength());
+ super(file, parameters.chunkLength(), parameters.sstableCompressor.useDirectOutputByteBuffers());
this.compressor = parameters.sstableCompressor;
// buffer for compression should be the same size as buffer itself
- compressed = new ICompressor.WrappedArray(new byte[compressor.initialCompressedBufferLength(buffer.length)]);
+ compressed = compressor.useDirectOutputByteBuffers()
+ ? new ICompressor.WrappedByteBuffer(ByteBuffer.allocateDirect(compressor.initialCompressedBufferLength(buffer.capacity())))
+ : new ICompressor.WrappedByteBuffer(ByteBuffer.allocate(compressor.initialCompressedBufferLength(buffer.capacity())));
/* Index File (-CompressionInfo.db component) and it's header */
metadataWriter = CompressionMetadata.Writer.open(parameters, offsetsPath);
this.sstableMetadataCollector = sstableMetadataCollector;
- crcMetadata = new DataIntegrityMetadata.ChecksumWriter(out);
+ crcMetadata = new DataIntegrityMetadata.ChecksumWriter(new DataOutputStreamAndChannel(channel));
}
@Override
@@ -82,7 +87,7 @@ public class CompressedSequentialWriter extends SequentialWriter
{
try
{
- return out.getFilePointer();
+ return channel.position();
}
catch (IOException e)
{
@@ -111,14 +116,19 @@ public class CompressedSequentialWriter extends SequentialWriter
try
{
// compressing data with buffer re-use
- compressedLength = compressor.compress(buffer, 0, validBufferBytes, compressed, 0);
+ buffer.flip();
+ compressed.buffer.clear();
+ compressedLength = compressor.compress(buffer, compressed);
+
+ // Compressors don't modify sentinels in our BB - we rely on buffer.position() for bufferOffset adjustment
+ buffer.position(buffer.limit());
}
catch (IOException e)
{
throw new RuntimeException("Compression exception", e); // shouldn't happen
}
- originalSize += validBufferBytes;
+ uncompressedSize += buffer.position();
compressedSize += compressedLength;
try
@@ -127,13 +137,19 @@ public class CompressedSequentialWriter extends SequentialWriter
metadataWriter.addOffset(chunkOffset);
chunkCount++;
- assert compressedLength <= compressed.buffer.length;
+ assert compressedLength <= compressed.buffer.capacity();
+
+ // write out the compressed data
+ compressed.buffer.flip();
+ channel.write(compressed.buffer);
- // write data itself
- out.write(compressed.buffer, 0, compressedLength);
// write corresponding checksum
- crcMetadata.append(compressed.buffer, 0, compressedLength, true);
+ compressed.buffer.rewind();
+ crcMetadata.appendDirect(compressed.buffer);
lastFlushOffset += compressedLength + 4;
+
+ // adjust our bufferOffset to account for the new uncompressed data we've now written out
+ resetBuffer();
}
catch (IOException e)
{
@@ -149,7 +165,7 @@ public class CompressedSequentialWriter extends SequentialWriter
public CompressionMetadata open(long overrideLength, boolean isFinal)
{
if (overrideLength <= 0)
- return metadataWriter.open(originalSize, chunkOffset, isFinal ? FINAL : SHARED_FINAL);
+ return metadataWriter.open(uncompressedSize, chunkOffset, isFinal ? FINAL : SHARED_FINAL);
// we are early opening the file, make sure we open metadata with the correct size
assert !isFinal;
return metadataWriter.open(overrideLength, chunkOffset, SHARED);
@@ -158,7 +174,7 @@ public class CompressedSequentialWriter extends SequentialWriter
@Override
public FileMark mark()
{
- return new CompressedFileWriterMark(chunkOffset, current, validBufferBytes, chunkCount + 1);
+ return new CompressedFileWriterMark(chunkOffset, current(), buffer.position(), chunkCount + 1);
}
@Override
@@ -169,46 +185,54 @@ public class CompressedSequentialWriter extends SequentialWriter
CompressedFileWriterMark realMark = (CompressedFileWriterMark) mark;
// reset position
- current = realMark.uncDataOffset;
+ long truncateTarget = realMark.uncDataOffset;
- if (realMark.chunkOffset == chunkOffset) // current buffer
+ if (realMark.chunkOffset == chunkOffset)
{
- // just reset a buffer offset and return
- validBufferBytes = realMark.bufferOffset;
+ // simply drop bytes to the right of our mark
+ buffer.position(realMark.validBufferBytes);
return;
}
- // synchronize current buffer with disk
- // because we don't want any data loss
+ // synchronize current buffer with disk - we don't want any data loss
syncInternal();
- // setting marker as a current offset
chunkOffset = realMark.chunkOffset;
// compressed chunk size (- 4 bytes reserved for checksum)
int chunkSize = (int) (metadataWriter.chunkOffsetBy(realMark.nextChunkIndex) - chunkOffset - 4);
- if (compressed.buffer.length < chunkSize)
- compressed.buffer = new byte[chunkSize];
+ if (compressed.buffer.capacity() < chunkSize)
+ compressed.buffer = compressor.useDirectOutputByteBuffers()
+ ? ByteBuffer.allocateDirect(chunkSize)
+ : ByteBuffer.allocate(chunkSize);
try
{
- out.seek(chunkOffset);
- out.readFully(compressed.buffer, 0, chunkSize);
+ compressed.buffer.clear();
+ compressed.buffer.limit(chunkSize);
+ channel.position(chunkOffset);
+ channel.read(compressed.buffer);
try
{
- // repopulate buffer
- compressor.uncompress(compressed.buffer, 0, chunkSize, buffer, 0);
+ // Repopulate buffer from compressed data
+ buffer.clear();
+ compressed.buffer.flip();
+ compressor.uncompress(compressed.buffer, buffer);
}
catch (IOException e)
{
throw new CorruptBlockException(getPath(), chunkOffset, chunkSize);
}
- Checksum checksum = new Adler32();
- checksum.update(compressed.buffer, 0, chunkSize);
+ Adler32 checksum = new Adler32();
- if (out.readInt() != (int) checksum.getValue())
+ FBUtilities.directCheckSum(checksum, compressed.buffer);
+
+ crcCheckBuffer.clear();
+ channel.read(crcCheckBuffer);
+ crcCheckBuffer.flip();
+ if (crcCheckBuffer.getInt() != (int) checksum.getValue())
throw new CorruptBlockException(getPath(), chunkOffset, chunkSize);
}
catch (CorruptBlockException e)
@@ -224,9 +248,11 @@ public class CompressedSequentialWriter extends SequentialWriter
throw new FSReadError(e, getPath());
}
- // reset buffer
- validBufferBytes = realMark.bufferOffset;
- bufferOffset = current - validBufferBytes;
+ // Mark as dirty so we can guarantee the newly buffered bytes won't be lost on a rebuffer
+ buffer.position(realMark.validBufferBytes);
+ isDirty = true;
+
+ bufferOffset = truncateTarget - buffer.position();
chunkCount = realMark.nextChunkIndex - 1;
// truncate data and index file
@@ -243,7 +269,7 @@ public class CompressedSequentialWriter extends SequentialWriter
{
try
{
- out.seek(chunkOffset);
+ channel.position(chunkOffset);
}
catch (IOException e)
{
@@ -256,13 +282,15 @@ public class CompressedSequentialWriter extends SequentialWriter
public void close()
{
if (buffer == null)
- return; // already closed
+ return;
+
+ long finalPosition = current();
super.close();
- sstableMetadataCollector.addCompressionRatio(compressedSize, originalSize);
+ sstableMetadataCollector.addCompressionRatio(compressedSize, uncompressedSize);
try
{
- metadataWriter.close(current, chunkCount);
+ metadataWriter.close(finalPosition, chunkCount);
}
catch (IOException e)
{
@@ -292,14 +320,14 @@ public class CompressedSequentialWriter extends SequentialWriter
// uncompressed data offset (real data offset)
final long uncDataOffset;
- final int bufferOffset;
+ final int validBufferBytes;
final int nextChunkIndex;
- public CompressedFileWriterMark(long chunkOffset, long uncDataOffset, int bufferOffset, int nextChunkIndex)
+ public CompressedFileWriterMark(long chunkOffset, long uncDataOffset, int validBufferBytes, int nextChunkIndex)
{
this.chunkOffset = chunkOffset;
this.uncDataOffset = uncDataOffset;
- this.bufferOffset = bufferOffset;
+ this.validBufferBytes = validBufferBytes;
this.nextChunkIndex = nextChunkIndex;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc7941c9/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 7237cb9..182cdd2 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -134,6 +134,7 @@ public class CompressionMetadata
{
FileUtils.closeQuietly(stream);
}
+
this.chunkOffsetsSize = chunkOffsets.size();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc7941c9/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java b/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
index 546b506..bede4da 100644
--- a/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
+++ b/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
@@ -71,29 +71,34 @@ public class DeflateCompressor implements ICompressor
return chunkLength;
}
- public int compress(byte[] input, int inputOffset, int inputLength, ICompressor.WrappedArray output, int outputOffset)
+ public int compress(ByteBuffer src, ICompressor.WrappedByteBuffer dest)
{
+ assert dest.buffer.hasArray();
+
Deflater def = deflater.get();
def.reset();
- def.setInput(input, inputOffset, inputLength);
+ def.setInput(src.array(), src.position(), src.limit());
def.finish();
if (def.needsInput())
return 0;
- int offs = outputOffset;
+ int startPos = dest.buffer.position();
while (true)
{
- offs += def.deflate(output.buffer, offs, output.buffer.length - offs);
+ int arrayOffset = dest.buffer.arrayOffset();
+ int len = def.deflate(dest.buffer.array(), arrayOffset + dest.buffer.position(), dest.buffer.remaining());
+ dest.buffer.position(dest.buffer.position() + len);
if (def.finished())
{
- return offs - outputOffset;
+ return dest.buffer.position() - startPos;
}
else
{
// We're not done, output was too small. Increase it and continue
- byte[] newBuffer = new byte[(output.buffer.length*4)/3 + 1];
- System.arraycopy(output.buffer, 0, newBuffer, 0, offs);
- output.buffer = newBuffer;
+ ByteBuffer newDest = ByteBuffer.allocate(dest.buffer.capacity()*4/3 + 1);
+ dest.buffer.rewind();
+ newDest.put(dest.buffer);
+ dest.buffer = newDest;
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc7941c9/src/java/org/apache/cassandra/io/compress/ICompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/ICompressor.java b/src/java/org/apache/cassandra/io/compress/ICompressor.java
index 81d1425..0326a9f 100644
--- a/src/java/org/apache/cassandra/io/compress/ICompressor.java
+++ b/src/java/org/apache/cassandra/io/compress/ICompressor.java
@@ -25,11 +25,14 @@ public interface ICompressor
{
public int initialCompressedBufferLength(int chunkLength);
- public int compress(byte[] input, int inputOffset, int inputLength, WrappedArray output, int outputOffset) throws IOException;
-
public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) throws IOException;
/**
+ * Compression for ByteBuffers
+ */
+ public int compress(ByteBuffer input, WrappedByteBuffer output) throws IOException;
+
+ /**
* Decompression for DirectByteBuffers
*/
public int uncompress(ByteBuffer input, ByteBuffer output) throws IOException;
@@ -43,18 +46,18 @@ public interface ICompressor
public Set<String> supportedOptions();
/**
- * A simple wrapper of a byte array.
- * Not all implementation allows to know what is the maximum size after
- * compression. This make it hard to size the ouput buffer for compress
- * (and we want to reuse the buffer). Instead we use this wrapped buffer
- * so that compress can have the liberty to resize underlying array if
- * need be.
+ * A simple wrapped Bytebuffer.
+ * Not all implementations allow us to know the maximum size after
+ * compression. This makes it hard to size the output buffer for compression
+ * (and we want to reuse the buffer). Instead we use this wrapped ByteBuffer
+ * so that compress(...) can have the liberty to resize the underlying array if
+ * necessary.
*/
- public static class WrappedArray
+ public static class WrappedByteBuffer
{
- public byte[] buffer;
+ public ByteBuffer buffer;
- public WrappedArray(byte[] buffer)
+ public WrappedByteBuffer(ByteBuffer buffer)
{
this.buffer = buffer;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc7941c9/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java b/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
index f32f266..ab10a00 100644
--- a/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
+++ b/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
@@ -24,13 +24,16 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import com.google.common.annotations.VisibleForTesting;
import net.jpountz.lz4.LZ4Exception;
import net.jpountz.lz4.LZ4Factory;
public class LZ4Compressor implements ICompressor
{
private static final int INTEGER_BYTES = 4;
- private static final LZ4Compressor instance = new LZ4Compressor();
+
+ @VisibleForTesting
+ public static final LZ4Compressor instance = new LZ4Compressor();
public static LZ4Compressor create(Map<String, String> args)
{
@@ -52,18 +55,20 @@ public class LZ4Compressor implements ICompressor
return INTEGER_BYTES + compressor.maxCompressedLength(chunkLength);
}
- public int compress(byte[] input, int inputOffset, int inputLength, WrappedArray output, int outputOffset) throws IOException
+ public int compress(ByteBuffer src, WrappedByteBuffer dest) throws IOException
{
- final byte[] dest = output.buffer;
- dest[outputOffset] = (byte) inputLength;
- dest[outputOffset + 1] = (byte) (inputLength >>> 8);
- dest[outputOffset + 2] = (byte) (inputLength >>> 16);
- dest[outputOffset + 3] = (byte) (inputLength >>> 24);
- final int maxCompressedLength = compressor.maxCompressedLength(inputLength);
+ final ByteBuffer buf = dest.buffer;
+ int len = src.remaining();
+ dest.buffer.put((byte) len);
+ dest.buffer.put((byte) (len >>> 8));
+ dest.buffer.put((byte) (len >>> 16));
+ dest.buffer.put((byte) (len >>> 24));
+
+ int start = dest.buffer.position();
try
{
- return INTEGER_BYTES + compressor.compress(input, inputOffset, inputLength,
- dest, outputOffset + INTEGER_BYTES, maxCompressedLength);
+ compressor.compress(src, dest.buffer);
+ return INTEGER_BYTES + (buf.position() - start);
}
catch (LZ4Exception e)
{
@@ -104,7 +109,6 @@ public class LZ4Compressor implements ICompressor
| ((input.get(pos + 1) & 0xFF) << 8)
| ((input.get(pos + 2) & 0xFF) << 16)
| ((input.get(pos + 3) & 0xFF) << 24);
-
int inputLength = input.remaining() - INTEGER_BYTES;
final int compressedLength;
@@ -119,7 +123,7 @@ public class LZ4Compressor implements ICompressor
if (compressedLength != inputLength)
{
- throw new IOException("Compressed lengths mismatch: "+compressedLength+" vs "+inputLength);
+ throw new IOException("Compressed lengths mismatch - got: "+compressedLength+" vs expected: "+inputLength);
}
return decompressedLength;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc7941c9/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java b/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java
index 0094042..7a91df1 100644
--- a/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java
+++ b/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java
@@ -87,9 +87,14 @@ public class SnappyCompressor implements ICompressor
return Snappy.maxCompressedLength(chunkLength);
}
- public int compress(byte[] input, int inputOffset, int inputLength, ICompressor.WrappedArray output, int outputOffset) throws IOException
+ public int compress(ByteBuffer src, WrappedByteBuffer dest) throws IOException
{
- return Snappy.rawCompress(input, inputOffset, inputLength, output.buffer, outputOffset);
+ int result = Snappy.compress(src, dest.buffer);
+
+ // Snappy doesn't match LZ4 and Deflate w/regards to state it leaves dest ByteBuffer's counters in
+ dest.buffer.position(dest.buffer.limit());
+ dest.buffer.limit(dest.buffer.capacity());
+ return result;
}
public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc7941c9/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
index 526347b..5c5637a 100644
--- a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.io.util;
import java.io.File;
+import java.nio.ByteBuffer;
import org.apache.cassandra.io.sstable.Descriptor;
@@ -28,16 +29,19 @@ public class ChecksummedSequentialWriter extends SequentialWriter
public ChecksummedSequentialWriter(File file, int bufferSize, File crcPath)
{
- super(file, bufferSize);
- crcWriter = new SequentialWriter(crcPath, 8 * 1024);
+ super(file, bufferSize, false);
+ crcWriter = new SequentialWriter(crcPath, 8 * 1024, false);
crcMetadata = new DataIntegrityMetadata.ChecksumWriter(crcWriter.stream);
- crcMetadata.writeChunkSize(buffer.length);
+ crcMetadata.writeChunkSize(buffer.capacity());
}
protected void flushData()
{
super.flushData();
- crcMetadata.append(buffer, 0, validBufferBytes, false);
+ ByteBuffer toAppend = buffer.duplicate();
+ toAppend.position(0);
+ toAppend.limit(buffer.position());
+ crcMetadata.appendDirect(toAppend);
}
public void writeFullChecksum(Descriptor descriptor)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc7941c9/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
index 281eb7b..464f9d2 100644
--- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
+++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.utils.CRC32Factory;
+import org.apache.cassandra.utils.FBUtilities;
public class DataIntegrityMetadata
{
@@ -87,9 +88,9 @@ public class DataIntegrityMetadata
public static class ChecksumWriter
{
- private final Checksum incrementalChecksum = new Adler32();
+ private final Adler32 incrementalChecksum = new Adler32();
private final DataOutput incrementalOut;
- private final Checksum fullChecksum = new Adler32();
+ private final Adler32 fullChecksum = new Adler32();
public ChecksumWriter(DataOutput incrementalOut)
{
@@ -141,6 +142,26 @@ public class DataIntegrityMetadata
}
}
+ public void appendDirect(ByteBuffer bb)
+ {
+ try
+ {
+ ByteBuffer toAppend = bb.duplicate();
+ toAppend.mark();
+ FBUtilities.directCheckSum(incrementalChecksum, toAppend);
+ toAppend.reset();
+
+ incrementalOut.writeInt((int) incrementalChecksum.getValue());
+ incrementalChecksum.reset();
+
+ FBUtilities.directCheckSum(fullChecksum, toAppend);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+
public void writeFullChecksum(Descriptor descriptor)
{
File outFile = new File(descriptor.filenameFor(Component.DIGEST));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc7941c9/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index 0188259..09efd48 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -309,7 +309,7 @@ public class RandomAccessReader extends AbstractDataInput implements FileDataInp
if (newPosition >= length()) // it is save to call length() in read-only mode
{
if (newPosition > length())
- throw new IllegalArgumentException(String.format("unable to seek to position %d in %s (%d bytes) in read-only mode",
+ throw new IllegalArgumentException(String.format("Unable to seek to position %d in %s (%d bytes) in read-only mode",
newPosition, getPath(), length()));
buffer.limit(0);
bufferOffset = newPosition;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc7941c9/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 2fcdcbf..f8ea92f 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -20,7 +20,9 @@ package org.apache.cassandra.io.util;
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
+import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
+import java.nio.file.StandardOpenOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,7 +34,6 @@ import org.apache.cassandra.io.compress.CompressedSequentialWriter;
import org.apache.cassandra.io.compress.CompressionParameters;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
-import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.CLibrary;
/**
@@ -49,16 +50,16 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
// absolute path to the given file
private final String filePath;
- protected byte[] buffer;
+ protected ByteBuffer buffer;
private final int fd;
private int directoryFD;
// directory should be synced only after first file sync, in other words, only once per file
private boolean directorySynced = false;
- protected long current = 0, bufferOffset;
- protected int validBufferBytes;
+ // Offset for start of buffer relative to underlying file
+ protected long bufferOffset;
- protected final RandomAccessFile out;
+ protected final FileChannel channel;
// whether to do trickling fsync() to avoid sudden bursts of dirty buffer flushing by kernel causing read
// latency spikes
@@ -71,44 +72,40 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
protected Runnable runPostFlush;
- public SequentialWriter(File file, int bufferSize)
+ public SequentialWriter(File file, int bufferSize, boolean offheap)
{
try
{
- out = new RandomAccessFile(file, "rw");
+ if (file.exists())
+ channel = FileChannel.open(file.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE);
+ else
+ channel = FileChannel.open(file.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE);
}
- catch (FileNotFoundException e)
+ catch (IOException e)
{
throw new RuntimeException(e);
}
filePath = file.getAbsolutePath();
- buffer = new byte[bufferSize];
+ // Allow children to allocate buffer as direct (snappy compression) if necessary
+ buffer = offheap ? ByteBuffer.allocateDirect(bufferSize) : ByteBuffer.allocate(bufferSize);
+
this.trickleFsync = DatabaseDescriptor.getTrickleFsync();
this.trickleFsyncByteInterval = DatabaseDescriptor.getTrickleFsyncIntervalInKb() * 1024;
- try
- {
- fd = CLibrary.getfd(out.getFD());
- }
- catch (IOException e)
- {
- throw new RuntimeException(e); // shouldn't happen
- }
+ fd = CLibrary.getfd(channel);
directoryFD = CLibrary.tryOpenDirectory(file.getParent());
stream = new DataOutputStreamAndChannel(this, this);
}
+ /**
+ * Open a heap-based, non-compressed SequentialWriter
+ */
public static SequentialWriter open(File file)
{
- return open(file, RandomAccessReader.DEFAULT_BUFFER_SIZE);
- }
-
- public static SequentialWriter open(File file, int bufferSize)
- {
- return new SequentialWriter(file, bufferSize);
+ return new SequentialWriter(file, RandomAccessReader.DEFAULT_BUFFER_SIZE, false);
}
public static ChecksummedSequentialWriter open(File file, File crcPath)
@@ -126,38 +123,28 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
public void write(int value) throws ClosedChannelException
{
- if (current >= bufferOffset + buffer.length)
- reBuffer();
+ if (buffer == null)
+ throw new ClosedChannelException();
- assert current < bufferOffset + buffer.length
- : String.format("File (%s) offset %d, buffer offset %d.", getPath(), current, bufferOffset);
+ if (!buffer.hasRemaining())
+ {
+ reBuffer();
+ }
- buffer[bufferCursor()] = (byte) value;
+ buffer.put((byte) value);
- validBufferBytes += 1;
- current += 1;
isDirty = true;
syncNeeded = true;
}
- public void write(byte[] buffer) throws ClosedChannelException
+ public void write(byte[] buffer) throws IOException
{
- write(buffer, 0, buffer.length);
+ write(ByteBuffer.wrap(buffer, 0, buffer.length));
}
- public void write(byte[] data, int offset, int length) throws ClosedChannelException
+ public void write(byte[] data, int offset, int length) throws IOException
{
- if (buffer == null)
- throw new ClosedChannelException();
-
- while (length > 0)
- {
- int n = writeAtMost(data, offset, length);
- offset += n;
- length -= n;
- isDirty = true;
- syncNeeded = true;
- }
+ write(ByteBuffer.wrap(data, offset, length));
}
public int write(ByteBuffer src) throws IOException
@@ -166,75 +153,23 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
throw new ClosedChannelException();
int length = src.remaining();
- int offset = src.position();
- while (length > 0)
+ int finalLimit = src.limit();
+ while (src.hasRemaining())
{
- int n = writeAtMost(src, offset, length);
- offset += n;
- length -= n;
+ if (!buffer.hasRemaining())
+ reBuffer();
+
+ if (buffer.remaining() < src.remaining())
+ src.limit(src.position() + buffer.remaining());
+ buffer.put(src);
+ src.limit(finalLimit);
+
isDirty = true;
syncNeeded = true;
}
- src.position(offset);
return length;
}
- /*
- * Write at most "length" bytes from "data" starting at position "offset", and
- * return the number of bytes written. caller is responsible for setting
- * isDirty.
- */
- private int writeAtMost(ByteBuffer data, int offset, int length)
- {
- if (current >= bufferOffset + buffer.length)
- reBuffer();
-
- assert current < bufferOffset + buffer.length
- : String.format("File (%s) offset %d, buffer offset %d.", getPath(), current, bufferOffset);
-
-
- int toCopy = Math.min(length, buffer.length - bufferCursor());
-
- // copy bytes from external buffer
- ByteBufferUtil.arrayCopy(data, offset, buffer, bufferCursor(), toCopy);
-
- assert current <= bufferOffset + buffer.length
- : String.format("File (%s) offset %d, buffer offset %d.", getPath(), current, bufferOffset);
-
- validBufferBytes = Math.max(validBufferBytes, bufferCursor() + toCopy);
- current += toCopy;
-
- return toCopy;
- }
-
- /*
- * Write at most "length" bytes from "data" starting at position "offset", and
- * return the number of bytes written. caller is responsible for setting
- * isDirty.
- */
- private int writeAtMost(byte[] data, int offset, int length)
- {
- if (current >= bufferOffset + buffer.length)
- reBuffer();
-
- assert current < bufferOffset + buffer.length
- : String.format("File (%s) offset %d, buffer offset %d.", getPath(), current, bufferOffset);
-
-
- int toCopy = Math.min(length, buffer.length - bufferCursor());
-
- // copy bytes from external buffer
- System.arraycopy(data, offset, buffer, bufferCursor(), toCopy);
-
- assert current <= bufferOffset + buffer.length
- : String.format("File (%s) offset %d, buffer offset %d.", getPath(), current, bufferOffset);
-
- validBufferBytes = Math.max(validBufferBytes, bufferCursor() + toCopy);
- current += toCopy;
-
- return toCopy;
- }
-
/**
* Synchronize file contents with disk.
*/
@@ -247,7 +182,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
{
try
{
- out.getFD().sync();
+ channel.force(false);
}
catch (IOException e)
{
@@ -291,7 +226,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
if (trickleFsync)
{
- bytesSinceTrickleFsync += validBufferBytes;
+ bytesSinceTrickleFsync += buffer.position();
if (bytesSinceTrickleFsync >= trickleFsyncByteInterval)
{
syncDataOnlyInternal();
@@ -320,8 +255,9 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
{
try
{
- out.write(buffer, 0, validBufferBytes);
- lastFlushOffset += validBufferBytes;
+ buffer.flip();
+ channel.write(buffer);
+ lastFlushOffset += buffer.position();
}
catch (IOException e)
{
@@ -333,7 +269,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
public long getFilePointer()
{
- return current;
+ return current();
}
/**
@@ -342,7 +278,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
* size and not every write to the writer will modify this value.
* Furthermore, for compressed files, this value refers to compressed data, while the
* writer getFilePointer() refers to uncompressedFile
- *
+ *
* @return the current file pointer
*/
public long getOnDiskFilePointer()
@@ -354,7 +290,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
{
try
{
- return Math.max(Math.max(current, out.length()), bufferOffset + validBufferBytes);
+ return Math.max(current(), channel.size());
}
catch (IOException e)
{
@@ -375,44 +311,48 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
protected void resetBuffer()
{
- bufferOffset = current;
- validBufferBytes = 0;
+ bufferOffset = current();
+ buffer.clear();
}
- private int bufferCursor()
+ protected long current()
{
- return (int) (current - bufferOffset);
+ return bufferOffset + (buffer == null ? 0 : buffer.position());
}
public FileMark mark()
{
- return new BufferedFileWriterMark(current);
+ return new BufferedFileWriterMark(current());
}
+ /**
+ * Drops all buffered data that's past the limits of our new file mark + buffer capacity, or syncs and truncates
+ * the underlying file to the marked position
+ */
public void resetAndTruncate(FileMark mark)
{
assert mark instanceof BufferedFileWriterMark;
- long previous = current;
- current = ((BufferedFileWriterMark) mark).pointer;
+ long previous = current();
+ long truncateTarget = ((BufferedFileWriterMark) mark).pointer;
- if (previous - current <= validBufferBytes) // current buffer
+ // If we're resetting to a point within our buffered data, just adjust our buffered position to drop bytes to
+ // the right of the desired mark.
+ if (previous - truncateTarget <= buffer.position())
{
- validBufferBytes = validBufferBytes - ((int) (previous - current));
+ buffer.position(buffer.position() - ((int) (previous - truncateTarget)));
return;
}
- // synchronize current buffer with disk
- // because we don't want any data loss
+ // synchronize current buffer with disk - we don't want any data loss
syncInternal();
// truncate file to given position
- truncate(current);
+ truncate(truncateTarget);
- // reset channel position
try
{
- out.seek(current);
+ channel.position(truncateTarget);
}
catch (IOException e)
{
@@ -431,7 +371,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
{
try
{
- out.getChannel().truncate(toSize);
+ channel.truncate(toSize);
}
catch (IOException e)
{
@@ -441,7 +381,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
public boolean isOpen()
{
- return out.getChannel().isOpen();
+ return channel.isOpen();
}
@Override
@@ -472,7 +412,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
}
// close is idempotent
- try { out.close(); }
+ try { channel.close(); }
catch (Throwable t) { handle(t, throwExceptions); }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc7941c9/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index f8a0ea7..2263e46 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -667,7 +667,8 @@ public class FBUtilities
{
directUpdate.invoke(checksum, bb);
return;
- } catch (IllegalAccessException e)
+ }
+ catch (IllegalAccessException e)
{
directUpdate = null;
logger.warn("JVM doesn't support Adler32 byte buffer access");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc7941c9/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
index 58bf5cb..2eff19b 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
@@ -55,6 +55,7 @@ public class CompressedRandomAccessReaderTest
testResetAndTruncate(File.createTempFile("compressed", "1"), true, 10);
testResetAndTruncate(File.createTempFile("compressed", "2"), true, CompressionParameters.DEFAULT_CHUNK_LENGTH);
}
+
@Test
public void test6791() throws IOException, ConfigurationException
{
@@ -105,7 +106,7 @@ public class CompressedRandomAccessReaderTest
MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance)).replayPosition(null);
SequentialWriter writer = compressed
? new CompressedSequentialWriter(f, filename + ".metadata", new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector)
- : new SequentialWriter(f, CompressionParameters.DEFAULT_CHUNK_LENGTH);
+ : new SequentialWriter(f, CompressionParameters.DEFAULT_CHUNK_LENGTH, false);
writer.write("The quick ".getBytes());
FileMark mark = writer.mark();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc7941c9/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
new file mode 100644
index 0000000..ab904f9
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.compress;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+
+import org.apache.cassandra.db.composites.SimpleDenseCellNameType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.RandomAccessReader;
+
+public class CompressedSequentialWriterTest
+{
+ private ICompressor compressor;
+
+ private void runTests(String testName) throws IOException
+ {
+ // Test small < 1 chunk data set
+ testWrite(File.createTempFile(testName + "_small", "1"), 25);
+
+ // Test to confirm pipeline w/chunk-aligned data writes works
+ testWrite(File.createTempFile(testName + "_chunkAligned", "1"), CompressionParameters.DEFAULT_CHUNK_LENGTH);
+
+ // Test to confirm pipeline on non-chunk boundaries works
+ testWrite(File.createTempFile(testName + "_large", "1"), CompressionParameters.DEFAULT_CHUNK_LENGTH * 3 + 100);
+ }
+
+ @Test
+ public void testLZ4Writer() throws IOException
+ {
+ compressor = LZ4Compressor.instance;
+ runTests("LZ4");
+ }
+
+ @Test
+ public void testDeflateWriter() throws IOException
+ {
+ compressor = DeflateCompressor.instance;
+ runTests("Deflate");
+ }
+
+ @Test
+ public void testSnappyWriter() throws IOException
+ {
+ compressor = SnappyCompressor.instance;
+ runTests("Snappy");
+ }
+
+ private void testWrite(File f, int bytesToTest) throws IOException
+ {
+ try
+ {
+ final String filename = f.getAbsolutePath();
+
+ MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance)).replayPosition(null);
+ CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata", new CompressionParameters(compressor), sstableMetadataCollector);
+
+ byte[] dataPre = new byte[bytesToTest];
+ byte[] rawPost = new byte[bytesToTest];
+ Random r = new Random();
+
+ // Test both write with byte[] and ByteBuffer
+ r.nextBytes(dataPre);
+ r.nextBytes(rawPost);
+ ByteBuffer dataPost = makeBB(bytesToTest);
+ dataPost.put(rawPost);
+ dataPost.flip();
+
+ writer.write(dataPre);
+ FileMark mark = writer.mark();
+
+ // Write enough garbage to transition chunk
+ for (int i = 0; i < CompressionParameters.DEFAULT_CHUNK_LENGTH; i++)
+ {
+ writer.write((byte)i);
+ }
+ writer.resetAndTruncate(mark);
+ writer.write(dataPost);
+ writer.close();
+
+ assert f.exists();
+ RandomAccessReader reader = CompressedRandomAccessReader.open(filename, new CompressionMetadata(filename + ".metadata", f.length()));
+ assertEquals(dataPre.length + rawPost.length, reader.length());
+ byte[] result = new byte[(int)reader.length()];
+
+ reader.readFully(result);
+
+ assert(reader.isEOF());
+ reader.close();
+
+ byte[] fullInput = new byte[bytesToTest * 2];
+ System.arraycopy(dataPre, 0, fullInput, 0, dataPre.length);
+ System.arraycopy(rawPost, 0, fullInput, bytesToTest, rawPost.length);
+ assert Arrays.equals(result, fullInput);
+ }
+ finally
+ {
+ // cleanup
+ if (f.exists())
+ f.delete();
+ File metadata = new File(f + ".metadata");
+ if (metadata.exists())
+ metadata.delete();
+ }
+ }
+
+ private ByteBuffer makeBB(int size)
+ {
+ return compressor.useDirectOutputByteBuffers()
+ ? ByteBuffer.allocateDirect(size)
+ : ByteBuffer.allocate(size);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc7941c9/test/unit/org/apache/cassandra/io/compress/CompressorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressorTest.java b/test/unit/org/apache/cassandra/io/compress/CompressorTest.java
index 5df0b29..53021ee 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressorTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressorTest.java
@@ -20,17 +20,21 @@ package org.apache.cassandra.io.compress;
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
import java.util.Arrays;
import java.util.Collections;
import java.util.Random;
-import com.google.common.io.Files;
-import org.apache.cassandra.io.compress.ICompressor.WrappedArray;
+import static org.junit.Assert.*;
+import org.apache.cassandra.io.util.RandomAccessReader;
import org.junit.Assert;
import org.junit.Test;
-import static org.junit.Assert.*;
+import com.google.common.io.Files;
+
+import org.apache.cassandra.io.compress.ICompressor.WrappedByteBuffer;
+import org.apache.cassandra.utils.ByteBufferUtil;
public class CompressorTest
{
@@ -42,7 +46,6 @@ public class CompressorTest
SnappyCompressor.create(Collections.<String, String>emptyMap())
};
-
@Test
public void testAllCompressors() throws IOException
{
@@ -57,69 +60,90 @@ public class CompressorTest
}
}
-
- public void test(byte[] data, int off, int len) throws IOException
+ public void testArrayUncompress(byte[] data, int off, int len) throws IOException
{
+ ByteBuffer src = makeBB(len);
+ src.put(data, off, len);
+ src.rewind();
+
final int outOffset = 3;
- final WrappedArray out = new WrappedArray(new byte[outOffset + compressor.initialCompressedBufferLength(len)]);
- new Random().nextBytes(out.buffer);
- final int compressedLength = compressor.compress(data, off, len, out, outOffset);
- final int restoredOffset = 5;
- final byte[] restored = new byte[restoredOffset + len];
+ final WrappedByteBuffer compressed = makeWrappedBB(outOffset + compressor.initialCompressedBufferLength(len));
+ fillBBWithRandom(compressed.buffer);
+ compressed.buffer.clear();
+ compressed.buffer.position(outOffset);
+
+ final int compressedLength = compressor.compress(src, compressed);
+
+ final int restoreOffset = 5;
+ final byte[] restored = new byte[restoreOffset + len];
new Random().nextBytes(restored);
- final int decompressedLength = compressor.uncompress(out.buffer, outOffset, compressedLength, restored, restoredOffset);
+
+ // need byte[] representation which direct buffers don't have
+ byte[] compressedBytes = new byte[compressed.buffer.capacity()];
+ ByteBufferUtil.arrayCopy(compressed.buffer, outOffset, compressedBytes, 0, compressed.buffer.capacity() - outOffset);
+
+ final int decompressedLength = compressor.uncompress(compressedBytes, 0, compressedLength, restored, restoreOffset);
+
assertEquals(decompressedLength, len);
assertArrayEquals(Arrays.copyOfRange(data, off, off + len),
- Arrays.copyOfRange(restored, restoredOffset, restoredOffset + decompressedLength));
+ Arrays.copyOfRange(restored, restoreOffset, restoreOffset + decompressedLength));
}
- public void test(byte[] data) throws IOException
+ public void testArrayUncompress(byte[] data) throws IOException
{
- test(data, 0, data.length);
+ testArrayUncompress(data, 0, data.length);
}
public void testEmptyArray() throws IOException
{
- test(new byte[0]);
+ testArrayUncompress(new byte[0]);
}
public void testShortArray() throws UnsupportedEncodingException, IOException
{
- test("Cassandra".getBytes("UTF-8"), 1, 7);
+ testArrayUncompress("Cassandra".getBytes("UTF-8"), 1, 7);
}
public void testLongArray() throws UnsupportedEncodingException, IOException
{
byte[] data = new byte[1 << 20];
- test(data, 13, 1 << 19);
+ testArrayUncompress(data, 13, 1 << 19);
new Random(0).nextBytes(data);
- test(data, 13, 1 << 19);
+ testArrayUncompress(data, 13, 1 << 19);
}
public void testMappedFile() throws IOException
{
byte[] data = new byte[1 << 20];
new Random().nextBytes(data);
+ ByteBuffer src = makeBB(data.length);
+ src.put(data);
+ src.flip();
- //create a temp file
+ // create a temp file
File temp = File.createTempFile("tempfile", ".tmp");
temp.deleteOnExit();
- //Prepend some random bytes to the output and compress
+ // Prepend some random bytes to the output and compress
final int outOffset = 3;
- final WrappedArray out = new WrappedArray(new byte[outOffset + compressor.initialCompressedBufferLength(data.length)]);
- new Random().nextBytes(out.buffer);
- final int compressedLength = compressor.compress(data, 0, data.length, out, outOffset);
- Files.write(out.buffer, temp);
+ byte[] garbage = new byte[outOffset + compressor.initialCompressedBufferLength(data.length)];
+ new Random().nextBytes(garbage);
+ WrappedByteBuffer dest = makeWrappedBB(outOffset + compressor.initialCompressedBufferLength(data.length));
+ dest.buffer.put(garbage);
+ dest.buffer.clear();
+ dest.buffer.position(outOffset);
+
+ final int compressedLength = compressor.compress(src, dest);
+
+ FileChannel channel = new FileOutputStream(temp, false).getChannel();
+ dest.buffer.clear();
+ channel.write(dest.buffer);
MappedByteBuffer mappedData = Files.map(temp);
mappedData.position(outOffset);
- mappedData.limit(compressedLength+outOffset);
-
+ mappedData.limit(compressedLength + outOffset);
- ByteBuffer result = compressor.useDirectOutputByteBuffers()
- ? ByteBuffer.allocateDirect(data.length + 100)
- : ByteBuffer.allocate(data.length + 100);
+ ByteBuffer result = makeBB(data.length + 100);
int length = compressor.uncompress(mappedData, result);
@@ -129,4 +153,79 @@ public class CompressorTest
Assert.assertEquals("Decompression mismatch at byte "+i, data[i], result.get());
}
}
+
+ @Test
+ public void testLZ4ByteBuffers() throws IOException
+ {
+ compressor = LZ4Compressor.create(Collections.<String, String>emptyMap());
+ testByteBuffers();
+ }
+
+ @Test
+ public void testDeflateByteBuffers() throws IOException
+ {
+ compressor = DeflateCompressor.create(Collections.<String, String>emptyMap());
+ testByteBuffers();
+ }
+
+ @Test
+ public void testSnappyByteBuffers() throws IOException
+ {
+ compressor = SnappyCompressor.create(Collections.<String, String>emptyMap());
+ testByteBuffers();
+ }
+
+ private void testByteBuffers() throws IOException
+ {
+ int n = RandomAccessReader.DEFAULT_BUFFER_SIZE;
+ byte[] srcData = new byte[n];
+ new Random().nextBytes(srcData);
+
+ ByteBuffer src = makeBB(n);
+ src.put(srcData, 0, n);
+ src.flip();
+
+ int outOffset = 5;
+ ICompressor.WrappedByteBuffer compressed = makeWrappedBB(outOffset + compressor.initialCompressedBufferLength(srcData.length));
+ byte[] garbage = new byte[compressed.buffer.capacity()];
+ new Random().nextBytes(garbage);
+ compressed.buffer.put(garbage);
+ compressed.buffer.clear();
+ compressed.buffer.position(outOffset);
+
+ compressor.compress(src, compressed);
+ compressed.buffer.flip();
+ compressed.buffer.position(outOffset);
+
+ ByteBuffer result = makeBB(outOffset + n);
+ int decompressed = compressor.uncompress(compressed.buffer, result);
+
+ assert decompressed == n;
+ for (int i = 0; i < n; ++i)
+ assert srcData[i] == result.get(i) : "Failed comparison on index: " + i + " with compressor: " + compressor.getClass().toString();
+ }
+
+ private ByteBuffer makeBB(int size)
+ {
+ return compressor.useDirectOutputByteBuffers()
+ ? ByteBuffer.allocateDirect(size)
+ : ByteBuffer.allocate(size);
+ }
+
+ private WrappedByteBuffer makeWrappedBB(int size)
+ {
+ return compressor.useDirectOutputByteBuffers()
+ ? new WrappedByteBuffer(ByteBuffer.allocateDirect(size))
+ : new WrappedByteBuffer(ByteBuffer.allocate(size));
+ }
+
+ private void fillBBWithRandom(ByteBuffer dest)
+ {
+ ByteBuffer dupe = dest.duplicate();
+ byte[] random = new byte[dest.capacity()];
+ new Random().nextBytes(random);
+ dest.clear();
+ dest.put(random);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc7941c9/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
index f2db428..624ca9b 100644
--- a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
+++ b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
@@ -41,7 +41,6 @@ import org.apache.cassandra.utils.ByteBufferUtil;
public class DataOutputTest
{
-
@Test
public void testDataOutputStreamPlus() throws IOException
{
@@ -145,7 +144,7 @@ public class DataOutputTest
public void testSequentialWriter() throws IOException
{
File file = FileUtils.createTempFile("dataoutput", "test");
- final SequentialWriter writer = new SequentialWriter(file, 32);
+ final SequentialWriter writer = new SequentialWriter(file, 32, false);
DataOutputStreamAndChannel write = new DataOutputStreamAndChannel(writer, writer);
DataInput canon = testWrite(write);
write.flush();
@@ -162,32 +161,33 @@ public class DataOutputTest
final DataOutput canon = new DataOutputStream(bos);
Random rnd = ThreadLocalRandom.current();
- byte[] bytes = new byte[50];
+ int size = 50;
+ byte[] bytes = new byte[size];
rnd.nextBytes(bytes);
ByteBufferUtil.writeWithLength(bytes, test);
ByteBufferUtil.writeWithLength(bytes, canon);
- bytes = new byte[50];
+ bytes = new byte[size];
rnd.nextBytes(bytes);
ByteBufferUtil.writeWithLength(wrap(bytes, false), test);
ByteBufferUtil.writeWithLength(bytes, canon);
- bytes = new byte[50];
+ bytes = new byte[size];
rnd.nextBytes(bytes);
ByteBufferUtil.writeWithLength(wrap(bytes, true), test);
ByteBufferUtil.writeWithLength(bytes, canon);
- bytes = new byte[50];
+ bytes = new byte[size];
rnd.nextBytes(bytes);
ByteBufferUtil.writeWithShortLength(bytes, test);
ByteBufferUtil.writeWithShortLength(bytes, canon);
- bytes = new byte[50];
+ bytes = new byte[size];
rnd.nextBytes(bytes);
ByteBufferUtil.writeWithShortLength(wrap(bytes, false), test);
ByteBufferUtil.writeWithShortLength(bytes, canon);
- bytes = new byte[50];
+ bytes = new byte[size];
rnd.nextBytes(bytes);
ByteBufferUtil.writeWithShortLength(wrap(bytes, true), test);
ByteBufferUtil.writeWithShortLength(bytes, canon);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc7941c9/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index 128ec3c..016deb3 100644
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -117,8 +117,8 @@ public class CompressedInputStreamTest
for (int i = 0; i < sections.size(); i++)
{
input.position(sections.get(i).left);
- long exp = in.readLong();
- assert exp == valuesToCheck[i] : "expected " + valuesToCheck[i] + " but was " + exp;
+ long readValue = in.readLong();
+ assert readValue == valuesToCheck[i] : "expected " + valuesToCheck[i] + " but was " + readValue;
}
}
}