You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2015/08/10 18:16:24 UTC
cassandra git commit: Replace all usages of Adler32 with CRC32 which
has a fast instrinsic now
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.0 2fcfc7c58 -> 5baf28d09
Replace all usages of Adler32 with CRC32 which has a fast instrinsic now
The switch to adler happened across two versions depending on whether the data was compressed or uncompressed
Patch by Ariel Weisberg; reviewed by tjake for CASSANDRA-8684
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5baf28d0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5baf28d0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5baf28d0
Branch: refs/heads/cassandra-3.0
Commit: 5baf28d0935b7f112c499856b3bc00c722feb460
Parents: 2fcfc7c
Author: Ariel Weisberg <ar...@weisberg.ws>
Authored: Fri Jul 31 11:56:50 2015 -0400
Committer: T Jake Luciani <ja...@apache.org>
Committed: Mon Aug 10 12:14:40 2015 -0400
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/cassandra/cache/AutoSavingCache.java | 2 +-
.../compress/CompressedRandomAccessReader.java | 10 ++--
.../io/compress/CompressedSequentialWriter.java | 4 +-
.../io/compress/CompressionMetadata.java | 14 +++--
.../apache/cassandra/io/sstable/Component.java | 18 ++++--
.../cassandra/io/sstable/format/Version.java | 6 +-
.../io/sstable/format/big/BigFormat.java | 38 +++++++++---
.../io/util/ChecksummedRandomAccessReader.java | 4 +-
.../io/util/DataIntegrityMetadata.java | 10 ++--
.../compress/CompressedInputStream.java | 6 +-
.../compress/CompressedStreamReader.java | 2 +-
.../apache/cassandra/utils/ChecksumType.java | 63 ++++++++++++++++++++
.../org/apache/cassandra/db/VerifyTest.java | 7 ++-
.../CompressedRandomAccessReaderTest.java | 8 +--
.../CompressedSequentialWriterTest.java | 4 +-
.../compression/CompressedInputStreamTest.java | 4 +-
17 files changed, 151 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5baf28d0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c7d466a..f1ac423 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.0-beta1
+ * Replace all usages of Adler32 with CRC32
* Fix row deletion bug for Materialized Views (CASSANDRA-10014)
* Support mixed-version clusters with Cassandra 2.1 and 2.2 (CASSANDRA-9704)
* Fix multiple slices on RowSearchers (CASSANDRA-10002)
@@ -9,6 +10,7 @@
* Add transparent data encryption core classes (CASSANDRA-9945)
* Bytecode inspection for Java-UDFs (CASSANDRA-9890)
* Use byte to serialize MT hash length (CASSANDRA-9792)
+ * Replace usage of Adler32 with CRC32 (CASSANDRA-8684)
Merged from 2.2:
* Add checksum to saved cache files (CASSANDRA-9265)
* Log warning when using an aggregate without partition key (CASSANDRA-9737)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5baf28d0/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 3c5b6a5..2a838ab 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -62,7 +62,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
protected final CacheService.CacheType cacheType;
private final CacheSerializer<K, V> cacheLoader;
- private static final String CURRENT_VERSION = "c";
+ private static final String CURRENT_VERSION = "d";
private static volatile IStreamFactory streamFactory = new IStreamFactory()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5baf28d0/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index 01b4655..c38f4d2 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@ -24,7 +24,7 @@ import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.zip.Adler32;
-
+import java.util.zip.Checksum;
import com.google.common.primitives.Ints;
@@ -58,7 +58,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader
private ByteBuffer compressed;
// re-use single crc object
- private final Adler32 checksum;
+ private final Checksum checksum;
// raw checksum bytes
private ByteBuffer checksumBytes;
@@ -67,7 +67,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader
{
super(channel, metadata.chunkLength(), metadata.compressedFileLength, metadata.compressor().preferredBufferType());
this.metadata = metadata;
- checksum = new Adler32();
+ checksum = metadata.checksumType.newInstance();
chunkSegments = file == null ? null : file.chunkSegments();
if (chunkSegments == null)
@@ -131,7 +131,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader
if (metadata.parameters.getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
{
compressed.rewind();
- checksum.update(compressed);
+ metadata.checksumType.update( checksum, (compressed));
if (checksum(chunk) != (int) checksum.getValue())
throw new CorruptBlockException(getPath(), chunk);
@@ -193,7 +193,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader
{
compressedChunk.position(chunkOffset).limit(chunkOffset + chunk.length);
- checksum.update(compressedChunk);
+ metadata.checksumType.update( checksum, compressedChunk);
compressedChunk.limit(compressedChunk.capacity());
if (compressedChunk.getInt() != (int) checksum.getValue())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5baf28d0/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index bc1e6f6..a4afa3f 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -23,7 +23,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
-import java.util.zip.Adler32;
+import java.util.zip.CRC32;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.FSWriteError;
@@ -204,7 +204,7 @@ public class CompressedSequentialWriter extends SequentialWriter
throw new CorruptBlockException(getPath(), chunkOffset, chunkSize);
}
- Adler32 checksum = new Adler32();
+ CRC32 checksum = new CRC32();
compressed.rewind();
checksum.update(compressed);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5baf28d0/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index bd6da2c..f5d8f7e 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -53,6 +53,7 @@ import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.Memory;
import org.apache.cassandra.io.util.SafeMemory;
import org.apache.cassandra.schema.CompressionParams;
+import org.apache.cassandra.utils.ChecksumType;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.Transactional;
import org.apache.cassandra.utils.concurrent.Ref;
@@ -71,6 +72,7 @@ public class CompressionMetadata
private final long chunkOffsetsSize;
public final String indexFilePath;
public final CompressionParams parameters;
+ public final ChecksumType checksumType;
/**
* Create metadata about given compressed file including uncompressed data length, chunk size
@@ -86,13 +88,14 @@ public class CompressionMetadata
public static CompressionMetadata create(String dataFilePath)
{
Descriptor desc = Descriptor.fromFilename(dataFilePath);
- return new CompressionMetadata(desc.filenameFor(Component.COMPRESSION_INFO), new File(dataFilePath).length());
+ return new CompressionMetadata(desc.filenameFor(Component.COMPRESSION_INFO), new File(dataFilePath).length(), desc.version.compressedChecksumType());
}
@VisibleForTesting
- CompressionMetadata(String indexFilePath, long compressedLength)
+ CompressionMetadata(String indexFilePath, long compressedLength, ChecksumType checksumType)
{
this.indexFilePath = indexFilePath;
+ this.checksumType = checksumType;
try (DataInputStream stream = new DataInputStream(new FileInputStream(indexFilePath)))
{
@@ -131,7 +134,7 @@ public class CompressionMetadata
this.chunkOffsetsSize = chunkOffsets.size();
}
- private CompressionMetadata(String filePath, CompressionParams parameters, SafeMemory offsets, long offsetsSize, long dataLength, long compressedLength)
+ private CompressionMetadata(String filePath, CompressionParams parameters, SafeMemory offsets, long offsetsSize, long dataLength, long compressedLength, ChecksumType checksumType)
{
this.indexFilePath = filePath;
this.parameters = parameters;
@@ -139,6 +142,7 @@ public class CompressionMetadata
this.compressedFileLength = compressedLength;
this.chunkOffsets = offsets;
this.chunkOffsetsSize = offsetsSize;
+ this.checksumType = checksumType;
}
public ICompressor compressor()
@@ -380,7 +384,7 @@ public class CompressionMetadata
if (count < this.count)
compressedLength = offsets.getLong(count * 8L);
- return new CompressionMetadata(filePath, parameters, offsets, count * 8L, dataLength, compressedLength);
+ return new CompressionMetadata(filePath, parameters, offsets, count * 8L, dataLength, compressedLength, ChecksumType.CRC32);
}
/**
@@ -398,7 +402,7 @@ public class CompressionMetadata
/**
* Reset the writer so that the next chunk offset written will be the
* one of {@code chunkIndex}.
- *
+ *
* @param chunkIndex the next index to write
*/
public void resetAndTruncate(int chunkIndex)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5baf28d0/src/java/org/apache/cassandra/io/sstable/Component.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Component.java b/src/java/org/apache/cassandra/io/sstable/Component.java
index a431f29..54dd35b 100644
--- a/src/java/org/apache/cassandra/io/sstable/Component.java
+++ b/src/java/org/apache/cassandra/io/sstable/Component.java
@@ -48,7 +48,7 @@ public class Component
// statistical metadata about the content of the sstable
STATS("Statistics.db"),
// holds adler32 checksum of the data file
- DIGEST("Digest.adler32"),
+ DIGEST(new String[] { "Digest.crc32", "Digest.adler32" }),
// holds the CRC32 for chunks in an a uncompressed file.
CRC("CRC.db"),
// holds SSTable Index Summary (sampling of Index component)
@@ -56,19 +56,25 @@ public class Component
// table of contents, stores the list of all components for the sstable
TOC("TOC.txt"),
// custom component, used by e.g. custom compaction strategy
- CUSTOM(null);
+ CUSTOM(new String[] { null });
- final String repr;
+ final String[] repr;
Type(String repr)
{
+ this(new String[] { repr });
+ }
+
+ Type(String[] repr)
+ {
this.repr = repr;
}
static Type fromRepresentation(String repr)
{
for (Type type : TYPES)
- if (repr.equals(type.repr))
- return type;
+ for (String representation : type.repr)
+ if (repr.equals(representation))
+ return type;
return CUSTOM;
}
}
@@ -90,7 +96,7 @@ public class Component
public Component(Type type)
{
- this(type, type.repr);
+ this(type, type.repr[0]);
assert type != Type.CUSTOM;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5baf28d0/src/java/org/apache/cassandra/io/sstable/format/Version.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/Version.java b/src/java/org/apache/cassandra/io/sstable/format/Version.java
index 10ceb24..9ef0b43 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/Version.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/Version.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.io.sstable.format;
import java.util.regex.Pattern;
+import org.apache.cassandra.utils.ChecksumType;
+
/**
* A set of feature flags associated with a SSTable format
*
@@ -48,7 +50,9 @@ public abstract class Version
public abstract boolean hasNewStatsFile();
- public abstract boolean hasAllAdlerChecksums();
+ public abstract ChecksumType compressedChecksumType();
+
+ public abstract ChecksumType uncompressedChecksumType();
public abstract boolean hasRepairedAt();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5baf28d0/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index 860cd9f..6df4b1e 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.ChecksumType;
/**
* Legacy bigtable format
@@ -81,11 +82,11 @@ public class BigFormat implements SSTableFormat
static class WriterFactory extends SSTableWriter.Factory
{
@Override
- public SSTableWriter open(Descriptor descriptor,
- long keyCount,
- long repairedAt,
- CFMetaData metadata,
- MetadataCollector metadataCollector,
+ public SSTableWriter open(Descriptor descriptor,
+ long keyCount,
+ long repairedAt,
+ CFMetaData metadata,
+ MetadataCollector metadataCollector,
SerializationHeader header,
LifecycleTransaction txn)
{
@@ -126,7 +127,8 @@ public class BigFormat implements SSTableFormat
private final boolean isLatestVersion;
private final boolean hasSamplingLevel;
private final boolean newStatsFile;
- private final boolean hasAllAdlerChecksums;
+ private final ChecksumType compressedChecksumType;
+ private final ChecksumType uncompressedChecksumType;
private final boolean hasRepairedAt;
private final boolean tracksLegacyCounterShards;
private final boolean newFileName;
@@ -145,7 +147,19 @@ public class BigFormat implements SSTableFormat
isLatestVersion = version.compareTo(current_version) == 0;
hasSamplingLevel = version.compareTo("ka") >= 0;
newStatsFile = version.compareTo("ka") >= 0;
- hasAllAdlerChecksums = version.compareTo("ka") >= 0;
+
+ //For a while Adler32 was in use, now the CRC32 instrinsic is very good especially after Haswell
+ //PureJavaCRC32 was always faster than Adler32. See CASSANDRA-8684
+ ChecksumType checksumType = ChecksumType.CRC32;
+ if (version.compareTo("ka") >= 0 && version.compareTo("ma") < 0)
+ checksumType = ChecksumType.Adler32;
+ this.uncompressedChecksumType = checksumType;
+
+ checksumType = ChecksumType.CRC32;
+ if (version.compareTo("jb") >= 0 && version.compareTo("ma") < 0)
+ checksumType = ChecksumType.Adler32;
+ this.compressedChecksumType = checksumType;
+
hasRepairedAt = version.compareTo("ka") >= 0;
tracksLegacyCounterShards = version.compareTo("ka") >= 0;
@@ -177,9 +191,15 @@ public class BigFormat implements SSTableFormat
}
@Override
- public boolean hasAllAdlerChecksums()
+ public ChecksumType compressedChecksumType()
+ {
+ return compressedChecksumType;
+ }
+
+ @Override
+ public ChecksumType uncompressedChecksumType()
{
- return hasAllAdlerChecksums;
+ return uncompressedChecksumType;
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5baf28d0/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java b/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
index 976ff23..3fc247b 100644
--- a/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.io.util;
import java.io.File;
import java.io.IOException;
-import java.util.zip.Adler32;
+import java.util.zip.CRC32;
import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -52,7 +52,7 @@ public class ChecksummedRandomAccessReader extends RandomAccessReader
{
ChannelProxy channel = new ChannelProxy(file);
RandomAccessReader crcReader = RandomAccessReader.open(crcFile);
- DataIntegrityMetadata.ChecksumValidator validator = new DataIntegrityMetadata.ChecksumValidator(new Adler32(),
+ DataIntegrityMetadata.ChecksumValidator validator = new DataIntegrityMetadata.ChecksumValidator(new CRC32(),
crcReader,
file.getPath());
return new ChecksummedRandomAccessReader(file, channel, validator);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5baf28d0/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
index ac2ab47..70cd860 100644
--- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
+++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
@@ -25,7 +25,6 @@ import java.io.IOError;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
-import java.util.zip.Adler32;
import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;
import java.util.zip.Checksum;
@@ -35,7 +34,6 @@ import com.google.common.base.Charsets;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.utils.FBUtilities;
public class DataIntegrityMetadata
{
@@ -53,7 +51,7 @@ public class DataIntegrityMetadata
public ChecksumValidator(Descriptor descriptor) throws IOException
{
- this(descriptor.version.hasAllAdlerChecksums() ? new Adler32() : new CRC32(),
+ this(descriptor.version.uncompressedChecksumType().newInstance(),
RandomAccessReader.open(new File(descriptor.filenameFor(Component.CRC))),
descriptor.filenameFor(Component.DATA));
}
@@ -110,7 +108,7 @@ public class DataIntegrityMetadata
public FileDigestValidator(Descriptor descriptor) throws IOException
{
this.descriptor = descriptor;
- checksum = descriptor.version.hasAllAdlerChecksums() ? new Adler32() : new CRC32();
+ checksum = descriptor.version.uncompressedChecksumType().newInstance();
digestReader = RandomAccessReader.open(new File(descriptor.filenameFor(Component.DIGEST)));
dataReader = RandomAccessReader.open(new File(descriptor.filenameFor(Component.DATA)));
try
@@ -154,9 +152,9 @@ public class DataIntegrityMetadata
public static class ChecksumWriter
{
- private final Adler32 incrementalChecksum = new Adler32();
+ private final CRC32 incrementalChecksum = new CRC32();
private final DataOutput incrementalOut;
- private final Adler32 fullChecksum = new Adler32();
+ private final CRC32 fullChecksum = new CRC32();
public ChecksumWriter(DataOutput incrementalOut)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5baf28d0/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 099fd14..0a118b2 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -24,13 +24,13 @@ import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
-import java.util.zip.Adler32;
import java.util.zip.Checksum;
import com.google.common.collect.Iterators;
import com.google.common.primitives.Ints;
import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.utils.ChecksumType;
import org.apache.cassandra.utils.WrappedRunnable;
/**
@@ -65,10 +65,10 @@ public class CompressedInputStream extends InputStream
* @param source Input source to read compressed data from
* @param info Compression info
*/
- public CompressedInputStream(InputStream source, CompressionInfo info)
+ public CompressedInputStream(InputStream source, CompressionInfo info, ChecksumType checksumType)
{
this.info = info;
- this.checksum = new Adler32();
+ this.checksum = checksumType.newInstance();
this.buffer = new byte[info.parameters.chunkLength()];
// buffer is limited to store up to 1024 chunks
this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5baf28d0/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 47832f0..205291b 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -77,7 +77,7 @@ public class CompressedStreamReader extends StreamReader
SSTableWriter writer = createWriter(cfs, totalSize, repairedAt, format);
- CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo);
+ CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, inputVersion.compressedChecksumType());
BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion, header.toHeader(cfs.metadata));
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5baf28d0/src/java/org/apache/cassandra/utils/ChecksumType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ChecksumType.java b/src/java/org/apache/cassandra/utils/ChecksumType.java
new file mode 100644
index 0000000..c9a1eb8
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/ChecksumType.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.nio.ByteBuffer;
+import java.util.zip.Checksum;
+import java.util.zip.CRC32;
+import java.util.zip.Adler32;
+
+public enum ChecksumType
+{
+ Adler32()
+ {
+
+ @Override
+ public Checksum newInstance()
+ {
+ return new Adler32();
+ }
+
+ @Override
+ public void update(Checksum checksum, ByteBuffer buf)
+ {
+ ((Adler32)checksum).update(buf);
+ }
+
+ },
+ CRC32()
+ {
+
+ @Override
+ public Checksum newInstance()
+ {
+ return new CRC32();
+ }
+
+ @Override
+ public void update(Checksum checksum, ByteBuffer buf)
+ {
+ ((CRC32)checksum).update(buf);
+ }
+
+ };
+
+ public abstract Checksum newInstance();
+
+ public abstract void update(Checksum checksum, ByteBuffer buf);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5baf28d0/test/unit/org/apache/cassandra/db/VerifyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/VerifyTest.java b/test/unit/org/apache/cassandra/db/VerifyTest.java
index 3bd4a47..13ce0c1 100644
--- a/test/unit/org/apache/cassandra/db/VerifyTest.java
+++ b/test/unit/org/apache/cassandra/db/VerifyTest.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.db;
import com.google.common.base.Charsets;
+
import org.apache.cassandra.OrderedJUnit4ClassRunner;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
@@ -43,7 +44,7 @@ import org.junit.runner.RunWith;
import java.io.*;
import java.nio.file.Files;
-import java.util.zip.Adler32;
+import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;
import static org.junit.Assert.fail;
@@ -371,8 +372,8 @@ public class VerifyTest
protected long simpleFullChecksum(String filename) throws IOException
{
FileInputStream inputStream = new FileInputStream(filename);
- Adler32 adlerChecksum = new Adler32();
- CheckedInputStream cinStream = new CheckedInputStream(inputStream, adlerChecksum);
+ CRC32 checksum = new CRC32();
+ CheckedInputStream cinStream = new CheckedInputStream(inputStream, checksum);
byte[] b = new byte[128];
while (cinStream.read(b) >= 0) {
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5baf28d0/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
index cc76a9e..8f94cf2 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
@@ -24,7 +24,6 @@ import java.io.RandomAccessFile;
import java.util.Random;
import org.junit.Test;
-
import org.apache.cassandra.db.ClusteringComparator;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.exceptions.ConfigurationException;
@@ -35,6 +34,7 @@ import org.apache.cassandra.io.util.FileMark;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.io.util.SequentialWriter;
import org.apache.cassandra.schema.CompressionParams;
+import org.apache.cassandra.utils.ChecksumType;
import org.apache.cassandra.utils.SyncUtil;
import static org.junit.Assert.assertEquals;
@@ -84,7 +84,7 @@ public class CompressedRandomAccessReaderTest
writer.write("x".getBytes());
writer.finish();
- CompressedRandomAccessReader reader = CompressedRandomAccessReader.open(channel, new CompressionMetadata(filename + ".metadata", f.length()));
+ CompressedRandomAccessReader reader = CompressedRandomAccessReader.open(channel, new CompressionMetadata(filename + ".metadata", f.length(), ChecksumType.CRC32));
String res = reader.readLine();
assertEquals(res, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx");
assertEquals(40, res.length());
@@ -129,7 +129,7 @@ public class CompressedRandomAccessReaderTest
assert f.exists();
RandomAccessReader reader = compressed
- ? CompressedRandomAccessReader.open(channel, new CompressionMetadata(filename + ".metadata", f.length()))
+ ? CompressedRandomAccessReader.open(channel, new CompressionMetadata(filename + ".metadata", f.length(), ChecksumType.CRC32))
: RandomAccessReader.open(f);
String expected = "The quick brown fox jumps over the lazy dog";
assertEquals(expected.length(), reader.length());
@@ -171,7 +171,7 @@ public class CompressedRandomAccessReaderTest
ChannelProxy channel = new ChannelProxy(file);
// open compression metadata and get chunk information
- CompressionMetadata meta = new CompressionMetadata(metadata.getPath(), file.length());
+ CompressionMetadata meta = new CompressionMetadata(metadata.getPath(), file.length(), ChecksumType.CRC32);
CompressionMetadata.Chunk chunk = meta.chunkFor(0);
RandomAccessReader reader = CompressedRandomAccessReader.open(channel, meta);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5baf28d0/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
index db99317..28af0ae 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
@@ -31,6 +31,7 @@ import org.junit.After;
import org.junit.Test;
import junit.framework.Assert;
+
import org.apache.cassandra.db.ClusteringComparator;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.BytesType;
@@ -41,6 +42,7 @@ import org.apache.cassandra.io.util.FileMark;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.io.util.SequentialWriterTest;
import org.apache.cassandra.schema.CompressionParams;
+import org.apache.cassandra.utils.ChecksumType;
public class CompressedSequentialWriterTest extends SequentialWriterTest
{
@@ -115,7 +117,7 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest
}
assert f.exists();
- RandomAccessReader reader = CompressedRandomAccessReader.open(channel, new CompressionMetadata(filename + ".metadata", f.length()));
+ RandomAccessReader reader = CompressedRandomAccessReader.open(channel, new CompressionMetadata(filename + ".metadata", f.length(), ChecksumType.CRC32));
assertEquals(dataPre.length + rawPost.length, reader.length());
byte[] result = new byte[(int)reader.length()];
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5baf28d0/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
index 37aea91..e3014c3 100644
--- a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
@@ -21,7 +21,6 @@ import java.io.*;
import java.util.*;
import org.junit.Test;
-
import org.apache.cassandra.db.ClusteringComparator;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.io.compress.CompressedSequentialWriter;
@@ -32,6 +31,7 @@ import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.streaming.compress.CompressedInputStream;
import org.apache.cassandra.streaming.compress.CompressionInfo;
+import org.apache.cassandra.utils.ChecksumType;
import org.apache.cassandra.utils.Pair;
/**
@@ -108,7 +108,7 @@ public class CompressedInputStreamTest
// read buffer using CompressedInputStream
CompressionInfo info = new CompressionInfo(chunks, param);
- CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info);
+ CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, ChecksumType.CRC32);
DataInputStream in = new DataInputStream(input);
for (int i = 0; i < sections.size(); i++)