You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/08/09 00:09:39 UTC

[09/19] git commit: Switch from crc32 to adler32 for compressed sstable checksumns and change to checksum the post-compressed data

Switch from crc32 to adler32 for compressed sstable checksumns
and change to checksum the post-compressed data

Patch by tjake; reviewed by jbellis for (CASSANDRA-5862)


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/815b2382
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/815b2382
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/815b2382

Branch: refs/heads/trunk
Commit: 815b2382a34dc25f005c9cc1bde78e10bc5d2ae0
Parents: e911b76
Author: Jake Luciani <ja...@apache.org>
Authored: Thu Aug 8 17:21:40 2013 -0400
Committer: Jake Luciani <ja...@apache.org>
Committed: Thu Aug 8 17:23:57 2013 -0400

----------------------------------------------------------------------
 CHANGES.txt                                      |   1 +
 .../compress/CompressedRandomAccessReader.java   |  14 ++++++++++++--
 .../io/compress/CompressedSequentialWriter.java  |   7 ++++---
 .../io/compress/CompressionMetadata.java         |   6 ++++--
 .../apache/cassandra/io/sstable/Descriptor.java  |   6 +++++-
 .../compress/CompressedInputStream.java          |  17 ++++++++++++++---
 .../compress/CompressedStreamReader.java         |   2 +-
 test/data/serialization/2.0/db.RowMutation.bin   | Bin 3599 -> 3599 bytes
 .../CompressedRandomAccessReaderTest.java        |   4 ++--
 .../compress/CompressedInputStreamTest.java      |   2 +-
 10 files changed, 44 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/815b2382/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 249c343..3379a80 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,7 @@
 2.0.1
  * Notify indexer of columns shadowed by range tombstones (CASSANDRA-5614)
  * Log Merkle tree stats (CASSANDRA-2698)
+ * Switch from crc32 to adler32 for compressed sstable checksums (CASSANDRA-5862)
 
 
 2.0.0

http://git-wip-us.apache.org/repos/asf/cassandra/blob/815b2382/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index 3269e4c..b6cffa2 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.io.compress;
 
 import java.io.*;
 import java.nio.ByteBuffer;
+import java.util.zip.Adler32;
 import java.util.zip.CRC32;
 import java.util.zip.Checksum;
 
@@ -65,7 +66,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader
     private ByteBuffer compressed;
 
     // re-use single crc object
-    private final Checksum checksum = new CRC32();
+    private final Checksum checksum;
 
     // raw checksum bytes
     private final ByteBuffer checksumBytes = ByteBuffer.wrap(new byte[4]);
@@ -74,6 +75,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader
     {
         super(new File(dataFilePath), metadata.chunkLength(), owner);
         this.metadata = metadata;
+        checksum = metadata.hasPostCompressionAdlerChecksums ? new Adler32() : new CRC32();
         compressed = ByteBuffer.wrap(new byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())]);
     }
 
@@ -122,7 +124,15 @@ public class CompressedRandomAccessReader extends RandomAccessReader
 
         if (metadata.parameters.getCrcCheckChance() > FBUtilities.threadLocalRandom().nextDouble())
         {
-            checksum.update(buffer, 0, validBufferBytes);
+
+            if (metadata.hasPostCompressionAdlerChecksums)
+            {
+                checksum.update(compressed.array(), 0, chunk.length);
+            }
+            else
+            {
+                checksum.update(buffer, 0, validBufferBytes);
+            }
 
             if (checksum(chunk) != (int) checksum.getValue())
                 throw new CorruptBlockException(getPath(), chunk);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/815b2382/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index 00eb5a7..386eca5 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.io.compress;
 import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
+import java.util.zip.Adler32;
 import java.util.zip.CRC32;
 import java.util.zip.Checksum;
 
@@ -55,7 +56,7 @@ public class CompressedSequentialWriter extends SequentialWriter
     // holds a number of already written chunks
     private int chunkCount = 0;
 
-    private final Checksum checksum = new CRC32();
+    private final Checksum checksum = new Adler32();
 
     private long originalSize = 0, compressedSize = 0;
 
@@ -126,7 +127,7 @@ public class CompressedSequentialWriter extends SequentialWriter
         compressedSize += compressedLength;
 
         // update checksum
-        checksum.update(buffer, 0, validBufferBytes);
+        checksum.update(compressed.buffer, 0, compressedLength);
 
         try
         {
@@ -204,7 +205,7 @@ public class CompressedSequentialWriter extends SequentialWriter
                 throw new CorruptBlockException(getPath(), chunkOffset, chunkSize);
             }
 
-            checksum.update(buffer, 0, validBytes);
+            checksum.update(compressed.buffer, 0, chunkSize);
 
             if (out.readInt() != (int) checksum.getValue())
                 throw new CorruptBlockException(getPath(), chunkOffset, chunkSize);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/815b2382/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 93b0091..b6d8e1b 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -42,6 +42,7 @@ public class CompressionMetadata
 {
     public final long dataLength;
     public final long compressedFileLength;
+    public final boolean hasPostCompressionAdlerChecksums;
     private final Memory chunkOffsets;
     public final String indexFilePath;
     public final CompressionParameters parameters;
@@ -60,13 +61,14 @@ public class CompressionMetadata
     public static CompressionMetadata create(String dataFilePath)
     {
         Descriptor desc = Descriptor.fromFilename(dataFilePath);
-        return new CompressionMetadata(desc.filenameFor(Component.COMPRESSION_INFO), new File(dataFilePath).length());
+        return new CompressionMetadata(desc.filenameFor(Component.COMPRESSION_INFO), new File(dataFilePath).length(), desc.version.hasPostCompressionAdlerChecksums);
     }
 
     @VisibleForTesting
-    CompressionMetadata(String indexFilePath, long compressedLength)
+    CompressionMetadata(String indexFilePath, long compressedLength, boolean hasPostCompressionAdlerChecksums)
     {
         this.indexFilePath = indexFilePath;
+        this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums;
 
         DataInputStream stream;
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/815b2382/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index ac1e55f..1b29c1c 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -44,7 +44,7 @@ public class Descriptor
     public static class Version
     {
         // This needs to be at the begining for initialization sake
-        public static final String current_version = "ja";
+        public static final String current_version = "jb";
 
         // ic (1.2.5): omits per-row bloom filter of column names
         // ja (2.0.0): super columns are serialized as composites (note that there is no real format change,
@@ -55,6 +55,8 @@ public class Descriptor
         //             records bloom_filter_fp_chance in metadata component
         //             remove data size and column count from data file (CASSANDRA-4180)
         //             tracks max/min column values (according to comparator)
+        // jb (2.0.1): switch from crc32 to adler32 for compression checksums
+        //             checksum the compressed data
 
         public static final Version CURRENT = new Version(current_version);
 
@@ -67,6 +69,7 @@ public class Descriptor
         public final boolean offHeapSummaries;
         public final boolean hasRowSizeAndColumnCount;
         public final boolean tracksMaxMinColumnNames;
+        public final boolean hasPostCompressionAdlerChecksums;
 
         public Version(String version)
         {
@@ -78,6 +81,7 @@ public class Descriptor
             offHeapSummaries = version.compareTo("ja") >= 0;
             hasRowSizeAndColumnCount = version.compareTo("ja") < 0;
             tracksMaxMinColumnNames = version.compareTo("ja") >= 0;
+            hasPostCompressionAdlerChecksums = version.compareTo("jb") >= 0;
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/815b2382/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 3305f50..698c2fe 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -23,6 +23,7 @@ import java.io.InputStream;
 import java.util.Iterator;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.zip.Adler32;
 import java.util.zip.CRC32;
 import java.util.zip.Checksum;
 
@@ -52,20 +53,23 @@ public class CompressedInputStream extends InputStream
     // number of bytes in the buffer that are actually valid
     protected int validBufferBytes = -1;
 
-    private final Checksum checksum = new CRC32();
+    private final Checksum checksum;
 
     // raw checksum bytes
     private final byte[] checksumBytes = new byte[4];
 
     private long totalCompressedBytesRead;
+    private final boolean hasPostCompressionAdlerChecksums;
 
     /**
      * @param source Input source to read compressed data from
      * @param info Compression info
      */
-    public CompressedInputStream(InputStream source, CompressionInfo info)
+    public CompressedInputStream(InputStream source, CompressionInfo info, boolean hasPostCompressionAdlerChecksums)
     {
         this.info = info;
+        this.checksum = hasPostCompressionAdlerChecksums ? new Adler32() : new CRC32();
+        this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums;
         this.buffer = new byte[info.parameters.chunkLength()];
         // buffer is limited to store up to 1024 chunks
         this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
@@ -107,7 +111,14 @@ public class CompressedInputStream extends InputStream
         // validate crc randomly
         if (info.parameters.getCrcCheckChance() > FBUtilities.threadLocalRandom().nextDouble())
         {
-            checksum.update(buffer, 0, validBufferBytes);
+            if (hasPostCompressionAdlerChecksums)
+            {
+                checksum.update(compressed, 0, compressed.length - checksumBytes.length);
+            }
+            else
+            {
+                checksum.update(buffer, 0, validBufferBytes);
+            }
 
             System.arraycopy(compressed, compressed.length - checksumBytes.length, checksumBytes, 0, checksumBytes.length);
             if (Ints.fromByteArray(checksumBytes) != (int) checksum.getValue())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/815b2382/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 44e4d5c..6f5d0f5 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -64,7 +64,7 @@ public class CompressedStreamReader extends StreamReader
 
         SSTableWriter writer = createWriter(cfs, totalSize);
 
-        CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo);
+        CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, inputVersion.hasPostCompressionAdlerChecksums);
         BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/815b2382/test/data/serialization/2.0/db.RowMutation.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/2.0/db.RowMutation.bin b/test/data/serialization/2.0/db.RowMutation.bin
index aa75378..ff6403c 100644
Binary files a/test/data/serialization/2.0/db.RowMutation.bin and b/test/data/serialization/2.0/db.RowMutation.bin differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/815b2382/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
index 8321ae6..ee32a0e 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
@@ -76,7 +76,7 @@ public class CompressedRandomAccessReaderTest
 
             assert f.exists();
             RandomAccessReader reader = compressed
-                                      ? CompressedRandomAccessReader.open(filename, new CompressionMetadata(filename + ".metadata", f.length()))
+                                      ? CompressedRandomAccessReader.open(filename, new CompressionMetadata(filename + ".metadata", f.length(), true))
                                       : RandomAccessReader.open(f);
             String expected = "The quick brown fox jumps over the lazy dog";
             assertEquals(expected.length(), reader.length());
@@ -113,7 +113,7 @@ public class CompressedRandomAccessReaderTest
         writer.close();
 
         // open compression metadata and get chunk information
-        CompressionMetadata meta = new CompressionMetadata(metadata.getPath(), file.length());
+        CompressionMetadata meta = new CompressionMetadata(metadata.getPath(), file.length(), true);
         CompressionMetadata.Chunk chunk = meta.chunkFor(0);
 
         RandomAccessReader reader = CompressedRandomAccessReader.open(file.getPath(), meta);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/815b2382/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index ab311e6..027c84c 100644
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -97,7 +97,7 @@ public class CompressedInputStreamTest
 
         // read buffer using CompressedInputStream
         CompressionInfo info = new CompressionInfo(chunks, param);
-        CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info);
+        CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, true);
         DataInputStream in = new DataInputStream(input);
 
         for (int i = 0; i < sections.size(); i++)