You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/07/12 17:00:46 UTC

cassandra git commit: Change CREATE/ALTER TABLE syntax for compression

Repository: cassandra
Updated Branches:
  refs/heads/trunk 418c7936f -> 056115fff


Change CREATE/ALTER TABLE syntax for compression

patch by Benjamin Lerer; reviewed by Aleksey Yeschenko for
CASSANDRA-8384


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/056115ff
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/056115ff
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/056115ff

Branch: refs/heads/trunk
Commit: 056115fff93b16071f3fda067329c64dd2fc1f05
Parents: 418c793
Author: blerer <be...@datastax.com>
Authored: Thu Jun 25 14:21:48 2015 +0200
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Jul 12 17:58:49 2015 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |   7 +-
 NEWS.txt                                        |   4 +
 doc/cql3/CQL.textile                            |   9 +-
 .../org/apache/cassandra/config/CFMetaData.java |   4 +-
 .../cassandra/cql3/statements/CFPropDefs.java   |  27 +-
 .../cql3/statements/CreateTableStatement.java   |   2 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  |   5 +-
 .../apache/cassandra/hadoop/ConfigHelper.java   |  12 -
 .../io/compress/CompressedSequentialWriter.java |   4 +-
 .../io/compress/CompressionMetadata.java        |   8 +-
 .../io/compress/CompressionParameters.java      | 254 ++++++++++++++++---
 .../io/sstable/format/SSTableWriter.java        |   2 +-
 .../cassandra/schema/LegacySchemaMigrator.java  |   2 +-
 .../apache/cassandra/schema/SchemaKeyspace.java |   4 +-
 .../compress/CompressedInputStream.java         |   2 +-
 .../cassandra/thrift/ThriftConversion.java      |  37 ++-
 .../unit/org/apache/cassandra/SchemaLoader.java |   8 +-
 .../apache/cassandra/config/CFMetaDataTest.java |   2 +-
 .../cql3/validation/operations/AlterTest.java   |  81 ++++++
 .../cql3/validation/operations/CreateTest.java  | 113 ++++++++-
 .../org/apache/cassandra/db/VerifyTest.java     |   3 +-
 .../CompressedRandomAccessReaderTest.java       |   6 +-
 .../CompressedSequentialWriterTest.java         |  14 +-
 .../schema/LegacySchemaMigratorTest.java        |   3 +-
 .../cassandra/schema/SchemaKeyspaceTest.java    |   3 +-
 .../compression/CompressedInputStreamTest.java  |   2 +-
 26 files changed, 506 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/056115ff/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b26efd5..b978c59 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0
+ * Change CREATE/ALTER TABLE syntax for compression (CASSANDRA-8384)
  * Cleanup crc and adler code for java 8 (CASSANDRA-9650)
  * Storage engine refactor (CASSANDRA-8099, 9743)
  * Update Guava to 18.0 (CASSANDRA-9653)
@@ -15,10 +16,14 @@
  * Populate TokenMetadata early during startup (CASSANDRA-9317)
  * undeprecate cache recentHitRate (CASSANDRA-6591)
  * Add support for selectively varint encoding fields (CASSANDRA-9499)
+
+
+2.2.0-rc3
+ * Revert CASSANDRA-9542 (allow native functions in UDA) (CASSANDRA-9771)
 Merged from 2.1:
  * Fix clientutil jar and tests (CASSANDRA-9760)
  * (cqlsh) Allow the SSL protocol version to be specified through the
-   config file or environment variables (CASSANDRA-9544)
+    config file or environment variables (CASSANDRA-9544)
 Merged from 2.0:
  * Can't transition from write survey to normal mode (CASSANDRA-9740)
  * Scrub (recover) sstables even when -Index.db is missing (CASSANDRA-9591)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056115ff/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index ce05b92..55b32d5 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -35,6 +35,10 @@ Upgrading
    - hinted_handoff_enabled in cassandra.yaml no longer supports a list of data centers.
      To specify a list of excluded data centers when hinted_handoff_enabled is set to true,
      use hinted_handoff_disabled_datacenters, see CASSANDRA-9035 for details.
+   - The `sstable_compression` and `chunk_length_kb` compression options have been deprecated.
+     The new options are `class` and `chunk_length_in_kb`. Disabling compression should now
+     be done by setting the new option `enabled` to `false`.
+
 
 2.2
 ===

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056115ff/doc/cql3/CQL.textile
----------------------------------------------------------------------
diff --git a/doc/cql3/CQL.textile b/doc/cql3/CQL.textile
index b49d37b..9750a4e 100644
--- a/doc/cql3/CQL.textile
+++ b/doc/cql3/CQL.textile
@@ -361,10 +361,11 @@ The @compaction@ property must at least define the @'class'@ sub-option, that de
 
 For the @compression@ property, the following default sub-options are available:
 
-|_. option              |_. default        |_. description |
-| @sstable_compression@ | LZ4Compressor    | The compression algorithm to use. Default compressor are: LZ4Compressor, SnappyCompressor and DeflateCompressor. Use an empty string (@''@) to disable compression. Custom compressor can be provided by specifying the full class name as a "string constant":#constants.|
-| @chunk_length_kb@     | 64KB             | On disk SSTables are compressed by block (to allow random reads). This defines the size (in KB) of said block. Bigger values may improve the compression rate, but increases the minimum size of data to be read from disk for a read |
-| @crc_check_chance@    | 1.0              | When compression is enabled, each compressed block includes a checksum of that block for the purpose of detecting disk bitrot and avoiding the propagation of corruption to other replica. This option defines the probability with which those checksums are checked during read. By default they are always checked. Set to 0 to disable checksum checking and to 0.5 for instance to check them every other read|
+|_. option                 |_. default        |_. description |
+| @class@                  | LZ4Compressor    | The compression algorithm to use. Default compressor are: LZ4Compressor, SnappyCompressor and DeflateCompressor. Use @'enabled' : false@ to disable compression. Custom compressor can be provided by specifying the full class name as a "string constant":#constants.|
+| @enabled@                | true             | By default compression is enabled. To disable it, set @enabled@ to @false@
+| @chunk_length_in_kb@     | 64KB             | On disk SSTables are compressed by block (to allow random reads). This defines the size (in KB) of said block. Bigger values may improve the compression rate, but increases the minimum size of data to be read from disk for a read |
+| @crc_check_chance@       | 1.0              | When compression is enabled, each compressed block includes a checksum of that block for the purpose of detecting disk bitrot and avoiding the propagation of corruption to other replica. This option defines the probability with which those checksums are checked during read. By default they are always checked. Set to 0 to disable checksum checking and to 0.5 for instance to check them every other read|
 
 
 h4. Other considerations:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056115ff/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 53d2171..fea2638 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -218,7 +218,7 @@ public final class CFMetaData
     public volatile Class<? extends AbstractCompactionStrategy> compactionStrategyClass = DEFAULT_COMPACTION_STRATEGY_CLASS;
     public volatile Map<String, String> compactionStrategyOptions = new HashMap<>();
 
-    public volatile CompressionParameters compressionParameters = new CompressionParameters(null);
+    public volatile CompressionParameters compressionParameters = CompressionParameters.noCompression();
 
     // attribute setters that return the modified CFMetaData instance
     public CFMetaData comment(String prop) {comment = Strings.nullToEmpty(prop); return this;}
@@ -1340,7 +1340,7 @@ public final class CFMetaData
             .append("columnMetadata", columnMetadata.values())
             .append("compactionStrategyClass", compactionStrategyClass)
             .append("compactionStrategyOptions", compactionStrategyOptions)
-            .append("compressionParameters", compressionParameters.asThriftOptions())
+            .append("compressionParameters", compressionParameters.asMap())
             .append("bloomFilterFpChance", getBloomFilterFpChance())
             .append("memtableFlushPeriod", memtableFlushPeriod)
             .append("caching", caching)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056115ff/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java b/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java
index 9ba2b61..56db85a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java
@@ -103,19 +103,14 @@ public class CFPropDefs extends PropertyDefinitions
         Map<String, String> compressionOptions = getCompressionOptions();
         if (!compressionOptions.isEmpty())
         {
-            String sstableCompressionClass = compressionOptions.get(CompressionParameters.SSTABLE_COMPRESSION);
-            if (sstableCompressionClass == null)
-                throw new ConfigurationException("Missing sub-option '" + CompressionParameters.SSTABLE_COMPRESSION + "' for the '" + KW_COMPRESSION + "' option.");
-
-            Integer chunkLength = CompressionParameters.DEFAULT_CHUNK_LENGTH;
-            if (compressionOptions.containsKey(CompressionParameters.CHUNK_LENGTH_KB))
-                chunkLength = CompressionParameters.parseChunkLength(compressionOptions.get(CompressionParameters.CHUNK_LENGTH_KB));
-
-            Map<String, String> remainingOptions = new HashMap<>(compressionOptions);
-            remainingOptions.remove(CompressionParameters.SSTABLE_COMPRESSION);
-            remainingOptions.remove(CompressionParameters.CHUNK_LENGTH_KB);
-            CompressionParameters cp = new CompressionParameters(sstableCompressionClass, chunkLength, remainingOptions);
-            cp.validate();
+            if (CompressionParameters.isEnabled(compressionOptions)
+                && !CompressionParameters.containsSstableCompressionClass(compressionOptions))
+            {
+                throw new ConfigurationException("Missing sub-option '" + CompressionParameters.CLASS + "' for the '" + KW_COMPRESSION + "' option.");
+            }
+
+            CompressionParameters compressionParameters = CompressionParameters.fromMap(compressionOptions);
+            compressionParameters.validate();
         }
 
         validateMinimumInt(KW_DEFAULT_TIME_TO_LIVE, 0, CFMetaData.DEFAULT_DEFAULT_TIME_TO_LIVE);
@@ -200,7 +195,11 @@ public class CFPropDefs extends PropertyDefinitions
         cfm.bloomFilterFpChance(getDouble(KW_BF_FP_CHANCE, cfm.getBloomFilterFpChance()));
 
         if (!getCompressionOptions().isEmpty())
-            cfm.compressionParameters(CompressionParameters.create(getCompressionOptions()));
+        {
+            CompressionParameters compressionParameters = CompressionParameters.fromMap(getCompressionOptions());
+            compressionParameters.validate();
+            cfm.compressionParameters(compressionParameters);
+        }
         CachingOptions cachingOptions = getCachingOptions();
         if (cachingOptions != null)
             cfm.caching(cachingOptions);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056115ff/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
index 8602af1..8cd6bc8 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
@@ -76,7 +76,7 @@ public class CreateTableStatement extends SchemaAlteringStatement
             this.properties.addProperty(CFPropDefs.KW_COMPRESSION,
                                         new HashMap<String, String>()
                                         {{
-                                            put(CompressionParameters.SSTABLE_COMPRESSION, CFMetaData.DEFAULT_COMPRESSOR);
+                                            put(CompressionParameters.CLASS, CFMetaData.DEFAULT_COMPRESSOR);
                                         }});
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056115ff/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index c2eb657..845a4fc 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -273,14 +273,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public Map<String,String> getCompressionParameters()
     {
-        return metadata.compressionParameters().asThriftOptions();
+        return metadata.compressionParameters().asMap();
     }
 
     public void setCompressionParameters(Map<String,String> opts)
     {
         try
         {
-            metadata.compressionParameters = CompressionParameters.create(opts);
+            metadata.compressionParameters = CompressionParameters.fromMap(opts);
+            metadata.compressionParameters.validate();
         }
         catch (ConfigurationException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056115ff/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
index e81860d..b5c66df 100644
--- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
@@ -483,18 +483,6 @@ public class ConfigHelper
         return conf.getInt(THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB, 15) * 1024 * 1024; // 15MB is default in Cassandra
     }
 
-    public static CompressionParameters getOutputCompressionParamaters(Configuration conf)
-    {
-        if (getOutputCompressionClass(conf) == null)
-            return new CompressionParameters(null);
-
-        Map<String, String> options = new HashMap<String, String>(2);
-        options.put(CompressionParameters.SSTABLE_COMPRESSION, getOutputCompressionClass(conf));
-        options.put(CompressionParameters.CHUNK_LENGTH_KB, getOutputCompressionChunkLength(conf));
-
-        return CompressionParameters.create(options);
-    }
-
     public static boolean getOutputLocalDCOnly(Configuration conf)
     {
         return Boolean.parseBoolean(conf.get(OUTPUT_LOCAL_DC_ONLY, "false"));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056115ff/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index 708ec75..0717121 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -63,8 +63,8 @@ public class CompressedSequentialWriter extends SequentialWriter
                                       CompressionParameters parameters,
                                       MetadataCollector sstableMetadataCollector)
     {
-        super(file, parameters.chunkLength(), parameters.sstableCompressor.preferredBufferType());
-        this.compressor = parameters.sstableCompressor;
+        super(file, parameters.chunkLength(), parameters.getSstableCompressor().preferredBufferType());
+        this.compressor = parameters.getSstableCompressor();
 
         // buffer for compression should be the same size as buffer itself
         compressed = compressor.preferredBufferType().allocate(compressor.initialCompressedBufferLength(buffer.capacity()));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056115ff/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 070be9f..2975a3d 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -141,7 +141,7 @@ public class CompressionMetadata
 
     public ICompressor compressor()
     {
-        return parameters.sstableCompressor;
+        return parameters.getSstableCompressor();
     }
 
     public int chunkLength()
@@ -304,9 +304,9 @@ public class CompressionMetadata
         {
             try
             {
-                out.writeUTF(parameters.sstableCompressor.getClass().getSimpleName());
-                out.writeInt(parameters.otherOptions.size());
-                for (Map.Entry<String, String> entry : parameters.otherOptions.entrySet())
+                out.writeUTF(parameters.getSstableCompressor().getClass().getSimpleName());
+                out.writeInt(parameters.getOtherOptions().size());
+                for (Map.Entry<String, String> entry : parameters.getOtherOptions().entrySet())
                 {
                     out.writeUTF(entry.getKey());
                     out.writeUTF(entry.getValue());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056115ff/src/java/org/apache/cassandra/io/compress/CompressionParameters.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionParameters.java b/src/java/org/apache/cassandra/io/compress/CompressionParameters.java
index 264d523..c828e27 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionParameters.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionParameters.java
@@ -26,11 +26,15 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.db.TypeSizes;
@@ -41,60 +45,111 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 
 public class CompressionParameters
 {
+    private final static Logger LOGGER = LoggerFactory.getLogger(CompressionParameters.class);
+
+    private volatile static boolean hasLoggedSsTableCompressionWarning;
+    private volatile static boolean hasLoggedChunkLengthWarning;
+
     public final static int DEFAULT_CHUNK_LENGTH = 65536;
     public final static double DEFAULT_CRC_CHECK_CHANCE = 1.0;
     public final static IVersionedSerializer<CompressionParameters> serializer = new Serializer();
 
+    public static final String CLASS = "class";
+    public static final String CHUNK_LENGTH_IN_KB = "chunk_length_in_kb";
+    public static final String ENABLED = "enabled";
+    @Deprecated
     public static final String SSTABLE_COMPRESSION = "sstable_compression";
+    @Deprecated
     public static final String CHUNK_LENGTH_KB = "chunk_length_kb";
     public static final String CRC_CHECK_CHANCE = "crc_check_chance";
 
     public static final Set<String> GLOBAL_OPTIONS = ImmutableSet.of(CRC_CHECK_CHANCE);
 
-    public final ICompressor sstableCompressor;
+    private final ICompressor sstableCompressor;
     private final Integer chunkLength;
     private volatile double crcCheckChance;
-    public final Map<String, String> otherOptions; // Unrecognized options, can be use by the compressor
+    private final ImmutableMap<String, String> otherOptions; // Unrecognized options, can be use by the compressor
     private CFMetaData liveMetadata;
 
-    public static CompressionParameters create(Map<? extends CharSequence, ? extends CharSequence> opts) throws ConfigurationException
+    public static CompressionParameters fromMap(Map<? extends CharSequence, ? extends CharSequence> opts)
     {
         Map<String, String> options = copyOptions(opts);
-        String sstableCompressionClass = options.get(SSTABLE_COMPRESSION);
-        String chunkLength = options.get(CHUNK_LENGTH_KB);
-        options.remove(SSTABLE_COMPRESSION);
-        options.remove(CHUNK_LENGTH_KB);
-        CompressionParameters cp = new CompressionParameters(sstableCompressionClass, parseChunkLength(chunkLength), options);
+
+        String sstableCompressionClass;
+
+        if (!removeEnabled(options))
+        {
+            sstableCompressionClass = null;
+
+            if (!options.isEmpty())
+                throw new ConfigurationException("If the '" + ENABLED + "' option is set to false"
+                                                  + " no other options must be specified");
+        }
+        else
+        {
+            sstableCompressionClass= removeSstableCompressionClass(options);
+        }
+
+        Integer chunkLength = removeChunkLength(options);
+
+        CompressionParameters cp = new CompressionParameters(sstableCompressionClass, chunkLength, options);
         cp.validate();
+
         return cp;
     }
 
-    public CompressionParameters(String sstableCompressorClass, Integer chunkLength, Map<String, String> otherOptions) throws ConfigurationException
+    public static CompressionParameters noCompression()
     {
-        this(createCompressor(parseCompressorClass(sstableCompressorClass), otherOptions), chunkLength, otherOptions);
+        return new CompressionParameters((ICompressor) null, DEFAULT_CHUNK_LENGTH, Collections.emptyMap());
     }
 
-    public CompressionParameters(ICompressor sstableCompressor)
+    public static CompressionParameters snappy()
     {
-        // can't try/catch as first statement in the constructor, thus repeating constructor code here.
-        this.sstableCompressor = sstableCompressor;
-        chunkLength = null;
-        otherOptions = Collections.emptyMap();
-        crcCheckChance = DEFAULT_CRC_CHECK_CHANCE;
+        return snappy(null);
+    }
+
+    public static CompressionParameters snappy(Integer chunkLength)
+    {
+        return new CompressionParameters(SnappyCompressor.instance, chunkLength, Collections.emptyMap());
+    }
+
+    public static CompressionParameters deflate()
+    {
+        return deflate(null);
+    }
+
+    public static CompressionParameters deflate(Integer chunkLength)
+    {
+        return new CompressionParameters(DeflateCompressor.instance, chunkLength, Collections.emptyMap());
     }
 
-    public CompressionParameters(ICompressor sstableCompressor, Integer chunkLength, Map<String, String> otherOptions) throws ConfigurationException
+    public static CompressionParameters lz4()
+    {
+        return lz4(null);
+    }
+
+    public static CompressionParameters lz4(Integer chunkLength)
+    {
+        return new CompressionParameters(LZ4Compressor.instance, chunkLength, Collections.emptyMap());
+    }
+
+    CompressionParameters(String sstableCompressorClass, Integer chunkLength, Map<String, String> otherOptions) throws ConfigurationException
+    {
+        this(createCompressor(parseCompressorClass(sstableCompressorClass), otherOptions), chunkLength, otherOptions);
+    }
+
+    private CompressionParameters(ICompressor sstableCompressor, Integer chunkLength, Map<String, String> otherOptions) throws ConfigurationException
     {
         this.sstableCompressor = sstableCompressor;
         this.chunkLength = chunkLength;
-        this.otherOptions = otherOptions;
+        this.otherOptions = ImmutableMap.copyOf(otherOptions);
         String chance = otherOptions.get(CRC_CHECK_CHANCE);
         this.crcCheckChance = (chance == null) ? DEFAULT_CRC_CHECK_CHANCE : parseCrcCheckChance(chance);
     }
 
     public CompressionParameters copy()
     {
-        return new CompressionParameters(sstableCompressor, chunkLength, new HashMap<>(otherOptions));
+        return new CompressionParameters(sstableCompressor, chunkLength, otherOptions);
     }
 
     public void setLiveMetadata(final CFMetaData liveMetadata)
@@ -114,6 +169,29 @@ public class CompressionParameters
             liveMetadata.compressionParameters.setCrcCheckChance(crcCheckChance);
     }
 
+    /**
+     * Checks if compression is enabled.
+     * @return <code>true</code> if compression is enabled, <code>false</code> otherwise.
+     */
+    public boolean isEnabled()
+    {
+        return sstableCompressor != null;
+    }
+
+    /**
+     * Returns the SSTable compressor.
+     * @return the SSTable compressor or <code>null</code> if compression is disabled.
+     */
+    public ICompressor getSstableCompressor()
+    {
+        return sstableCompressor;
+    }
+
+    public ImmutableMap<String, String> getOtherOptions()
+    {
+        return otherOptions;
+    }
+
     public double getCrcCheckChance()
     {
         return liveMetadata == null ? this.crcCheckChance : liveMetadata.compressionParameters.crcCheckChance;
@@ -230,7 +308,7 @@ public class CompressionParameters
      * @return the chunk length in bytes
      * @throws ConfigurationException if the chunk size is too large
      */
-    public static Integer parseChunkLength(String chLengthKB) throws ConfigurationException
+    private static Integer parseChunkLength(String chLengthKB) throws ConfigurationException
     {
         if (chLengthKB == null)
             return null;
@@ -239,15 +317,124 @@ public class CompressionParameters
         {
             int parsed = Integer.parseInt(chLengthKB);
             if (parsed > Integer.MAX_VALUE / 1024)
-                throw new ConfigurationException("Value of " + CHUNK_LENGTH_KB + " is too large (" + parsed + ")");
+                throw new ConfigurationException("Value of " + CHUNK_LENGTH_IN_KB + " is too large (" + parsed + ")");
             return 1024 * parsed;
         }
         catch (NumberFormatException e)
         {
-            throw new ConfigurationException("Invalid value for " + CHUNK_LENGTH_KB, e);
+            throw new ConfigurationException("Invalid value for " + CHUNK_LENGTH_IN_KB, e);
         }
     }
 
+    /**
+     * Removes the chunk length option from the specified set of option.
+     *
+     * @param options the options
+     * @return the chunk length value
+     */
+    private static Integer removeChunkLength(Map<String, String> options)
+    {
+        if (options.containsKey(CHUNK_LENGTH_IN_KB))
+        {
+            if (options.containsKey(CHUNK_LENGTH_KB))
+            {
+                throw new ConfigurationException(String.format("The '%s' option must not be used if the chunk length is already specified by the '%s' option",
+                                                               CHUNK_LENGTH_KB,
+                                                               CHUNK_LENGTH_IN_KB));
+            }
+
+            return parseChunkLength(options.remove(CHUNK_LENGTH_IN_KB));
+        }
+
+        if (options.containsKey(CHUNK_LENGTH_KB))
+        {
+            if (options.containsKey(CHUNK_LENGTH_KB) && !hasLoggedChunkLengthWarning)
+            {
+                hasLoggedChunkLengthWarning = true;
+                LOGGER.warn(String.format("The %s option has been deprecated. You should use %s instead",
+                                          CHUNK_LENGTH_KB,
+                                          CHUNK_LENGTH_IN_KB));
+            }
+
+            return parseChunkLength(options.remove(CHUNK_LENGTH_KB));
+        }
+
+        return null;
+    }
+
+    /**
+     * Returns <code>true</code> if the specified options contains the name of the compression class to be used,
+     * <code>false</code> otherwise.
+     *
+     * @param options the options
+     * @return <code>true</code> if the specified options contains the name of the compression class to be used,
+     * <code>false</code> otherwise.
+     */
+    public static boolean containsSstableCompressionClass(Map<String, String> options)
+    {
+        return options.containsKey(CLASS)
+                || options.containsKey(SSTABLE_COMPRESSION);
+    }
+
+    /**
+     * Removes the option specifying the name of the compression class
+     *
+     * @param options the options
+     * @return the name of the compression class
+     */
+    private static String removeSstableCompressionClass(Map<String, String> options)
+    {
+        if (options.containsKey(CLASS))
+        {
+            if (options.containsKey(SSTABLE_COMPRESSION))
+                throw new ConfigurationException(String.format("The '%s' option must not be used if the compression algorithm is already specified by the '%s' option",
+                                                               SSTABLE_COMPRESSION,
+                                                               CLASS));
+
+            String clazz = options.remove(CLASS);
+            if (clazz.isEmpty())
+                throw new ConfigurationException(String.format("The '%s' option must not be empty. To disable compression use 'enabled' : false", CLASS));
+
+            return clazz;
+        }
+
+        if (options.containsKey(SSTABLE_COMPRESSION) && !hasLoggedSsTableCompressionWarning)
+        {
+            hasLoggedSsTableCompressionWarning = true;
+            LOGGER.warn(String.format("The %s option has been deprecated. You should use %s instead",
+                                      SSTABLE_COMPRESSION,
+                                      CLASS));
+        }
+
+        return options.remove(SSTABLE_COMPRESSION);
+    }
+
+    /**
+     * Returns <code>true</code> if the options contains the <code>enabled</code> option and that its value is
+     * <code>true</code>, otherwise returns <code>false</code>.
+     *
+     * @param options the options
+     * @return <code>true</code> if the options contains the <code>enabled</code> option and that its value is
+     * <code>true</code>, otherwise returns <code>false</code>.
+     */
+    public static boolean isEnabled(Map<String, String> options)
+    {
+        String enabled = options.get(ENABLED);
+        return enabled == null || Boolean.parseBoolean(enabled);
+    }
+
+    /**
+     * Removes the <code>enabled</code> option from the specified options.
+     *
+     * @param options the options
+     * @return the value of the <code>enabled</code> option
+     */
+    private static boolean removeEnabled(Map<String, String> options)
+    {
+        String enabled = options.remove(ENABLED);
+        return enabled == null || Boolean.parseBoolean(enabled);
+    }
+
     // chunkLength must be a power of 2 because we assume so when
     // computing the chunk number from an uncompressed file offset (see
     // CompressedRandomAccessReader.decompresseChunk())
@@ -257,7 +444,7 @@ public class CompressionParameters
         if (chunkLength != null)
         {
             if (chunkLength <= 0)
-                throw new ConfigurationException("Invalid negative or null " + CHUNK_LENGTH_KB);
+                throw new ConfigurationException("Invalid negative or null " + CHUNK_LENGTH_IN_KB);
 
             int c = chunkLength;
             boolean found = false;
@@ -266,7 +453,7 @@ public class CompressionParameters
                 if ((c & 0x01) != 0)
                 {
                     if (found)
-                        throw new ConfigurationException(CHUNK_LENGTH_KB + " must be a power of 2");
+                        throw new ConfigurationException(CHUNK_LENGTH_IN_KB + " must be a power of 2");
                     else
                         found = true;
                 }
@@ -277,19 +464,18 @@ public class CompressionParameters
         validateCrcCheckChance(crcCheckChance);
     }
 
-    public Map<String, String> asThriftOptions()
+    public Map<String, String> asMap()
     {
-        Map<String, String> options = new HashMap<String, String>(otherOptions);
-        if (sstableCompressor == null)
-            return options;
+        if (!isEnabled())
+            return Collections.singletonMap(ENABLED, "false");
 
-        options.put(SSTABLE_COMPRESSION, sstableCompressor.getClass().getName());
-        if (chunkLength != null)
-            options.put(CHUNK_LENGTH_KB, chunkLengthInKB());
+        Map<String, String> options = new HashMap<String, String>(otherOptions);
+        options.put(CLASS, sstableCompressor.getClass().getName());
+        options.put(CHUNK_LENGTH_IN_KB, chunkLengthInKB());
         return options;
     }
 
-    private String chunkLengthInKB()
+    public String chunkLengthInKB()
     {
         return String.valueOf(chunkLength() / 1024);
     }
@@ -309,7 +495,7 @@ public class CompressionParameters
         CompressionParameters cp = (CompressionParameters) obj;
         return new EqualsBuilder()
             .append(sstableCompressor, cp.sstableCompressor)
-            .append(chunkLength, cp.chunkLength)
+            .append(chunkLength(), cp.chunkLength())
             .append(otherOptions, cp.otherOptions)
             .isEquals();
     }
@@ -319,7 +505,7 @@ public class CompressionParameters
     {
         return new HashCodeBuilder(29, 1597)
             .append(sstableCompressor)
-            .append(chunkLength)
+            .append(chunkLength())
             .append(otherOptions)
             .toHashCode();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056115ff/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index c3c69b3..900c948 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -135,7 +135,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
         if (metadata.getBloomFilterFpChance() < 1.0)
             components.add(Component.FILTER);
 
-        if (metadata.compressionParameters().sstableCompressor != null)
+        if (metadata.compressionParameters().isEnabled())
         {
             components.add(Component.COMPRESSION_INFO);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056115ff/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
index dc9e168..79243f4 100644
--- a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
+++ b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
@@ -315,7 +315,7 @@ public final class LegacySchemaMigrator
         if (tableRow.has("speculative_retry"))
             cfm.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(tableRow.getString("speculative_retry")));
         cfm.compactionStrategyClass(CFMetaData.createCompactionStrategy(tableRow.getString("compaction_strategy_class")));
-        cfm.compressionParameters(CompressionParameters.create(fromJsonMap(tableRow.getString("compression_parameters"))));
+        cfm.compressionParameters(CompressionParameters.fromMap(fromJsonMap(tableRow.getString("compression_parameters"))));
         cfm.compactionStrategyOptions(fromJsonMap(tableRow.getString("compaction_strategy_options")));
 
         if (tableRow.has("min_index_interval"))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056115ff/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index 337c4bf..979f973 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -828,7 +828,7 @@ public final class SchemaKeyspace
              .add("comment", table.getComment())
              .add("compaction_strategy_class", table.compactionStrategyClass.getName())
              .add("compaction_strategy_options", json(table.compactionStrategyOptions))
-             .add("compression_parameters", json(table.compressionParameters.asThriftOptions()))
+             .add("compression_parameters", json(table.compressionParameters.asMap()))
              .add("default_time_to_live", table.getDefaultTimeToLive())
              .add("gc_grace_seconds", table.getGcGraceSeconds())
              .add("key_validator", table.getKeyValidator().toString())
@@ -1041,7 +1041,7 @@ public final class SchemaKeyspace
         if (result.has("speculative_retry"))
             cfm.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(result.getString("speculative_retry")));
         cfm.compactionStrategyClass(CFMetaData.createCompactionStrategy(result.getString("compaction_strategy_class")));
-        cfm.compressionParameters(CompressionParameters.create(fromJsonMap(result.getString("compression_parameters"))));
+        cfm.compressionParameters(CompressionParameters.fromMap(fromJsonMap(result.getString("compression_parameters"))));
         cfm.compactionStrategyOptions(fromJsonMap(result.getString("compaction_strategy_options")));
 
         if (result.has("min_index_interval"))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056115ff/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 872afcd..099fd14 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -107,7 +107,7 @@ public class CompressedInputStream extends InputStream
     private void decompress(byte[] compressed) throws IOException
     {
         // uncompress
-        validBufferBytes = info.parameters.sstableCompressor.uncompress(compressed, 0, compressed.length - checksumBytes.length, buffer, 0);
+        validBufferBytes = info.parameters.getSstableCompressor().uncompress(compressed, 0, compressed.length - checksumBytes.length, buffer, 0);
         totalCompressedBytesRead += compressed.length;
 
         // validate crc randomly

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056115ff/src/java/org/apache/cassandra/thrift/ThriftConversion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftConversion.java b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
index 2b210e9..0afc778 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftConversion.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
@@ -23,15 +23,19 @@ import java.util.*;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 import com.google.common.collect.Maps;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.compress.ICompressor;
 
 import org.apache.cassandra.cache.CachingOptions;
-import org.apache.cassandra.config.*;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.CompactTables;
+import org.apache.cassandra.db.LegacyLayout;
+import org.apache.cassandra.db.WriteType;
 import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.*;
@@ -55,8 +59,6 @@ import org.apache.cassandra.utils.UUIDGen;
  */
 public class ThriftConversion
 {
-    private static final Logger logger = LoggerFactory.getLogger(ThriftConversion.class);
-
     public static org.apache.cassandra.db.ConsistencyLevel fromThrift(ConsistencyLevel cl)
     {
         switch (cl)
@@ -315,7 +317,7 @@ public class ThriftConversion
                 newCFMD.triggers(triggerDefinitionsFromThrift(cf_def.triggers));
 
             return newCFMD.comment(cf_def.comment)
-                          .compressionParameters(CompressionParameters.create(cf_def.compression_options));
+                          .compressionParameters(compressionParametersFromThrift(cf_def.compression_options));
         }
         catch (SyntaxException | MarshalException e)
         {
@@ -323,6 +325,13 @@ public class ThriftConversion
         }
     }
 
+    private static CompressionParameters compressionParametersFromThrift(Map<String, String> compression_options)
+    {
+        CompressionParameters compressionParameter = CompressionParameters.fromMap(compression_options);
+        compressionParameter.validate();
+        return compressionParameter;
+    }
+
     private static void addDefaultCQLMetadata(Collection<ColumnDefinition> defs,
                                               String ks,
                                               String cf,
@@ -456,7 +465,7 @@ public class ThriftConversion
         def.setColumn_metadata(columnDefinitionsToThrift(cfm, cfm.allColumns()));
         def.setCompaction_strategy(cfm.compactionStrategyClass.getName());
         def.setCompaction_strategy_options(new HashMap<>(cfm.compactionStrategyOptions));
-        def.setCompression_options(cfm.compressionParameters.asThriftOptions());
+        def.setCompression_options(compressionParametersToThrift(cfm.compressionParameters));
         def.setBloom_filter_fp_chance(cfm.getBloomFilterFpChance());
         def.setMin_index_interval(cfm.getMinIndexInterval());
         def.setMax_index_interval(cfm.getMaxIndexInterval());
@@ -564,4 +573,16 @@ public class ThriftConversion
         }
         return thriftDefs;
     }
+
+    public static Map<String, String> compressionParametersToThrift(CompressionParameters parameters)
+    {
+        if (!parameters.isEnabled())
+            return Collections.emptyMap();
+
+        Map<String, String> options = new HashMap<>(parameters.getOtherOptions());
+        Class<? extends ICompressor> klass = parameters.getSstableCompressor().getClass();
+        options.put(CompressionParameters.SSTABLE_COMPRESSION, klass.getName());
+        options.put(CompressionParameters.CHUNK_LENGTH_KB, parameters.chunkLengthInKB());
+        return options;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056115ff/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java
index 86e4762..ce6ac22 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -327,7 +327,7 @@ public class SchemaLoader
     {
         for (KeyspaceMetadata ksm : schema)
             for (CFMetaData cfm : ksm.tables)
-                cfm.compressionParameters(new CompressionParameters(SnappyCompressor.instance));
+                cfm.compressionParameters(CompressionParameters.snappy());
     }
 
     public static CFMetaData counterCFMD(String ksName, String cfName)
@@ -462,9 +462,9 @@ public class SchemaLoader
     public static CompressionParameters getCompressionParameters(Integer chunkSize)
     {
         if (Boolean.parseBoolean(System.getProperty("cassandra.test.compression", "false")))
-            return new CompressionParameters(SnappyCompressor.instance, chunkSize, Collections.<String, String>emptyMap());
-        else
-            return new CompressionParameters(null);
+            return CompressionParameters.snappy(chunkSize);
+
+        return CompressionParameters.noCompression();
     }
 
     public static void cleanupAndLeaveDirs() throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056115ff/test/unit/org/apache/cassandra/config/CFMetaDataTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/config/CFMetaDataTest.java b/test/unit/org/apache/cassandra/config/CFMetaDataTest.java
index c0808d2..fdbf132 100644
--- a/test/unit/org/apache/cassandra/config/CFMetaDataTest.java
+++ b/test/unit/org/apache/cassandra/config/CFMetaDataTest.java
@@ -126,7 +126,7 @@ public class CFMetaDataTest
 
                 // Testing with compression to catch #3558
                 CFMetaData withCompression = cfm.copy();
-                withCompression.compressionParameters(new CompressionParameters(SnappyCompressor.instance, 32768, new HashMap<String, String>()));
+                withCompression.compressionParameters(CompressionParameters.snappy(32768));
                 checkInverses(withCompression);
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056115ff/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java
index 66b6b33..e12794c 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java
@@ -24,9 +24,12 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
+import org.apache.cassandra.schema.SchemaKeyspace;
 
+import org.junit.Assert;
 import org.junit.Test;
 
+import static java.lang.String.format;
 import static org.junit.Assert.assertEquals;
 
 public class AlterTest extends CQLTester
@@ -208,4 +211,82 @@ public class AlterTest extends CQLTester
             assertInvalidSyntaxMessage("no viable alternative at input 'WITH'", stmt);
         }
     }
+
+    @Test
+    public void testAlterTableWithCompression() throws Throwable
+    {
+        createTable("CREATE TABLE %s (a text, b int, c int, primary key (a, b))");
+
+        assertRows(execute(format("SELECT compression_parameters FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
+                                  SchemaKeyspace.NAME,
+                                  SchemaKeyspace.TABLES),
+                           KEYSPACE,
+                           currentTable()),
+                   row("{\"chunk_length_in_kb\":\"64\",\"class\":\"org.apache.cassandra.io.compress.LZ4Compressor\"}"));
+
+        execute("ALTER TABLE %s WITH compression = { 'class' : 'SnappyCompressor', 'chunk_length_in_kb' : 32 };");
+
+        assertRows(execute(format("SELECT compression_parameters FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
+                                  SchemaKeyspace.NAME,
+                                  SchemaKeyspace.TABLES),
+                           KEYSPACE,
+                           currentTable()),
+                   row("{\"chunk_length_in_kb\":\"32\",\"class\":\"org.apache.cassandra.io.compress.SnappyCompressor\"}"));
+
+        execute("ALTER TABLE %s WITH compression = { 'sstable_compression' : 'LZ4Compressor', 'chunk_length_kb' : 64 };");
+
+        assertRows(execute(format("SELECT compression_parameters FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
+                                  SchemaKeyspace.NAME,
+                                  SchemaKeyspace.TABLES),
+                           KEYSPACE,
+                           currentTable()),
+                   row("{\"chunk_length_in_kb\":\"64\",\"class\":\"org.apache.cassandra.io.compress.LZ4Compressor\"}"));
+
+        execute("ALTER TABLE %s WITH compression = { 'sstable_compression' : '', 'chunk_length_kb' : 32 };");
+
+        assertRows(execute(format("SELECT compression_parameters FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
+                                  SchemaKeyspace.NAME,
+                                  SchemaKeyspace.TABLES),
+                           KEYSPACE,
+                           currentTable()),
+                   row("{\"enabled\":\"false\"}"));
+
+        execute("ALTER TABLE %s WITH compression = { 'class' : 'SnappyCompressor', 'chunk_length_in_kb' : 32 };");
+        execute("ALTER TABLE %s WITH compression = { 'enabled' : 'false'};");
+
+        assertRows(execute(format("SELECT compression_parameters FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
+                                  SchemaKeyspace.NAME,
+                                  SchemaKeyspace.TABLES),
+                           KEYSPACE,
+                           currentTable()),
+                   row("{\"enabled\":\"false\"}"));
+
+        assertThrowsConfigurationException("Missing sub-option 'class' for the 'compression' option.",
+                                           "ALTER TABLE %s WITH  compression = {'chunk_length_in_kb' : 32};");
+
+        assertThrowsConfigurationException("The 'class' option must not be empty. To disable compression use 'enabled' : false",
+                                           "ALTER TABLE %s WITH  compression = { 'class' : ''};");
+
+        assertThrowsConfigurationException("If the 'enabled' option is set to false no other options must be specified",
+                                           "ALTER TABLE %s WITH compression = { 'enabled' : 'false', 'class' : 'SnappyCompressor'};");
+
+        assertThrowsConfigurationException("The 'sstable_compression' option must not be used if the compression algorithm is already specified by the 'class' option",
+                                           "ALTER TABLE %s WITH compression = { 'sstable_compression' : 'SnappyCompressor', 'class' : 'SnappyCompressor'};");
+
+        assertThrowsConfigurationException("The 'chunk_length_kb' option must not be used if the chunk length is already specified by the 'chunk_length_in_kb' option",
+                                           "ALTER TABLE %s WITH compression = { 'class' : 'SnappyCompressor', 'chunk_length_kb' : 32 , 'chunk_length_in_kb' : 32 };");
+    }
+
+    private void assertThrowsConfigurationException(String errorMsg, String alterStmt) throws Throwable
+    {
+        try
+        {
+            execute(alterStmt);
+            Assert.fail("Query should be invalid but no error was thrown. Query is: " + alterStmt);
+        }
+        catch (ConfigurationException e)
+        {
+            assertEquals(errorMsg, e.getMessage());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056115ff/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
index 053d291..5143480 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.cassandra.cql3.validation.operations;
 
 import java.util.Collection;
@@ -24,6 +23,7 @@ import java.util.UUID;
 
 import org.junit.Test;
 
+
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.CQLTester;
@@ -31,11 +31,15 @@ import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.partitions.Partition;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
+import org.apache.cassandra.schema.SchemaKeyspace;
 import org.apache.cassandra.triggers.ITrigger;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+import static java.lang.String.format;
 import static junit.framework.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static junit.framework.Assert.fail;
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
 
 public class CreateTest extends CQLTester
 {
@@ -501,6 +505,111 @@ public class CreateTest extends CQLTester
             assertInvalidSyntaxMessage("no viable alternative at input 'WITH'", stmt);
     }
 
+    @Test
+    public void testCreateTableWithCompression() throws Throwable
+    {
+        createTable("CREATE TABLE %s (a text, b int, c int, primary key (a, b))");
+
+        assertRows(execute(format("SELECT compression_parameters FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
+                                  SchemaKeyspace.NAME,
+                                  SchemaKeyspace.TABLES),
+                           KEYSPACE,
+                           currentTable()),
+                   row("{\"chunk_length_in_kb\":\"64\",\"class\":\"org.apache.cassandra.io.compress.LZ4Compressor\"}"));
+
+        createTable("CREATE TABLE %s (a text, b int, c int, primary key (a, b))"
+                + " WITH compression = { 'class' : 'SnappyCompressor', 'chunk_length_in_kb' : 32 };");
+
+        assertRows(execute(format("SELECT compression_parameters FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
+                                  SchemaKeyspace.NAME,
+                                  SchemaKeyspace.TABLES),
+                           KEYSPACE,
+                           currentTable()),
+                   row("{\"chunk_length_in_kb\":\"32\",\"class\":\"org.apache.cassandra.io.compress.SnappyCompressor\"}"));
+
+        createTable("CREATE TABLE %s (a text, b int, c int, primary key (a, b))"
+                + " WITH compression = { 'class' : 'SnappyCompressor', 'chunk_length_in_kb' : 32, 'enabled' : true };");
+
+        assertRows(execute(format("SELECT compression_parameters FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
+                                  SchemaKeyspace.NAME,
+                                  SchemaKeyspace.TABLES),
+                           KEYSPACE,
+                           currentTable()),
+                   row("{\"chunk_length_in_kb\":\"32\",\"class\":\"org.apache.cassandra.io.compress.SnappyCompressor\"}"));
+
+        createTable("CREATE TABLE %s (a text, b int, c int, primary key (a, b))"
+                + " WITH compression = { 'sstable_compression' : 'SnappyCompressor', 'chunk_length_kb' : 32 };");
+
+        assertRows(execute(format("SELECT compression_parameters FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
+                                  SchemaKeyspace.NAME,
+                                  SchemaKeyspace.TABLES),
+                           KEYSPACE,
+                           currentTable()),
+                   row("{\"chunk_length_in_kb\":\"32\",\"class\":\"org.apache.cassandra.io.compress.SnappyCompressor\"}"));
+
+        createTable("CREATE TABLE %s (a text, b int, c int, primary key (a, b))"
+                + " WITH compression = { 'sstable_compression' : '', 'chunk_length_kb' : 32 };");
+
+        assertRows(execute(format("SELECT compression_parameters FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
+                                  SchemaKeyspace.NAME,
+                                  SchemaKeyspace.TABLES),
+                           KEYSPACE,
+                           currentTable()),
+                   row("{\"enabled\":\"false\"}"));
+
+        createTable("CREATE TABLE %s (a text, b int, c int, primary key (a, b))"
+                + " WITH compression = { 'enabled' : 'false'};");
+
+        assertRows(execute(format("SELECT compression_parameters FROM %s.%s WHERE keyspace_name = ? and table_name = ?;",
+                                  SchemaKeyspace.NAME,
+                                  SchemaKeyspace.TABLES),
+                           KEYSPACE,
+                           currentTable()),
+                   row("{\"enabled\":\"false\"}"));
+
+        assertThrowsConfigurationException("Missing sub-option 'class' for the 'compression' option.",
+                                           "CREATE TABLE %s (a text, b int, c int, primary key (a, b))"
+                                           + " WITH compression = {'chunk_length_in_kb' : 32};");
+
+        assertThrowsConfigurationException("The 'class' option must not be empty. To disable compression use 'enabled' : false",
+                                           "CREATE TABLE %s (a text, b int, c int, primary key (a, b))"
+                                           + " WITH compression = { 'class' : ''};");
+
+        assertThrowsConfigurationException("If the 'enabled' option is set to false no other options must be specified",
+                                           "CREATE TABLE %s (a text, b int, c int, primary key (a, b))"
+                                           + " WITH compression = { 'enabled' : 'false', 'class' : 'SnappyCompressor'};");
+
+        assertThrowsConfigurationException("If the 'enabled' option is set to false no other options must be specified",
+                                           "CREATE TABLE %s (a text, b int, c int, primary key (a, b))"
+                                           + " WITH compression = { 'enabled' : 'false', 'chunk_length_in_kb' : 32};");
+
+        assertThrowsConfigurationException("The 'sstable_compression' option must not be used if the compression algorithm is already specified by the 'class' option",
+                                           "CREATE TABLE %s (a text, b int, c int, primary key (a, b))"
+                                           + " WITH compression = { 'sstable_compression' : 'SnappyCompressor', 'class' : 'SnappyCompressor'};");
+
+        assertThrowsConfigurationException("The 'chunk_length_kb' option must not be used if the chunk length is already specified by the 'chunk_length_in_kb' option",
+                                           "CREATE TABLE %s (a text, b int, c int, primary key (a, b))"
+                                           + " WITH compression = { 'class' : 'SnappyCompressor', 'chunk_length_kb' : 32 , 'chunk_length_in_kb' : 32 };");
+
+        assertThrowsConfigurationException("Unknown compression options unknownOption",
+                                           "CREATE TABLE %s (a text, b int, c int, primary key (a, b))"
+                                            + " WITH compression = { 'class' : 'SnappyCompressor', 'unknownOption' : 32 };");
+    }
+
+     private void assertThrowsConfigurationException(String errorMsg, String createStmt) {
+         try
+         {
+             createTable(createStmt);
+             fail("Query should be invalid but no error was thrown. Query is: " + createStmt);
+         }
+         catch (RuntimeException e)
+         {
+             Throwable cause = e.getCause();
+             assertTrue("The exception should be a ConfigurationException", cause instanceof ConfigurationException);
+             assertEquals(errorMsg, cause.getMessage());
+         }
+     }
+
     private void assertTriggerExists(String name)
     {
         CFMetaData cfm = Schema.instance.getCFMetaData(keyspace(), currentTable()).copy();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056115ff/test/unit/org/apache/cassandra/db/VerifyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/VerifyTest.java b/test/unit/org/apache/cassandra/db/VerifyTest.java
index 6819e28..d2f933c 100644
--- a/test/unit/org/apache/cassandra/db/VerifyTest.java
+++ b/test/unit/org/apache/cassandra/db/VerifyTest.java
@@ -43,7 +43,6 @@ import org.junit.runner.RunWith;
 
 import java.io.*;
 import java.nio.file.Files;
-import java.util.HashMap;
 import java.util.zip.Adler32;
 import java.util.zip.CheckedInputStream;
 
@@ -71,7 +70,7 @@ public class VerifyTest
     @BeforeClass
     public static void defineSchema() throws ConfigurationException
     {
-        CompressionParameters compressionParameters = new CompressionParameters(SnappyCompressor.instance, 32768, new HashMap<String, String>());
+        CompressionParameters compressionParameters = CompressionParameters.snappy(32768);
 
         SchemaLoader.loadSchema();
         SchemaLoader.createKeyspace(KEYSPACE,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056115ff/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
index 8aa2fcd..3966342 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
@@ -68,7 +68,7 @@ public class CompressedRandomAccessReaderTest
         {
 
             MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(BytesType.instance));
-            CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata", new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap()), sstableMetadataCollector);
+            CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata",  CompressionParameters.snappy(32), sstableMetadataCollector);
 
             for (int i = 0; i < 20; i++)
                 writer.write("x".getBytes());
@@ -110,7 +110,7 @@ public class CompressedRandomAccessReaderTest
         {
             MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(BytesType.instance)).replayPosition(null);
             SequentialWriter writer = compressed
-                ? new CompressedSequentialWriter(f, filename + ".metadata", new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector)
+                ? new CompressedSequentialWriter(f, filename + ".metadata", CompressionParameters.snappy(), sstableMetadataCollector)
                 : SequentialWriter.open(f);
 
             writer.write("The quick ".getBytes());
@@ -162,7 +162,7 @@ public class CompressedRandomAccessReaderTest
         metadata.deleteOnExit();
 
         MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(BytesType.instance)).replayPosition(null);
-        try (SequentialWriter writer = new CompressedSequentialWriter(file, metadata.getPath(), new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector))
+        try (SequentialWriter writer = new CompressedSequentialWriter(file, metadata.getPath(), CompressionParameters.snappy(), sstableMetadataCollector))
         {
             writer.write(CONTENT.getBytes());
             writer.finish();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056115ff/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
index aa26dc0..e5a7499 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
@@ -43,7 +43,7 @@ import org.apache.cassandra.io.util.SequentialWriterTest;
 
 public class CompressedSequentialWriterTest extends SequentialWriterTest
 {
-    private ICompressor compressor;
+    private CompressionParameters compressionParameters;
 
     private void runTests(String testName) throws IOException
     {
@@ -60,21 +60,21 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest
     @Test
     public void testLZ4Writer() throws IOException
     {
-        compressor = LZ4Compressor.instance;
+        compressionParameters = CompressionParameters.lz4();
         runTests("LZ4");
     }
 
     @Test
     public void testDeflateWriter() throws IOException
     {
-        compressor = DeflateCompressor.instance;
+        compressionParameters = CompressionParameters.deflate();
         runTests("Deflate");
     }
 
     @Test
     public void testSnappyWriter() throws IOException
     {
-        compressor = SnappyCompressor.instance;
+        compressionParameters = CompressionParameters.snappy();
         runTests("Snappy");
     }
 
@@ -89,7 +89,7 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest
 
             byte[] dataPre = new byte[bytesToTest];
             byte[] rawPost = new byte[bytesToTest];
-            try (CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata", new CompressionParameters(compressor), sstableMetadataCollector);)
+            try (CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata", compressionParameters, sstableMetadataCollector);)
             {
                 Random r = new Random();
 
@@ -143,7 +143,7 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest
 
     private ByteBuffer makeBB(int size)
     {
-        return compressor.preferredBufferType().allocate(size);
+        return compressionParameters.getSstableCompressor().preferredBufferType().allocate(size);
     }
 
     private final List<TestableCSW> writers = new ArrayList<>();
@@ -177,7 +177,7 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest
         {
             this(file, offsetsFile, new CompressedSequentialWriter(file,
                                                                    offsetsFile.getPath(),
-                                                                   new CompressionParameters(LZ4Compressor.instance, BUFFER_SIZE, new HashMap<String, String>()),
+                                                                   CompressionParameters.lz4(BUFFER_SIZE),
                                                                    new MetadataCollector(new ClusteringComparator(UTF8Type.instance))));
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056115ff/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java b/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java
index 1e1c799..4a6bb01 100644
--- a/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java
+++ b/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.cql3.functions.*;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
 import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.thrift.ThriftConversion;
 
 import static java.lang.String.format;
 import static junit.framework.Assert.assertEquals;
@@ -389,7 +390,7 @@ public class LegacySchemaMigratorTest
              .add("comment", table.getComment())
              .add("compaction_strategy_class", table.compactionStrategyClass.getName())
              .add("compaction_strategy_options", json(table.compactionStrategyOptions))
-             .add("compression_parameters", json(table.compressionParameters.asThriftOptions()))
+             .add("compression_parameters", json(ThriftConversion.compressionParametersToThrift(table.compressionParameters)))
              .add("default_time_to_live", table.getDefaultTimeToLive())
              .add("gc_grace_seconds", table.getGcGraceSeconds())
              .add("key_validator", table.getKeyValidator().toString())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056115ff/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java b/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
index 715799b..0545cc4 100644
--- a/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.schema;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.HashMap;
 import java.util.HashSet;
 
 import org.apache.cassandra.SchemaLoader;
@@ -127,7 +126,7 @@ public class SchemaKeyspaceTest
 
                 // Testing with compression to catch #3558
                 CFMetaData withCompression = cfm.copy();
-                withCompression.compressionParameters(new CompressionParameters(SnappyCompressor.instance, 32768, new HashMap<>()));
+                withCompression.compressionParameters(CompressionParameters.snappy(32768));
                 checkInverses(withCompression);
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056115ff/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
index c4548c6..132abaa 100644
--- a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
@@ -66,7 +66,7 @@ public class CompressedInputStreamTest
         File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db");
         Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
         MetadataCollector collector = new MetadataCollector(new ClusteringComparator(BytesType.instance));
-        CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.EMPTY_MAP);
+        CompressionParameters param = CompressionParameters.snappy(32);
         CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector);
         Map<Long, Long> index = new HashMap<Long, Long>();
         for (long l = 0L; l < 1000; l++)