You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2015/09/19 12:49:49 UTC
[2/3] cassandra git commit: Moved crc_check_chance out of compression
options
Moved crc_check_chance out of compression options
Patch by Paulo Motta; reviewed by Sam Tunnicliffe for CASSANDRA-9839
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e25453ba
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e25453ba
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e25453ba
Branch: refs/heads/trunk
Commit: e25453ba55d35cac802719c6fb906460f13a9c36
Parents: aef7169
Author: Paulo Motta <pa...@gmail.com>
Authored: Wed Sep 2 19:24:47 2015 -0300
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Sat Sep 19 11:38:55 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 2 +
bin/cqlsh.py | 2 +-
...ore-3.0.0-alpha3-b6aa814-SNAPSHOT-shaded.jar | Bin 2217119 -> 0 bytes
...ore-3.0.0-alpha3-f9b7e7c-SNAPSHOT-shaded.jar | Bin 0 -> 2282233 bytes
...iver-internal-only-3.0.0a2.post0-03085e6.zip | Bin 230830 -> 0 bytes
...iver-internal-only-3.0.0a2.post0-379b6f2.zip | Bin 0 -> 230837 bytes
.../org/apache/cassandra/config/CFMetaData.java | 9 +-
.../cql3/statements/TableAttributes.java | 26 ++++++
.../apache/cassandra/db/ColumnFamilyStore.java | 61 ++++++++------
.../compress/CompressedRandomAccessReader.java | 10 ++-
.../cassandra/io/compress/LZ4Compressor.java | 2 +-
.../io/sstable/format/SSTableReader.java | 17 ++--
.../cassandra/io/util/IChecksummedFile.java | 27 ++++++
.../cassandra/io/util/ICompressedFile.java | 3 +-
.../apache/cassandra/io/util/SegmentedFile.java | 15 +++-
.../cassandra/schema/CompressionParams.java | 78 +++++------------
.../cassandra/schema/LegacySchemaMigrator.java | 8 +-
.../apache/cassandra/schema/SchemaKeyspace.java | 4 +
.../apache/cassandra/schema/TableParams.java | 24 +++++-
.../compress/CompressedInputStream.java | 7 +-
.../compress/CompressedStreamReader.java | 3 +-
.../apache/cassandra/utils/DefaultInteger.java | 51 -----------
.../apache/cassandra/utils/DefaultValue.java | 51 +++++++++++
.../miscellaneous/CrcCheckChanceTest.java | 84 ++++++++++++-------
.../compression/CompressedInputStreamTest.java | 3 +-
26 files changed, 308 insertions(+), 180 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2c0cde2..e55fd0a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.0-rc1
+ * Move crc_check_chance out of compression options (CASSANDRA-9839)
* Fix descending iteration past end of BTreeSearchIterator (CASSANDRA-10301)
* Transfer hints to a different node on decommission (CASSANDRA-10198)
* Check partition keys for CAS operations during stmt validation (CASSANDRA-10338)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 94f3c37..924c35f 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -94,6 +94,8 @@ Upgrading
- The `sstable_compression` and `chunk_length_kb` compression options have been deprecated.
The new options are `class` and `chunk_length_in_kb`. Disabling compression should now
be done by setting the new option `enabled` to `false`.
+ - The compression option `crc_check_chance` became a top-level table option, but is currently
+ enforced only against tables with enabled compression.
- Only map syntax is now allowed for caching options. ALL/NONE/KEYS_ONLY/ROWS_ONLY syntax
has been deprecated since 2.1.0 and is being removed in 3.0.0.
- The 'index_interval' option for 'CREATE TABLE' statements, which has been deprecated
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/bin/cqlsh.py
----------------------------------------------------------------------
diff --git a/bin/cqlsh.py b/bin/cqlsh.py
index 41fa4fd..07968d0 100644
--- a/bin/cqlsh.py
+++ b/bin/cqlsh.py
@@ -1184,7 +1184,7 @@ class Shell(cmd.Cmd):
return self.get_table_meta(ks, name)
except ColumnFamilyNotFound:
try:
- return self.get_view_meta(ks, name)
+ return self.get_view_meta(ks, name)
except MaterializedViewNotFound:
raise ObjectNotFound("%r not found in keyspace %r" % (name, ks))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/lib/cassandra-driver-core-3.0.0-alpha3-b6aa814-SNAPSHOT-shaded.jar
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-core-3.0.0-alpha3-b6aa814-SNAPSHOT-shaded.jar b/lib/cassandra-driver-core-3.0.0-alpha3-b6aa814-SNAPSHOT-shaded.jar
deleted file mode 100644
index 0f01a28..0000000
Binary files a/lib/cassandra-driver-core-3.0.0-alpha3-b6aa814-SNAPSHOT-shaded.jar and /dev/null differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/lib/cassandra-driver-core-3.0.0-alpha3-f9b7e7c-SNAPSHOT-shaded.jar
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-core-3.0.0-alpha3-f9b7e7c-SNAPSHOT-shaded.jar b/lib/cassandra-driver-core-3.0.0-alpha3-f9b7e7c-SNAPSHOT-shaded.jar
new file mode 100644
index 0000000..cf30f25
Binary files /dev/null and b/lib/cassandra-driver-core-3.0.0-alpha3-f9b7e7c-SNAPSHOT-shaded.jar differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/lib/cassandra-driver-internal-only-3.0.0a2.post0-03085e6.zip
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-internal-only-3.0.0a2.post0-03085e6.zip b/lib/cassandra-driver-internal-only-3.0.0a2.post0-03085e6.zip
deleted file mode 100644
index e672bd9..0000000
Binary files a/lib/cassandra-driver-internal-only-3.0.0a2.post0-03085e6.zip and /dev/null differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/lib/cassandra-driver-internal-only-3.0.0a2.post0-379b6f2.zip
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-internal-only-3.0.0a2.post0-379b6f2.zip b/lib/cassandra-driver-internal-only-3.0.0a2.post0-379b6f2.zip
new file mode 100644
index 0000000..5605ef7
Binary files /dev/null and b/lib/cassandra-driver-internal-only-3.0.0a2.post0-379b6f2.zip differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 69bf6bf..00ca704 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -199,6 +199,12 @@ public final class CFMetaData
return this;
}
+ public CFMetaData crcCheckChance(double prop)
+ {
+ params = TableParams.builder(params).crcCheckChance(prop).build();
+ return this;
+ }
+
public CFMetaData speculativeRetry(SpeculativeRetryParam prop)
{
params = TableParams.builder(params).speculativeRetry(prop).build();
@@ -270,7 +276,8 @@ public final class CFMetaData
isIndex = cfName.contains(".");
- assert partitioner != null;
+ assert partitioner != null : "This assertion failure is probably due to accessing Schema.instance " +
+ "from client-mode tools - See CASSANDRA-8143.";
this.partitioner = partitioner;
// A compact table should always have a clustering
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/cql3/statements/TableAttributes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/TableAttributes.java b/src/java/org/apache/cassandra/cql3/statements/TableAttributes.java
index ed64f0d..9e7bbfe 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TableAttributes.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TableAttributes.java
@@ -73,7 +73,17 @@ public final class TableAttributes extends PropertyDefinitions
builder.compaction(CompactionParams.fromMap(getMap(Option.COMPACTION)));
if (hasOption(Option.COMPRESSION))
+ {
+ //crc_check_chance was "promoted" from a compression property to a top-level-property after #9839
+ //so we temporarily accept it to be defined as a compression option, to maintain backwards compatibility
+ Map<String, String> compressionOpts = getMap(Option.COMPRESSION);
+ if (compressionOpts.containsKey(Option.CRC_CHECK_CHANCE.toString().toLowerCase()))
+ {
+ Double crcCheckChance = getDeprecatedCrcCheckChance(compressionOpts);
+ builder.crcCheckChance(crcCheckChance);
+ }
builder.compression(CompressionParams.fromMap(getMap(Option.COMPRESSION)));
+ }
if (hasOption(Option.DCLOCAL_READ_REPAIR_CHANCE))
builder.dcLocalReadRepairChance(getDouble(Option.DCLOCAL_READ_REPAIR_CHANCE));
@@ -99,9 +109,25 @@ public final class TableAttributes extends PropertyDefinitions
if (hasOption(Option.SPECULATIVE_RETRY))
builder.speculativeRetry(SpeculativeRetryParam.fromString(getString(Option.SPECULATIVE_RETRY)));
+ if (hasOption(Option.CRC_CHECK_CHANCE))
+ builder.crcCheckChance(getDouble(Option.CRC_CHECK_CHANCE));
+
return builder.build();
}
+ private Double getDeprecatedCrcCheckChance(Map<String, String> compressionOpts)
+ {
+ String value = compressionOpts.get(Option.CRC_CHECK_CHANCE.toString().toLowerCase());
+ try
+ {
+ return Double.parseDouble(value);
+ }
+ catch (NumberFormatException e)
+ {
+ throw new SyntaxException(String.format("Invalid double value %s for crc_check_chance.'", value));
+ }
+ }
+
private double getDouble(Option option)
{
String value = getString(option);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index cdb9770..a9a8f80 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -196,8 +196,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public final ViewManager.ForStore viewManager;
/* These are locally held copies to be changed from the config during runtime */
- private volatile DefaultInteger minCompactionThreshold;
- private volatile DefaultInteger maxCompactionThreshold;
+ private volatile DefaultValue<Integer> minCompactionThreshold;
+ private volatile DefaultValue<Integer> maxCompactionThreshold;
+ private volatile DefaultValue<Double> crcCheckChance;
+
private final CompactionStrategyManager compactionStrategyManager;
private volatile Directories directories;
@@ -219,10 +221,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
// only update these runtime-modifiable settings if they have not been modified.
if (!minCompactionThreshold.isModified())
for (ColumnFamilyStore cfs : concatWithIndexes())
- cfs.minCompactionThreshold = new DefaultInteger(metadata.params.compaction.minCompactionThreshold());
+ cfs.minCompactionThreshold = new DefaultValue(metadata.params.compaction.minCompactionThreshold());
if (!maxCompactionThreshold.isModified())
for (ColumnFamilyStore cfs : concatWithIndexes())
- cfs.maxCompactionThreshold = new DefaultInteger(metadata.params.compaction.maxCompactionThreshold());
+ cfs.maxCompactionThreshold = new DefaultValue(metadata.params.compaction.maxCompactionThreshold());
+ if (!crcCheckChance.isModified())
+ for (ColumnFamilyStore cfs : concatWithIndexes())
+ cfs.crcCheckChance = new DefaultValue(metadata.params.crcCheckChance);
compactionStrategyManager.maybeReload(metadata);
directories = compactionStrategyManager.getDirectories();
@@ -333,19 +338,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
}
- // FIXME: this is wrong, JMX should never update live CFMetaData objects
- public void setCrcCheckChance(double crcCheckChance)
- {
- try
- {
- metadata.params.compression.setCrcCheckChance(crcCheckChance);
- }
- catch (ConfigurationException e)
- {
- throw new IllegalArgumentException(e.getMessage());
- }
- }
-
private ColumnFamilyStore(Keyspace keyspace,
String columnFamilyName,
int generation,
@@ -370,14 +362,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
assert metadata != null : "null metadata for " + keyspace + ":" + columnFamilyName;
this.keyspace = keyspace;
- name = columnFamilyName;
this.metadata = metadata;
- this.minCompactionThreshold = new DefaultInteger(metadata.params.compaction.minCompactionThreshold());
- this.maxCompactionThreshold = new DefaultInteger(metadata.params.compaction.maxCompactionThreshold());
this.directories = directories;
- this.indexManager = new SecondaryIndexManager(this);
- this.viewManager = keyspace.viewManager.forTable(metadata.cfId);
- this.metric = new TableMetrics(this);
+ name = columnFamilyName;
+ minCompactionThreshold = new DefaultValue<>(metadata.params.compaction.minCompactionThreshold());
+ maxCompactionThreshold = new DefaultValue<>(metadata.params.compaction.maxCompactionThreshold());
+ crcCheckChance = new DefaultValue<>(metadata.params.crcCheckChance);
+ indexManager = new SecondaryIndexManager(this);
+ viewManager = keyspace.viewManager.forTable(metadata.cfId);
+ metric = new TableMetrics(this);
fileIndexGenerator.set(generation);
sampleLatencyNanos = DatabaseDescriptor.getReadRpcTimeout() / 2;
@@ -394,7 +387,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
// compaction strategy should be created after the CFS has been prepared
- this.compactionStrategyManager = new CompactionStrategyManager(this);
+ compactionStrategyManager = new CompactionStrategyManager(this);
this.directories = this.compactionStrategyManager.getDirectories();
if (maxCompactionThreshold.value() <= 0 || minCompactionThreshold.value() <=0)
@@ -2041,6 +2034,26 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return compactionStrategyManager;
}
+ public void setCrcCheckChance(double crcCheckChance)
+ {
+ try
+ {
+ TableParams.builder().crcCheckChance(crcCheckChance).build().validate();
+ for (ColumnFamilyStore cfs : concatWithIndexes())
+ cfs.crcCheckChance.set(crcCheckChance);
+ }
+ catch (ConfigurationException e)
+ {
+ throw new IllegalArgumentException(e.getMessage());
+ }
+ }
+
+
+ public Double getCrcCheckChance()
+ {
+ return crcCheckChance.value();
+ }
+
public void setCompactionThresholds(int minThreshold, int maxThreshold)
{
validateCompactionThresholds(minThreshold, maxThreshold);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index 7294923..b2759e6 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@ -21,6 +21,7 @@ import java.io.*;
import java.nio.ByteBuffer;
import java.util.concurrent.ThreadLocalRandom;
import java.util.zip.Checksum;
+import java.util.function.Supplier;
import com.google.common.primitives.Ints;
@@ -45,12 +46,14 @@ public class CompressedRandomAccessReader extends RandomAccessReader
// raw checksum bytes
private ByteBuffer checksumBytes;
+ private final Supplier<Double> crcCheckChanceSupplier;
protected CompressedRandomAccessReader(Builder builder)
{
super(builder);
this.metadata = builder.metadata;
this.checksum = metadata.checksumType.newInstance();
+ crcCheckChanceSupplier = builder.crcCheckChanceSupplier;
if (regions == null)
{
@@ -121,7 +124,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader
buffer.flip();
}
- if (metadata.parameters.getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
+ if (crcCheckChanceSupplier.get() > ThreadLocalRandom.current().nextDouble())
{
compressed.rewind();
metadata.checksumType.update( checksum, (compressed));
@@ -183,7 +186,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader
buffer.flip();
}
- if (metadata.parameters.getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
+ if (crcCheckChanceSupplier.get() > ThreadLocalRandom.current().nextDouble())
{
compressedChunk.position(chunkOffset).limit(chunkOffset + chunk.length);
@@ -236,18 +239,21 @@ public class CompressedRandomAccessReader extends RandomAccessReader
public final static class Builder extends RandomAccessReader.Builder
{
private final CompressionMetadata metadata;
+ private final Supplier<Double> crcCheckChanceSupplier;
public Builder(ICompressedFile file)
{
super(file.channel());
this.metadata = applyMetadata(file.getMetadata());
this.regions = file.regions();
+ this.crcCheckChanceSupplier = file.getCrcCheckChanceSupplier();
}
public Builder(ChannelProxy channel, CompressionMetadata metadata)
{
super(channel);
this.metadata = applyMetadata(metadata);
+ this.crcCheckChanceSupplier = (() -> 1.0); //100% crc_check_chance
}
private CompressionMetadata applyMetadata(CompressionMetadata metadata)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java b/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
index 069cc96..3a3b024 100644
--- a/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
+++ b/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
@@ -127,7 +127,7 @@ public class LZ4Compressor implements ICompressor
public Set<String> supportedOptions()
{
- return new HashSet<>(Arrays.asList(CompressionParams.CRC_CHECK_CHANCE));
+ return new HashSet<>();
}
public BufferType preferredBufferType()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index ebf28a4..c4ef239 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -655,12 +655,11 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
// e.g. by BulkLoader, which does not initialize the cache. As a kludge, we set up the cache
// here when we know we're being wired into the rest of the server infrastructure.
keyCache = CacheService.instance.keyCache;
-
- // ensure secondary index compression metadata is linked to the parent metadata.
- if (compression && metadata.isIndex())
+ final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.cfId);
+ if (cfs != null)
{
- getCompressionMetadata().parameters.setLiveMetadata(
- Schema.instance.getCFMetaData(metadata.ksName, metadata.getParentColumnFamilyName()));
+ ifile.setCrcCheckChanceSupplier(cfs::getCrcCheckChance);
+ dfile.setCrcCheckChanceSupplier(cfs::getCrcCheckChance);
}
}
@@ -1642,6 +1641,12 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
return dfile.onDiskLength;
}
+ @VisibleForTesting
+ public double getCrcCheckChance()
+ {
+ return dfile.getCrcCheckChanceSupplier().get();
+ }
+
/**
* Mark the sstable as obsolete, i.e., compacted into newer sstables.
*
@@ -2047,8 +2052,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
{
tidy.setup(this, trackHotness);
this.readMeter = tidy.global.readMeter;
- if (compression)
- getCompressionMetadata().parameters.setLiveMetadata(metadata);
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/io/util/IChecksummedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/IChecksummedFile.java b/src/java/org/apache/cassandra/io/util/IChecksummedFile.java
new file mode 100644
index 0000000..fa15a5e
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/IChecksummedFile.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.util.function.Supplier;
+
+public interface IChecksummedFile
+{
+ public Supplier<Double> getCrcCheckChanceSupplier();
+ public void setCrcCheckChanceSupplier(Supplier<Double> crcCheckChanceSupplier);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/io/util/ICompressedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ICompressedFile.java b/src/java/org/apache/cassandra/io/util/ICompressedFile.java
index 43d37dc..c149fd1 100644
--- a/src/java/org/apache/cassandra/io/util/ICompressedFile.java
+++ b/src/java/org/apache/cassandra/io/util/ICompressedFile.java
@@ -19,9 +19,10 @@ package org.apache.cassandra.io.util;
import org.apache.cassandra.io.compress.CompressionMetadata;
-public interface ICompressedFile
+public interface ICompressedFile extends IChecksummedFile
{
ChannelProxy channel();
CompressionMetadata getMetadata();
MmappedRegions regions();
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
index c2a2374..2504ecd 100644
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@ -21,6 +21,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
+import java.util.function.Supplier;
import com.google.common.util.concurrent.RateLimiter;
@@ -48,7 +49,7 @@ import static org.apache.cassandra.utils.Throwables.maybeFail;
* would need to be longer than 2GB, that segment will not be mmap'd, and a new RandomAccessFile will be created for
* each access to that segment.
*/
-public abstract class SegmentedFile extends SharedCloseableImpl
+public abstract class SegmentedFile extends SharedCloseableImpl implements IChecksummedFile
{
public final ChannelProxy channel;
public final int bufferSize;
@@ -57,6 +58,8 @@ public abstract class SegmentedFile extends SharedCloseableImpl
// This differs from length for compressed files (but we still need length for
// SegmentIterator because offsets in the file are relative to the uncompressed size)
public final long onDiskLength;
+ private Supplier<Double> crcCheckChanceSupplier = () -> 1.0;
+
/**
* Use getBuilder to get a Builder to construct a SegmentedFile.
@@ -134,6 +137,16 @@ public abstract class SegmentedFile extends SharedCloseableImpl
return reader;
}
+ public Supplier<Double> getCrcCheckChanceSupplier()
+ {
+ return crcCheckChanceSupplier;
+ }
+
+ public void setCrcCheckChanceSupplier(Supplier<Double> crcCheckChanceSupplier)
+ {
+ this.crcCheckChanceSupplier = crcCheckChanceSupplier;
+ }
+
public void dropPageCache(long before)
{
CLibrary.trySkipCache(channel.getFileDescriptor(), 0, before, path());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/schema/CompressionParams.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/CompressionParams.java b/src/java/org/apache/cassandra/schema/CompressionParams.java
index a73fcd1..443b6f1 100644
--- a/src/java/org/apache/cassandra/schema/CompressionParams.java
+++ b/src/java/org/apache/cassandra/schema/CompressionParams.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.*;
+import java.util.concurrent.TimeUnit;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
@@ -38,6 +39,8 @@ import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.compress.*;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.utils.NoSpamLogger;
import static java.lang.String.format;
@@ -48,9 +51,9 @@ public final class CompressionParams
private static volatile boolean hasLoggedSsTableCompressionWarning;
private static volatile boolean hasLoggedChunkLengthWarning;
+ private static volatile boolean hasLoggedCrcCheckChanceWarning;
public static final int DEFAULT_CHUNK_LENGTH = 65536;
- public static final double DEFAULT_CRC_CHECK_CHANCE = 1.0;
public static final IVersionedSerializer<CompressionParams> serializer = new Serializer();
public static final String CLASS = "class";
@@ -61,18 +64,16 @@ public final class CompressionParams
DEFAULT_CHUNK_LENGTH,
Collections.emptyMap());
+ private static final String CRC_CHECK_CHANCE_WARNING = "The option crc_check_chance was deprecated as a compression option. " +
+ "You should specify it as a top-level table option instead";
+
@Deprecated public static final String SSTABLE_COMPRESSION = "sstable_compression";
@Deprecated public static final String CHUNK_LENGTH_KB = "chunk_length_kb";
-
- public static final String CRC_CHECK_CHANCE = "crc_check_chance";
-
- public static final Set<String> GLOBAL_OPTIONS = ImmutableSet.of(CRC_CHECK_CHANCE);
+ @Deprecated public static final String CRC_CHECK_CHANCE = "crc_check_chance";
private final ICompressor sstableCompressor;
private final Integer chunkLength;
- private volatile double crcCheckChance;
- private final ImmutableMap<String, String> otherOptions; // Unrecognized options, can be use by the compressor
- private CFMetaData liveMetadata;
+ private final ImmutableMap<String, String> otherOptions; // Unrecognized options, can be used by the compressor
public static CompressionParams fromMap(Map<String, String> opts)
{
@@ -153,8 +154,6 @@ public final class CompressionParams
this.sstableCompressor = sstableCompressor;
this.chunkLength = chunkLength;
this.otherOptions = ImmutableMap.copyOf(otherOptions);
- String chance = otherOptions.get(CRC_CHECK_CHANCE);
- this.crcCheckChance = (chance == null) ? DEFAULT_CRC_CHECK_CHANCE : parseCrcCheckChance(chance);
}
public CompressionParams copy()
@@ -162,23 +161,6 @@ public final class CompressionParams
return new CompressionParams(sstableCompressor, chunkLength, otherOptions);
}
- public void setLiveMetadata(final CFMetaData liveMetadata)
- {
- if (liveMetadata == null)
- return;
-
- this.liveMetadata = liveMetadata;
- }
-
- public void setCrcCheckChance(double crcCheckChance) throws ConfigurationException
- {
- validateCrcCheckChance(crcCheckChance);
- this.crcCheckChance = crcCheckChance;
-
- if (liveMetadata != null && this != liveMetadata.params.compression)
- liveMetadata.params.compression.setCrcCheckChance(crcCheckChance);
- }
-
/**
* Checks if compression is enabled.
* @return {@code true} if compression is enabled, {@code false} otherwise.
@@ -202,31 +184,6 @@ public final class CompressionParams
return otherOptions;
}
- public double getCrcCheckChance()
- {
- return liveMetadata == null ? this.crcCheckChance : liveMetadata.params.compression.crcCheckChance;
- }
-
- private static double parseCrcCheckChance(String crcCheckChance) throws ConfigurationException
- {
- try
- {
- double chance = Double.parseDouble(crcCheckChance);
- validateCrcCheckChance(chance);
- return chance;
- }
- catch (NumberFormatException e)
- {
- throw new ConfigurationException("crc_check_chance should be a double");
- }
- }
-
- private static void validateCrcCheckChance(double crcCheckChance) throws ConfigurationException
- {
- if (crcCheckChance < 0.0d || crcCheckChance > 1.0d)
- throw new ConfigurationException("crc_check_chance should be between 0.0 and 1.0");
- }
-
public int chunkLength()
{
return chunkLength == null ? DEFAULT_CHUNK_LENGTH : chunkLength;
@@ -257,14 +214,23 @@ public final class CompressionParams
return null;
}
+ if (compressionOptions.containsKey(CRC_CHECK_CHANCE))
+ {
+ if (!hasLoggedCrcCheckChanceWarning)
+ {
+ logger.warn(CRC_CHECK_CHANCE_WARNING);
+ hasLoggedCrcCheckChanceWarning = true;
+ }
+ compressionOptions.remove(CRC_CHECK_CHANCE);
+ }
+
try
{
Method method = compressorClass.getMethod("create", Map.class);
ICompressor compressor = (ICompressor)method.invoke(null, compressionOptions);
// Check for unknown options
- AbstractSet<String> supportedOpts = Sets.union(compressor.supportedOptions(), GLOBAL_OPTIONS);
for (String provided : compressionOptions.keySet())
- if (!supportedOpts.contains(provided))
+ if (!compressor.supportedOptions().contains(provided))
throw new ConfigurationException("Unknown compression options " + provided);
return compressor;
}
@@ -363,7 +329,7 @@ public final class CompressionParams
if (options.containsKey(CHUNK_LENGTH_KB))
{
- if (options.containsKey(CHUNK_LENGTH_KB) && !hasLoggedChunkLengthWarning)
+ if (!hasLoggedChunkLengthWarning)
{
hasLoggedChunkLengthWarning = true;
logger.warn(format("The %s option has been deprecated. You should use %s instead",
@@ -474,8 +440,6 @@ public final class CompressionParams
c >>= 1;
}
}
-
- validateCrcCheckChance(crcCheckChance);
}
public Map<String, String> asMap()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
index 0d5a040..f23ec0b 100644
--- a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
+++ b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
@@ -361,7 +361,13 @@ public final class LegacySchemaMigrator
if (row.has("speculative_retry"))
params.speculativeRetry(SpeculativeRetryParam.fromString(row.getString("speculative_retry")));
- params.compression(CompressionParams.fromMap(fromJsonMap(row.getString("compression_parameters"))));
+ Map<String, String> compressionParameters = fromJsonMap(row.getString("compression_parameters"));
+ String crcCheckChance = compressionParameters.remove("crc_check_chance");
+ //crc_check_chance was promoted from a compression property to a top-level property
+ if (crcCheckChance != null)
+ params.crcCheckChance(Double.parseDouble(crcCheckChance));
+
+ params.compression(CompressionParams.fromMap(compressionParameters));
params.compaction(compactionFromRow(row));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index 2376fad..fb97ca5 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -98,6 +98,7 @@ public final class SchemaKeyspace
+ "comment text,"
+ "compaction frozen<map<text, text>>,"
+ "compression frozen<map<text, text>>,"
+ + "crc_check_chance double,"
+ "dclocal_read_repair_chance double,"
+ "default_time_to_live int,"
+ "extensions frozen<map<text, blob>>,"
@@ -159,6 +160,7 @@ public final class SchemaKeyspace
+ "comment text,"
+ "compaction frozen<map<text, text>>,"
+ "compression frozen<map<text, text>>,"
+ + "crc_check_chance double,"
+ "dclocal_read_repair_chance double,"
+ "default_time_to_live int,"
+ "extensions frozen<map<text, blob>>,"
@@ -908,6 +910,7 @@ public final class SchemaKeyspace
.add("min_index_interval", params.minIndexInterval)
.add("read_repair_chance", params.readRepairChance)
.add("speculative_retry", params.speculativeRetry.toString())
+ .add("crc_check_chance", params.crcCheckChance)
.frozenMap("caching", params.caching.asMap())
.frozenMap("compaction", params.compaction.asMap())
.frozenMap("compression", params.compression.asMap())
@@ -1149,6 +1152,7 @@ public final class SchemaKeyspace
.memtableFlushPeriodInMs(row.getInt("memtable_flush_period_in_ms"))
.minIndexInterval(row.getInt("min_index_interval"))
.readRepairChance(row.getDouble("read_repair_chance"))
+ .crcCheckChance(row.getDouble("crc_check_chance"))
.speculativeRetry(SpeculativeRetryParam.fromString(row.getString("speculative_retry")));
if (row.has("extensions"))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/schema/TableParams.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/TableParams.java b/src/java/org/apache/cassandra/schema/TableParams.java
index 64e2c36..7e44e73 100644
--- a/src/java/org/apache/cassandra/schema/TableParams.java
+++ b/src/java/org/apache/cassandra/schema/TableParams.java
@@ -46,7 +46,8 @@ public final class TableParams
MEMTABLE_FLUSH_PERIOD_IN_MS,
MIN_INDEX_INTERVAL,
READ_REPAIR_CHANCE,
- SPECULATIVE_RETRY;
+ SPECULATIVE_RETRY,
+ CRC_CHECK_CHANCE;
@Override
public String toString()
@@ -63,11 +64,13 @@ public final class TableParams
public static final int DEFAULT_MEMTABLE_FLUSH_PERIOD_IN_MS = 0;
public static final int DEFAULT_MIN_INDEX_INTERVAL = 128;
public static final int DEFAULT_MAX_INDEX_INTERVAL = 2048;
+ public static final double DEFAULT_CRC_CHECK_CHANCE = 1.0;
public final String comment;
public final double readRepairChance;
public final double dcLocalReadRepairChance;
public final double bloomFilterFpChance;
+ public final double crcCheckChance;
public final int gcGraceSeconds;
public final int defaultTimeToLive;
public final int memtableFlushPeriodInMs;
@@ -87,6 +90,7 @@ public final class TableParams
bloomFilterFpChance = builder.bloomFilterFpChance == null
? builder.compaction.defaultBloomFilterFbChance()
: builder.bloomFilterFpChance;
+ crcCheckChance = builder.crcCheckChance;
gcGraceSeconds = builder.gcGraceSeconds;
defaultTimeToLive = builder.defaultTimeToLive;
memtableFlushPeriodInMs = builder.memtableFlushPeriodInMs;
@@ -112,6 +116,7 @@ public final class TableParams
.compaction(params.compaction)
.compression(params.compression)
.dcLocalReadRepairChance(params.dcLocalReadRepairChance)
+ .crcCheckChance(params.crcCheckChance)
.defaultTimeToLive(params.defaultTimeToLive)
.gcGraceSeconds(params.gcGraceSeconds)
.maxIndexInterval(params.maxIndexInterval)
@@ -148,6 +153,13 @@ public final class TableParams
readRepairChance);
}
+ if (crcCheckChance < 0 || crcCheckChance > 1.0)
+ {
+ fail("%s must be larger than or equal to 0 and smaller than or equal to 1.0 (got %s)",
+ Option.CRC_CHECK_CHANCE,
+ crcCheckChance);
+ }
+
if (defaultTimeToLive < 0)
fail("%s must be greater than or equal to 0 (got %s)", Option.DEFAULT_TIME_TO_LIVE, defaultTimeToLive);
@@ -190,6 +202,7 @@ public final class TableParams
&& readRepairChance == p.readRepairChance
&& dcLocalReadRepairChance == p.dcLocalReadRepairChance
&& bloomFilterFpChance == p.bloomFilterFpChance
+ && crcCheckChance == p.crcCheckChance
&& gcGraceSeconds == p.gcGraceSeconds
&& defaultTimeToLive == p.defaultTimeToLive
&& memtableFlushPeriodInMs == p.memtableFlushPeriodInMs
@@ -209,6 +222,7 @@ public final class TableParams
readRepairChance,
dcLocalReadRepairChance,
bloomFilterFpChance,
+ crcCheckChance,
gcGraceSeconds,
defaultTimeToLive,
memtableFlushPeriodInMs,
@@ -229,6 +243,7 @@ public final class TableParams
.add(Option.READ_REPAIR_CHANCE.toString(), readRepairChance)
.add(Option.DCLOCAL_READ_REPAIR_CHANCE.toString(), dcLocalReadRepairChance)
.add(Option.BLOOM_FILTER_FP_CHANCE.toString(), bloomFilterFpChance)
+ .add(Option.CRC_CHECK_CHANCE.toString(), crcCheckChance)
.add(Option.GC_GRACE_SECONDS.toString(), gcGraceSeconds)
.add(Option.DEFAULT_TIME_TO_LIVE.toString(), defaultTimeToLive)
.add(Option.MEMTABLE_FLUSH_PERIOD_IN_MS.toString(), memtableFlushPeriodInMs)
@@ -248,6 +263,7 @@ public final class TableParams
private double readRepairChance = DEFAULT_READ_REPAIR_CHANCE;
private double dcLocalReadRepairChance = DEFAULT_DCLOCAL_READ_REPAIR_CHANCE;
private Double bloomFilterFpChance;
+ public Double crcCheckChance = DEFAULT_CRC_CHECK_CHANCE;
private int gcGraceSeconds = DEFAULT_GC_GRACE_SECONDS;
private int defaultTimeToLive = DEFAULT_DEFAULT_TIME_TO_LIVE;
private int memtableFlushPeriodInMs = DEFAULT_MEMTABLE_FLUSH_PERIOD_IN_MS;
@@ -292,6 +308,12 @@ public final class TableParams
return this;
}
+ public Builder crcCheckChance(double val)
+ {
+ crcCheckChance = val;
+ return this;
+ }
+
public Builder gcGraceSeconds(int val)
{
gcGraceSeconds = val;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 0a118b2..ccd0ac5 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Supplier;
import java.util.zip.Checksum;
import com.google.common.collect.Iterators;
@@ -41,6 +42,7 @@ public class CompressedInputStream extends InputStream
private final CompressionInfo info;
// chunk buffer
private final BlockingQueue<byte[]> dataBuffer;
+ private final Supplier<Double> crcCheckChanceSupplier;
// uncompressed bytes
private byte[] buffer;
@@ -65,13 +67,14 @@ public class CompressedInputStream extends InputStream
* @param source Input source to read compressed data from
* @param info Compression info
*/
- public CompressedInputStream(InputStream source, CompressionInfo info, ChecksumType checksumType)
+ public CompressedInputStream(InputStream source, CompressionInfo info, ChecksumType checksumType, Supplier<Double> crcCheckChanceSupplier)
{
this.info = info;
this.checksum = checksumType.newInstance();
this.buffer = new byte[info.parameters.chunkLength()];
// buffer is limited to store up to 1024 chunks
this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
+ this.crcCheckChanceSupplier = crcCheckChanceSupplier;
new Thread(new Reader(source, info, dataBuffer)).start();
}
@@ -111,7 +114,7 @@ public class CompressedInputStream extends InputStream
totalCompressedBytesRead += compressed.length;
// validate crc randomly
- if (info.parameters.getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
+ if (this.crcCheckChanceSupplier.get() > ThreadLocalRandom.current().nextDouble())
{
checksum.update(compressed, 0, compressed.length - checksumBytes.length);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index f702e24..69c7b87 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -77,7 +77,8 @@ public class CompressedStreamReader extends StreamReader
SSTableMultiWriter writer = createWriter(cfs, totalSize, repairedAt, format);
- CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, inputVersion.compressedChecksumType());
+ CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo,
+ inputVersion.compressedChecksumType(), cfs::getCrcCheckChance);
BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion, header.toHeader(cfs.metadata));
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/utils/DefaultInteger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/DefaultInteger.java b/src/java/org/apache/cassandra/utils/DefaultInteger.java
deleted file mode 100644
index 2a3efc7..0000000
--- a/src/java/org/apache/cassandra/utils/DefaultInteger.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.utils;
-
-
-public class DefaultInteger
-{
- private final int originalValue;
- private int currentValue;
-
- public DefaultInteger(int value)
- {
- originalValue = value;
- currentValue = value;
- }
-
- public int value()
- {
- return currentValue;
- }
-
- public void set(int i)
- {
- currentValue = i;
- }
-
- public void reset()
- {
- currentValue = originalValue;
- }
-
- public boolean isModified()
- {
- return originalValue != currentValue;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/utils/DefaultValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/DefaultValue.java b/src/java/org/apache/cassandra/utils/DefaultValue.java
new file mode 100644
index 0000000..5697ede
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/DefaultValue.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+
+public class DefaultValue<T>
+{
+ private final T originalValue;
+ private T currentValue;
+
+ public DefaultValue(T value)
+ {
+ originalValue = value;
+ currentValue = value;
+ }
+
+ public T value()
+ {
+ return currentValue;
+ }
+
+ public void set(T i)
+ {
+ currentValue = i;
+ }
+
+ public void reset()
+ {
+ currentValue = originalValue;
+ }
+
+ public boolean isModified()
+ {
+ return originalValue != currentValue;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java
index e0879d2..3a68e4a 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java
@@ -35,11 +35,28 @@ import org.apache.cassandra.utils.FBUtilities;
public class CrcCheckChanceTest extends CQLTester
{
+
+
@Test
- public void testChangingCrcCheckChance() throws Throwable
+ public void testChangingCrcCheckChanceNewFormat() throws Throwable
+ {
+ testChangingCrcCheckChance(true);
+ }
+
+ @Test
+ public void testChangingCrcCheckChanceOldFormat() throws Throwable
+ {
+ testChangingCrcCheckChance(false);
+ }
+
+
+ public void testChangingCrcCheckChance(boolean newFormat) throws Throwable
{
//Start with crc_check_chance of 99%
- createTable("CREATE TABLE %s (p text, c text, v text, s text static, PRIMARY KEY (p, c)) WITH compression = {'sstable_compression': 'LZ4Compressor', 'crc_check_chance' : 0.99}");
+ if (newFormat)
+ createTable("CREATE TABLE %s (p text, c text, v text, s text static, PRIMARY KEY (p, c)) WITH compression = {'sstable_compression': 'LZ4Compressor'} AND crc_check_chance = 0.99;");
+ else
+ createTable("CREATE TABLE %s (p text, c text, v text, s text static, PRIMARY KEY (p, c)) WITH compression = {'sstable_compression': 'LZ4Compressor', 'crc_check_chance' : 0.99}");
execute("CREATE INDEX foo ON %s(v)");
@@ -47,37 +64,37 @@ public class CrcCheckChanceTest extends CQLTester
execute("INSERT INTO %s(p, c, v) values (?, ?, ?)", "p1", "k2", "v2");
execute("INSERT INTO %s(p, s) values (?, ?)", "p2", "sv2");
-
ColumnFamilyStore cfs = Keyspace.open(CQLTester.KEYSPACE).getColumnFamilyStore(currentTable());
ColumnFamilyStore indexCfs = cfs.indexManager.getAllIndexColumnFamilyStores().iterator().next();
cfs.forceBlockingFlush();
- Assert.assertEquals(0.99, cfs.metadata.params.compression.getCrcCheckChance());
- Assert.assertEquals(0.99, cfs.getLiveSSTables().iterator().next().getCompressionMetadata().parameters.getCrcCheckChance());
- Assert.assertEquals(0.99, indexCfs.metadata.params.compression.getCrcCheckChance());
- Assert.assertEquals(0.99, indexCfs.getLiveSSTables().iterator().next().getCompressionMetadata().parameters.getCrcCheckChance());
+ Assert.assertEquals(0.99, cfs.getCrcCheckChance());
+ Assert.assertEquals(0.99, cfs.getLiveSSTables().iterator().next().getCrcCheckChance());
+ Assert.assertEquals(0.99, indexCfs.getCrcCheckChance());
+ Assert.assertEquals(0.99, indexCfs.getLiveSSTables().iterator().next().getCrcCheckChance());
//Test for stack overflow
- cfs.setCrcCheckChance(0.99);
+ if (newFormat)
+ alterTable("ALTER TABLE %s WITH crc_check_chance = 0.99");
+ else
+ alterTable("ALTER TABLE %s WITH compression = {'sstable_compression': 'LZ4Compressor', 'crc_check_chance': 0.99}");
assertRows(execute("SELECT * FROM %s WHERE p=?", "p1"),
- row("p1", "k1", "sv1", "v1"),
- row("p1", "k2", "sv1", "v2")
+ row("p1", "k1", "sv1", "v1"),
+ row("p1", "k2", "sv1", "v2")
);
assertRows(execute("SELECT * FROM %s WHERE v=?", "v1"),
- row("p1", "k1", "sv1", "v1")
+ row("p1", "k1", "sv1", "v1")
);
//Write a few SSTables then Compact
-
execute("INSERT INTO %s(p, c, v, s) values (?, ?, ?, ?)", "p1", "k1", "v1", "sv1");
execute("INSERT INTO %s(p, c, v) values (?, ?, ?)", "p1", "k2", "v2");
execute("INSERT INTO %s(p, s) values (?, ?)", "p2", "sv2");
cfs.forceBlockingFlush();
-
execute("INSERT INTO %s(p, c, v, s) values (?, ?, ?, ?)", "p1", "k1", "v1", "sv1");
execute("INSERT INTO %s(p, c, v) values (?, ?, ?)", "p1", "k2", "v2");
execute("INSERT INTO %s(p, s) values (?, ?)", "p2", "sv2");
@@ -89,34 +106,45 @@ public class CrcCheckChanceTest extends CQLTester
execute("INSERT INTO %s(p, s) values (?, ?)", "p2", "sv2");
cfs.forceBlockingFlush();
-
cfs.forceMajorCompaction();
- //Verify when we alter the value the live sstable readers hold the new one
- alterTable("ALTER TABLE %s WITH compression = {'sstable_compression': 'LZ4Compressor', 'crc_check_chance': 0.01}");
+ //Now let's change via JMX
+ cfs.setCrcCheckChance(0.01);
- Assert.assertEquals( 0.01, cfs.metadata.params.compression.getCrcCheckChance());
- Assert.assertEquals( 0.01, cfs.getLiveSSTables().iterator().next().getCompressionMetadata().parameters.getCrcCheckChance());
- Assert.assertEquals( 0.01, indexCfs.metadata.params.compression.getCrcCheckChance());
- Assert.assertEquals( 0.01, indexCfs.getLiveSSTables().iterator().next().getCompressionMetadata().parameters.getCrcCheckChance());
+ Assert.assertEquals(0.01, cfs.getCrcCheckChance());
+ Assert.assertEquals(0.01, cfs.getLiveSSTables().iterator().next().getCrcCheckChance());
+ Assert.assertEquals(0.01, indexCfs.getCrcCheckChance());
+ Assert.assertEquals(0.01, indexCfs.getLiveSSTables().iterator().next().getCrcCheckChance());
assertRows(execute("SELECT * FROM %s WHERE p=?", "p1"),
- row("p1", "k1", "sv1", "v1"),
- row("p1", "k2", "sv1", "v2")
+ row("p1", "k1", "sv1", "v1"),
+ row("p1", "k2", "sv1", "v2")
);
assertRows(execute("SELECT * FROM %s WHERE v=?", "v1"),
- row("p1", "k1", "sv1", "v1")
+ row("p1", "k1", "sv1", "v1")
);
+ //Alter again via schema
+ if (newFormat)
+ alterTable("ALTER TABLE %s WITH crc_check_chance = 0.5");
+ else
+ alterTable("ALTER TABLE %s WITH compression = {'sstable_compression': 'LZ4Compressor', 'crc_check_chance': 0.5}");
+
+ //We should be able to get the new value by accessing directly the schema metadata
+ Assert.assertEquals(0.5, cfs.metadata.params.crcCheckChance);
+
+ //but previous JMX-set value will persist until next restart
+ Assert.assertEquals(0.01, cfs.getLiveSSTables().iterator().next().getCrcCheckChance());
+ Assert.assertEquals(0.01, indexCfs.getCrcCheckChance());
+ Assert.assertEquals(0.01, indexCfs.getLiveSSTables().iterator().next().getCrcCheckChance());
//Verify the call used by JMX still works
cfs.setCrcCheckChance(0.03);
- Assert.assertEquals( 0.03, cfs.metadata.params.compression.getCrcCheckChance());
- Assert.assertEquals( 0.03, cfs.getLiveSSTables().iterator().next().getCompressionMetadata().parameters.getCrcCheckChance());
- Assert.assertEquals( 0.03, indexCfs.metadata.params.compression.getCrcCheckChance());
- Assert.assertEquals( 0.03, indexCfs.getLiveSSTables().iterator().next().getCompressionMetadata().parameters.getCrcCheckChance());
-
+ Assert.assertEquals(0.03, cfs.getCrcCheckChance());
+ Assert.assertEquals(0.03, cfs.getLiveSSTables().iterator().next().getCrcCheckChance());
+ Assert.assertEquals(0.03, indexCfs.getCrcCheckChance());
+ Assert.assertEquals(0.03, indexCfs.getLiveSSTables().iterator().next().getCrcCheckChance());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
index 30670fb..db05a3e 100644
--- a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
@@ -108,7 +108,8 @@ public class CompressedInputStreamTest
// read buffer using CompressedInputStream
CompressionInfo info = new CompressionInfo(chunks, param);
- CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, ChecksumType.CRC32);
+ CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info,
+ ChecksumType.CRC32, () -> 1.0);
DataInputStream in = new DataInputStream(input);
for (int i = 0; i < sections.size(); i++)