You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/08/09 00:09:39 UTC
[09/19] git commit: Switch from crc32 to adler32 for compressed
sstable checksumns and change to checksum the post-compressed data
Switch from crc32 to adler32 for compressed sstable checksumns
and change to checksum the post-compressed data
Patch by tjake; reviewed by jbellis for (CASSANDRA-5862)
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/815b2382
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/815b2382
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/815b2382
Branch: refs/heads/trunk
Commit: 815b2382a34dc25f005c9cc1bde78e10bc5d2ae0
Parents: e911b76
Author: Jake Luciani <ja...@apache.org>
Authored: Thu Aug 8 17:21:40 2013 -0400
Committer: Jake Luciani <ja...@apache.org>
Committed: Thu Aug 8 17:23:57 2013 -0400
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../compress/CompressedRandomAccessReader.java | 14 ++++++++++++--
.../io/compress/CompressedSequentialWriter.java | 7 ++++---
.../io/compress/CompressionMetadata.java | 6 ++++--
.../apache/cassandra/io/sstable/Descriptor.java | 6 +++++-
.../compress/CompressedInputStream.java | 17 ++++++++++++++---
.../compress/CompressedStreamReader.java | 2 +-
test/data/serialization/2.0/db.RowMutation.bin | Bin 3599 -> 3599 bytes
.../CompressedRandomAccessReaderTest.java | 4 ++--
.../compress/CompressedInputStreamTest.java | 2 +-
10 files changed, 44 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/815b2382/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 249c343..3379a80 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,7 @@
2.0.1
* Notify indexer of columns shadowed by range tombstones (CASSANDRA-5614)
* Log Merkle tree stats (CASSANDRA-2698)
+ * Switch from crc32 to adler32 for compressed sstable checksums (CASSANDRA-5862)
2.0.0
http://git-wip-us.apache.org/repos/asf/cassandra/blob/815b2382/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index 3269e4c..b6cffa2 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.io.compress;
import java.io.*;
import java.nio.ByteBuffer;
+import java.util.zip.Adler32;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
@@ -65,7 +66,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader
private ByteBuffer compressed;
// re-use single crc object
- private final Checksum checksum = new CRC32();
+ private final Checksum checksum;
// raw checksum bytes
private final ByteBuffer checksumBytes = ByteBuffer.wrap(new byte[4]);
@@ -74,6 +75,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader
{
super(new File(dataFilePath), metadata.chunkLength(), owner);
this.metadata = metadata;
+ checksum = metadata.hasPostCompressionAdlerChecksums ? new Adler32() : new CRC32();
compressed = ByteBuffer.wrap(new byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())]);
}
@@ -122,7 +124,15 @@ public class CompressedRandomAccessReader extends RandomAccessReader
if (metadata.parameters.getCrcCheckChance() > FBUtilities.threadLocalRandom().nextDouble())
{
- checksum.update(buffer, 0, validBufferBytes);
+
+ if (metadata.hasPostCompressionAdlerChecksums)
+ {
+ checksum.update(compressed.array(), 0, chunk.length);
+ }
+ else
+ {
+ checksum.update(buffer, 0, validBufferBytes);
+ }
if (checksum(chunk) != (int) checksum.getValue())
throw new CorruptBlockException(getPath(), chunk);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/815b2382/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index 00eb5a7..386eca5 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.io.compress;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
+import java.util.zip.Adler32;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
@@ -55,7 +56,7 @@ public class CompressedSequentialWriter extends SequentialWriter
// holds a number of already written chunks
private int chunkCount = 0;
- private final Checksum checksum = new CRC32();
+ private final Checksum checksum = new Adler32();
private long originalSize = 0, compressedSize = 0;
@@ -126,7 +127,7 @@ public class CompressedSequentialWriter extends SequentialWriter
compressedSize += compressedLength;
// update checksum
- checksum.update(buffer, 0, validBufferBytes);
+ checksum.update(compressed.buffer, 0, compressedLength);
try
{
@@ -204,7 +205,7 @@ public class CompressedSequentialWriter extends SequentialWriter
throw new CorruptBlockException(getPath(), chunkOffset, chunkSize);
}
- checksum.update(buffer, 0, validBytes);
+ checksum.update(compressed.buffer, 0, chunkSize);
if (out.readInt() != (int) checksum.getValue())
throw new CorruptBlockException(getPath(), chunkOffset, chunkSize);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/815b2382/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 93b0091..b6d8e1b 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -42,6 +42,7 @@ public class CompressionMetadata
{
public final long dataLength;
public final long compressedFileLength;
+ public final boolean hasPostCompressionAdlerChecksums;
private final Memory chunkOffsets;
public final String indexFilePath;
public final CompressionParameters parameters;
@@ -60,13 +61,14 @@ public class CompressionMetadata
public static CompressionMetadata create(String dataFilePath)
{
Descriptor desc = Descriptor.fromFilename(dataFilePath);
- return new CompressionMetadata(desc.filenameFor(Component.COMPRESSION_INFO), new File(dataFilePath).length());
+ return new CompressionMetadata(desc.filenameFor(Component.COMPRESSION_INFO), new File(dataFilePath).length(), desc.version.hasPostCompressionAdlerChecksums);
}
@VisibleForTesting
- CompressionMetadata(String indexFilePath, long compressedLength)
+ CompressionMetadata(String indexFilePath, long compressedLength, boolean hasPostCompressionAdlerChecksums)
{
this.indexFilePath = indexFilePath;
+ this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums;
DataInputStream stream;
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/815b2382/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index ac1e55f..1b29c1c 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -44,7 +44,7 @@ public class Descriptor
public static class Version
{
// This needs to be at the begining for initialization sake
- public static final String current_version = "ja";
+ public static final String current_version = "jb";
// ic (1.2.5): omits per-row bloom filter of column names
// ja (2.0.0): super columns are serialized as composites (note that there is no real format change,
@@ -55,6 +55,8 @@ public class Descriptor
// records bloom_filter_fp_chance in metadata component
// remove data size and column count from data file (CASSANDRA-4180)
// tracks max/min column values (according to comparator)
+ // jb (2.0.1): switch from crc32 to adler32 for compression checksums
+ // checksum the compressed data
public static final Version CURRENT = new Version(current_version);
@@ -67,6 +69,7 @@ public class Descriptor
public final boolean offHeapSummaries;
public final boolean hasRowSizeAndColumnCount;
public final boolean tracksMaxMinColumnNames;
+ public final boolean hasPostCompressionAdlerChecksums;
public Version(String version)
{
@@ -78,6 +81,7 @@ public class Descriptor
offHeapSummaries = version.compareTo("ja") >= 0;
hasRowSizeAndColumnCount = version.compareTo("ja") < 0;
tracksMaxMinColumnNames = version.compareTo("ja") >= 0;
+ hasPostCompressionAdlerChecksums = version.compareTo("jb") >= 0;
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/815b2382/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 3305f50..698c2fe 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -23,6 +23,7 @@ import java.io.InputStream;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.zip.Adler32;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
@@ -52,20 +53,23 @@ public class CompressedInputStream extends InputStream
// number of bytes in the buffer that are actually valid
protected int validBufferBytes = -1;
- private final Checksum checksum = new CRC32();
+ private final Checksum checksum;
// raw checksum bytes
private final byte[] checksumBytes = new byte[4];
private long totalCompressedBytesRead;
+ private final boolean hasPostCompressionAdlerChecksums;
/**
* @param source Input source to read compressed data from
* @param info Compression info
*/
- public CompressedInputStream(InputStream source, CompressionInfo info)
+ public CompressedInputStream(InputStream source, CompressionInfo info, boolean hasPostCompressionAdlerChecksums)
{
this.info = info;
+ this.checksum = hasPostCompressionAdlerChecksums ? new Adler32() : new CRC32();
+ this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums;
this.buffer = new byte[info.parameters.chunkLength()];
// buffer is limited to store up to 1024 chunks
this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
@@ -107,7 +111,14 @@ public class CompressedInputStream extends InputStream
// validate crc randomly
if (info.parameters.getCrcCheckChance() > FBUtilities.threadLocalRandom().nextDouble())
{
- checksum.update(buffer, 0, validBufferBytes);
+ if (hasPostCompressionAdlerChecksums)
+ {
+ checksum.update(compressed, 0, compressed.length - checksumBytes.length);
+ }
+ else
+ {
+ checksum.update(buffer, 0, validBufferBytes);
+ }
System.arraycopy(compressed, compressed.length - checksumBytes.length, checksumBytes, 0, checksumBytes.length);
if (Ints.fromByteArray(checksumBytes) != (int) checksum.getValue())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/815b2382/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 44e4d5c..6f5d0f5 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -64,7 +64,7 @@ public class CompressedStreamReader extends StreamReader
SSTableWriter writer = createWriter(cfs, totalSize);
- CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo);
+ CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, inputVersion.hasPostCompressionAdlerChecksums);
BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/815b2382/test/data/serialization/2.0/db.RowMutation.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/2.0/db.RowMutation.bin b/test/data/serialization/2.0/db.RowMutation.bin
index aa75378..ff6403c 100644
Binary files a/test/data/serialization/2.0/db.RowMutation.bin and b/test/data/serialization/2.0/db.RowMutation.bin differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/815b2382/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
index 8321ae6..ee32a0e 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
@@ -76,7 +76,7 @@ public class CompressedRandomAccessReaderTest
assert f.exists();
RandomAccessReader reader = compressed
- ? CompressedRandomAccessReader.open(filename, new CompressionMetadata(filename + ".metadata", f.length()))
+ ? CompressedRandomAccessReader.open(filename, new CompressionMetadata(filename + ".metadata", f.length(), true))
: RandomAccessReader.open(f);
String expected = "The quick brown fox jumps over the lazy dog";
assertEquals(expected.length(), reader.length());
@@ -113,7 +113,7 @@ public class CompressedRandomAccessReaderTest
writer.close();
// open compression metadata and get chunk information
- CompressionMetadata meta = new CompressionMetadata(metadata.getPath(), file.length());
+ CompressionMetadata meta = new CompressionMetadata(metadata.getPath(), file.length(), true);
CompressionMetadata.Chunk chunk = meta.chunkFor(0);
RandomAccessReader reader = CompressedRandomAccessReader.open(file.getPath(), meta);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/815b2382/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index ab311e6..027c84c 100644
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -97,7 +97,7 @@ public class CompressedInputStreamTest
// read buffer using CompressedInputStream
CompressionInfo info = new CompressionInfo(chunks, param);
- CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info);
+ CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, true);
DataInputStream in = new DataInputStream(input);
for (int i = 0; i < sections.size(); i++)