You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2015/05/26 17:22:21 UTC
cassandra git commit: Improve ByteBuffer compression interface
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.2 28afa1c27 -> 3adfd1575
Improve ByteBuffer compression interface
Patch by blambov; reviewed by jmckenzie for CASSANDRA-9096
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3adfd157
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3adfd157
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3adfd157
Branch: refs/heads/cassandra-2.2
Commit: 3adfd1575cc3cf81397a7d148d5b2ed768b67014
Parents: 28afa1c
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue May 26 11:18:50 2015 -0400
Committer: Josh McKenzie <jo...@datastax.com>
Committed: Tue May 26 11:18:50 2015 -0400
----------------------------------------------------------------------
.../db/commitlog/CompressedSegment.java | 44 +++---
.../cassandra/io/compress/BufferType.java | 45 ++++++
.../compress/CompressedRandomAccessReader.java | 50 +++----
.../io/compress/CompressedSequentialWriter.java | 47 +++---
.../io/compress/DeflateCompressor.java | 133 +++++++++++++----
.../cassandra/io/compress/ICompressor.java | 41 +++--
.../cassandra/io/compress/LZ4Compressor.java | 57 ++++---
.../cassandra/io/compress/SnappyCompressor.java | 38 +++--
.../io/util/ChecksummedSequentialWriter.java | 6 +-
.../cassandra/io/util/RandomAccessReader.java | 13 +-
.../cassandra/io/util/SequentialWriter.java | 10 +-
.../cassandra/io/util/ThrottledReader.java | 4 +-
.../org/apache/cassandra/utils/FBUtilities.java | 9 +-
.../db/commitlog/CommitLogStressTest.java | 21 +--
.../io/compress/CompressorPerformance.java | 99 ++++++++++++
.../cassandra/io/RandomAccessReaderTest.java | 11 +-
.../CompressedRandomAccessReaderTest.java | 2 +-
.../CompressedSequentialWriterTest.java | 4 +-
.../cassandra/io/compress/CompressorTest.java | 149 +++++++++++--------
.../cassandra/io/util/DataOutputTest.java | 4 +-
.../cassandra/io/util/SequentialWriterTest.java | 4 +-
21 files changed, 504 insertions(+), 287 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index f3a80bc..73bc5e2 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@ -24,8 +24,8 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.io.compress.ICompressor;
-import org.apache.cassandra.io.compress.ICompressor.WrappedByteBuffer;
import org.apache.cassandra.io.util.FileUtils;
/*
@@ -34,10 +34,10 @@ import org.apache.cassandra.io.util.FileUtils;
*/
public class CompressedSegment extends CommitLogSegment
{
- static private final ThreadLocal<WrappedByteBuffer> compressedBufferHolder = new ThreadLocal<WrappedByteBuffer>() {
- protected WrappedByteBuffer initialValue()
+ static private final ThreadLocal<ByteBuffer> compressedBufferHolder = new ThreadLocal<ByteBuffer>() {
+ protected ByteBuffer initialValue()
{
- return new WrappedByteBuffer(ByteBuffer.allocate(0));
+ return ByteBuffer.allocate(0);
}
};
static Queue<ByteBuffer> bufferPool = new ConcurrentLinkedQueue<>();
@@ -69,17 +69,9 @@ public class CompressedSegment extends CommitLogSegment
}
}
- static ByteBuffer allocate(ICompressor compressor, int size)
- {
- if (compressor.useDirectOutputByteBuffers())
- return ByteBuffer.allocateDirect(size);
- else
- return ByteBuffer.allocate(size);
- }
-
ByteBuffer allocate(int size)
{
- return allocate(compressor, size);
+ return compressor.preferredBufferType().allocate(size);
}
ByteBuffer createBuffer(CommitLog commitLog)
@@ -88,7 +80,7 @@ public class CompressedSegment extends CommitLogSegment
if (buf == null)
{
// this.compressor is not yet set, so we must use the commitLog's one.
- buf = allocate(commitLog.compressor, DatabaseDescriptor.getCommitLogSegmentSize());
+ buf = commitLog.compressor.preferredBufferType().allocate(DatabaseDescriptor.getCommitLogSegmentSize());
} else
buf.clear();
return buf;
@@ -104,26 +96,24 @@ public class CompressedSegment extends CommitLogSegment
// The length may be 0 when the segment is being closed.
assert length > 0 || length == 0 && !isStillAllocating();
- try {
-
- int compressedLength = compressor.initialCompressedBufferLength(length);
- WrappedByteBuffer wrappedCompressedBuffer = compressedBufferHolder.get();
- ByteBuffer compressedBuffer = wrappedCompressedBuffer.buffer;
- if (compressedBuffer.isDirect() != compressor.useDirectOutputByteBuffers() ||
- compressedBuffer.capacity() < compressedLength + COMPRESSED_MARKER_SIZE)
+ try
+ {
+ int neededBufferSize = compressor.initialCompressedBufferLength(length) + COMPRESSED_MARKER_SIZE;
+ ByteBuffer compressedBuffer = compressedBufferHolder.get();
+ if (compressor.preferredBufferType() != BufferType.typeOf(compressedBuffer) ||
+ compressedBuffer.capacity() < neededBufferSize)
{
- compressedBuffer = allocate(compressedLength + COMPRESSED_MARKER_SIZE);
- FileUtils.clean(wrappedCompressedBuffer.buffer);
- wrappedCompressedBuffer.buffer = compressedBuffer;
+ FileUtils.clean(compressedBuffer);
+ compressedBuffer = allocate(neededBufferSize);
+ compressedBufferHolder.set(compressedBuffer);
}
ByteBuffer inputBuffer = buffer.duplicate();
inputBuffer.limit(contentStart + length).position(contentStart);
compressedBuffer.limit(compressedBuffer.capacity()).position(COMPRESSED_MARKER_SIZE);
- compressedLength = compressor.compress(inputBuffer, wrappedCompressedBuffer);
+ compressor.compress(inputBuffer, compressedBuffer);
- compressedBuffer.position(0);
- compressedBuffer.limit(COMPRESSED_MARKER_SIZE + compressedLength);
+ compressedBuffer.flip();
compressedBuffer.putInt(SYNC_MARKER_SIZE, length);
// Only one thread can be here at a given time.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/src/java/org/apache/cassandra/io/compress/BufferType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/BufferType.java b/src/java/org/apache/cassandra/io/compress/BufferType.java
new file mode 100644
index 0000000..8817802
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/compress/BufferType.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.compress;
+
+import java.nio.ByteBuffer;
+
+public enum BufferType
+{
+ ON_HEAP
+ {
+ public ByteBuffer allocate(int size)
+ {
+ return ByteBuffer.allocate(size);
+ }
+ },
+ OFF_HEAP
+ {
+ public ByteBuffer allocate(int size)
+ {
+ return ByteBuffer.allocateDirect(size);
+ }
+ };
+
+ public abstract ByteBuffer allocate(int size);
+
+ public static BufferType typeOf(ByteBuffer buffer)
+ {
+ return buffer.isDirect() ? OFF_HEAP : ON_HEAP;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index 1febe37..e6ac60a 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.io.compress;
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ThreadLocalRandom;
@@ -29,8 +28,6 @@ import java.util.zip.Adler32;
import com.google.common.primitives.Ints;
-import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.util.*;
@@ -81,24 +78,23 @@ public class CompressedRandomAccessReader extends RandomAccessReader
protected CompressedRandomAccessReader(ChannelProxy channel, CompressionMetadata metadata, ICompressedFile file) throws FileNotFoundException
{
- super(channel, metadata.chunkLength(), metadata.compressedFileLength, metadata.compressor().useDirectOutputByteBuffers(), file instanceof PoolingSegmentedFile ? (PoolingSegmentedFile) file : null);
+ super(channel, metadata.chunkLength(), metadata.compressedFileLength, metadata.compressor().preferredBufferType(), file instanceof PoolingSegmentedFile ? (PoolingSegmentedFile) file : null);
this.metadata = metadata;
checksum = new Adler32();
chunkSegments = file == null ? null : file.chunkSegments();
if (chunkSegments == null)
{
- compressed = super.allocateBuffer(metadata.compressor().initialCompressedBufferLength(metadata.chunkLength()), metadata.compressor().useDirectOutputByteBuffers());
+ compressed = super.allocateBuffer(metadata.compressor().initialCompressedBufferLength(metadata.chunkLength()), metadata.compressor().preferredBufferType());
checksumBytes = ByteBuffer.wrap(new byte[4]);
}
}
- protected ByteBuffer allocateBuffer(int bufferSize, boolean useDirect)
+ @Override
+ protected ByteBuffer allocateBuffer(int bufferSize, BufferType bufferType)
{
assert Integer.bitCount(bufferSize) == 1;
- return useDirect
- ? ByteBuffer.allocateDirect(bufferSize)
- : ByteBuffer.allocate(bufferSize);
+ return bufferType.allocate(bufferSize);
}
@Override
@@ -120,33 +116,32 @@ public class CompressedRandomAccessReader extends RandomAccessReader
CompressionMetadata.Chunk chunk = metadata.chunkFor(position);
if (compressed.capacity() < chunk.length)
- compressed = ByteBuffer.wrap(new byte[chunk.length]);
+ compressed = allocateBuffer(chunk.length, metadata.compressor().preferredBufferType());
else
compressed.clear();
compressed.limit(chunk.length);
if (channel.read(compressed, chunk.offset) != chunk.length)
throw new CorruptBlockException(getPath(), chunk);
-
- // technically flip() is unnecessary since all the remaining work uses the raw array, but if that changes
- // in the future this will save a lot of hair-pulling
compressed.flip();
buffer.clear();
- int decompressedBytes;
+
try
{
- decompressedBytes = metadata.compressor().uncompress(compressed, buffer);
- buffer.limit(decompressedBytes);
+ metadata.compressor().uncompress(compressed, buffer);
}
catch (IOException e)
{
- buffer.limit(0);
throw new CorruptBlockException(getPath(), chunk);
}
+ finally
+ {
+ buffer.flip();
+ }
if (metadata.parameters.getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
{
- compressed.position(0);
+ compressed.rewind();
FBUtilities.directCheckSum(checksum, compressed);
if (checksum(chunk) != (int) checksum.getValue())
@@ -186,39 +181,32 @@ public class CompressedRandomAccessReader extends RandomAccessReader
Map.Entry<Long, MappedByteBuffer> entry = chunkSegments.floorEntry(chunk.offset);
long segmentOffset = entry.getKey();
int chunkOffset = Ints.checkedCast(chunk.offset - segmentOffset);
- ByteBuffer compressedChunk = entry.getValue().duplicate();
+ ByteBuffer compressedChunk = entry.getValue().duplicate(); // TODO: change to slice(chunkOffset) when we upgrade LZ4-java
- compressedChunk.position(chunkOffset);
- compressedChunk.limit(chunkOffset + chunk.length);
- compressedChunk.mark();
+ compressedChunk.position(chunkOffset).limit(chunkOffset + chunk.length);
buffer.clear();
- int decompressedBytes;
+
try
{
- decompressedBytes = metadata.compressor().uncompress(compressedChunk, buffer);
- buffer.limit(decompressedBytes);
+ metadata.compressor().uncompress(compressedChunk, buffer);
}
catch (IOException e)
{
- buffer.limit(0);
throw new CorruptBlockException(getPath(), chunk);
}
finally
{
- compressedChunk.limit(compressedChunk.capacity());
+ buffer.flip();
}
if (metadata.parameters.getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
{
- compressedChunk.reset();
- compressedChunk.limit(chunkOffset + chunk.length);
+ compressedChunk.position(chunkOffset).limit(chunkOffset + chunk.length);
FBUtilities.directCheckSum(checksum, compressedChunk);
compressedChunk.limit(compressedChunk.capacity());
-
-
if (compressedChunk.getInt() != (int) checksum.getValue())
throw new CorruptBlockException(getPath(), chunk);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index 6218526..9c7c776 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -47,7 +47,7 @@ public class CompressedSequentialWriter extends SequentialWriter
private final ICompressor compressor;
// used to store compressed data
- private final ICompressor.WrappedByteBuffer compressed;
+ private ByteBuffer compressed;
// holds a number of already written chunks
private int chunkCount = 0;
@@ -63,13 +63,11 @@ public class CompressedSequentialWriter extends SequentialWriter
CompressionParameters parameters,
MetadataCollector sstableMetadataCollector)
{
- super(file, parameters.chunkLength(), parameters.sstableCompressor.useDirectOutputByteBuffers());
+ super(file, parameters.chunkLength(), parameters.sstableCompressor.preferredBufferType());
this.compressor = parameters.sstableCompressor;
// buffer for compression should be the same size as buffer itself
- compressed = compressor.useDirectOutputByteBuffers()
- ? new ICompressor.WrappedByteBuffer(ByteBuffer.allocateDirect(compressor.initialCompressedBufferLength(buffer.capacity())))
- : new ICompressor.WrappedByteBuffer(ByteBuffer.allocate(compressor.initialCompressedBufferLength(buffer.capacity())));
+ compressed = compressor.preferredBufferType().allocate(compressor.initialCompressedBufferLength(buffer.capacity()));
/* Index File (-CompressionInfo.db component) and it's header */
metadataWriter = CompressionMetadata.Writer.open(parameters, offsetsPath);
@@ -102,22 +100,19 @@ public class CompressedSequentialWriter extends SequentialWriter
{
seekToChunkStart(); // why is this necessary? seems like it should always be at chunk start in normal operation
- int compressedLength;
try
{
// compressing data with buffer re-use
buffer.flip();
- compressed.buffer.clear();
- compressedLength = compressor.compress(buffer, compressed);
-
- // Compressors don't modify sentinels in our BB - we rely on buffer.position() for bufferOffset adjustment
- buffer.position(buffer.limit());
+ compressed.clear();
+ compressor.compress(buffer, compressed);
}
catch (IOException e)
{
throw new RuntimeException("Compression exception", e); // shouldn't happen
}
+ int compressedLength = compressed.position();
uncompressedSize += buffer.position();
compressedSize += compressedLength;
@@ -127,15 +122,13 @@ public class CompressedSequentialWriter extends SequentialWriter
metadataWriter.addOffset(chunkOffset);
chunkCount++;
- assert compressedLength <= compressed.buffer.capacity();
-
// write out the compressed data
- compressed.buffer.flip();
- channel.write(compressed.buffer);
+ compressed.flip();
+ channel.write(compressed);
// write corresponding checksum
- compressed.buffer.rewind();
- crcMetadata.appendDirect(compressed.buffer, true);
+ compressed.rewind();
+ crcMetadata.appendDirect(compressed, true);
lastFlushOffset += compressedLength + 4;
// adjust our bufferOffset to account for the new uncompressed data we've now written out
@@ -189,24 +182,22 @@ public class CompressedSequentialWriter extends SequentialWriter
// compressed chunk size (- 4 bytes reserved for checksum)
int chunkSize = (int) (metadataWriter.chunkOffsetBy(realMark.nextChunkIndex) - chunkOffset - 4);
- if (compressed.buffer.capacity() < chunkSize)
- compressed.buffer = compressor.useDirectOutputByteBuffers()
- ? ByteBuffer.allocateDirect(chunkSize)
- : ByteBuffer.allocate(chunkSize);
+ if (compressed.capacity() < chunkSize)
+ compressed = compressor.preferredBufferType().allocate(chunkSize);
try
{
- compressed.buffer.clear();
- compressed.buffer.limit(chunkSize);
+ compressed.clear();
+ compressed.limit(chunkSize);
channel.position(chunkOffset);
- channel.read(compressed.buffer);
+ channel.read(compressed);
try
{
// Repopulate buffer from compressed data
buffer.clear();
- compressed.buffer.flip();
- compressor.uncompress(compressed.buffer, buffer);
+ compressed.flip();
+ compressor.uncompress(compressed, buffer);
}
catch (IOException e)
{
@@ -214,8 +205,8 @@ public class CompressedSequentialWriter extends SequentialWriter
}
Adler32 checksum = new Adler32();
-
- FBUtilities.directCheckSum(checksum, compressed.buffer);
+ compressed.rewind();
+ FBUtilities.directCheckSum(checksum, compressed);
crcCheckBuffer.clear();
channel.read(crcCheckBuffer);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java b/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
index 833c375..f2ccd64 100644
--- a/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
+++ b/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
@@ -17,7 +17,7 @@
*/
package org.apache.cassandra.io.compress;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -66,45 +66,124 @@ public class DeflateCompressor implements ICompressor
return Collections.emptySet();
}
- public int initialCompressedBufferLength(int chunkLength)
+ public int initialCompressedBufferLength(int sourceLen)
{
- return chunkLength;
+ // Taken from zlib deflateBound(). See http://www.zlib.net/zlib_tech.html.
+ return sourceLen + (sourceLen >> 12) + (sourceLen >> 14) + (sourceLen >> 25) + 13;
}
- public int compress(ByteBuffer src, ICompressor.WrappedByteBuffer dest)
+ public void compress(ByteBuffer input, ByteBuffer output)
{
- assert dest.buffer.hasArray();
+ if (input.hasArray() && output.hasArray())
+ {
+ int length = compressArray(input.array(), input.arrayOffset() + input.position(), input.remaining(),
+ output.array(), output.arrayOffset() + output.position(), output.remaining());
+ input.position(input.limit());
+ output.position(output.position() + length);
+ }
+ else
+ compressBuffer(input, output);
+ }
+ public int compressArray(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength)
+ {
Deflater def = deflater.get();
def.reset();
- def.setInput(src.array(), src.arrayOffset() + src.position(), src.remaining());
+ def.setInput(input, inputOffset, inputLength);
def.finish();
if (def.needsInput())
return 0;
- int startPos = dest.buffer.position();
- while (true)
+ int len = def.deflate(output, outputOffset, maxOutputLength);
+ assert def.finished();
+ return len;
+ }
+
+ public void compressBuffer(ByteBuffer input, ByteBuffer output)
+ {
+ Deflater def = deflater.get();
+ def.reset();
+
+ byte[] buffer = FBUtilities.getThreadLocalScratchBuffer();
+ // Use half the buffer for input, half for output.
+ int chunkLen = buffer.length / 2;
+ while (input.remaining() > chunkLen)
+ {
+ input.get(buffer, 0, chunkLen);
+ def.setInput(buffer, 0, chunkLen);
+ while (!def.needsInput())
+ {
+ int len = def.deflate(buffer, chunkLen, chunkLen);
+ output.put(buffer, chunkLen, len);
+ }
+ }
+ int inputLength = input.remaining();
+ input.get(buffer, 0, inputLength);
+ def.setInput(buffer, 0, inputLength);
+ def.finish();
+ while (!def.finished())
+ {
+ int len = def.deflate(buffer, chunkLen, chunkLen);
+ output.put(buffer, chunkLen, len);
+ }
+ }
+
+
+ public void uncompress(ByteBuffer input, ByteBuffer output) throws IOException
+ {
+ if (input.hasArray() && output.hasArray())
+ {
+ int length = uncompress(input.array(), input.arrayOffset() + input.position(), input.remaining(),
+ output.array(), output.arrayOffset() + output.position(), output.remaining());
+ input.position(input.limit());
+ output.position(output.position() + length);
+ }
+ else
+ uncompressBuffer(input, output);
+ }
+
+ public void uncompressBuffer(ByteBuffer input, ByteBuffer output) throws IOException
+ {
+ try
{
- int arrayOffset = dest.buffer.arrayOffset();
- int len = def.deflate(dest.buffer.array(), arrayOffset + dest.buffer.position(), dest.buffer.remaining());
- dest.buffer.position(dest.buffer.position() + len);
- if (def.finished())
+ Inflater inf = inflater.get();
+ inf.reset();
+
+ byte[] buffer = FBUtilities.getThreadLocalScratchBuffer();
+ // Use half the buffer for input, half for output.
+ int chunkLen = buffer.length / 2;
+ while (input.remaining() > chunkLen)
{
- return dest.buffer.position() - startPos;
+ input.get(buffer, 0, chunkLen);
+ inf.setInput(buffer, 0, chunkLen);
+ while (!inf.needsInput())
+ {
+ int len = inf.inflate(buffer, chunkLen, chunkLen);
+ output.put(buffer, chunkLen, len);
+ }
}
- else
+ int inputLength = input.remaining();
+ input.get(buffer, 0, inputLength);
+ inf.setInput(buffer, 0, inputLength);
+ while (!inf.needsInput())
{
- // We're not done, output was too small. Increase it and continue
- ByteBuffer newDest = ByteBuffer.allocate(dest.buffer.capacity()*4/3 + 1);
- dest.buffer.rewind();
- newDest.put(dest.buffer);
- dest.buffer = newDest;
+ int len = inf.inflate(buffer, chunkLen, chunkLen);
+ output.put(buffer, chunkLen, len);
}
}
+ catch (DataFormatException e)
+ {
+ throw new IOException(e);
+ }
}
public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) throws IOException
{
+ return uncompress(input, inputOffset, inputLength, output, outputOffset, output.length - outputOffset);
+ }
+
+ public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength) throws IOException
+ {
Inflater inf = inflater.get();
inf.reset();
inf.setInput(input, inputOffset, inputLength);
@@ -114,7 +193,7 @@ public class DeflateCompressor implements ICompressor
// We assume output is big enough
try
{
- return inf.inflate(output, outputOffset, output.length - outputOffset);
+ return inf.inflate(output, outputOffset, maxOutputLength);
}
catch (DataFormatException e)
{
@@ -122,18 +201,14 @@ public class DeflateCompressor implements ICompressor
}
}
- public int uncompress(ByteBuffer input, ByteBuffer output) throws IOException
+ public boolean supports(BufferType bufferType)
{
- if (!output.hasArray())
- throw new IllegalArgumentException("DeflateCompressor doesn't work with direct byte buffers");
-
- if (input.hasArray())
- return uncompress(input.array(), input.arrayOffset() + input.position(), input.remaining(), output.array(), output.arrayOffset() + output.position());
- return uncompress(ByteBufferUtil.getArray(input), 0, input.remaining(), output.array(), output.arrayOffset() + output.position());
+ return true;
}
- public boolean useDirectOutputByteBuffers()
+ public BufferType preferredBufferType()
{
- return false;
+ // Prefer array-backed buffers.
+ return BufferType.ON_HEAP;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/src/java/org/apache/cassandra/io/compress/ICompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/ICompressor.java b/src/java/org/apache/cassandra/io/compress/ICompressor.java
index 0326a9f..5719834 100644
--- a/src/java/org/apache/cassandra/io/compress/ICompressor.java
+++ b/src/java/org/apache/cassandra/io/compress/ICompressor.java
@@ -28,38 +28,33 @@ public interface ICompressor
public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) throws IOException;
/**
- * Compression for ByteBuffers
+ * Compression for ByteBuffers.
+ *
+ * The data between input.position() and input.limit() is compressed and placed into output starting from output.position().
+ * Positions in both buffers are moved to reflect the bytes read and written. Limits are not changed.
*/
- public int compress(ByteBuffer input, WrappedByteBuffer output) throws IOException;
+ public void compress(ByteBuffer input, ByteBuffer output) throws IOException;
/**
- * Decompression for DirectByteBuffers
+ * Decompression for DirectByteBuffers.
+ *
+ * The data between input.position() and input.limit() is uncompressed and placed into output starting from output.position().
+ * Positions in both buffers are moved to reflect the bytes read and written. Limits are not changed.
*/
- public int uncompress(ByteBuffer input, ByteBuffer output) throws IOException;
+ public void uncompress(ByteBuffer input, ByteBuffer output) throws IOException;
/**
- * Notifies user if this compressor will wants/requires a direct byte buffers to
- * decompress direct byteBuffers
+ * Returns the preferred (most efficient) buffer type for this compressor.
*/
- public boolean useDirectOutputByteBuffers();
-
- public Set<String> supportedOptions();
+ public BufferType preferredBufferType();
/**
- * A simple wrapped Bytebuffer.
- * Not all implementations allow us to know the maximum size after
- * compression. This makes it hard to size the output buffer for compression
- * (and we want to reuse the buffer). Instead we use this wrapped ByteBuffer
- * so that compress(...) can have the liberty to resize the underlying array if
- * necessary.
+ * Checks if the given buffer would be supported by the compressor. If a type is supported the compressor must be
+ * able to use it in combination with all other supported types.
+ *
+ * Direct and memory-mapped buffers must be supported by all compressors.
*/
- public static class WrappedByteBuffer
- {
- public ByteBuffer buffer;
+ public boolean supports(BufferType bufferType);
- public WrappedByteBuffer(ByteBuffer buffer)
- {
- this.buffer = buffer;
- }
- }
+ public Set<String> supportedOptions();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java b/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
index 9d54048..5fd4309 100644
--- a/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
+++ b/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
@@ -55,20 +55,17 @@ public class LZ4Compressor implements ICompressor
return INTEGER_BYTES + compressor.maxCompressedLength(chunkLength);
}
- public int compress(ByteBuffer src, WrappedByteBuffer dest) throws IOException
+ public void compress(ByteBuffer input, ByteBuffer output) throws IOException
{
- final ByteBuffer buf = dest.buffer;
- int len = src.remaining();
- dest.buffer.put((byte) len);
- dest.buffer.put((byte) (len >>> 8));
- dest.buffer.put((byte) (len >>> 16));
- dest.buffer.put((byte) (len >>> 24));
-
- int start = dest.buffer.position();
+ int len = input.remaining();
+ output.put((byte) len);
+ output.put((byte) (len >>> 8));
+ output.put((byte) (len >>> 16));
+ output.put((byte) (len >>> 24));
+
try
{
- compressor.compress(src, dest.buffer);
- return INTEGER_BYTES + (buf.position() - start);
+ compressor.compress(input, output);
}
catch (LZ4Exception e)
{
@@ -103,44 +100,42 @@ public class LZ4Compressor implements ICompressor
return decompressedLength;
}
- public int uncompress(ByteBuffer input, ByteBuffer output) throws IOException
+ public void uncompress(ByteBuffer input, ByteBuffer output) throws IOException
{
- if (input.hasArray() && output.hasArray())
- return uncompress(input.array(), input.arrayOffset() + input.position(), input.remaining(), output.array(), output.arrayOffset() + output.position());
-
- int pos = input.position();
- final int decompressedLength = (input.get(pos) & 0xFF)
- | ((input.get(pos + 1) & 0xFF) << 8)
- | ((input.get(pos + 2) & 0xFF) << 16)
- | ((input.get(pos + 3) & 0xFF) << 24);
- int inputLength = input.remaining() - INTEGER_BYTES;
+ final int decompressedLength = (input.get() & 0xFF)
+ | ((input.get() & 0xFF) << 8)
+ | ((input.get() & 0xFF) << 16)
+ | ((input.get() & 0xFF) << 24);
- final int compressedLength;
try
{
- compressedLength = decompressor.decompress(input, input.position() + INTEGER_BYTES, output, output.position(), decompressedLength);
+ int compressedLength = decompressor.decompress(input, input.position(), output, output.position(), decompressedLength);
+ input.position(input.position() + compressedLength);
+ output.position(output.position() + decompressedLength);
}
catch (LZ4Exception e)
{
throw new IOException(e);
}
- if (compressedLength != inputLength)
+ if (input.remaining() > 0)
{
- throw new IOException("Compressed lengths mismatch - got: "+compressedLength+" vs expected: "+inputLength);
+ throw new IOException("Compressed lengths mismatch - "+input.remaining()+" bytes remain");
}
+ }
- return decompressedLength;
+ public Set<String> supportedOptions()
+ {
+ return new HashSet<>(Arrays.asList(CompressionParameters.CRC_CHECK_CHANCE));
}
- @Override
- public boolean useDirectOutputByteBuffers()
+ public BufferType preferredBufferType()
{
- return true;
+ return BufferType.OFF_HEAP;
}
- public Set<String> supportedOptions()
+ public boolean supports(BufferType bufferType)
{
- return new HashSet<>(Arrays.asList(CompressionParameters.CRC_CHECK_CHANCE));
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java b/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java
index 04f676b..9fc170a 100644
--- a/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java
+++ b/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java
@@ -79,14 +79,15 @@ public class SnappyCompressor implements ICompressor
return Snappy.maxCompressedLength(chunkLength);
}
- public int compress(ByteBuffer src, WrappedByteBuffer dest) throws IOException
+ public void compress(ByteBuffer input, ByteBuffer output) throws IOException
{
- int result = Snappy.compress(src, dest.buffer);
+ int dlimit = output.limit();
+ Snappy.compress(input, output);
- // Snappy doesn't match LZ4 and Deflate w/regards to state it leaves dest ByteBuffer's counters in
- dest.buffer.position(dest.buffer.limit());
- dest.buffer.limit(dest.buffer.capacity());
- return result;
+ // Snappy doesn't match the ICompressor contract w/regards to state it leaves dest ByteBuffer's counters in
+ output.position(output.limit());
+ output.limit(dlimit);
+ input.position(input.limit());
}
public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) throws IOException
@@ -94,16 +95,27 @@ public class SnappyCompressor implements ICompressor
return Snappy.rawUncompress(input, inputOffset, inputLength, output, outputOffset);
}
- public int uncompress(ByteBuffer input, ByteBuffer output) throws IOException
+ public void uncompress(ByteBuffer input, ByteBuffer output)
+ throws IOException
{
- if (input.hasArray() && output.hasArray())
- return Snappy.rawUncompress(input.array(), input.arrayOffset() + input.position(), input.remaining(), output.array(), output.arrayOffset() + output.position());
- return Snappy.uncompress(input, output);
+ int dlimit = output.limit();
+ Snappy.uncompress(input, output);
+
+ // Snappy doesn't match the ICompressor contract w/regards to state it leaves dest ByteBuffer's counters in
+ output.position(output.limit());
+ output.limit(dlimit);
+ input.position(input.limit());
+ }
+
+ public BufferType preferredBufferType()
+ {
+ return BufferType.OFF_HEAP;
}
- @Override
- public boolean useDirectOutputByteBuffers()
+ public boolean supports(BufferType bufferType)
{
- return true;
+ // Snappy can't deal with different input and output buffer types.
+ // To avoid possible problems, pretend it can't support array-backed at all.
+ return bufferType == BufferType.OFF_HEAP;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
index ec68c2d..d5e6be9 100644
--- a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.io.util;
import java.io.File;
import java.nio.ByteBuffer;
+import org.apache.cassandra.io.compress.BufferType;
+
public class ChecksummedSequentialWriter extends SequentialWriter
{
private final SequentialWriter crcWriter;
@@ -27,8 +29,8 @@ public class ChecksummedSequentialWriter extends SequentialWriter
public ChecksummedSequentialWriter(File file, int bufferSize, File crcPath)
{
- super(file, bufferSize, false);
- crcWriter = new SequentialWriter(crcPath, 8 * 1024, false);
+ super(file, bufferSize, BufferType.ON_HEAP);
+ crcWriter = new SequentialWriter(crcPath, 8 * 1024, BufferType.ON_HEAP);
crcMetadata = new DataIntegrityMetadata.ChecksumWriter(crcWriter.stream);
crcMetadata.writeChunkSize(buffer.capacity());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index 328095b..302f054 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.utils.ByteBufferUtil;
public class RandomAccessReader extends AbstractDataInput implements FileDataInput
@@ -45,7 +46,7 @@ public class RandomAccessReader extends AbstractDataInput implements FileDataInp
protected final PoolingSegmentedFile owner;
- protected RandomAccessReader(ChannelProxy channel, int bufferSize, long overrideLength, boolean useDirectBuffer, PoolingSegmentedFile owner)
+ protected RandomAccessReader(ChannelProxy channel, int bufferSize, long overrideLength, BufferType bufferType, PoolingSegmentedFile owner)
{
this.channel = channel.sharedCopy();
this.owner = owner;
@@ -57,16 +58,14 @@ public class RandomAccessReader extends AbstractDataInput implements FileDataInp
// we can cache file length in read-only mode
fileLength = overrideLength <= 0 ? channel.size() : overrideLength;
- buffer = allocateBuffer(bufferSize, useDirectBuffer);
+ buffer = allocateBuffer(bufferSize, bufferType);
buffer.limit(0);
}
- protected ByteBuffer allocateBuffer(int bufferSize, boolean useDirectBuffer)
+ protected ByteBuffer allocateBuffer(int bufferSize, BufferType bufferType)
{
int size = (int) Math.min(fileLength, bufferSize);
- return useDirectBuffer
- ? ByteBuffer.allocateDirect(size)
- : ByteBuffer.allocate(size);
+ return bufferType.allocate(size);
}
public static RandomAccessReader open(ChannelProxy channel, long overrideSize, PoolingSegmentedFile owner)
@@ -100,7 +99,7 @@ public class RandomAccessReader extends AbstractDataInput implements FileDataInp
private static RandomAccessReader open(ChannelProxy channel, int bufferSize, long overrideSize, PoolingSegmentedFile owner)
{
- return new RandomAccessReader(channel, bufferSize, overrideSize, false, owner);
+ return new RandomAccessReader(channel, bufferSize, overrideSize, BufferType.ON_HEAP, owner);
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 3c35a34..304f702 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -27,6 +27,7 @@ import java.nio.file.StandardOpenOption;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.io.compress.CompressedSequentialWriter;
import org.apache.cassandra.io.compress.CompressionParameters;
import org.apache.cassandra.io.sstable.Descriptor;
@@ -49,7 +50,6 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
private final String filePath;
protected ByteBuffer buffer;
- private final int fd;
private int directoryFD;
// directory should be synced only after first file sync, in other words, only once per file
private boolean directorySynced = false;
@@ -119,7 +119,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
}
}
- public SequentialWriter(File file, int bufferSize, boolean offheap)
+ public SequentialWriter(File file, int bufferSize, BufferType bufferType)
{
try
{
@@ -136,13 +136,11 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
filePath = file.getAbsolutePath();
// Allow children to allocate buffer as direct (snappy compression) if necessary
- buffer = offheap ? ByteBuffer.allocateDirect(bufferSize) : ByteBuffer.allocate(bufferSize);
+ buffer = bufferType.allocate(bufferSize);
this.trickleFsync = DatabaseDescriptor.getTrickleFsync();
this.trickleFsyncByteInterval = DatabaseDescriptor.getTrickleFsyncIntervalInKb() * 1024;
- fd = CLibrary.getfd(channel);
-
directoryFD = CLibrary.tryOpenDirectory(file.getParent());
stream = new WrappedDataOutputStreamPlus(this, this);
}
@@ -152,7 +150,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
*/
public static SequentialWriter open(File file)
{
- return new SequentialWriter(file, RandomAccessReader.DEFAULT_BUFFER_SIZE, false);
+ return new SequentialWriter(file, RandomAccessReader.DEFAULT_BUFFER_SIZE, BufferType.ON_HEAP);
}
public static ChecksummedSequentialWriter open(File file, File crcPath)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/src/java/org/apache/cassandra/io/util/ThrottledReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ThrottledReader.java b/src/java/org/apache/cassandra/io/util/ThrottledReader.java
index 38e5cff..f725984 100644
--- a/src/java/org/apache/cassandra/io/util/ThrottledReader.java
+++ b/src/java/org/apache/cassandra/io/util/ThrottledReader.java
@@ -25,13 +25,15 @@ import java.io.FileNotFoundException;
import com.google.common.util.concurrent.RateLimiter;
+import org.apache.cassandra.io.compress.BufferType;
+
public class ThrottledReader extends RandomAccessReader
{
private final RateLimiter limiter;
protected ThrottledReader(ChannelProxy channel, long overrideLength, RateLimiter limiter) throws FileNotFoundException
{
- super(channel, RandomAccessReader.DEFAULT_BUFFER_SIZE, overrideLength, false, null);
+ super(channel, RandomAccessReader.DEFAULT_BUFFER_SIZE, overrideLength, BufferType.ON_HEAP, null);
this.limiter = limiter;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index b61f956..c20e33e 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -638,7 +638,7 @@ public class FBUtilities
}
}
- private static final ThreadLocal<byte[]> localDigestBuffer = new ThreadLocal<byte[]>()
+ private static final ThreadLocal<byte[]> threadLocalScratchBuffer = new ThreadLocal<byte[]>()
{
@Override
protected byte[] initialValue()
@@ -647,6 +647,11 @@ public class FBUtilities
}
};
+ public static byte[] getThreadLocalScratchBuffer()
+ {
+ return threadLocalScratchBuffer.get();
+ }
+
//Java 7 has this method but it's private till Java 8. Thanks JDK!
public static boolean supportsDirectChecksum()
{
@@ -674,7 +679,7 @@ public class FBUtilities
}
//Fallback
- byte[] buffer = localDigestBuffer.get();
+ byte[] buffer = getThreadLocalScratchBuffer();
int remaining;
while ((remaining = bb.remaining()) > 0)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
index a8cf8fd..7f9df9e 100644
--- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@ -30,6 +30,7 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
+import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
@@ -38,7 +39,6 @@ import java.util.concurrent.atomic.AtomicLong;
import junit.framework.Assert;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.RateLimiter;
import org.junit.BeforeClass;
@@ -53,7 +53,6 @@ import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.Cell;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnSerializer;
-import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
@@ -110,7 +109,7 @@ public class CommitLogStressTest
CommitLogStressTest tester = new CommitLogStressTest();
tester.testFixedSize();
}
- catch (Exception e)
+ catch (Throwable e)
{
e.printStackTrace(System.err);
}
@@ -272,7 +271,7 @@ public class CommitLogStressTest
{
stop = false;
for (int ii = 0; ii < NUM_THREADS; ii++) {
- final CommitlogExecutor t = new CommitlogExecutor(commitLog);
+ final CommitlogExecutor t = new CommitlogExecutor(commitLog, new Random(ii));
threads.add(t);
t.start();
}
@@ -313,7 +312,7 @@ public class CommitLogStressTest
return maxMemory / (1024 * 1024);
}
- public static ByteBuffer randomBytes(int quantity, ThreadLocalRandom tlr) {
+ public static ByteBuffer randomBytes(int quantity, Random tlr) {
ByteBuffer slice = ByteBuffer.allocate(quantity);
ByteBuffer source = dataSource.duplicate();
source.position(tlr.nextInt(source.capacity() - quantity));
@@ -329,27 +328,29 @@ public class CommitLogStressTest
int cells = 0;
int dataSize = 0;
final CommitLog commitLog;
+ final Random random;
volatile ReplayPosition rp;
- public CommitlogExecutor(CommitLog commitLog)
+ public CommitlogExecutor(CommitLog commitLog, Random rand)
{
this.commitLog = commitLog;
+ this.random = rand;
}
public void run() {
RateLimiter rl = rateLimit != 0 ? RateLimiter.create(rateLimit) : null;
- final ThreadLocalRandom tlr = ThreadLocalRandom.current();
+ final Random rand = random != null ? random : ThreadLocalRandom.current();
while (!stop) {
if (rl != null)
rl.acquire();
String ks = "Keyspace1";
- ByteBuffer key = randomBytes(16, tlr);
+ ByteBuffer key = randomBytes(16, rand);
Mutation mutation = new Mutation(ks, key);
for (int ii = 0; ii < numCells; ii++) {
- int sz = randomSize ? tlr.nextInt(cellSize) : cellSize;
- ByteBuffer bytes = randomBytes(sz, tlr);
+ int sz = randomSize ? rand.nextInt(cellSize) : cellSize;
+ ByteBuffer bytes = randomBytes(sz, rand);
mutation.add("Standard1", Util.cellname("name" + ii), bytes,
System.currentTimeMillis());
hash = hash(hash, bytes);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/test/long/org/apache/cassandra/io/compress/CompressorPerformance.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/io/compress/CompressorPerformance.java b/test/long/org/apache/cassandra/io/compress/CompressorPerformance.java
new file mode 100644
index 0000000..7401951
--- /dev/null
+++ b/test/long/org/apache/cassandra/io/compress/CompressorPerformance.java
@@ -0,0 +1,99 @@
+package org.apache.cassandra.io.compress;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class CompressorPerformance
+{
+
+ static public void testPerformances() throws IOException
+ {
+ for (ICompressor compressor: new ICompressor[] {
+ SnappyCompressor.instance, // warm up
+ DeflateCompressor.instance,
+ LZ4Compressor.instance,
+ SnappyCompressor.instance
+ })
+ {
+ for (BufferType in: BufferType.values())
+ {
+ if (compressor.supports(in))
+ {
+ for (BufferType out: BufferType.values())
+ {
+ if (compressor.supports(out))
+ {
+ for (int i=0; i<10; ++i)
+ testPerformance(compressor, in, out);
+ System.out.println();
+ }
+ }
+ }
+ }
+ }
+ }
+
+ static ByteBuffer dataSource;
+ static int bufLen;
+
+ static private void testPerformance(ICompressor compressor, BufferType in, BufferType out) throws IOException
+ {
+ int len = dataSource.capacity();
+ int bufLen = compressor.initialCompressedBufferLength(len);
+ ByteBuffer input = in.allocate(bufLen);
+ ByteBuffer output = out.allocate(bufLen);
+
+ int checksum = 0;
+ int count = 100;
+
+ long time = System.nanoTime();
+ for (int i=0; i<count; ++i)
+ {
+ output.clear();
+ compressor.compress(dataSource, output);
+ // Make sure not optimized away.
+ checksum += output.get(ThreadLocalRandom.current().nextInt(output.position()));
+ dataSource.rewind();
+ }
+ long timec = System.nanoTime() - time;
+ output.flip();
+ input.put(output);
+ input.flip();
+
+ time = System.nanoTime();
+ for (int i=0; i<count; ++i)
+ {
+ output.clear();
+ compressor.uncompress(input, output);
+ // Make sure not optimized away.
+ checksum += output.get(ThreadLocalRandom.current().nextInt(output.position()));
+ input.rewind();
+ }
+ long timed = System.nanoTime() - time;
+ System.out.format("Compressor %s %s->%s compress %.3f ns/b %.3f mb/s uncompress %.3f ns/b %.3f mb/s.%s\n",
+ compressor.getClass().getSimpleName(),
+ in,
+ out,
+ 1.0 * timec / (count * len),
+ Math.scalb(1.0e9, -20) * count * len / timec,
+ 1.0 * timed / (count * len),
+ Math.scalb(1.0e9, -20) * count * len / timed,
+ checksum == 0 ? " " : "");
+ }
+
+ public static void main(String[] args) throws IOException
+ {
+ try (FileInputStream fis = new FileInputStream("CHANGES.txt"))
+ {
+ int len = (int)fis.getChannel().size();
+ dataSource = ByteBuffer.allocateDirect(len);
+ while (dataSource.hasRemaining()) {
+ fis.getChannel().read(dataSource);
+ }
+ dataSource.flip();
+ }
+ testPerformances();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java
index fe04096..71fab61 100644
--- a/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java
@@ -3,9 +3,7 @@ package org.apache.cassandra.io;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
-import java.nio.file.StandardOpenOption;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -14,7 +12,6 @@ import java.util.concurrent.TimeUnit;
import org.junit.Test;
import static org.junit.Assert.*;
-import org.apache.cassandra.io.compress.*;
import org.apache.cassandra.io.util.ChannelProxy;
import org.apache.cassandra.io.util.FileMark;
import org.apache.cassandra.io.util.RandomAccessReader;
@@ -28,7 +25,7 @@ public class RandomAccessReaderTest
final File f = File.createTempFile("testReadFully", "1");
final String expected = "The quick brown fox jumps over the lazy dog";
- SequentialWriter writer = new SequentialWriter(f, CompressionParameters.DEFAULT_CHUNK_LENGTH, false);
+ SequentialWriter writer = SequentialWriter.open(f);
writer.write(expected.getBytes());
writer.finish();
@@ -56,7 +53,7 @@ public class RandomAccessReaderTest
File f = File.createTempFile("testReadBytes", "1");
final String expected = "The quick brown fox jumps over the lazy dog";
- SequentialWriter writer = new SequentialWriter(f, CompressionParameters.DEFAULT_CHUNK_LENGTH, false);
+ SequentialWriter writer = SequentialWriter.open(f);
writer.write(expected.getBytes());
writer.finish();
@@ -84,7 +81,7 @@ public class RandomAccessReaderTest
final String expected = "The quick brown fox jumps over the lazy dog";
final int numIterations = 10;
- SequentialWriter writer = new SequentialWriter(f, CompressionParameters.DEFAULT_CHUNK_LENGTH, false);
+ SequentialWriter writer = SequentialWriter.open(f);
for (int i = 0; i < numIterations; i++)
writer.write(expected.getBytes());
writer.finish();
@@ -163,7 +160,7 @@ public class RandomAccessReaderTest
}
final int totalLength = len;
- SequentialWriter writer = new SequentialWriter(f, CompressionParameters.DEFAULT_CHUNK_LENGTH, false);
+ SequentialWriter writer = SequentialWriter.open(f);
for (int i = 0; i < expected.length; i++)
writer.write(expected[i].getBytes());
writer.finish();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
index cfc4bb8..b013206 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
@@ -110,7 +110,7 @@ public class CompressedRandomAccessReaderTest
MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance)).replayPosition(null);
SequentialWriter writer = compressed
? new CompressedSequentialWriter(f, filename + ".metadata", new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector)
- : new SequentialWriter(f, CompressionParameters.DEFAULT_CHUNK_LENGTH, false);
+ : SequentialWriter.open(f);
writer.write("The quick ".getBytes());
FileMark mark = writer.mark();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
index 184319f..27b866d 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
@@ -142,9 +142,7 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest
private ByteBuffer makeBB(int size)
{
- return compressor.useDirectOutputByteBuffers()
- ? ByteBuffer.allocateDirect(size)
- : ByteBuffer.allocate(size);
+ return compressor.preferredBufferType().allocate(size);
}
private final List<TestableCSW> writers = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/test/unit/org/apache/cassandra/io/compress/CompressorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressorTest.java b/test/unit/org/apache/cassandra/io/compress/CompressorTest.java
index 53021ee..6018cc7 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressorTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressorTest.java
@@ -21,19 +21,17 @@ import java.io.*;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Collections;
import java.util.Random;
+import com.google.common.io.Files;
import static org.junit.Assert.*;
-
-import org.apache.cassandra.io.util.RandomAccessReader;
import org.junit.Assert;
import org.junit.Test;
-import com.google.common.io.Files;
-
-import org.apache.cassandra.io.compress.ICompressor.WrappedByteBuffer;
+import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.utils.ByteBufferUtil;
public class CompressorTest
@@ -62,27 +60,29 @@ public class CompressorTest
public void testArrayUncompress(byte[] data, int off, int len) throws IOException
{
- ByteBuffer src = makeBB(len);
+ final int inOffset = 2;
+ ByteBuffer src = makeBB(len + inOffset);
+ src.position(inOffset);
src.put(data, off, len);
- src.rewind();
+ src.flip().position(inOffset);
final int outOffset = 3;
- final WrappedByteBuffer compressed = makeWrappedBB(outOffset + compressor.initialCompressedBufferLength(len));
- fillBBWithRandom(compressed.buffer);
- compressed.buffer.clear();
- compressed.buffer.position(outOffset);
+ final ByteBuffer compressed = makeBB(outOffset + compressor.initialCompressedBufferLength(len));
+ fillBBWithRandom(compressed);
+ compressed.position(outOffset);
- final int compressedLength = compressor.compress(src, compressed);
+ compressor.compress(src, compressed);
+ compressed.flip().position(outOffset);
final int restoreOffset = 5;
final byte[] restored = new byte[restoreOffset + len];
new Random().nextBytes(restored);
// need byte[] representation which direct buffers don't have
- byte[] compressedBytes = new byte[compressed.buffer.capacity()];
- ByteBufferUtil.arrayCopy(compressed.buffer, outOffset, compressedBytes, 0, compressed.buffer.capacity() - outOffset);
+ byte[] compressedBytes = new byte[compressed.capacity()];
+ ByteBufferUtil.arrayCopy(compressed, outOffset, compressedBytes, outOffset, compressed.capacity() - outOffset);
- final int decompressedLength = compressor.uncompress(compressedBytes, 0, compressedLength, restored, restoreOffset);
+ final int decompressedLength = compressor.uncompress(compressedBytes, outOffset, compressed.remaining(), restored, restoreOffset);
assertEquals(decompressedLength, len);
assertArrayEquals(Arrays.copyOfRange(data, off, off + len),
@@ -128,27 +128,28 @@ public class CompressorTest
final int outOffset = 3;
byte[] garbage = new byte[outOffset + compressor.initialCompressedBufferLength(data.length)];
new Random().nextBytes(garbage);
- WrappedByteBuffer dest = makeWrappedBB(outOffset + compressor.initialCompressedBufferLength(data.length));
- dest.buffer.put(garbage);
- dest.buffer.clear();
- dest.buffer.position(outOffset);
+ ByteBuffer dest = makeBB(outOffset + compressor.initialCompressedBufferLength(data.length));
+ dest.put(garbage);
+ dest.clear();
+ dest.position(outOffset);
- final int compressedLength = compressor.compress(src, dest);
+ compressor.compress(src, dest);
+ int compressedLength = dest.position() - outOffset;
- FileChannel channel = new FileOutputStream(temp, false).getChannel();
- dest.buffer.clear();
- channel.write(dest.buffer);
+ FileChannel channel = FileChannel.open(temp.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE);
+ dest.clear();
+ channel.write(dest);
MappedByteBuffer mappedData = Files.map(temp);
- mappedData.position(outOffset);
- mappedData.limit(compressedLength + outOffset);
-
ByteBuffer result = makeBB(data.length + 100);
+ mappedData.position(outOffset).limit(outOffset + compressedLength);
- int length = compressor.uncompress(mappedData, result);
+ compressor.uncompress(mappedData, result);
+ channel.close();
+ result.flip();
- Assert.assertEquals(data.length, length);
- for (int i = 0; i < length; i++)
+ Assert.assertEquals(data.length, result.limit());
+ for (int i = 0; i < result.limit(); i++)
{
Assert.assertEquals("Decompression mismatch at byte "+i, data[i], result.get());
}
@@ -177,51 +178,71 @@ public class CompressorTest
private void testByteBuffers() throws IOException
{
- int n = RandomAccessReader.DEFAULT_BUFFER_SIZE;
- byte[] srcData = new byte[n];
- new Random().nextBytes(srcData);
-
- ByteBuffer src = makeBB(n);
- src.put(srcData, 0, n);
- src.flip();
-
- int outOffset = 5;
- ICompressor.WrappedByteBuffer compressed = makeWrappedBB(outOffset + compressor.initialCompressedBufferLength(srcData.length));
- byte[] garbage = new byte[compressed.buffer.capacity()];
- new Random().nextBytes(garbage);
- compressed.buffer.put(garbage);
- compressed.buffer.clear();
- compressed.buffer.position(outOffset);
-
- compressor.compress(src, compressed);
- compressed.buffer.flip();
- compressed.buffer.position(outOffset);
-
- ByteBuffer result = makeBB(outOffset + n);
- int decompressed = compressor.uncompress(compressed.buffer, result);
-
- assert decompressed == n;
- for (int i = 0; i < n; ++i)
- assert srcData[i] == result.get(i) : "Failed comparison on index: " + i + " with compressor: " + compressor.getClass().toString();
+ assert compressor.supports(BufferType.OFF_HEAP);
+ assert compressor.supports(compressor.preferredBufferType());
+
+ for (BufferType in: BufferType.values())
+ if (compressor.supports(in))
+ for (BufferType comp: BufferType.values())
+ if (compressor.supports(comp))
+ for (BufferType out: BufferType.values())
+ if (compressor.supports(out))
+ testByteBuffers(in, comp, out);
}
- private ByteBuffer makeBB(int size)
+ private void testByteBuffers(BufferType typeIn, BufferType typeComp, BufferType typeOut) throws IOException
{
- return compressor.useDirectOutputByteBuffers()
- ? ByteBuffer.allocateDirect(size)
- : ByteBuffer.allocate(size);
+ try
+ {
+ int n = RandomAccessReader.DEFAULT_BUFFER_SIZE;
+ byte[] srcData = new byte[n];
+ new Random().nextBytes(srcData);
+
+ final int inOffset = 2;
+ ByteBuffer src = typeIn.allocate(inOffset + n + inOffset);
+ src.position(inOffset);
+ src.put(srcData, 0, n);
+ src.flip().position(inOffset);
+
+ int outOffset = 5;
+ ByteBuffer compressed = typeComp.allocate(outOffset + compressor.initialCompressedBufferLength(srcData.length) + outOffset);
+ byte[] garbage = new byte[compressed.capacity()];
+ new Random().nextBytes(garbage);
+ compressed.put(garbage);
+ compressed.position(outOffset).limit(compressed.capacity() - outOffset);
+
+ compressor.compress(src, compressed);
+ assertEquals(inOffset + n, src.position());
+ assertEquals(inOffset + n, src.limit());
+ assertEquals(compressed.capacity() - outOffset, compressed.limit());
+ compressed.flip().position(outOffset);
+ int len = compressed.remaining();
+
+ ByteBuffer result = typeOut.allocate(inOffset + n + inOffset);
+ result.position(inOffset).limit(result.capacity() - inOffset);
+ compressor.uncompress(compressed, result);
+ assertEquals(outOffset + len, compressed.position());
+ assertEquals(outOffset + len, compressed.limit());
+ assertEquals(result.capacity() - inOffset, result.limit());
+
+ int decompressed = result.position() - inOffset;
+ assert decompressed == n : "Failed uncompressed size";
+ for (int i = 0; i < n; ++i)
+ assert srcData[i] == result.get(inOffset + i) : "Failed comparison on index: " + i;
+ }
+ catch (Throwable e)
+ {
+ throw new AssertionError("Failed testing compressor " + compressor.getClass().getSimpleName() + " with buffer types in:" + typeIn + " compressed:" + typeComp + " out:" + typeOut, e);
+ }
}
- private WrappedByteBuffer makeWrappedBB(int size)
+ private ByteBuffer makeBB(int size)
{
- return compressor.useDirectOutputByteBuffers()
- ? new WrappedByteBuffer(ByteBuffer.allocateDirect(size))
- : new WrappedByteBuffer(ByteBuffer.allocate(size));
+ return compressor.preferredBufferType().allocate(size);
}
private void fillBBWithRandom(ByteBuffer dest)
{
- ByteBuffer dupe = dest.duplicate();
byte[] random = new byte[dest.capacity()];
new Random().nextBytes(random);
dest.clear();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
index ec280fa..70993d3 100644
--- a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
+++ b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
@@ -36,6 +36,8 @@ import java.util.concurrent.ThreadLocalRandom;
import org.junit.Assert;
import org.junit.Test;
+
+import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.utils.ByteBufferUtil;
public class DataOutputTest
@@ -176,7 +178,7 @@ public class DataOutputTest
public void testSequentialWriter() throws IOException
{
File file = FileUtils.createTempFile("dataoutput", "test");
- final SequentialWriter writer = new SequentialWriter(file, 32, false);
+ final SequentialWriter writer = new SequentialWriter(file, 32, BufferType.ON_HEAP);
DataOutputStreamPlus write = new WrappedDataOutputStreamPlus(writer);
DataInput canon = testWrite(write);
write.flush();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3adfd157/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
index ef52030..ce0f918 100644
--- a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
@@ -28,6 +28,8 @@ import java.util.concurrent.ThreadLocalRandom;
import org.junit.After;
import junit.framework.Assert;
+
+import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest;
import static org.apache.commons.io.FileUtils.*;
@@ -66,7 +68,7 @@ public class SequentialWriterTest extends AbstractTransactionalTest
protected TestableSW(File file) throws IOException
{
- this(file, new SequentialWriter(file, 8 << 10, true));
+ this(file, new SequentialWriter(file, 8 << 10, BufferType.OFF_HEAP));
}
protected TestableSW(File file, SequentialWriter sw) throws IOException