You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2015/01/16 15:39:53 UTC
cassandra git commit: Support direct buffer decompression for reads
Repository: cassandra
Updated Branches:
refs/heads/trunk 3dd9c38a1 -> 895ec3ea1
Support direct buffer decompression for reads
Patch by tjake; reviewed by Branimir Lambov for CASSANDRA-8464
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/895ec3ea
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/895ec3ea
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/895ec3ea
Branch: refs/heads/trunk
Commit: 895ec3ea1e68b74ee025317f57a09c1ef0b512dd
Parents: 3dd9c38
Author: T Jake Luciani <ja...@apache.org>
Authored: Fri Jan 16 09:37:06 2015 -0500
Committer: T Jake Luciani <ja...@apache.org>
Committed: Fri Jan 16 09:38:42 2015 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra-env.sh | 1 +
.../compress/CompressedRandomAccessReader.java | 179 +++++++++++++++++--
.../io/compress/CompressionMetadata.java | 11 +-
.../io/compress/DeflateCompressor.java | 17 ++
.../cassandra/io/compress/ICompressor.java | 12 ++
.../cassandra/io/compress/LZ4Compressor.java | 43 ++++-
.../cassandra/io/compress/SnappyCompressor.java | 13 ++
.../cassandra/io/sstable/format/Version.java | 2 -
.../io/sstable/format/big/BigFormat.java | 20 +--
.../org/apache/cassandra/io/util/FileUtils.java | 5 +-
.../cassandra/io/util/MmappedSegmentedFile.java | 1 +
.../cassandra/io/util/RandomAccessReader.java | 16 +-
.../compress/CompressedInputStream.java | 15 +-
.../compress/CompressedStreamReader.java | 2 +-
.../org/apache/cassandra/utils/FBUtilities.java | 67 +++++++
.../CompressedRandomAccessReaderTest.java | 6 +-
.../cassandra/io/compress/CompressorTest.java | 133 ++++++++++++++
.../io/compress/LZ4CompressorTest.java | 84 ---------
.../compress/CompressedInputStreamTest.java | 2 +-
20 files changed, 474 insertions(+), 156 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 27b511a..79181e1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0
+ * Support direct buffer decompression for reads (CASSANDRA-8464)
* DirectByteBuffer compatible LZ4 methods (CASSANDRA-7039)
* Add role based access control (CASSANDRA-7653)
* Group sstables for anticompaction correctly (CASSANDRA-8578)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/conf/cassandra-env.sh
----------------------------------------------------------------------
diff --git a/conf/cassandra-env.sh b/conf/cassandra-env.sh
index 58605ca..f9641ed 100644
--- a/conf/cassandra-env.sh
+++ b/conf/cassandra-env.sh
@@ -251,6 +251,7 @@ fi
# uncomment to have Cassandra JVM log internal method compilation (developers only)
# JVM_OPTS="$JVM_OPTS -XX:+UnlockDiagnosticVMOptions -XX:+LogCompilation"
+# JVM_OPTS="$JVM_OPTS -XX:+UnlockCommercialFeatures -XX:+FlightRecorder"
# Prefer binding to IPv4 network intefaces (when net.ipv6.bindv6only=1). See
# http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6342561 (short version:
http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index dca5ade..57abba9 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@ -19,14 +19,22 @@ package org.apache.cassandra.io.compress;
import java.io.*;
import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Map;
+import java.util.TreeMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.zip.Adler32;
-import java.util.zip.CRC32;
-import java.util.zip.Checksum;
+
+import com.google.common.primitives.Ints;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.util.CompressedPoolingSegmentedFile;
+import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.PoolingSegmentedFile;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.utils.FBUtilities;
@@ -37,6 +45,8 @@ import org.apache.cassandra.utils.FBUtilities;
*/
public class CompressedRandomAccessReader extends RandomAccessReader
{
+ private static final boolean useMmap = DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap;
+
public static CompressedRandomAccessReader open(String path, CompressionMetadata metadata, CompressedPoolingSegmentedFile owner)
{
try
@@ -61,33 +71,96 @@ public class CompressedRandomAccessReader extends RandomAccessReader
}
}
+ private TreeMap<Long, MappedByteBuffer> chunkSegments;
+ private int MAX_SEGMENT_SIZE = Integer.MAX_VALUE;
+
private final CompressionMetadata metadata;
// we read the raw compressed bytes into this buffer, then move the uncompressed ones into super.buffer.
private ByteBuffer compressed;
// re-use single crc object
- private final Checksum checksum;
+ private final Adler32 checksum;
// raw checksum bytes
- private final ByteBuffer checksumBytes = ByteBuffer.wrap(new byte[4]);
+ private ByteBuffer checksumBytes;
protected CompressedRandomAccessReader(String dataFilePath, CompressionMetadata metadata, PoolingSegmentedFile owner) throws FileNotFoundException
{
- super(new File(dataFilePath), metadata.chunkLength(), owner);
+ super(new File(dataFilePath), metadata.chunkLength(), metadata.compressor().useDirectOutputByteBuffers(), owner);
this.metadata = metadata;
- checksum = metadata.hasPostCompressionAdlerChecksums ? new Adler32() : new CRC32();
- compressed = ByteBuffer.wrap(new byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())]);
+ checksum = new Adler32();
+
+ if (!useMmap)
+ {
+ compressed = ByteBuffer.wrap(new byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())]);
+ checksumBytes = ByteBuffer.wrap(new byte[4]);
+ }
+ else
+ {
+ try
+ {
+ createMappedSegments();
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+ }
+
+ private void createMappedSegments() throws IOException
+ {
+ chunkSegments = new TreeMap<>();
+ long offset = 0;
+ long lastSegmentOffset = 0;
+ long segmentSize = 0;
+
+ while (offset < metadata.dataLength)
+ {
+ CompressionMetadata.Chunk chunk = metadata.chunkFor(offset);
+
+ //Reached a new mmap boundary
+ if (segmentSize + chunk.length + 4 > MAX_SEGMENT_SIZE)
+ {
+ chunkSegments.put(lastSegmentOffset, channel.map(FileChannel.MapMode.READ_ONLY, lastSegmentOffset, segmentSize));
+ lastSegmentOffset += segmentSize;
+ segmentSize = 0;
+ }
+
+ segmentSize += chunk.length + 4; //checksum
+ offset += metadata.chunkLength();
+ }
+
+ if (segmentSize > 0)
+ chunkSegments.put(lastSegmentOffset, channel.map(FileChannel.MapMode.READ_ONLY, lastSegmentOffset, segmentSize));
}
- protected ByteBuffer allocateBuffer(int bufferSize)
+ protected ByteBuffer allocateBuffer(int bufferSize, boolean useDirect)
{
assert Integer.bitCount(bufferSize) == 1;
- return ByteBuffer.allocate(bufferSize);
+ return useMmap && useDirect
+ ? ByteBuffer.allocateDirect(bufferSize)
+ : ByteBuffer.allocate(bufferSize);
}
@Override
- protected void reBuffer()
+ public void deallocate()
+ {
+ super.deallocate();
+
+ if (chunkSegments != null)
+ {
+ for (Map.Entry<Long, MappedByteBuffer> entry : chunkSegments.entrySet())
+ {
+ FileUtils.clean(entry.getValue());
+ }
+ }
+
+ chunkSegments = null;
+ }
+
+ private void reBufferStandard()
{
try
{
@@ -126,14 +199,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader
if (metadata.parameters.getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
{
- if (metadata.hasPostCompressionAdlerChecksums)
- {
- checksum.update(compressed.array(), 0, chunk.length);
- }
- else
- {
- checksum.update(buffer.array(), 0, decompressedBytes);
- }
+ checksum.update(compressed.array(), 0, chunk.length);
if (checksum(chunk) != (int) checksum.getValue())
throw new CorruptBlockException(getPath(), chunk);
@@ -156,6 +222,81 @@ public class CompressedRandomAccessReader extends RandomAccessReader
}
}
+ private void reBufferMmap()
+ {
+ try
+ {
+ long position = current();
+ assert position < metadata.dataLength;
+
+ CompressionMetadata.Chunk chunk = metadata.chunkFor(position);
+
+ Map.Entry<Long, MappedByteBuffer> entry = chunkSegments.floorEntry(chunk.offset);
+ long segmentOffset = entry.getKey();
+ int chunkOffset = Ints.checkedCast(chunk.offset - segmentOffset);
+ MappedByteBuffer compressedChunk = entry.getValue();
+
+ compressedChunk.position(chunkOffset);
+ compressedChunk.limit(chunkOffset + chunk.length);
+ compressedChunk.mark();
+
+ buffer.clear();
+ int decompressedBytes;
+ try
+ {
+ decompressedBytes = metadata.compressor().uncompress(compressedChunk, buffer);
+ buffer.limit(decompressedBytes);
+ }
+ catch (IOException e)
+ {
+ throw new CorruptBlockException(getPath(), chunk);
+ }
+ finally
+ {
+ compressedChunk.limit(compressedChunk.capacity());
+ }
+
+ if (metadata.parameters.getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
+ {
+ compressedChunk.reset();
+ compressedChunk.limit(chunkOffset + chunk.length);
+
+ FBUtilities.directCheckSum(checksum, compressedChunk);
+
+ compressedChunk.limit(compressedChunk.capacity());
+
+
+ if (compressedChunk.getInt() != (int) checksum.getValue())
+ throw new CorruptBlockException(getPath(), chunk);
+
+ // reset checksum object back to the original (blank) state
+ checksum.reset();
+ }
+
+ // buffer offset is always aligned
+ bufferOffset = position & ~(buffer.capacity() - 1);
+ buffer.position((int) (position - bufferOffset));
+ }
+ catch (CorruptBlockException e)
+ {
+ throw new CorruptSSTableException(e, getPath());
+ }
+
+ }
+
+ @Override
+ protected void reBuffer()
+ {
+ if (useMmap)
+ {
+ reBufferMmap();
+ }
+ else
+ {
+ reBufferStandard();
+ }
+ }
+
private int checksum(CompressionMetadata.Chunk chunk) throws IOException
{
assert channel.position() == chunk.offset + chunk.length;
@@ -167,7 +308,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader
public int getTotalBufferSize()
{
- return super.getTotalBufferSize() + compressed.capacity();
+ return super.getTotalBufferSize() + (useMmap ? 0 : compressed.capacity());
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 57d7cbe..6139a5c 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -62,7 +62,6 @@ public class CompressionMetadata
{
public final long dataLength;
public final long compressedFileLength;
- public final boolean hasPostCompressionAdlerChecksums;
private final Memory chunkOffsets;
private final long chunkOffsetsSize;
public final String indexFilePath;
@@ -82,14 +81,13 @@ public class CompressionMetadata
public static CompressionMetadata create(String dataFilePath)
{
Descriptor desc = Descriptor.fromFilename(dataFilePath);
- return new CompressionMetadata(desc.filenameFor(Component.COMPRESSION_INFO), new File(dataFilePath).length(), desc.version.hasPostCompressionAdlerChecksums());
+ return new CompressionMetadata(desc.filenameFor(Component.COMPRESSION_INFO), new File(dataFilePath).length());
}
@VisibleForTesting
- CompressionMetadata(String indexFilePath, long compressedLength, boolean hasPostCompressionAdlerChecksums)
+ CompressionMetadata(String indexFilePath, long compressedLength)
{
this.indexFilePath = indexFilePath;
- this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums;
DataInputStream stream;
try
@@ -137,13 +135,12 @@ public class CompressionMetadata
this.chunkOffsetsSize = chunkOffsets.size();
}
- private CompressionMetadata(String filePath, CompressionParameters parameters, RefCountedMemory offsets, long offsetsSize, long dataLength, long compressedLength, boolean hasPostCompressionAdlerChecksums)
+ private CompressionMetadata(String filePath, CompressionParameters parameters, RefCountedMemory offsets, long offsetsSize, long dataLength, long compressedLength)
{
this.indexFilePath = filePath;
this.parameters = parameters;
this.dataLength = dataLength;
this.compressedFileLength = compressedLength;
- this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums;
this.chunkOffsets = offsets;
offsets.reference();
this.chunkOffsetsSize = offsetsSize;
@@ -342,7 +339,7 @@ public class CompressionMetadata
default:
throw new AssertionError();
}
- return new CompressionMetadata(filePath, parameters, offsets, count * 8L, dataLength, compressedLength, latestVersion.hasPostCompressionAdlerChecksums());
+ return new CompressionMetadata(filePath, parameters, offsets, count * 8L, dataLength, compressedLength);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java b/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
index 125a08f..546b506 100644
--- a/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
+++ b/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
@@ -17,7 +17,10 @@
*/
package org.apache.cassandra.io.compress;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
@@ -113,4 +116,18 @@ public class DeflateCompressor implements ICompressor
throw new IOException(e);
}
}
+
+ public int uncompress(ByteBuffer input_, ByteBuffer output) throws IOException
+ {
+ if (!output.hasArray())
+ throw new IllegalArgumentException("DeflateCompressor doesn't work with direct byte buffers");
+
+ byte[] input = ByteBufferUtil.getArray(input_);
+ return uncompress(input, 0, input.length, output.array(), output.arrayOffset() + output.position());
+ }
+
+ public boolean useDirectOutputByteBuffers()
+ {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/src/java/org/apache/cassandra/io/compress/ICompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/ICompressor.java b/src/java/org/apache/cassandra/io/compress/ICompressor.java
index be76bc5..81d1425 100644
--- a/src/java/org/apache/cassandra/io/compress/ICompressor.java
+++ b/src/java/org/apache/cassandra/io/compress/ICompressor.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.io.compress;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Set;
public interface ICompressor
@@ -28,6 +29,17 @@ public interface ICompressor
public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) throws IOException;
+ /**
+ * Decompression for DirectByteBuffers
+ */
+ public int uncompress(ByteBuffer input, ByteBuffer output) throws IOException;
+
+ /**
+ * Notifies user if this compressor will wants/requires a direct byte buffers to
+ * decompress direct byteBuffers
+ */
+ public boolean useDirectOutputByteBuffers();
+
public Set<String> supportedOptions();
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java b/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
index 0cf36c1..f458cb6 100644
--- a/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
+++ b/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.io.compress;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
@@ -25,10 +26,10 @@ import java.util.Set;
import net.jpountz.lz4.LZ4Exception;
import net.jpountz.lz4.LZ4Factory;
+import org.apache.cassandra.utils.FastByteOperations;
public class LZ4Compressor implements ICompressor
{
-
private static final int INTEGER_BYTES = 4;
private static final LZ4Compressor instance = new LZ4Compressor();
@@ -38,13 +39,13 @@ public class LZ4Compressor implements ICompressor
}
private final net.jpountz.lz4.LZ4Compressor compressor;
- private final net.jpountz.lz4.LZ4Decompressor decompressor;
+ private final net.jpountz.lz4.LZ4FastDecompressor decompressor;
private LZ4Compressor()
{
final LZ4Factory lz4Factory = LZ4Factory.fastestInstance();
compressor = lz4Factory.fastCompressor();
- decompressor = lz4Factory.decompressor();
+ decompressor = lz4Factory.fastDecompressor();
}
public int initialCompressedBufferLength(int chunkLength)
@@ -97,8 +98,42 @@ public class LZ4Compressor implements ICompressor
return decompressedLength;
}
+ public int uncompress(ByteBuffer input, ByteBuffer output) throws IOException
+ {
+ int pos = input.position();
+ final int decompressedLength = (input.get(pos) & 0xFF)
+ | ((input.get(pos + 1) & 0xFF) << 8)
+ | ((input.get(pos + 2) & 0xFF) << 16)
+ | ((input.get(pos + 3) & 0xFF) << 24);
+
+ int inputLength = input.remaining() - INTEGER_BYTES;
+
+ final int compressedLength;
+ try
+ {
+ compressedLength = decompressor.decompress(input, input.position() + INTEGER_BYTES, output, output.position(), decompressedLength);
+ }
+ catch (LZ4Exception e)
+ {
+ throw new IOException(e);
+ }
+
+ if (compressedLength != inputLength)
+ {
+ throw new IOException("Compressed lengths mismatch: "+compressedLength+" vs "+inputLength);
+ }
+
+ return decompressedLength;
+ }
+
+ @Override
+ public boolean useDirectOutputByteBuffers()
+ {
+ return false;
+ }
+
public Set<String> supportedOptions()
{
- return new HashSet<String>(Arrays.asList(CompressionParameters.CRC_CHECK_CHANCE));
+ return new HashSet<>(Arrays.asList(CompressionParameters.CRC_CHECK_CHANCE));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java b/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java
index 3583201..f5a2062 100644
--- a/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java
+++ b/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java
@@ -18,10 +18,12 @@
package org.apache.cassandra.io.compress;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xerial.snappy.Snappy;
@@ -95,4 +97,15 @@ public class SnappyCompressor implements ICompressor
{
return Snappy.rawUncompress(input, inputOffset, inputLength, output, outputOffset);
}
+
+ public int uncompress(ByteBuffer input, ByteBuffer output) throws IOException
+ {
+ return Snappy.uncompress(input, output);
+ }
+
+ @Override
+ public boolean useDirectOutputByteBuffers()
+ {
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/src/java/org/apache/cassandra/io/sstable/format/Version.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/Version.java b/src/java/org/apache/cassandra/io/sstable/format/Version.java
index 5da0cb8..faaa89e 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/Version.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/Version.java
@@ -40,8 +40,6 @@ public abstract class Version
public abstract boolean isLatestVersion();
- public abstract boolean hasPostCompressionAdlerChecksums();
-
public abstract boolean hasSamplingLevel();
public abstract boolean hasNewStatsFile();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index eb43968..e1a5622 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@ -126,16 +126,8 @@ public class BigFormat implements SSTableFormat
static class BigVersion extends Version
{
public static final String current_version = "la";
- public static final String earliest_supported_version = "ja";
-
- // ja (2.0.0): super columns are serialized as composites (note that there is no real format change,
- // this is mostly a marker to know if we should expect super columns or not. We do need
- // a major version bump however, because we should not allow streaming of super columns
- // into this new format)
- // tracks max local deletiontime in sstable metadata
- // records bloom_filter_fp_chance in metadata component
- // remove data size and column count from data file (CASSANDRA-4180)
- // tracks max/min column values (according to comparator)
+ public static final String earliest_supported_version = "jb";
+
// jb (2.0.1): switch from crc32 to adler32 for compression checksums
// checksum the compressed data
// ka (2.1.0): new Statistics.db file format
@@ -145,7 +137,6 @@ public class BigFormat implements SSTableFormat
// la (3.0.0): new file name format
private final boolean isLatestVersion;
- private final boolean hasPostCompressionAdlerChecksums;
private final boolean hasSamplingLevel;
private final boolean newStatsFile;
private final boolean hasAllAdlerChecksums;
@@ -158,7 +149,6 @@ public class BigFormat implements SSTableFormat
super(instance,version);
isLatestVersion = version.compareTo(current_version) == 0;
- hasPostCompressionAdlerChecksums = version.compareTo("jb") >= 0;
hasSamplingLevel = version.compareTo("ka") >= 0;
newStatsFile = version.compareTo("ka") >= 0;
hasAllAdlerChecksums = version.compareTo("ka") >= 0;
@@ -174,12 +164,6 @@ public class BigFormat implements SSTableFormat
}
@Override
- public boolean hasPostCompressionAdlerChecksums()
- {
- return hasPostCompressionAdlerChecksums;
- }
-
- @Override
public boolean hasSamplingLevel()
{
return hasSamplingLevel;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/src/java/org/apache/cassandra/io/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java
index 080caa5..837cc6a 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -278,9 +278,10 @@ public class FileUtils
return canCleanDirectBuffers;
}
- public static void clean(MappedByteBuffer buffer)
+ public static void clean(ByteBuffer buffer)
{
- ((DirectBuffer) buffer).cleaner().clean();
+ if (isCleanerAvailable() && buffer.isDirect())
+ ((DirectBuffer)buffer).cleaner().clean();
}
public static void createDirectory(String directory)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
index bf120a3..6f2def0 100644
--- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.utils.JVMStabilityInspector;
+import sun.nio.ch.DirectBuffer;
public class MmappedSegmentedFile extends SegmentedFile
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index 58205d8..6bff378 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -53,6 +53,11 @@ public class RandomAccessReader extends AbstractDataInput implements FileDataInp
protected RandomAccessReader(File file, int bufferSize, PoolingSegmentedFile owner) throws FileNotFoundException
{
+ this(file, bufferSize, false, owner);
+ }
+
+ protected RandomAccessReader(File file, int bufferSize, boolean useDirectBuffer, PoolingSegmentedFile owner) throws FileNotFoundException
+ {
this.owner = owner;
filePath = file.getAbsolutePath();
@@ -79,13 +84,16 @@ public class RandomAccessReader extends AbstractDataInput implements FileDataInp
{
throw new FSReadError(e, filePath);
}
- buffer = allocateBuffer(bufferSize);
+ buffer = allocateBuffer(bufferSize, useDirectBuffer);
buffer.limit(0);
}
- protected ByteBuffer allocateBuffer(int bufferSize)
+ protected ByteBuffer allocateBuffer(int bufferSize, boolean useDirectBuffer)
{
- return ByteBuffer.allocate((int) Math.min(fileLength, bufferSize));
+ int size = (int) Math.min(fileLength, bufferSize);
+ return useDirectBuffer
+ ? ByteBuffer.allocate(size)
+ : ByteBuffer.allocateDirect(size);
}
public static RandomAccessReader open(File file, PoolingSegmentedFile owner)
@@ -239,6 +247,8 @@ public class RandomAccessReader extends AbstractDataInput implements FileDataInp
public void deallocate()
{
bufferOffset += buffer.position();
+ FileUtils.clean(buffer);
+
buffer = null; // makes sure we don't use this after it's ostensibly closed
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 449546f..54f6eda 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -62,17 +62,15 @@ public class CompressedInputStream extends InputStream
private static final byte[] POISON_PILL = new byte[0];
private long totalCompressedBytesRead;
- private final boolean hasPostCompressionAdlerChecksums;
/**
* @param source Input source to read compressed data from
* @param info Compression info
*/
- public CompressedInputStream(InputStream source, CompressionInfo info, boolean hasPostCompressionAdlerChecksums)
+ public CompressedInputStream(InputStream source, CompressionInfo info)
{
this.info = info;
- this.checksum = hasPostCompressionAdlerChecksums ? new Adler32() : new CRC32();
- this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums;
+ this.checksum = new Adler32();
this.buffer = new byte[info.parameters.chunkLength()];
// buffer is limited to store up to 1024 chunks
this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
@@ -117,14 +115,7 @@ public class CompressedInputStream extends InputStream
// validate crc randomly
if (info.parameters.getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
{
- if (hasPostCompressionAdlerChecksums)
- {
- checksum.update(compressed, 0, compressed.length - checksumBytes.length);
- }
- else
- {
- checksum.update(buffer, 0, validBufferBytes);
- }
+ checksum.update(compressed, 0, compressed.length - checksumBytes.length);
System.arraycopy(compressed, compressed.length - checksumBytes.length, checksumBytes, 0, checksumBytes.length);
if (Ints.fromByteArray(checksumBytes) != (int) checksum.getValue())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 0595e0c..46f7d4f 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -76,7 +76,7 @@ public class CompressedStreamReader extends StreamReader
SSTableWriter writer = createWriter(cfs, totalSize, repairedAt, format);
- CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, inputVersion.hasPostCompressionAdlerChecksums());
+ CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo);
BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index 0462e5e..c9024ec 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.utils;
import java.io.*;
import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.math.BigInteger;
import java.net.*;
import java.nio.ByteBuffer;
@@ -26,10 +28,12 @@ import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.*;
import java.util.concurrent.*;
+import java.util.zip.Adler32;
import java.util.zip.Checksum;
import com.google.common.base.Joiner;
import com.google.common.collect.AbstractIterator;
+import com.google.common.primitives.Ints;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,6 +48,8 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
+import org.apache.cassandra.io.compress.CompressionParameters;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.IAllocator;
@@ -634,6 +640,67 @@ public class FBUtilities
checksum.update((v >>> 0) & 0xFF);
}
+ private static Method directUpdate;
+ static
+ {
+ try
+ {
+ directUpdate = Adler32.class.getDeclaredMethod("update", new Class[]{ByteBuffer.class});
+ directUpdate.setAccessible(true);
+ } catch (NoSuchMethodException e)
+ {
+ logger.warn("JVM doesn't support Adler32 byte buffer access");
+ directUpdate = null;
+ }
+ }
+
+ private static final ThreadLocal<byte[]> localDigestBuffer = new ThreadLocal<byte[]>()
+ {
+ @Override
+ protected byte[] initialValue()
+ {
+ return new byte[CompressionParameters.DEFAULT_CHUNK_LENGTH];
+ }
+ };
+
+ //Java 7 has this method but it's private till Java 8. Thanks JDK!
+ public static boolean supportsDirectChecksum()
+ {
+ return directUpdate != null;
+ }
+
+ public static void directCheckSum(Adler32 checksum, ByteBuffer bb)
+ {
+ if (directUpdate != null)
+ {
+ try
+ {
+ directUpdate.invoke(checksum, bb);
+ return;
+ } catch (IllegalAccessException e)
+ {
+ directUpdate = null;
+ logger.warn("JVM doesn't support Adler32 byte buffer access");
+ }
+ catch (InvocationTargetException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ //Fallback
+ byte[] buffer = localDigestBuffer.get();
+
+ int remaining;
+ while ((remaining = bb.remaining()) > 0)
+ {
+ remaining = Math.min(remaining, buffer.length);
+ ByteBufferUtil.arrayCopy(bb, bb.position(), buffer, 0, remaining);
+ bb.position(bb.position() + remaining);
+ checksum.update(buffer, 0, remaining);
+ }
+ }
+
public static long abs(long index)
{
long negbit = index >> 63;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
index 900abd8..58bf5cb 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
@@ -80,7 +80,7 @@ public class CompressedRandomAccessReaderTest
writer.write("x".getBytes());
writer.close();
- CompressedRandomAccessReader reader = CompressedRandomAccessReader.open(filename, new CompressionMetadata(filename + ".metadata", f.length(), true));
+ CompressedRandomAccessReader reader = CompressedRandomAccessReader.open(filename, new CompressionMetadata(filename + ".metadata", f.length()));
String res = reader.readLine();
assertEquals(res, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx");
assertEquals(40, res.length());
@@ -123,7 +123,7 @@ public class CompressedRandomAccessReaderTest
assert f.exists();
RandomAccessReader reader = compressed
- ? CompressedRandomAccessReader.open(filename, new CompressionMetadata(filename + ".metadata", f.length(), true))
+ ? CompressedRandomAccessReader.open(filename, new CompressionMetadata(filename + ".metadata", f.length()))
: RandomAccessReader.open(f);
String expected = "The quick brown fox jumps over the lazy dog";
assertEquals(expected.length(), reader.length());
@@ -160,7 +160,7 @@ public class CompressedRandomAccessReaderTest
writer.close();
// open compression metadata and get chunk information
- CompressionMetadata meta = new CompressionMetadata(metadata.getPath(), file.length(), true);
+ CompressionMetadata meta = new CompressionMetadata(metadata.getPath(), file.length());
CompressionMetadata.Chunk chunk = meta.chunkFor(0);
RandomAccessReader reader = CompressedRandomAccessReader.open(file.getPath(), meta);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/test/unit/org/apache/cassandra/io/compress/CompressorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressorTest.java b/test/unit/org/apache/cassandra/io/compress/CompressorTest.java
new file mode 100644
index 0000000..04396e0
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/compress/CompressorTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.compress;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Random;
+
+import com.google.common.io.Files;
+import org.apache.cassandra.io.compress.ICompressor.WrappedArray;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class CompressorTest
+{
+ ICompressor compressor;
+
+ ICompressor[] compressors = new ICompressor[] {
+ LZ4Compressor.create(Collections.<String, String>emptyMap()),
+ DeflateCompressor.create(Collections.<String, String>emptyMap()),
+ SnappyCompressor.create(Collections.<String, String>emptyMap())
+ };
+
+
+ @Test
+ public void testAllCompressors() throws IOException
+ {
+ for (ICompressor compressor : compressors)
+ {
+ this.compressor = compressor;
+
+ testEmptyArray();
+ testLongArray();
+ testShortArray();
+ testMappedFile();
+ }
+ }
+
+
+ public void test(byte[] data, int off, int len) throws IOException
+ {
+ final int outOffset = 3;
+ final WrappedArray out = new WrappedArray(new byte[outOffset + compressor.initialCompressedBufferLength(len)]);
+ new Random().nextBytes(out.buffer);
+ final int compressedLength = compressor.compress(data, off, len, out, outOffset);
+ final int restoredOffset = 5;
+ final byte[] restored = new byte[restoredOffset + len];
+ new Random().nextBytes(restored);
+ final int decompressedLength = compressor.uncompress(out.buffer, outOffset, compressedLength, restored, restoredOffset);
+ assertEquals(decompressedLength, len);
+ assertArrayEquals(Arrays.copyOfRange(data, off, off + len),
+ Arrays.copyOfRange(restored, restoredOffset, restoredOffset + decompressedLength));
+ }
+
+ public void test(byte[] data) throws IOException
+ {
+ test(data, 0, data.length);
+ }
+
+ public void testEmptyArray() throws IOException
+ {
+ test(new byte[0]);
+ }
+
+ public void testShortArray() throws UnsupportedEncodingException, IOException
+ {
+ test("Cassandra".getBytes("UTF-8"), 1, 7);
+ }
+
+ public void testLongArray() throws UnsupportedEncodingException, IOException
+ {
+ byte[] data = new byte[1 << 20];
+ test(data, 13, 1 << 19);
+ new Random(0).nextBytes(data);
+ test(data, 13, 1 << 19);
+ }
+
+ public void testMappedFile() throws IOException
+ {
+ byte[] data = new byte[1 << 20];
+ new Random().nextBytes(data);
+
+ //create a temp file
+ File temp = File.createTempFile("tempfile", ".tmp");
+ temp.deleteOnExit();
+
+ //Prepend some random bytes to the output and compress
+ final int outOffset = 3;
+ final WrappedArray out = new WrappedArray(new byte[outOffset + compressor.initialCompressedBufferLength(data.length)]);
+ new Random().nextBytes(out.buffer);
+ final int compressedLength = compressor.compress(data, 0, data.length, out, outOffset);
+ Files.write(out.buffer, temp);
+
+ MappedByteBuffer mappedData = Files.map(temp);
+ mappedData.position(outOffset);
+ mappedData.limit(compressedLength+outOffset);
+
+
+ ByteBuffer result = compressor.useDirectOutputByteBuffers()
+ ? ByteBuffer.allocateDirect(data.length + 100)
+ : ByteBuffer.allocate(data.length + 100);
+
+ int length = compressor.uncompress(mappedData, result);
+
+ Assert.assertEquals(data.length, length);
+ for (int i = 0; i < length; i++)
+ {
+ Assert.assertEquals("Decompression mismatch at byte "+i, data[i], result.get());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/test/unit/org/apache/cassandra/io/compress/LZ4CompressorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/LZ4CompressorTest.java b/test/unit/org/apache/cassandra/io/compress/LZ4CompressorTest.java
deleted file mode 100644
index 56ffdf1..0000000
--- a/test/unit/org/apache/cassandra/io/compress/LZ4CompressorTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.compress;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Random;
-
-import org.apache.cassandra.io.compress.ICompressor.WrappedArray;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-public class LZ4CompressorTest
-{
-
- LZ4Compressor compressor;
-
- @Before
- public void setUp()
- {
- compressor = LZ4Compressor.create(Collections.<String, String>emptyMap());
- }
-
- public void test(byte[] data, int off, int len) throws IOException
- {
- final int outOffset = 3;
- final WrappedArray out = new WrappedArray(new byte[outOffset + compressor.initialCompressedBufferLength(len)]);
- new Random().nextBytes(out.buffer);
- final int compressedLength = compressor.compress(data, off, len, out, outOffset);
- final int restoredOffset = 5;
- final byte[] restored = new byte[restoredOffset + len];
- new Random().nextBytes(restored);
- final int decompressedLength = compressor.uncompress(out.buffer, outOffset, compressedLength, restored, restoredOffset);
- assertEquals(decompressedLength, len);
- assertArrayEquals(Arrays.copyOfRange(data, off, off + len),
- Arrays.copyOfRange(restored, restoredOffset, restoredOffset + decompressedLength));
- }
-
- public void test(byte[] data) throws IOException
- {
- test(data, 0, data.length);
- }
-
- @Test
- public void testEmptyArray() throws IOException
- {
- test(new byte[0]);
- }
-
- @Test
- public void testShortArray() throws UnsupportedEncodingException, IOException
- {
- test("Cassandra".getBytes("UTF-8"), 1, 7);
- }
-
- @Test
- public void testLongArray() throws UnsupportedEncodingException, IOException
- {
- byte[] data = new byte[1 << 20];
- test(data, 13, 1 << 19);
- new Random(0).nextBytes(data);
- test(data, 13, 1 << 19);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/895ec3ea/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index 42a83a0..128ec3c 100644
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -111,7 +111,7 @@ public class CompressedInputStreamTest
// read buffer using CompressedInputStream
CompressionInfo info = new CompressionInfo(chunks, param);
- CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, true);
+ CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info);
DataInputStream in = new DataInputStream(input);
for (int i = 0; i < sections.size(); i++)