You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2015/09/19 12:49:49 UTC

[2/3] cassandra git commit: Moved crc_check_chance out of compression options

Moved crc_check_chance out of compression options

Patch by Paulo Motta; reviewed by Sam Tunnicliffe for CASSANDRA-9839


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e25453ba
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e25453ba
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e25453ba

Branch: refs/heads/trunk
Commit: e25453ba55d35cac802719c6fb906460f13a9c36
Parents: aef7169
Author: Paulo Motta <pa...@gmail.com>
Authored: Wed Sep 2 19:24:47 2015 -0300
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Sat Sep 19 11:38:55 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   2 +
 bin/cqlsh.py                                    |   2 +-
 ...ore-3.0.0-alpha3-b6aa814-SNAPSHOT-shaded.jar | Bin 2217119 -> 0 bytes
 ...ore-3.0.0-alpha3-f9b7e7c-SNAPSHOT-shaded.jar | Bin 0 -> 2282233 bytes
 ...iver-internal-only-3.0.0a2.post0-03085e6.zip | Bin 230830 -> 0 bytes
 ...iver-internal-only-3.0.0a2.post0-379b6f2.zip | Bin 0 -> 230837 bytes
 .../org/apache/cassandra/config/CFMetaData.java |   9 +-
 .../cql3/statements/TableAttributes.java        |  26 ++++++
 .../apache/cassandra/db/ColumnFamilyStore.java  |  61 ++++++++------
 .../compress/CompressedRandomAccessReader.java  |  10 ++-
 .../cassandra/io/compress/LZ4Compressor.java    |   2 +-
 .../io/sstable/format/SSTableReader.java        |  17 ++--
 .../cassandra/io/util/IChecksummedFile.java     |  27 ++++++
 .../cassandra/io/util/ICompressedFile.java      |   3 +-
 .../apache/cassandra/io/util/SegmentedFile.java |  15 +++-
 .../cassandra/schema/CompressionParams.java     |  78 +++++------------
 .../cassandra/schema/LegacySchemaMigrator.java  |   8 +-
 .../apache/cassandra/schema/SchemaKeyspace.java |   4 +
 .../apache/cassandra/schema/TableParams.java    |  24 +++++-
 .../compress/CompressedInputStream.java         |   7 +-
 .../compress/CompressedStreamReader.java        |   3 +-
 .../apache/cassandra/utils/DefaultInteger.java  |  51 -----------
 .../apache/cassandra/utils/DefaultValue.java    |  51 +++++++++++
 .../miscellaneous/CrcCheckChanceTest.java       |  84 ++++++++++++-------
 .../compression/CompressedInputStreamTest.java  |   3 +-
 26 files changed, 308 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2c0cde2..e55fd0a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.0-rc1
+ * Move crc_check_chance out of compression options (CASSANDRA-9839)
  * Fix descending iteration past end of BTreeSearchIterator (CASSANDRA-10301)
  * Transfer hints to a different node on decommission (CASSANDRA-10198)
  * Check partition keys for CAS operations during stmt validation (CASSANDRA-10338)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 94f3c37..924c35f 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -94,6 +94,8 @@ Upgrading
    - The `sstable_compression` and `chunk_length_kb` compression options have been deprecated.
      The new options are `class` and `chunk_length_in_kb`. Disabling compression should now
      be done by setting the new option `enabled` to `false`.
+   - The compression option `crc_check_chance` became a top-level table option, but is currently
+     enforced only against tables with enabled compression.
    - Only map syntax is now allowed for caching options. ALL/NONE/KEYS_ONLY/ROWS_ONLY syntax
      has been deprecated since 2.1.0 and is being removed in 3.0.0.
    - The 'index_interval' option for 'CREATE TABLE' statements, which has been deprecated

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/bin/cqlsh.py
----------------------------------------------------------------------
diff --git a/bin/cqlsh.py b/bin/cqlsh.py
index 41fa4fd..07968d0 100644
--- a/bin/cqlsh.py
+++ b/bin/cqlsh.py
@@ -1184,7 +1184,7 @@ class Shell(cmd.Cmd):
             return self.get_table_meta(ks, name)
         except ColumnFamilyNotFound:
             try:
-               return self.get_view_meta(ks, name)
+                return self.get_view_meta(ks, name)
             except MaterializedViewNotFound:
                 raise ObjectNotFound("%r not found in keyspace %r" % (name, ks))
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/lib/cassandra-driver-core-3.0.0-alpha3-b6aa814-SNAPSHOT-shaded.jar
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-core-3.0.0-alpha3-b6aa814-SNAPSHOT-shaded.jar b/lib/cassandra-driver-core-3.0.0-alpha3-b6aa814-SNAPSHOT-shaded.jar
deleted file mode 100644
index 0f01a28..0000000
Binary files a/lib/cassandra-driver-core-3.0.0-alpha3-b6aa814-SNAPSHOT-shaded.jar and /dev/null differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/lib/cassandra-driver-core-3.0.0-alpha3-f9b7e7c-SNAPSHOT-shaded.jar
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-core-3.0.0-alpha3-f9b7e7c-SNAPSHOT-shaded.jar b/lib/cassandra-driver-core-3.0.0-alpha3-f9b7e7c-SNAPSHOT-shaded.jar
new file mode 100644
index 0000000..cf30f25
Binary files /dev/null and b/lib/cassandra-driver-core-3.0.0-alpha3-f9b7e7c-SNAPSHOT-shaded.jar differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/lib/cassandra-driver-internal-only-3.0.0a2.post0-03085e6.zip
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-internal-only-3.0.0a2.post0-03085e6.zip b/lib/cassandra-driver-internal-only-3.0.0a2.post0-03085e6.zip
deleted file mode 100644
index e672bd9..0000000
Binary files a/lib/cassandra-driver-internal-only-3.0.0a2.post0-03085e6.zip and /dev/null differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/lib/cassandra-driver-internal-only-3.0.0a2.post0-379b6f2.zip
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-internal-only-3.0.0a2.post0-379b6f2.zip b/lib/cassandra-driver-internal-only-3.0.0a2.post0-379b6f2.zip
new file mode 100644
index 0000000..5605ef7
Binary files /dev/null and b/lib/cassandra-driver-internal-only-3.0.0a2.post0-379b6f2.zip differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 69bf6bf..00ca704 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -199,6 +199,12 @@ public final class CFMetaData
         return this;
     }
 
+    public CFMetaData crcCheckChance(double prop)
+    {
+        params = TableParams.builder(params).crcCheckChance(prop).build();
+        return this;
+    }
+
     public CFMetaData speculativeRetry(SpeculativeRetryParam prop)
     {
         params = TableParams.builder(params).speculativeRetry(prop).build();
@@ -270,7 +276,8 @@ public final class CFMetaData
 
         isIndex = cfName.contains(".");
 
-        assert partitioner != null;
+        assert partitioner != null : "This assertion failure is probably due to accessing Schema.instance " +
+                                     "from client-mode tools - See CASSANDRA-8143.";
         this.partitioner = partitioner;
 
         // A compact table should always have a clustering

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/cql3/statements/TableAttributes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/TableAttributes.java b/src/java/org/apache/cassandra/cql3/statements/TableAttributes.java
index ed64f0d..9e7bbfe 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TableAttributes.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TableAttributes.java
@@ -73,7 +73,17 @@ public final class TableAttributes extends PropertyDefinitions
             builder.compaction(CompactionParams.fromMap(getMap(Option.COMPACTION)));
 
         if (hasOption(Option.COMPRESSION))
+        {
+            //crc_check_chance was "promoted" from a compression property to a top-level-property after #9839
+            //so we temporarily accept it to be defined as a compression option, to maintain backwards compatibility
+            Map<String, String> compressionOpts = getMap(Option.COMPRESSION);
+            if (compressionOpts.containsKey(Option.CRC_CHECK_CHANCE.toString().toLowerCase()))
+            {
+                Double crcCheckChance = getDeprecatedCrcCheckChance(compressionOpts);
+                builder.crcCheckChance(crcCheckChance);
+            }
             builder.compression(CompressionParams.fromMap(getMap(Option.COMPRESSION)));
+        }
 
         if (hasOption(Option.DCLOCAL_READ_REPAIR_CHANCE))
             builder.dcLocalReadRepairChance(getDouble(Option.DCLOCAL_READ_REPAIR_CHANCE));
@@ -99,9 +109,25 @@ public final class TableAttributes extends PropertyDefinitions
         if (hasOption(Option.SPECULATIVE_RETRY))
             builder.speculativeRetry(SpeculativeRetryParam.fromString(getString(Option.SPECULATIVE_RETRY)));
 
+        if (hasOption(Option.CRC_CHECK_CHANCE))
+            builder.crcCheckChance(getDouble(Option.CRC_CHECK_CHANCE));
+
         return builder.build();
     }
 
+    private Double getDeprecatedCrcCheckChance(Map<String, String> compressionOpts)
+    {
+        String value = compressionOpts.get(Option.CRC_CHECK_CHANCE.toString().toLowerCase());
+        try
+        {
+            return Double.parseDouble(value);
+        }
+        catch (NumberFormatException e)
+        {
+            throw new SyntaxException(String.format("Invalid double value %s for crc_check_chance.'", value));
+        }
+    }
+
     private double getDouble(Option option)
     {
         String value = getString(option);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index cdb9770..a9a8f80 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -196,8 +196,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     public final ViewManager.ForStore viewManager;
 
     /* These are locally held copies to be changed from the config during runtime */
-    private volatile DefaultInteger minCompactionThreshold;
-    private volatile DefaultInteger maxCompactionThreshold;
+    private volatile DefaultValue<Integer> minCompactionThreshold;
+    private volatile DefaultValue<Integer> maxCompactionThreshold;
+    private volatile DefaultValue<Double> crcCheckChance;
+
     private final CompactionStrategyManager compactionStrategyManager;
 
     private volatile Directories directories;
@@ -219,10 +221,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         // only update these runtime-modifiable settings if they have not been modified.
         if (!minCompactionThreshold.isModified())
             for (ColumnFamilyStore cfs : concatWithIndexes())
-                cfs.minCompactionThreshold = new DefaultInteger(metadata.params.compaction.minCompactionThreshold());
+                cfs.minCompactionThreshold = new DefaultValue(metadata.params.compaction.minCompactionThreshold());
         if (!maxCompactionThreshold.isModified())
             for (ColumnFamilyStore cfs : concatWithIndexes())
-                cfs.maxCompactionThreshold = new DefaultInteger(metadata.params.compaction.maxCompactionThreshold());
+                cfs.maxCompactionThreshold = new DefaultValue(metadata.params.compaction.maxCompactionThreshold());
+        if (!crcCheckChance.isModified())
+            for (ColumnFamilyStore cfs : concatWithIndexes())
+                cfs.crcCheckChance = new DefaultValue(metadata.params.crcCheckChance);
 
         compactionStrategyManager.maybeReload(metadata);
         directories = compactionStrategyManager.getDirectories();
@@ -333,19 +338,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     }
 
-    // FIXME: this is wrong, JMX should never update live CFMetaData objects
-    public void setCrcCheckChance(double crcCheckChance)
-    {
-        try
-        {
-            metadata.params.compression.setCrcCheckChance(crcCheckChance);
-        }
-        catch (ConfigurationException e)
-        {
-            throw new IllegalArgumentException(e.getMessage());
-        }
-    }
-
     private ColumnFamilyStore(Keyspace keyspace,
                              String columnFamilyName,
                              int generation,
@@ -370,14 +362,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         assert metadata != null : "null metadata for " + keyspace + ":" + columnFamilyName;
 
         this.keyspace = keyspace;
-        name = columnFamilyName;
         this.metadata = metadata;
-        this.minCompactionThreshold = new DefaultInteger(metadata.params.compaction.minCompactionThreshold());
-        this.maxCompactionThreshold = new DefaultInteger(metadata.params.compaction.maxCompactionThreshold());
         this.directories = directories;
-        this.indexManager = new SecondaryIndexManager(this);
-        this.viewManager = keyspace.viewManager.forTable(metadata.cfId);
-        this.metric = new TableMetrics(this);
+        name = columnFamilyName;
+        minCompactionThreshold = new DefaultValue<>(metadata.params.compaction.minCompactionThreshold());
+        maxCompactionThreshold = new DefaultValue<>(metadata.params.compaction.maxCompactionThreshold());
+        crcCheckChance = new DefaultValue<>(metadata.params.crcCheckChance);
+        indexManager = new SecondaryIndexManager(this);
+        viewManager = keyspace.viewManager.forTable(metadata.cfId);
+        metric = new TableMetrics(this);
         fileIndexGenerator.set(generation);
         sampleLatencyNanos = DatabaseDescriptor.getReadRpcTimeout() / 2;
 
@@ -394,7 +387,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
 
         // compaction strategy should be created after the CFS has been prepared
-        this.compactionStrategyManager = new CompactionStrategyManager(this);
+        compactionStrategyManager = new CompactionStrategyManager(this);
         this.directories = this.compactionStrategyManager.getDirectories();
 
         if (maxCompactionThreshold.value() <= 0 || minCompactionThreshold.value() <=0)
@@ -2041,6 +2034,26 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return compactionStrategyManager;
     }
 
+    public void setCrcCheckChance(double crcCheckChance)
+    {
+        try
+        {
+            TableParams.builder().crcCheckChance(crcCheckChance).build().validate();
+            for (ColumnFamilyStore cfs : concatWithIndexes())
+                cfs.crcCheckChance.set(crcCheckChance);
+        }
+        catch (ConfigurationException e)
+        {
+            throw new IllegalArgumentException(e.getMessage());
+        }
+    }
+
+
+    public Double getCrcCheckChance()
+    {
+        return crcCheckChance.value();
+    }
+
     public void setCompactionThresholds(int minThreshold, int maxThreshold)
     {
         validateCompactionThresholds(minThreshold, maxThreshold);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index 7294923..b2759e6 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@ -21,6 +21,7 @@ import java.io.*;
 import java.nio.ByteBuffer;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.zip.Checksum;
+import java.util.function.Supplier;
 
 import com.google.common.primitives.Ints;
 
@@ -45,12 +46,14 @@ public class CompressedRandomAccessReader extends RandomAccessReader
 
     // raw checksum bytes
     private ByteBuffer checksumBytes;
+    private final Supplier<Double> crcCheckChanceSupplier;
 
     protected CompressedRandomAccessReader(Builder builder)
     {
         super(builder);
         this.metadata = builder.metadata;
         this.checksum = metadata.checksumType.newInstance();
+        crcCheckChanceSupplier = builder.crcCheckChanceSupplier;
 
         if (regions == null)
         {
@@ -121,7 +124,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader
                 buffer.flip();
             }
 
-            if (metadata.parameters.getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
+            if (crcCheckChanceSupplier.get() > ThreadLocalRandom.current().nextDouble())
             {
                 compressed.rewind();
                 metadata.checksumType.update( checksum, (compressed));
@@ -183,7 +186,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader
                 buffer.flip();
             }
 
-            if (metadata.parameters.getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
+            if (crcCheckChanceSupplier.get() > ThreadLocalRandom.current().nextDouble())
             {
                 compressedChunk.position(chunkOffset).limit(chunkOffset + chunk.length);
 
@@ -236,18 +239,21 @@ public class CompressedRandomAccessReader extends RandomAccessReader
     public final static class Builder extends RandomAccessReader.Builder
     {
         private final CompressionMetadata metadata;
+        private final Supplier<Double> crcCheckChanceSupplier;
 
         public Builder(ICompressedFile file)
         {
             super(file.channel());
             this.metadata = applyMetadata(file.getMetadata());
             this.regions = file.regions();
+            this.crcCheckChanceSupplier = file.getCrcCheckChanceSupplier();
         }
 
         public Builder(ChannelProxy channel, CompressionMetadata metadata)
         {
             super(channel);
             this.metadata = applyMetadata(metadata);
+            this.crcCheckChanceSupplier = (() -> 1.0); //100% crc_check_chance
         }
 
         private CompressionMetadata applyMetadata(CompressionMetadata metadata)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java b/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
index 069cc96..3a3b024 100644
--- a/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
+++ b/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
@@ -127,7 +127,7 @@ public class LZ4Compressor implements ICompressor
 
     public Set<String> supportedOptions()
     {
-        return new HashSet<>(Arrays.asList(CompressionParams.CRC_CHECK_CHANCE));
+        return new HashSet<>();
     }
 
     public BufferType preferredBufferType()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index ebf28a4..c4ef239 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -655,12 +655,11 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         // e.g. by BulkLoader, which does not initialize the cache.  As a kludge, we set up the cache
         // here when we know we're being wired into the rest of the server infrastructure.
         keyCache = CacheService.instance.keyCache;
-
-        // ensure secondary index compression metadata is linked to the parent metadata.
-        if (compression && metadata.isIndex())
+        final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.cfId);
+        if (cfs != null)
         {
-            getCompressionMetadata().parameters.setLiveMetadata(
-                    Schema.instance.getCFMetaData(metadata.ksName, metadata.getParentColumnFamilyName()));
+            ifile.setCrcCheckChanceSupplier(cfs::getCrcCheckChance);
+            dfile.setCrcCheckChanceSupplier(cfs::getCrcCheckChance);
         }
     }
 
@@ -1642,6 +1641,12 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         return dfile.onDiskLength;
     }
 
+    @VisibleForTesting
+    public double getCrcCheckChance()
+    {
+        return dfile.getCrcCheckChanceSupplier().get();
+    }
+
     /**
      * Mark the sstable as obsolete, i.e., compacted into newer sstables.
      *
@@ -2047,8 +2052,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
     {
         tidy.setup(this, trackHotness);
         this.readMeter = tidy.global.readMeter;
-        if (compression)
-            getCompressionMetadata().parameters.setLiveMetadata(metadata);
     }
 
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/io/util/IChecksummedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/IChecksummedFile.java b/src/java/org/apache/cassandra/io/util/IChecksummedFile.java
new file mode 100644
index 0000000..fa15a5e
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/IChecksummedFile.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.util.function.Supplier;
+
+public interface IChecksummedFile
+{
+    public Supplier<Double> getCrcCheckChanceSupplier();
+    public void setCrcCheckChanceSupplier(Supplier<Double> crcCheckChanceSupplier);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/io/util/ICompressedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ICompressedFile.java b/src/java/org/apache/cassandra/io/util/ICompressedFile.java
index 43d37dc..c149fd1 100644
--- a/src/java/org/apache/cassandra/io/util/ICompressedFile.java
+++ b/src/java/org/apache/cassandra/io/util/ICompressedFile.java
@@ -19,9 +19,10 @@ package org.apache.cassandra.io.util;
 
 import org.apache.cassandra.io.compress.CompressionMetadata;
 
-public interface ICompressedFile
+public interface ICompressedFile extends IChecksummedFile
 {
     ChannelProxy channel();
     CompressionMetadata getMetadata();
     MmappedRegions regions();
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
index c2a2374..2504ecd 100644
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@ -21,6 +21,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.File;
 import java.io.IOException;
+import java.util.function.Supplier;
 
 import com.google.common.util.concurrent.RateLimiter;
 
@@ -48,7 +49,7 @@ import static org.apache.cassandra.utils.Throwables.maybeFail;
  * would need to be longer than 2GB, that segment will not be mmap'd, and a new RandomAccessFile will be created for
  * each access to that segment.
  */
-public abstract class SegmentedFile extends SharedCloseableImpl
+public abstract class SegmentedFile extends SharedCloseableImpl implements IChecksummedFile
 {
     public final ChannelProxy channel;
     public final int bufferSize;
@@ -57,6 +58,8 @@ public abstract class SegmentedFile extends SharedCloseableImpl
     // This differs from length for compressed files (but we still need length for
     // SegmentIterator because offsets in the file are relative to the uncompressed size)
     public final long onDiskLength;
+    private Supplier<Double> crcCheckChanceSupplier = () -> 1.0;
+
 
     /**
      * Use getBuilder to get a Builder to construct a SegmentedFile.
@@ -134,6 +137,16 @@ public abstract class SegmentedFile extends SharedCloseableImpl
         return reader;
     }
 
+    public Supplier<Double> getCrcCheckChanceSupplier()
+    {
+        return crcCheckChanceSupplier;
+    }
+
+    public void setCrcCheckChanceSupplier(Supplier<Double> crcCheckChanceSupplier)
+    {
+        this.crcCheckChanceSupplier = crcCheckChanceSupplier;
+    }
+
     public void dropPageCache(long before)
     {
         CLibrary.trySkipCache(channel.getFileDescriptor(), 0, before, path());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/schema/CompressionParams.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/CompressionParams.java b/src/java/org/apache/cassandra/schema/CompressionParams.java
index a73fcd1..443b6f1 100644
--- a/src/java/org/apache/cassandra/schema/CompressionParams.java
+++ b/src/java/org/apache/cassandra/schema/CompressionParams.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
@@ -38,6 +39,8 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.compress.*;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.utils.NoSpamLogger;
 
 import static java.lang.String.format;
 
@@ -48,9 +51,9 @@ public final class CompressionParams
 
     private static volatile boolean hasLoggedSsTableCompressionWarning;
     private static volatile boolean hasLoggedChunkLengthWarning;
+    private static volatile boolean hasLoggedCrcCheckChanceWarning;
 
     public static final int DEFAULT_CHUNK_LENGTH = 65536;
-    public static final double DEFAULT_CRC_CHECK_CHANCE = 1.0;
     public static final IVersionedSerializer<CompressionParams> serializer = new Serializer();
 
     public static final String CLASS = "class";
@@ -61,18 +64,16 @@ public final class CompressionParams
                                                                           DEFAULT_CHUNK_LENGTH,
                                                                           Collections.emptyMap());
 
+    private static final String CRC_CHECK_CHANCE_WARNING = "The option crc_check_chance was deprecated as a compression option. " +
+                                                           "You should specify it as a top-level table option instead";
+
     @Deprecated public static final String SSTABLE_COMPRESSION = "sstable_compression";
     @Deprecated public static final String CHUNK_LENGTH_KB = "chunk_length_kb";
-
-    public static final String CRC_CHECK_CHANCE = "crc_check_chance";
-
-    public static final Set<String> GLOBAL_OPTIONS = ImmutableSet.of(CRC_CHECK_CHANCE);
+    @Deprecated public static final String CRC_CHECK_CHANCE = "crc_check_chance";
 
     private final ICompressor sstableCompressor;
     private final Integer chunkLength;
-    private volatile double crcCheckChance;
-    private final ImmutableMap<String, String> otherOptions; // Unrecognized options, can be use by the compressor
-    private CFMetaData liveMetadata;
+    private final ImmutableMap<String, String> otherOptions; // Unrecognized options, can be used by the compressor
 
     public static CompressionParams fromMap(Map<String, String> opts)
     {
@@ -153,8 +154,6 @@ public final class CompressionParams
         this.sstableCompressor = sstableCompressor;
         this.chunkLength = chunkLength;
         this.otherOptions = ImmutableMap.copyOf(otherOptions);
-        String chance = otherOptions.get(CRC_CHECK_CHANCE);
-        this.crcCheckChance = (chance == null) ? DEFAULT_CRC_CHECK_CHANCE : parseCrcCheckChance(chance);
     }
 
     public CompressionParams copy()
@@ -162,23 +161,6 @@ public final class CompressionParams
         return new CompressionParams(sstableCompressor, chunkLength, otherOptions);
     }
 
-    public void setLiveMetadata(final CFMetaData liveMetadata)
-    {
-        if (liveMetadata == null)
-            return;
-
-        this.liveMetadata = liveMetadata;
-    }
-
-    public void setCrcCheckChance(double crcCheckChance) throws ConfigurationException
-    {
-        validateCrcCheckChance(crcCheckChance);
-        this.crcCheckChance = crcCheckChance;
-
-        if (liveMetadata != null && this != liveMetadata.params.compression)
-            liveMetadata.params.compression.setCrcCheckChance(crcCheckChance);
-    }
-
     /**
      * Checks if compression is enabled.
      * @return {@code true} if compression is enabled, {@code false} otherwise.
@@ -202,31 +184,6 @@ public final class CompressionParams
         return otherOptions;
     }
 
-    public double getCrcCheckChance()
-    {
-        return liveMetadata == null ? this.crcCheckChance : liveMetadata.params.compression.crcCheckChance;
-    }
-
-    private static double parseCrcCheckChance(String crcCheckChance) throws ConfigurationException
-    {
-        try
-        {
-            double chance = Double.parseDouble(crcCheckChance);
-            validateCrcCheckChance(chance);
-            return chance;
-        }
-        catch (NumberFormatException e)
-        {
-            throw new ConfigurationException("crc_check_chance should be a double");
-        }
-    }
-
-    private static void validateCrcCheckChance(double crcCheckChance) throws ConfigurationException
-    {
-        if (crcCheckChance < 0.0d || crcCheckChance > 1.0d)
-            throw new ConfigurationException("crc_check_chance should be between 0.0 and 1.0");
-    }
-
     public int chunkLength()
     {
         return chunkLength == null ? DEFAULT_CHUNK_LENGTH : chunkLength;
@@ -257,14 +214,23 @@ public final class CompressionParams
             return null;
         }
 
+        if (compressionOptions.containsKey(CRC_CHECK_CHANCE))
+        {
+            if (!hasLoggedCrcCheckChanceWarning)
+            {
+                logger.warn(CRC_CHECK_CHANCE_WARNING);
+                hasLoggedCrcCheckChanceWarning = true;
+            }
+            compressionOptions.remove(CRC_CHECK_CHANCE);
+        }
+
         try
         {
             Method method = compressorClass.getMethod("create", Map.class);
             ICompressor compressor = (ICompressor)method.invoke(null, compressionOptions);
             // Check for unknown options
-            AbstractSet<String> supportedOpts = Sets.union(compressor.supportedOptions(), GLOBAL_OPTIONS);
             for (String provided : compressionOptions.keySet())
-                if (!supportedOpts.contains(provided))
+                if (!compressor.supportedOptions().contains(provided))
                     throw new ConfigurationException("Unknown compression options " + provided);
             return compressor;
         }
@@ -363,7 +329,7 @@ public final class CompressionParams
 
         if (options.containsKey(CHUNK_LENGTH_KB))
         {
-            if (options.containsKey(CHUNK_LENGTH_KB) && !hasLoggedChunkLengthWarning)
+            if (!hasLoggedChunkLengthWarning)
             {
                 hasLoggedChunkLengthWarning = true;
                 logger.warn(format("The %s option has been deprecated. You should use %s instead",
@@ -474,8 +440,6 @@ public final class CompressionParams
                 c >>= 1;
             }
         }
-
-        validateCrcCheckChance(crcCheckChance);
     }
 
     public Map<String, String> asMap()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
index 0d5a040..f23ec0b 100644
--- a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
+++ b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
@@ -361,7 +361,13 @@ public final class LegacySchemaMigrator
         if (row.has("speculative_retry"))
             params.speculativeRetry(SpeculativeRetryParam.fromString(row.getString("speculative_retry")));
 
-        params.compression(CompressionParams.fromMap(fromJsonMap(row.getString("compression_parameters"))));
+        Map<String, String> compressionParameters = fromJsonMap(row.getString("compression_parameters"));
+        String crcCheckChance = compressionParameters.remove("crc_check_chance");
+        //crc_check_chance was promoted from a compression property to a top-level property
+        if (crcCheckChance != null)
+            params.crcCheckChance(Double.parseDouble(crcCheckChance));
+
+        params.compression(CompressionParams.fromMap(compressionParameters));
 
         params.compaction(compactionFromRow(row));
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index 2376fad..fb97ca5 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -98,6 +98,7 @@ public final class SchemaKeyspace
                 + "comment text,"
                 + "compaction frozen<map<text, text>>,"
                 + "compression frozen<map<text, text>>,"
+                + "crc_check_chance double,"
                 + "dclocal_read_repair_chance double,"
                 + "default_time_to_live int,"
                 + "extensions frozen<map<text, blob>>,"
@@ -159,6 +160,7 @@ public final class SchemaKeyspace
                 + "comment text,"
                 + "compaction frozen<map<text, text>>,"
                 + "compression frozen<map<text, text>>,"
+                + "crc_check_chance double,"
                 + "dclocal_read_repair_chance double,"
                 + "default_time_to_live int,"
                 + "extensions frozen<map<text, blob>>,"
@@ -908,6 +910,7 @@ public final class SchemaKeyspace
              .add("min_index_interval", params.minIndexInterval)
              .add("read_repair_chance", params.readRepairChance)
              .add("speculative_retry", params.speculativeRetry.toString())
+             .add("crc_check_chance", params.crcCheckChance)
              .frozenMap("caching", params.caching.asMap())
              .frozenMap("compaction", params.compaction.asMap())
              .frozenMap("compression", params.compression.asMap())
@@ -1149,6 +1152,7 @@ public final class SchemaKeyspace
                .memtableFlushPeriodInMs(row.getInt("memtable_flush_period_in_ms"))
                .minIndexInterval(row.getInt("min_index_interval"))
                .readRepairChance(row.getDouble("read_repair_chance"))
+               .crcCheckChance(row.getDouble("crc_check_chance"))
                .speculativeRetry(SpeculativeRetryParam.fromString(row.getString("speculative_retry")));
 
         if (row.has("extensions"))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/schema/TableParams.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/TableParams.java b/src/java/org/apache/cassandra/schema/TableParams.java
index 64e2c36..7e44e73 100644
--- a/src/java/org/apache/cassandra/schema/TableParams.java
+++ b/src/java/org/apache/cassandra/schema/TableParams.java
@@ -46,7 +46,8 @@ public final class TableParams
         MEMTABLE_FLUSH_PERIOD_IN_MS,
         MIN_INDEX_INTERVAL,
         READ_REPAIR_CHANCE,
-        SPECULATIVE_RETRY;
+        SPECULATIVE_RETRY,
+        CRC_CHECK_CHANCE;
 
         @Override
         public String toString()
@@ -63,11 +64,13 @@ public final class TableParams
     public static final int DEFAULT_MEMTABLE_FLUSH_PERIOD_IN_MS = 0;
     public static final int DEFAULT_MIN_INDEX_INTERVAL = 128;
     public static final int DEFAULT_MAX_INDEX_INTERVAL = 2048;
+    public static final double DEFAULT_CRC_CHECK_CHANCE = 1.0;
 
     public final String comment;
     public final double readRepairChance;
     public final double dcLocalReadRepairChance;
     public final double bloomFilterFpChance;
+    public final double crcCheckChance;
     public final int gcGraceSeconds;
     public final int defaultTimeToLive;
     public final int memtableFlushPeriodInMs;
@@ -87,6 +90,7 @@ public final class TableParams
         bloomFilterFpChance = builder.bloomFilterFpChance == null
                             ? builder.compaction.defaultBloomFilterFbChance()
                             : builder.bloomFilterFpChance;
+        crcCheckChance = builder.crcCheckChance;
         gcGraceSeconds = builder.gcGraceSeconds;
         defaultTimeToLive = builder.defaultTimeToLive;
         memtableFlushPeriodInMs = builder.memtableFlushPeriodInMs;
@@ -112,6 +116,7 @@ public final class TableParams
                             .compaction(params.compaction)
                             .compression(params.compression)
                             .dcLocalReadRepairChance(params.dcLocalReadRepairChance)
+                            .crcCheckChance(params.crcCheckChance)
                             .defaultTimeToLive(params.defaultTimeToLive)
                             .gcGraceSeconds(params.gcGraceSeconds)
                             .maxIndexInterval(params.maxIndexInterval)
@@ -148,6 +153,13 @@ public final class TableParams
                  readRepairChance);
         }
 
+        if (crcCheckChance < 0 || crcCheckChance > 1.0)
+        {
+            fail("%s must be larger than or equal to 0 and smaller than or equal to 1.0 (got %s)",
+                 Option.CRC_CHECK_CHANCE,
+                 crcCheckChance);
+        }
+
         if (defaultTimeToLive < 0)
             fail("%s must be greater than or equal to 0 (got %s)", Option.DEFAULT_TIME_TO_LIVE, defaultTimeToLive);
 
@@ -190,6 +202,7 @@ public final class TableParams
             && readRepairChance == p.readRepairChance
             && dcLocalReadRepairChance == p.dcLocalReadRepairChance
             && bloomFilterFpChance == p.bloomFilterFpChance
+            && crcCheckChance == p.crcCheckChance
             && gcGraceSeconds == p.gcGraceSeconds
             && defaultTimeToLive == p.defaultTimeToLive
             && memtableFlushPeriodInMs == p.memtableFlushPeriodInMs
@@ -209,6 +222,7 @@ public final class TableParams
                                 readRepairChance,
                                 dcLocalReadRepairChance,
                                 bloomFilterFpChance,
+                                crcCheckChance,
                                 gcGraceSeconds,
                                 defaultTimeToLive,
                                 memtableFlushPeriodInMs,
@@ -229,6 +243,7 @@ public final class TableParams
                           .add(Option.READ_REPAIR_CHANCE.toString(), readRepairChance)
                           .add(Option.DCLOCAL_READ_REPAIR_CHANCE.toString(), dcLocalReadRepairChance)
                           .add(Option.BLOOM_FILTER_FP_CHANCE.toString(), bloomFilterFpChance)
+                          .add(Option.CRC_CHECK_CHANCE.toString(), crcCheckChance)
                           .add(Option.GC_GRACE_SECONDS.toString(), gcGraceSeconds)
                           .add(Option.DEFAULT_TIME_TO_LIVE.toString(), defaultTimeToLive)
                           .add(Option.MEMTABLE_FLUSH_PERIOD_IN_MS.toString(), memtableFlushPeriodInMs)
@@ -248,6 +263,7 @@ public final class TableParams
         private double readRepairChance = DEFAULT_READ_REPAIR_CHANCE;
         private double dcLocalReadRepairChance = DEFAULT_DCLOCAL_READ_REPAIR_CHANCE;
         private Double bloomFilterFpChance;
+        public Double crcCheckChance = DEFAULT_CRC_CHECK_CHANCE;
         private int gcGraceSeconds = DEFAULT_GC_GRACE_SECONDS;
         private int defaultTimeToLive = DEFAULT_DEFAULT_TIME_TO_LIVE;
         private int memtableFlushPeriodInMs = DEFAULT_MEMTABLE_FLUSH_PERIOD_IN_MS;
@@ -292,6 +308,12 @@ public final class TableParams
             return this;
         }
 
+        public Builder crcCheckChance(double val)
+        {
+            crcCheckChance = val;
+            return this;
+        }
+
         public Builder gcGraceSeconds(int val)
         {
             gcGraceSeconds = val;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 0a118b2..ccd0ac5 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Supplier;
 import java.util.zip.Checksum;
 
 import com.google.common.collect.Iterators;
@@ -41,6 +42,7 @@ public class CompressedInputStream extends InputStream
     private final CompressionInfo info;
     // chunk buffer
     private final BlockingQueue<byte[]> dataBuffer;
+    private final Supplier<Double> crcCheckChanceSupplier;
 
     // uncompressed bytes
     private byte[] buffer;
@@ -65,13 +67,14 @@ public class CompressedInputStream extends InputStream
      * @param source Input source to read compressed data from
      * @param info Compression info
      */
-    public CompressedInputStream(InputStream source, CompressionInfo info, ChecksumType checksumType)
+    public CompressedInputStream(InputStream source, CompressionInfo info, ChecksumType checksumType, Supplier<Double> crcCheckChanceSupplier)
     {
         this.info = info;
         this.checksum =  checksumType.newInstance();
         this.buffer = new byte[info.parameters.chunkLength()];
         // buffer is limited to store up to 1024 chunks
         this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
+        this.crcCheckChanceSupplier = crcCheckChanceSupplier;
 
         new Thread(new Reader(source, info, dataBuffer)).start();
     }
@@ -111,7 +114,7 @@ public class CompressedInputStream extends InputStream
         totalCompressedBytesRead += compressed.length;
 
         // validate crc randomly
-        if (info.parameters.getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
+        if (this.crcCheckChanceSupplier.get() > ThreadLocalRandom.current().nextDouble())
         {
             checksum.update(compressed, 0, compressed.length - checksumBytes.length);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index f702e24..69c7b87 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -77,7 +77,8 @@ public class CompressedStreamReader extends StreamReader
 
         SSTableMultiWriter writer = createWriter(cfs, totalSize, repairedAt, format);
 
-        CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, inputVersion.compressedChecksumType());
+        CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo,
+                                                              inputVersion.compressedChecksumType(), cfs::getCrcCheckChance);
         BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
         StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion, header.toHeader(cfs.metadata));
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/utils/DefaultInteger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/DefaultInteger.java b/src/java/org/apache/cassandra/utils/DefaultInteger.java
deleted file mode 100644
index 2a3efc7..0000000
--- a/src/java/org/apache/cassandra/utils/DefaultInteger.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.utils;
-
-
-public class DefaultInteger
-{
-    private final int originalValue;
-    private int currentValue;
-
-    public DefaultInteger(int value)
-    {
-        originalValue = value;
-        currentValue = value;
-    }
-
-    public int value()
-    {
-        return currentValue;
-    }
-
-    public void set(int i)
-    {
-        currentValue = i;
-    }
-
-    public void reset()
-    {
-        currentValue = originalValue;
-    }
-
-    public boolean isModified()
-    {
-        return originalValue != currentValue;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/src/java/org/apache/cassandra/utils/DefaultValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/DefaultValue.java b/src/java/org/apache/cassandra/utils/DefaultValue.java
new file mode 100644
index 0000000..5697ede
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/DefaultValue.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+
+public class DefaultValue<T>
+{
+    private final T originalValue;
+    private T currentValue;
+
+    public DefaultValue(T value)
+    {
+        originalValue = value;
+        currentValue = value;
+    }
+
+    public T value()
+    {
+        return currentValue;
+    }
+
+    public void set(T i)
+    {
+        currentValue = i;
+    }
+
+    public void reset()
+    {
+        currentValue = originalValue;
+    }
+
+    public boolean isModified()
+    {
+        return originalValue != currentValue;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java
index e0879d2..3a68e4a 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java
@@ -35,11 +35,28 @@ import org.apache.cassandra.utils.FBUtilities;
 
 public class CrcCheckChanceTest extends CQLTester
 {
+
+
     @Test
-    public void testChangingCrcCheckChance() throws Throwable
+    public void testChangingCrcCheckChanceNewFormat() throws Throwable
+    {
+        testChangingCrcCheckChance(true);
+    }
+
+    @Test
+    public void testChangingCrcCheckChanceOldFormat() throws Throwable
+    {
+        testChangingCrcCheckChance(false);
+    }
+
+
+    public void testChangingCrcCheckChance(boolean newFormat) throws Throwable
     {
         //Start with crc_check_chance of 99%
-        createTable("CREATE TABLE %s (p text, c text, v text, s text static, PRIMARY KEY (p, c)) WITH compression = {'sstable_compression': 'LZ4Compressor', 'crc_check_chance' : 0.99}");
+        if (newFormat)
+            createTable("CREATE TABLE %s (p text, c text, v text, s text static, PRIMARY KEY (p, c)) WITH compression = {'sstable_compression': 'LZ4Compressor'} AND crc_check_chance = 0.99;");
+        else
+            createTable("CREATE TABLE %s (p text, c text, v text, s text static, PRIMARY KEY (p, c)) WITH compression = {'sstable_compression': 'LZ4Compressor', 'crc_check_chance' : 0.99}");
 
         execute("CREATE INDEX foo ON %s(v)");
 
@@ -47,37 +64,37 @@ public class CrcCheckChanceTest extends CQLTester
         execute("INSERT INTO %s(p, c, v) values (?, ?, ?)", "p1", "k2", "v2");
         execute("INSERT INTO %s(p, s) values (?, ?)", "p2", "sv2");
 
-
         ColumnFamilyStore cfs = Keyspace.open(CQLTester.KEYSPACE).getColumnFamilyStore(currentTable());
         ColumnFamilyStore indexCfs = cfs.indexManager.getAllIndexColumnFamilyStores().iterator().next();
         cfs.forceBlockingFlush();
 
-        Assert.assertEquals(0.99, cfs.metadata.params.compression.getCrcCheckChance());
-        Assert.assertEquals(0.99, cfs.getLiveSSTables().iterator().next().getCompressionMetadata().parameters.getCrcCheckChance());
-        Assert.assertEquals(0.99, indexCfs.metadata.params.compression.getCrcCheckChance());
-        Assert.assertEquals(0.99, indexCfs.getLiveSSTables().iterator().next().getCompressionMetadata().parameters.getCrcCheckChance());
+        Assert.assertEquals(0.99, cfs.getCrcCheckChance());
+        Assert.assertEquals(0.99, cfs.getLiveSSTables().iterator().next().getCrcCheckChance());
+        Assert.assertEquals(0.99, indexCfs.getCrcCheckChance());
+        Assert.assertEquals(0.99, indexCfs.getLiveSSTables().iterator().next().getCrcCheckChance());
 
         //Test for stack overflow
-        cfs.setCrcCheckChance(0.99);
+        if (newFormat)
+            alterTable("ALTER TABLE %s WITH crc_check_chance = 0.99");
+        else
+            alterTable("ALTER TABLE %s WITH compression = {'sstable_compression': 'LZ4Compressor', 'crc_check_chance': 0.99}");
 
         assertRows(execute("SELECT * FROM %s WHERE p=?", "p1"),
-                row("p1", "k1", "sv1", "v1"),
-                row("p1", "k2", "sv1", "v2")
+                   row("p1", "k1", "sv1", "v1"),
+                   row("p1", "k2", "sv1", "v2")
         );
 
         assertRows(execute("SELECT * FROM %s WHERE v=?", "v1"),
-                row("p1", "k1", "sv1", "v1")
+                   row("p1", "k1", "sv1", "v1")
         );
 
         //Write a few SSTables then Compact
-
         execute("INSERT INTO %s(p, c, v, s) values (?, ?, ?, ?)", "p1", "k1", "v1", "sv1");
         execute("INSERT INTO %s(p, c, v) values (?, ?, ?)", "p1", "k2", "v2");
         execute("INSERT INTO %s(p, s) values (?, ?)", "p2", "sv2");
 
         cfs.forceBlockingFlush();
 
-
         execute("INSERT INTO %s(p, c, v, s) values (?, ?, ?, ?)", "p1", "k1", "v1", "sv1");
         execute("INSERT INTO %s(p, c, v) values (?, ?, ?)", "p1", "k2", "v2");
         execute("INSERT INTO %s(p, s) values (?, ?)", "p2", "sv2");
@@ -89,34 +106,45 @@ public class CrcCheckChanceTest extends CQLTester
         execute("INSERT INTO %s(p, s) values (?, ?)", "p2", "sv2");
 
         cfs.forceBlockingFlush();
-
         cfs.forceMajorCompaction();
 
-        //Verify when we alter the value the live sstable readers hold the new one
-        alterTable("ALTER TABLE %s WITH compression = {'sstable_compression': 'LZ4Compressor', 'crc_check_chance': 0.01}");
+        //Now let's change via JMX
+        cfs.setCrcCheckChance(0.01);
 
-        Assert.assertEquals( 0.01, cfs.metadata.params.compression.getCrcCheckChance());
-        Assert.assertEquals( 0.01, cfs.getLiveSSTables().iterator().next().getCompressionMetadata().parameters.getCrcCheckChance());
-        Assert.assertEquals( 0.01, indexCfs.metadata.params.compression.getCrcCheckChance());
-        Assert.assertEquals( 0.01, indexCfs.getLiveSSTables().iterator().next().getCompressionMetadata().parameters.getCrcCheckChance());
+        Assert.assertEquals(0.01, cfs.getCrcCheckChance());
+        Assert.assertEquals(0.01, cfs.getLiveSSTables().iterator().next().getCrcCheckChance());
+        Assert.assertEquals(0.01, indexCfs.getCrcCheckChance());
+        Assert.assertEquals(0.01, indexCfs.getLiveSSTables().iterator().next().getCrcCheckChance());
 
         assertRows(execute("SELECT * FROM %s WHERE p=?", "p1"),
-                row("p1", "k1", "sv1", "v1"),
-                row("p1", "k2", "sv1", "v2")
+                   row("p1", "k1", "sv1", "v1"),
+                   row("p1", "k2", "sv1", "v2")
         );
 
         assertRows(execute("SELECT * FROM %s WHERE v=?", "v1"),
-                row("p1", "k1", "sv1", "v1")
+                   row("p1", "k1", "sv1", "v1")
         );
 
+        //Alter again via schema
+        if (newFormat)
+            alterTable("ALTER TABLE %s WITH crc_check_chance = 0.5");
+        else
+            alterTable("ALTER TABLE %s WITH compression = {'sstable_compression': 'LZ4Compressor', 'crc_check_chance': 0.5}");
+
+        //We should be able to get the new value by accessing directly the schema metadata
+        Assert.assertEquals(0.5, cfs.metadata.params.crcCheckChance);
+
+        //but previous JMX-set value will persist until next restart
+        Assert.assertEquals(0.01, cfs.getLiveSSTables().iterator().next().getCrcCheckChance());
+        Assert.assertEquals(0.01, indexCfs.getCrcCheckChance());
+        Assert.assertEquals(0.01, indexCfs.getLiveSSTables().iterator().next().getCrcCheckChance());
 
         //Verify the call used by JMX still works
         cfs.setCrcCheckChance(0.03);
-        Assert.assertEquals( 0.03, cfs.metadata.params.compression.getCrcCheckChance());
-        Assert.assertEquals( 0.03, cfs.getLiveSSTables().iterator().next().getCompressionMetadata().parameters.getCrcCheckChance());
-        Assert.assertEquals( 0.03, indexCfs.metadata.params.compression.getCrcCheckChance());
-        Assert.assertEquals( 0.03, indexCfs.getLiveSSTables().iterator().next().getCompressionMetadata().parameters.getCrcCheckChance());
-
+        Assert.assertEquals(0.03, cfs.getCrcCheckChance());
+        Assert.assertEquals(0.03, cfs.getLiveSSTables().iterator().next().getCrcCheckChance());
+        Assert.assertEquals(0.03, indexCfs.getCrcCheckChance());
+        Assert.assertEquals(0.03, indexCfs.getLiveSSTables().iterator().next().getCrcCheckChance());
     }
 
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e25453ba/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
index 30670fb..db05a3e 100644
--- a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
@@ -108,7 +108,8 @@ public class CompressedInputStreamTest
 
         // read buffer using CompressedInputStream
         CompressionInfo info = new CompressionInfo(chunks, param);
-        CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, ChecksumType.CRC32);
+        CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info,
+                                                                ChecksumType.CRC32, () -> 1.0);
         DataInputStream in = new DataInputStream(input);
 
         for (int i = 0; i < sections.size(); i++)