You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/08/04 11:14:44 UTC

[5/5] cassandra git commit: Factor out TableParams from CFMetaData

Factor out TableParams from CFMetaData

patch by Aleksey Yeschenko; reviewed by Robert Stupp for CASSANDRA-9712


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b31845c4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b31845c4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b31845c4

Branch: refs/heads/cassandra-3.0
Commit: b31845c4a7982358a7c5bfd9bcf572fda6c1bfa9
Parents: 6932bd8
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Sat Jul 18 01:59:00 2015 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Tue Aug 4 12:12:34 2015 +0300

----------------------------------------------------------------------
 NEWS.txt                                        |   8 +-
 .../apache/cassandra/cache/CachingOptions.java  | 291 ----------
 .../org/apache/cassandra/config/CFMetaData.java | 540 +++++------------
 src/java/org/apache/cassandra/cql3/Cql.g        |  12 +-
 .../AlterMaterializedViewStatement.java         |  16 +-
 .../cql3/statements/AlterTableStatement.java    |  19 +-
 .../cassandra/cql3/statements/CFPropDefs.java   | 222 -------
 .../cassandra/cql3/statements/CFProperties.java |   4 +-
 .../cql3/statements/CreateTableStatement.java   |  56 +-
 .../cql3/statements/KeyspaceAttributes.java     |  36 +-
 .../cql3/statements/TableAttributes.java        | 153 +++++
 .../apache/cassandra/db/ColumnFamilyStore.java  |  71 +--
 .../cassandra/db/HintedHandOffManager.java      |   2 +-
 .../org/apache/cassandra/db/LivenessInfo.java   |   2 +-
 src/java/org/apache/cassandra/db/Memtable.java  |   2 +-
 .../apache/cassandra/db/RowUpdateBuilder.java   |   2 +-
 .../db/SinglePartitionReadCommand.java          |   4 +-
 .../org/apache/cassandra/db/SystemKeyspace.java |  21 +-
 .../cassandra/db/commitlog/CommitLog.java       |   4 +-
 .../db/commitlog/CommitLogArchiver.java         |   5 +-
 .../db/commitlog/CommitLogReplayer.java         |   4 +-
 .../db/compaction/CompactionManager.java        |   4 +-
 .../compaction/CompactionStrategyManager.java   |  36 +-
 .../DateTieredCompactionStrategy.java           |  12 +-
 .../cassandra/db/compaction/Scrubber.java       |   2 +-
 .../SizeTieredCompactionStrategy.java           |  14 +-
 .../cassandra/db/compaction/Upgrader.java       |   2 +-
 .../apache/cassandra/db/rows/BufferCell.java    |   4 +-
 .../cassandra/db/view/MaterializedView.java     |   5 +-
 .../dht/OrderPreservingPartitioner.java         |   2 +-
 .../apache/cassandra/hadoop/ConfigHelper.java   |   6 +-
 .../io/compress/CompressedSequentialWriter.java |   4 +-
 .../io/compress/CompressionMetadata.java        |  15 +-
 .../io/compress/CompressionParameters.java      | 564 ------------------
 .../cassandra/io/compress/LZ4Compressor.java    |   3 +-
 .../io/sstable/IndexSummaryManager.java         |   4 +-
 .../io/sstable/format/SSTableReader.java        |  26 +-
 .../io/sstable/format/SSTableWriter.java        |   6 +-
 .../io/sstable/format/big/BigTableWriter.java   |   6 +-
 .../cassandra/io/util/SequentialWriter.java     |   4 +-
 .../apache/cassandra/schema/CachingParams.java  | 196 +++++++
 .../cassandra/schema/CompactionParams.java      | 304 ++++++++++
 .../cassandra/schema/CompressionParams.java     | 579 +++++++++++++++++++
 .../apache/cassandra/schema/KeyspaceParams.java |  95 +--
 .../cassandra/schema/LegacySchemaMigrator.java  | 122 ++--
 .../cassandra/schema/ReplicationParams.java     | 106 ++++
 .../apache/cassandra/schema/SchemaKeyspace.java | 142 ++---
 .../cassandra/schema/SpeculativeRetryParam.java | 160 +++++
 .../apache/cassandra/schema/TableParams.java    | 338 +++++++++++
 .../cassandra/service/AbstractReadExecutor.java |   9 +-
 .../apache/cassandra/service/CacheService.java  |   2 +-
 .../cassandra/service/StorageService.java       |   2 +-
 .../streaming/compress/CompressionInfo.java     |  12 +-
 .../cassandra/thrift/CassandraServer.java       |   7 +-
 .../cassandra/thrift/ThriftConversion.java      | 181 ++++--
 .../org/apache/cassandra/utils/FBUtilities.java |   8 +-
 .../utils/NativeSSTableLoaderClient.java        |   2 +-
 .../db/compaction/LongCompactionsTest.java      |  10 +-
 .../LongLeveledCompactionStrategyTest.java      |   4 +-
 test/unit/org/apache/cassandra/MockSchema.java  |   4 +-
 .../unit/org/apache/cassandra/SchemaLoader.java |  50 +-
 .../apache/cassandra/config/CFMetaDataTest.java |   4 +-
 .../validation/entities/SecondaryIndexTest.java |   2 +-
 .../miscellaneous/CrcCheckChanceTest.java       |  12 +-
 .../validation/miscellaneous/OverflowTest.java  |   8 +-
 .../cql3/validation/operations/AlterTest.java   |  14 +-
 .../SelectOrderedPartitionerTest.java           |   2 +-
 .../apache/cassandra/db/HintedHandOffTest.java  |   2 -
 .../apache/cassandra/db/RangeTombstoneTest.java |   5 -
 .../apache/cassandra/db/RowCacheCQLTest.java    |   2 +-
 .../org/apache/cassandra/db/RowCacheTest.java   |   7 +-
 .../unit/org/apache/cassandra/db/ScrubTest.java |   2 +-
 .../org/apache/cassandra/db/VerifyTest.java     |  12 +-
 .../db/commitlog/CommitLogUpgradeTest.java      |   2 +-
 .../db/compaction/AntiCompactionTest.java       |   1 -
 .../compaction/BlacklistingCompactionsTest.java |   4 +-
 .../db/compaction/CompactionsPurgeTest.java     |   7 +-
 .../db/compaction/CompactionsTest.java          |  11 +-
 .../LeveledCompactionStrategyTest.java          |   9 +-
 .../db/compaction/OneCompactionTest.java        |   9 +-
 .../SizeTieredCompactionStrategyTest.java       |   5 +-
 .../db/view/MaterializedViewUtilsTest.java      |   6 +-
 .../CompressedRandomAccessReaderTest.java       |  12 +-
 .../CompressedSequentialWriterTest.java         |  17 +-
 .../io/sstable/IndexSummaryManagerTest.java     |  49 +-
 .../cassandra/io/sstable/SSTableReaderTest.java |   6 +-
 .../org/apache/cassandra/schema/DefsTest.java   |  23 +-
 .../schema/LegacySchemaMigratorTest.java        |  61 +-
 .../cassandra/schema/SchemaKeyspaceTest.java    |   6 +-
 .../service/StorageServiceServerTest.java       |  13 +-
 .../compression/CompressedInputStreamTest.java  |   5 +-
 91 files changed, 2625 insertions(+), 2219 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 0e0d7c4..1fcbb12 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -21,10 +21,10 @@ New features
    - Materialized Views, which allow for server-side denormalization, is now
      available. Materialized views provide an alternative to secondary indexes
      for non-primary key queries, and perform much better for indexing high
-     cardinality columns. 
-     See http://www.datastax.com/dev/blog/new-in-cassandra-3-0-materialized-views 
+     cardinality columns.
+     See http://www.datastax.com/dev/blog/new-in-cassandra-3-0-materialized-views
+
 
-   
 Upgrading
 ---------
    - 3.0 requires Java 8u20 or later.
@@ -56,6 +56,8 @@ Upgrading
    - The `sstable_compression` and `chunk_length_kb` compression options have been deprecated.
      The new options are `class` and `chunk_length_in_kb`. Disabling compression should now
      be done by setting the new option `enabled` to `false`.
+   - Only map syntax is now allowed for caching options. ALL/NONE/KEYS_ONLY/ROWS_ONLY syntax
+     has been deprecated since 2.1.0 and is being removed in 3.0.0.
 
 
 2.2

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/cache/CachingOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/CachingOptions.java b/src/java/org/apache/cassandra/cache/CachingOptions.java
deleted file mode 100644
index 686f365..0000000
--- a/src/java/org/apache/cassandra/cache/CachingOptions.java
+++ /dev/null
@@ -1,291 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.cache;
-
-import java.util.*;
-
-import org.apache.commons.lang3.StringUtils;
-
-import org.apache.cassandra.exceptions.ConfigurationException;
-
-import static org.apache.cassandra.utils.FBUtilities.fromJsonMap;
-
-/*
- * CQL: { 'keys' : 'ALL|NONE', 'rows_per_partition': '200|NONE|ALL' }
- */
-public class CachingOptions
-{
-    public static final CachingOptions KEYS_ONLY = new CachingOptions(new KeyCache(KeyCache.Type.ALL), new RowCache(RowCache.Type.NONE));
-    public static final CachingOptions ALL = new CachingOptions(new KeyCache(KeyCache.Type.ALL), new RowCache(RowCache.Type.ALL));
-    public static final CachingOptions ROWS_ONLY = new CachingOptions(new KeyCache(KeyCache.Type.NONE), new RowCache(RowCache.Type.ALL));
-    public static final CachingOptions NONE = new CachingOptions(new KeyCache(KeyCache.Type.NONE), new RowCache(RowCache.Type.NONE));
-
-    public final KeyCache keyCache;
-    public final RowCache rowCache;
-    private static final Set<String> legacyOptions = new HashSet<>(Arrays.asList("ALL", "NONE", "KEYS_ONLY", "ROWS_ONLY"));
-
-    public CachingOptions(KeyCache kc, RowCache rc)
-    {
-        this.keyCache = kc;
-        this.rowCache = rc;
-    }
-
-    public static CachingOptions fromString(String cache) throws ConfigurationException
-    {
-        if (legacyOptions.contains(cache.toUpperCase()))
-            return fromLegacyOption(cache.toUpperCase());
-        return fromMap(fromJsonMap(cache));
-    }
-
-    public static CachingOptions fromMap(Map<String, String> cacheConfig) throws ConfigurationException
-    {
-        validateCacheConfig(cacheConfig);
-        if (!cacheConfig.containsKey("keys") && !cacheConfig.containsKey("rows_per_partition"))
-            return CachingOptions.NONE;
-        if (!cacheConfig.containsKey("keys"))
-            return new CachingOptions(new KeyCache(KeyCache.Type.NONE), RowCache.fromString(cacheConfig.get("rows_per_partition")));
-        if (!cacheConfig.containsKey("rows_per_partition"))
-            return CachingOptions.KEYS_ONLY;
-
-        return new CachingOptions(KeyCache.fromString(cacheConfig.get("keys")), RowCache.fromString(cacheConfig.get("rows_per_partition")));
-    }
-
-    public Map<String, String> asMap()
-    {
-        Map<String, String> map = new HashMap<>(2);
-        map.put("keys", keyCache.toString());
-        map.put("rows_per_partition", rowCache.toString());
-        return map;
-    }
-
-    private static void validateCacheConfig(Map<String, String> cacheConfig) throws ConfigurationException
-    {
-        for (Map.Entry<String, String> entry : cacheConfig.entrySet())
-        {
-            String value = entry.getValue().toUpperCase();
-            if (entry.getKey().equals("keys"))
-            {
-                if (!(value.equals("ALL") || value.equals("NONE")))
-                {
-                    throw new ConfigurationException("'keys' can only have values 'ALL' or 'NONE', but was '" + value + "'");
-                }
-            }
-            else if (entry.getKey().equals("rows_per_partition"))
-            {
-                if (!(value.equals("ALL") || value.equals("NONE") || StringUtils.isNumeric(value)))
-                {
-                    throw new ConfigurationException("'rows_per_partition' can only have values 'ALL', 'NONE' or be numeric, but was '" + value + "'.");
-                }
-            }
-            else
-                throw new ConfigurationException("Only supported CachingOptions parameters are 'keys' and 'rows_per_partition', but was '" + entry.getKey() + "'");
-        }
-    }
-
-    @Override
-    public String toString()
-    {
-        return String.format("{\"keys\":\"%s\", \"rows_per_partition\":\"%s\"}", keyCache.toString(), rowCache.toString());
-    }
-
-    private static CachingOptions fromLegacyOption(String cache)
-    {
-        if (cache.equals("ALL"))
-            return ALL;
-        if (cache.equals("KEYS_ONLY"))
-            return KEYS_ONLY;
-        if (cache.equals("ROWS_ONLY"))
-            return ROWS_ONLY;
-        return NONE;
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-
-        CachingOptions o2 = (CachingOptions) o;
-
-        if (!keyCache.equals(o2.keyCache)) return false;
-        if (!rowCache.equals(o2.rowCache)) return false;
-
-        return true;
-    }
-
-    @Override
-    public int hashCode()
-    {
-        int result = keyCache.hashCode();
-        result = 31 * result + rowCache.hashCode();
-        return result;
-    }
-
-    // FIXME: move to ThriftConversion
-    public static CachingOptions fromThrift(String caching, String cellsPerRow) throws ConfigurationException
-    {
-
-        RowCache rc = new RowCache(RowCache.Type.NONE);
-        KeyCache kc = new KeyCache(KeyCache.Type.ALL);
-        // if we get a caching string from thrift it is legacy, "ALL", "KEYS_ONLY" etc, fromString handles those
-        if (caching != null)
-        {
-            CachingOptions givenOptions = CachingOptions.fromString(caching);
-            rc = givenOptions.rowCache;
-            kc = givenOptions.keyCache;
-        }
-        // if we get cells_per_row from thrift, it is either "ALL" or "<number of cells to cache>".
-        if (cellsPerRow != null && rc.isEnabled())
-            rc = RowCache.fromString(cellsPerRow);
-        return new CachingOptions(kc, rc);
-    }
-
-    // FIXME: move to ThriftConversion
-    public String toThriftCaching()
-    {
-        if (rowCache.isEnabled() && keyCache.isEnabled())
-            return "ALL";
-        if (rowCache.isEnabled())
-            return "ROWS_ONLY";
-        if (keyCache.isEnabled())
-            return "KEYS_ONLY";
-        return "NONE";
-    }
-
-    // FIXME: move to ThriftConversion
-    public String toThriftCellsPerRow()
-    {
-        if (rowCache.cacheFullPartitions())
-            return "ALL";
-        return String.valueOf(rowCache.rowsToCache);
-    }
-
-    public static class KeyCache
-    {
-        public final Type type;
-        public KeyCache(Type type)
-        {
-            this.type = type;
-        }
-
-        public enum Type
-        {
-            ALL, NONE
-        }
-        public static KeyCache fromString(String keyCache)
-        {
-            return new KeyCache(Type.valueOf(keyCache.toUpperCase()));
-        }
-
-        public boolean isEnabled()
-        {
-            return type == Type.ALL;
-        }
-
-        @Override
-        public boolean equals(Object o)
-        {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-
-            KeyCache keyCache = (KeyCache) o;
-
-            if (type != keyCache.type) return false;
-
-            return true;
-        }
-
-        @Override
-        public int hashCode()
-        {
-            return type.hashCode();
-        }
-        @Override
-        public String toString()
-        {
-            return type.toString();
-        }
-    }
-
-    public static class RowCache
-    {
-        public final Type type;
-        public final int rowsToCache;
-
-        public RowCache(Type type)
-        {
-            this(type, (type == Type.ALL) ? Integer.MAX_VALUE : 0);
-        }
-        public RowCache(Type type, int rowsToCache)
-        {
-            this.type = type;
-            this.rowsToCache = rowsToCache;
-        }
-
-        public enum Type
-        {
-            ALL, NONE, HEAD
-        }
-
-        public static RowCache fromString(String rowCache)
-        {
-            if (rowCache == null || rowCache.equalsIgnoreCase("none"))
-                return new RowCache(Type.NONE, 0);
-            else if (rowCache.equalsIgnoreCase("all"))
-                return new RowCache(Type.ALL, Integer.MAX_VALUE);
-            return new RowCache(Type.HEAD, Integer.parseInt(rowCache));
-        }
-        public boolean isEnabled()
-        {
-            return (type == Type.ALL) || (type == Type.HEAD);
-        }
-        public boolean cacheFullPartitions()
-        {
-            return type == Type.ALL;
-        }
-        @Override
-        public String toString()
-        {
-            if (type == Type.ALL) return "ALL";
-            if (type == Type.NONE) return "NONE";
-            return String.valueOf(rowsToCache);
-        }
-
-        @Override
-        public boolean equals(Object o)
-        {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-
-            RowCache rowCache = (RowCache) o;
-
-            if (rowsToCache != rowCache.rowsToCache) return false;
-            if (type != rowCache.type) return false;
-
-            return true;
-        }
-
-        @Override
-        public int hashCode()
-        {
-            int result = type.hashCode();
-            result = 31 * result + rowsToCache;
-            return result;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 43c95ea..0982109 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -24,12 +24,12 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
-import com.google.common.base.Strings;
 import com.google.common.collect.*;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
@@ -37,7 +37,6 @@ import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.cache.CachingOptions;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.statements.CFStatement;
@@ -48,13 +47,9 @@ import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.io.compress.CompressionParameters;
-import org.apache.cassandra.io.compress.LZ4Compressor;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.schema.MaterializedViews;
-import org.apache.cassandra.schema.SchemaKeyspace;
-import org.apache.cassandra.schema.Triggers;
+import org.apache.cassandra.schema.*;
 import org.apache.cassandra.utils.*;
 import org.github.jamm.Unmetered;
 
@@ -66,113 +61,13 @@ public final class CFMetaData
 {
     public enum Flag
     {
-        SUPER, COUNTER, DENSE, COMPOUND, MATERIALIZEDVIEW
+        SUPER, COUNTER, DENSE, COMPOUND, VIEW
     }
 
     private static final Logger logger = LoggerFactory.getLogger(CFMetaData.class);
 
     public static final Serializer serializer = new Serializer();
 
-    public final static double DEFAULT_READ_REPAIR_CHANCE = 0.0;
-    public final static double DEFAULT_DCLOCAL_READ_REPAIR_CHANCE = 0.1;
-    public final static int DEFAULT_GC_GRACE_SECONDS = 864000;
-    public final static int DEFAULT_MIN_COMPACTION_THRESHOLD = 4;
-    public final static int DEFAULT_MAX_COMPACTION_THRESHOLD = 32;
-    public final static Class<? extends AbstractCompactionStrategy> DEFAULT_COMPACTION_STRATEGY_CLASS = SizeTieredCompactionStrategy.class;
-    public final static CachingOptions DEFAULT_CACHING_STRATEGY = CachingOptions.KEYS_ONLY;
-    public final static int DEFAULT_DEFAULT_TIME_TO_LIVE = 0;
-    public final static SpeculativeRetry DEFAULT_SPECULATIVE_RETRY = new SpeculativeRetry(SpeculativeRetry.RetryType.PERCENTILE, 0.99);
-    public final static int DEFAULT_MIN_INDEX_INTERVAL = 128;
-    public final static int DEFAULT_MAX_INDEX_INTERVAL = 2048;
-
-    // Note that this is the default only for user created tables
-    public final static String DEFAULT_COMPRESSOR = LZ4Compressor.class.getCanonicalName();
-
-    // Note that this need to come *before* any CFMetaData is defined so before the compile below.
-    private static final Comparator<ColumnDefinition> regularColumnComparator = new Comparator<ColumnDefinition>()
-    {
-        public int compare(ColumnDefinition def1, ColumnDefinition def2)
-        {
-            return ByteBufferUtil.compareUnsigned(def1.name.bytes, def2.name.bytes);
-        }
-    };
-
-    public static class SpeculativeRetry
-    {
-        public enum RetryType
-        {
-            NONE, CUSTOM, PERCENTILE, ALWAYS
-        }
-
-        public final RetryType type;
-        public final double value;
-
-        private SpeculativeRetry(RetryType type, double value)
-        {
-            this.type = type;
-            this.value = value;
-        }
-
-        public static SpeculativeRetry fromString(String retry) throws ConfigurationException
-        {
-            String name = retry.toUpperCase();
-            try
-            {
-                if (name.endsWith(RetryType.PERCENTILE.toString()))
-                {
-                    double value = Double.parseDouble(name.substring(0, name.length() - 10));
-                    if (value > 100 || value < 0)
-                        throw new ConfigurationException("PERCENTILE should be between 0 and 100, but was " + value);
-                    return new SpeculativeRetry(RetryType.PERCENTILE, (value / 100));
-                }
-                else if (name.endsWith("MS"))
-                {
-                    double value = Double.parseDouble(name.substring(0, name.length() - 2));
-                    return new SpeculativeRetry(RetryType.CUSTOM, value);
-                }
-                else
-                {
-                    return new SpeculativeRetry(RetryType.valueOf(name), 0);
-                }
-            }
-            catch (IllegalArgumentException e)
-            {
-                // ignore to throw the below exception.
-            }
-            throw new ConfigurationException("invalid speculative_retry type: " + retry);
-        }
-
-        @Override
-        public boolean equals(Object obj)
-        {
-            if (!(obj instanceof SpeculativeRetry))
-                return false;
-            SpeculativeRetry rhs = (SpeculativeRetry) obj;
-            return Objects.equal(type, rhs.type) && Objects.equal(value, rhs.value);
-        }
-
-        @Override
-        public int hashCode()
-        {
-            return Objects.hashCode(type, value);
-        }
-
-        @Override
-        public String toString()
-        {
-            switch (type)
-            {
-            case PERCENTILE:
-                // TODO switch to BigDecimal so round-tripping isn't lossy
-                return (value * 100) + "PERCENTILE";
-            case CUSTOM:
-                return value + "ms";
-            default:
-                return type.toString();
-            }
-        }
-    }
-
     //REQUIRED
     public final UUID cfId;                           // internal id, never exposed to user
     public final String ksName;                       // name of keyspace
@@ -192,21 +87,10 @@ public final class CFMetaData
 
     private final Serializers serializers;
 
-    //OPTIONAL
-    private volatile String comment = "";
-    private volatile double readRepairChance = DEFAULT_READ_REPAIR_CHANCE;
-    private volatile double dcLocalReadRepairChance = DEFAULT_DCLOCAL_READ_REPAIR_CHANCE;
-    private volatile int gcGraceSeconds = DEFAULT_GC_GRACE_SECONDS;
+    // non-final, for now
+    public volatile TableParams params = TableParams.DEFAULT;
+
     private volatile AbstractType<?> keyValidator = BytesType.instance;
-    private volatile int minCompactionThreshold = DEFAULT_MIN_COMPACTION_THRESHOLD;
-    private volatile int maxCompactionThreshold = DEFAULT_MAX_COMPACTION_THRESHOLD;
-    private volatile Double bloomFilterFpChance = null;
-    private volatile CachingOptions caching = DEFAULT_CACHING_STRATEGY;
-    private volatile int minIndexInterval = DEFAULT_MIN_INDEX_INTERVAL;
-    private volatile int maxIndexInterval = DEFAULT_MAX_INDEX_INTERVAL;
-    private volatile int memtableFlushPeriod = 0;
-    private volatile int defaultTimeToLive = DEFAULT_DEFAULT_TIME_TO_LIVE;
-    private volatile SpeculativeRetry speculativeRetry = DEFAULT_SPECULATIVE_RETRY;
     private volatile Map<ByteBuffer, DroppedColumn> droppedColumns = new HashMap<>();
     private volatile Triggers triggers = Triggers.none();
     private volatile MaterializedViews materializedViews = MaterializedViews.none();
@@ -228,31 +112,110 @@ public final class CFMetaData
     // for those tables in practice).
     private volatile ColumnDefinition compactValueColumn;
 
-    public volatile Class<? extends AbstractCompactionStrategy> compactionStrategyClass = DEFAULT_COMPACTION_STRATEGY_CLASS;
-    public volatile Map<String, String> compactionStrategyOptions = new HashMap<>();
-
-    public volatile CompressionParameters compressionParameters = CompressionParameters.noCompression();
-
-    // attribute setters that return the modified CFMetaData instance
-    public CFMetaData comment(String prop) {comment = Strings.nullToEmpty(prop); return this;}
-    public CFMetaData readRepairChance(double prop) {readRepairChance = prop; return this;}
-    public CFMetaData dcLocalReadRepairChance(double prop) {dcLocalReadRepairChance = prop; return this;}
-    public CFMetaData gcGraceSeconds(int prop) {gcGraceSeconds = prop; return this;}
-    public CFMetaData minCompactionThreshold(int prop) {minCompactionThreshold = prop; return this;}
-    public CFMetaData maxCompactionThreshold(int prop) {maxCompactionThreshold = prop; return this;}
-    public CFMetaData compactionStrategyClass(Class<? extends AbstractCompactionStrategy> prop) {compactionStrategyClass = prop; return this;}
-    public CFMetaData compactionStrategyOptions(Map<String, String> prop) {compactionStrategyOptions = prop; return this;}
-    public CFMetaData compressionParameters(CompressionParameters prop) {compressionParameters = prop; return this;}
-    public CFMetaData bloomFilterFpChance(double prop) {bloomFilterFpChance = prop; return this;}
-    public CFMetaData caching(CachingOptions prop) {caching = prop; return this;}
-    public CFMetaData minIndexInterval(int prop) {minIndexInterval = prop; return this;}
-    public CFMetaData maxIndexInterval(int prop) {maxIndexInterval = prop; return this;}
-    public CFMetaData memtableFlushPeriod(int prop) {memtableFlushPeriod = prop; return this;}
-    public CFMetaData defaultTimeToLive(int prop) {defaultTimeToLive = prop; return this;}
-    public CFMetaData speculativeRetry(SpeculativeRetry prop) {speculativeRetry = prop; return this;}
-    public CFMetaData droppedColumns(Map<ByteBuffer, DroppedColumn> cols) {droppedColumns = cols; return this;}
-    public CFMetaData triggers(Triggers prop) {triggers = prop; return this;}
-    public CFMetaData materializedViews(MaterializedViews prop) {materializedViews = prop; return this;}
+    /*
+     * All of these methods will go away once CFMetaData becomes completely immutable.
+     */
+    public CFMetaData params(TableParams params)
+    {
+        this.params = params;
+        return this;
+    }
+
+    public CFMetaData bloomFilterFpChance(double prop)
+    {
+        params = TableParams.builder(params).bloomFilterFpChance(prop).build();
+        return this;
+    }
+
+    public CFMetaData caching(CachingParams prop)
+    {
+        params = TableParams.builder(params).caching(prop).build();
+        return this;
+    }
+
+    public CFMetaData comment(String prop)
+    {
+        params = TableParams.builder(params).comment(prop).build();
+        return this;
+    }
+
+    public CFMetaData compaction(CompactionParams prop)
+    {
+        params = TableParams.builder(params).compaction(prop).build();
+        return this;
+    }
+
+    public CFMetaData compression(CompressionParams prop)
+    {
+        params = TableParams.builder(params).compression(prop).build();
+        return this;
+    }
+
+    public CFMetaData dcLocalReadRepairChance(double prop)
+    {
+        params = TableParams.builder(params).dcLocalReadRepairChance(prop).build();
+        return this;
+    }
+
+    public CFMetaData defaultTimeToLive(int prop)
+    {
+        params = TableParams.builder(params).defaultTimeToLive(prop).build();
+        return this;
+    }
+
+    public CFMetaData gcGraceSeconds(int prop)
+    {
+        params = TableParams.builder(params).gcGraceSeconds(prop).build();
+        return this;
+    }
+
+    public CFMetaData maxIndexInterval(int prop)
+    {
+        params = TableParams.builder(params).maxIndexInterval(prop).build();
+        return this;
+    }
+
+    public CFMetaData memtableFlushPeriod(int prop)
+    {
+        params = TableParams.builder(params).memtableFlushPeriodInMs(prop).build();
+        return this;
+    }
+
+    public CFMetaData minIndexInterval(int prop)
+    {
+        params = TableParams.builder(params).minIndexInterval(prop).build();
+        return this;
+    }
+
+    public CFMetaData readRepairChance(double prop)
+    {
+        params = TableParams.builder(params).readRepairChance(prop).build();
+        return this;
+    }
+
+    public CFMetaData speculativeRetry(SpeculativeRetryParam prop)
+    {
+        params = TableParams.builder(params).speculativeRetry(prop).build();
+        return this;
+    }
+
+    public CFMetaData droppedColumns(Map<ByteBuffer, DroppedColumn> cols)
+    {
+        droppedColumns = cols;
+        return this;
+    }
+
+    public CFMetaData triggers(Triggers prop)
+    {
+        triggers = prop;
+        return this;
+    }
+
+    public CFMetaData materializedViews(MaterializedViews prop)
+    {
+        materializedViews = prop;
+        return this;
+    }
 
     private CFMetaData(String keyspace,
                        String name,
@@ -287,7 +250,7 @@ public final class CFMetaData
         if (isCompound)
             flags.add(Flag.COMPOUND);
         if (isMaterializedView)
-            flags.add(Flag.MATERIALIZEDVIEW);
+            flags.add(Flag.VIEW);
         this.flags = Sets.immutableEnumSet(flags);
 
         isIndex = cfName.contains(".");
@@ -414,15 +377,15 @@ public final class CFMetaData
         CFStatement parsed = (CFStatement)QueryProcessor.parseStatement(cql);
         parsed.prepareKeyspace(keyspace);
         CreateTableStatement statement = (CreateTableStatement) parsed.prepare().statement;
-        CFMetaData.Builder builder = statement.metadataBuilder();
-        builder.withId(generateLegacyCfId(keyspace, statement.columnFamily()));
-        CFMetaData cfm = builder.build();
-        statement.applyPropertiesTo(cfm);
 
-        return cfm.readRepairChance(0)
-                  .dcLocalReadRepairChance(0)
-                  .gcGraceSeconds(0)
-                  .memtableFlushPeriod(3600 * 1000);
+        return statement.metadataBuilder()
+                        .withId(generateLegacyCfId(keyspace, statement.columnFamily()))
+                        .build()
+                        .params(statement.params())
+                        .readRepairChance(0.0)
+                        .dcLocalReadRepairChance(0.0)
+                        .gcGraceSeconds(0)
+                        .memtableFlushPeriod((int) TimeUnit.HOURS.toMillis(1));
     }
 
     /**
@@ -438,22 +401,20 @@ public final class CFMetaData
 
     public CFMetaData reloadIndexMetadataProperties(CFMetaData parent)
     {
+        TableParams.Builder indexParams = TableParams.builder(parent.params);
+
         // Depends on parent's cache setting, turn on its index CF's cache.
         // Row caching is never enabled; see CASSANDRA-5732
-        CachingOptions indexCaching = parent.getCaching().keyCache.isEnabled()
-                                    ? CachingOptions.KEYS_ONLY
-                                    : CachingOptions.NONE;
+        if (parent.params.caching.cacheKeys())
+            indexParams.caching(CachingParams.CACHE_KEYS);
+        else
+            indexParams.caching(CachingParams.CACHE_NOTHING);
 
-        return this.readRepairChance(0.0)
+        indexParams.readRepairChance(0.0)
                    .dcLocalReadRepairChance(0.0)
-                   .gcGraceSeconds(0)
-                   .caching(indexCaching)
-                   .speculativeRetry(parent.speculativeRetry)
-                   .minCompactionThreshold(parent.minCompactionThreshold)
-                   .maxCompactionThreshold(parent.maxCompactionThreshold)
-                   .compactionStrategyClass(parent.compactionStrategyClass)
-                   .compactionStrategyOptions(parent.compactionStrategyOptions)
-                   .compressionParameters(parent.compressionParameters);
+                   .gcGraceSeconds(0);
+
+        return params(indexParams.build());
     }
 
     public CFMetaData copy()
@@ -520,22 +481,7 @@ public final class CFMetaData
     @VisibleForTesting
     public static CFMetaData copyOpts(CFMetaData newCFMD, CFMetaData oldCFMD)
     {
-        return newCFMD.comment(oldCFMD.comment)
-                      .readRepairChance(oldCFMD.readRepairChance)
-                      .dcLocalReadRepairChance(oldCFMD.dcLocalReadRepairChance)
-                      .gcGraceSeconds(oldCFMD.gcGraceSeconds)
-                      .minCompactionThreshold(oldCFMD.minCompactionThreshold)
-                      .maxCompactionThreshold(oldCFMD.maxCompactionThreshold)
-                      .compactionStrategyClass(oldCFMD.compactionStrategyClass)
-                      .compactionStrategyOptions(new HashMap<>(oldCFMD.compactionStrategyOptions))
-                      .compressionParameters(oldCFMD.compressionParameters.copy())
-                      .bloomFilterFpChance(oldCFMD.getBloomFilterFpChance())
-                      .caching(oldCFMD.caching)
-                      .defaultTimeToLive(oldCFMD.defaultTimeToLive)
-                      .minIndexInterval(oldCFMD.minIndexInterval)
-                      .maxIndexInterval(oldCFMD.maxIndexInterval)
-                      .speculativeRetry(oldCFMD.speculativeRetry)
-                      .memtableFlushPeriod(oldCFMD.memtableFlushPeriod)
+        return newCFMD.params(oldCFMD.params)
                       .droppedColumns(new HashMap<>(oldCFMD.droppedColumns))
                       .triggers(oldCFMD.triggers)
                       .materializedViews(oldCFMD.materializedViews);
@@ -555,11 +501,6 @@ public final class CFMetaData
         return cfName + Directories.SECONDARY_INDEX_NAME_SEPARATOR + (info.getIndexName() == null ? ByteBufferUtil.bytesToHex(info.name.bytes) : info.getIndexName());
     }
 
-    public String getComment()
-    {
-        return comment;
-    }
-
     /**
      * The '.' char is the only way to identify if the CFMetadata is for a secondary index
      */
@@ -595,23 +536,13 @@ public final class CFMetaData
         return isIndex ? cfName.substring(0, cfName.indexOf('.')) : null;
     }
 
-    public double getReadRepairChance()
-    {
-        return readRepairChance;
-    }
-
-    public double getDcLocalReadRepairChance()
-    {
-        return dcLocalReadRepairChance;
-    }
-
     public ReadRepairDecision newReadRepairDecision()
     {
         double chance = ThreadLocalRandom.current().nextDouble();
-        if (getReadRepairChance() > chance)
+        if (params.readRepairChance > chance)
             return ReadRepairDecision.GLOBAL;
 
-        if (getDcLocalReadRepairChance() > chance)
+        if (params.dcLocalReadRepairChance > chance)
             return ReadRepairDecision.DC_LOCAL;
 
         return ReadRepairDecision.NONE;
@@ -624,31 +555,11 @@ public final class CFMetaData
              : UTF8Type.instance;
     }
 
-    public int getGcGraceSeconds()
-    {
-        return gcGraceSeconds;
-    }
-
     public AbstractType<?> getKeyValidator()
     {
         return keyValidator;
     }
 
-    public int getMinCompactionThreshold()
-    {
-        return minCompactionThreshold;
-    }
-
-    public int getMaxCompactionThreshold()
-    {
-        return maxCompactionThreshold;
-    }
-
-    public CompressionParameters compressionParameters()
-    {
-        return compressionParameters;
-    }
-
     public Collection<ColumnDefinition> allColumns()
     {
         return columnMetadata.values();
@@ -728,44 +639,6 @@ public final class CFMetaData
         return CompositeType.build(values);
     }
 
-    public double getBloomFilterFpChance()
-    {
-        // we disallow bFFPC==null starting in 1.2.1 but tolerated it before that
-        return (bloomFilterFpChance == null || bloomFilterFpChance == 0)
-               ? compactionStrategyClass == LeveledCompactionStrategy.class ? 0.1 : 0.01
-               : bloomFilterFpChance;
-    }
-
-    public CachingOptions getCaching()
-    {
-        return caching;
-    }
-
-    public int getMinIndexInterval()
-    {
-        return minIndexInterval;
-    }
-
-    public int getMaxIndexInterval()
-    {
-        return maxIndexInterval;
-    }
-
-    public SpeculativeRetry getSpeculativeRetry()
-    {
-        return speculativeRetry;
-    }
-
-    public int getMemtableFlushPeriod()
-    {
-        return memtableFlushPeriod;
-    }
-
-    public int getDefaultTimeToLive()
-    {
-        return defaultTimeToLive;
-    }
-
     public Map<ByteBuffer, DroppedColumn> getDroppedColumns()
     {
         return droppedColumns;
@@ -803,25 +676,10 @@ public final class CFMetaData
             && Objects.equal(flags, other.flags)
             && Objects.equal(ksName, other.ksName)
             && Objects.equal(cfName, other.cfName)
+            && Objects.equal(params, other.params)
             && Objects.equal(comparator, other.comparator)
-            && Objects.equal(comment, other.comment)
-            && Objects.equal(readRepairChance, other.readRepairChance)
-            && Objects.equal(dcLocalReadRepairChance, other.dcLocalReadRepairChance)
-            && Objects.equal(gcGraceSeconds, other.gcGraceSeconds)
             && Objects.equal(keyValidator, other.keyValidator)
-            && Objects.equal(minCompactionThreshold, other.minCompactionThreshold)
-            && Objects.equal(maxCompactionThreshold, other.maxCompactionThreshold)
             && Objects.equal(columnMetadata, other.columnMetadata)
-            && Objects.equal(compactionStrategyClass, other.compactionStrategyClass)
-            && Objects.equal(compactionStrategyOptions, other.compactionStrategyOptions)
-            && Objects.equal(compressionParameters, other.compressionParameters)
-            && Objects.equal(getBloomFilterFpChance(), other.getBloomFilterFpChance())
-            && Objects.equal(memtableFlushPeriod, other.memtableFlushPeriod)
-            && Objects.equal(caching, other.caching)
-            && Objects.equal(defaultTimeToLive, other.defaultTimeToLive)
-            && Objects.equal(minIndexInterval, other.minIndexInterval)
-            && Objects.equal(maxIndexInterval, other.maxIndexInterval)
-            && Objects.equal(speculativeRetry, other.speculativeRetry)
             && Objects.equal(droppedColumns, other.droppedColumns)
             && Objects.equal(triggers, other.triggers)
             && Objects.equal(materializedViews, other.materializedViews);
@@ -836,24 +694,9 @@ public final class CFMetaData
             .append(cfName)
             .append(flags)
             .append(comparator)
-            .append(comment)
-            .append(readRepairChance)
-            .append(dcLocalReadRepairChance)
-            .append(gcGraceSeconds)
+            .append(params)
             .append(keyValidator)
-            .append(minCompactionThreshold)
-            .append(maxCompactionThreshold)
             .append(columnMetadata)
-            .append(compactionStrategyClass)
-            .append(compactionStrategyOptions)
-            .append(compressionParameters)
-            .append(getBloomFilterFpChance())
-            .append(memtableFlushPeriod)
-            .append(caching)
-            .append(defaultTimeToLive)
-            .append(minIndexInterval)
-            .append(maxIndexInterval)
-            .append(speculativeRetry)
             .append(droppedColumns)
             .append(triggers)
             .append(materializedViews)
@@ -893,30 +736,13 @@ public final class CFMetaData
         // compaction thresholds are checked by ThriftValidation. We shouldn't be doing
         // validation on the apply path; it's too late for that.
 
-        comment = Strings.nullToEmpty(cfm.comment);
-        readRepairChance = cfm.readRepairChance;
-        dcLocalReadRepairChance = cfm.dcLocalReadRepairChance;
-        gcGraceSeconds = cfm.gcGraceSeconds;
-        keyValidator = cfm.keyValidator;
-        minCompactionThreshold = cfm.minCompactionThreshold;
-        maxCompactionThreshold = cfm.maxCompactionThreshold;
+        params = cfm.params;
 
-        bloomFilterFpChance = cfm.getBloomFilterFpChance();
-        caching = cfm.caching;
-        minIndexInterval = cfm.minIndexInterval;
-        maxIndexInterval = cfm.maxIndexInterval;
-        memtableFlushPeriod = cfm.memtableFlushPeriod;
-        defaultTimeToLive = cfm.defaultTimeToLive;
-        speculativeRetry = cfm.speculativeRetry;
+        keyValidator = cfm.keyValidator;
 
         if (!cfm.droppedColumns.isEmpty())
             droppedColumns = cfm.droppedColumns;
 
-        compactionStrategyClass = cfm.compactionStrategyClass;
-        compactionStrategyOptions = cfm.compactionStrategyOptions;
-
-        compressionParameters = cfm.compressionParameters;
-
         triggers = cfm.triggers;
         materializedViews = cfm.materializedViews;
 
@@ -945,37 +771,6 @@ public final class CFMetaData
             throw new ConfigurationException(String.format("Column family comparators do not match or are not compatible (found %s; expected %s).", cfm.comparator.getClass().getSimpleName(), comparator.getClass().getSimpleName()));
     }
 
-    public static void validateCompactionOptions(Class<? extends AbstractCompactionStrategy> strategyClass, Map<String, String> options) throws ConfigurationException
-    {
-        try
-        {
-            if (options == null)
-                return;
-
-            Map<?,?> unknownOptions = (Map) strategyClass.getMethod("validateOptions", Map.class).invoke(null, options);
-            if (!unknownOptions.isEmpty())
-                throw new ConfigurationException(String.format("Properties specified %s are not understood by %s", unknownOptions.keySet(), strategyClass.getSimpleName()));
-        }
-        catch (NoSuchMethodException e)
-        {
-            logger.warn("Compaction Strategy {} does not have a static validateOptions method. Validation ignored", strategyClass.getName());
-        }
-        catch (InvocationTargetException e)
-        {
-            if (e.getTargetException() instanceof ConfigurationException)
-                throw (ConfigurationException) e.getTargetException();
-            throw new ConfigurationException("Failed to validate compaction options: " + options);
-        }
-        catch (ConfigurationException e)
-        {
-            throw e;
-        }
-        catch (Exception e)
-        {
-            throw new ConfigurationException("Failed to validate compaction options: " + options);
-        }
-    }
-
     public static Class<? extends AbstractCompactionStrategy> createCompactionStrategy(String className) throws ConfigurationException
     {
         className = className.contains(".") ? className : "org.apache.cassandra.db.compaction." + className;
@@ -991,8 +786,8 @@ public final class CFMetaData
         try
         {
             Constructor<? extends AbstractCompactionStrategy> constructor =
-                compactionStrategyClass.getConstructor(ColumnFamilyStore.class, Map.class);
-            return constructor.newInstance(cfs, compactionStrategyOptions);
+                params.compaction.klass().getConstructor(ColumnFamilyStore.class, Map.class);
+            return constructor.newInstance(cfs, params.compaction.options());
         }
         catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException e)
         {
@@ -1094,6 +889,8 @@ public final class CFMetaData
         if (!isNameValid(cfName))
             throw new ConfigurationException(String.format("ColumnFamily name must not be empty, more than %s characters long, or contain non-alphanumeric-underscore characters (got \"%s\")", Schema.NAME_LENGTH, cfName));
 
+        params.validate();
+
         for (int i = 0; i < comparator.size(); i++)
         {
             if (comparator.subtype(i) instanceof CounterColumnType)
@@ -1147,13 +944,6 @@ public final class CFMetaData
             }
         }
 
-        validateCompactionThresholds();
-
-        if (bloomFilterFpChance != null && bloomFilterFpChance == 0)
-            throw new ConfigurationException("Zero false positives is impossible; bloom filter false positive chance bffpc must be 0 < bffpc <= 1");
-
-        validateIndexIntervalThresholds();
-
         return this;
     }
 
@@ -1167,32 +957,6 @@ public final class CFMetaData
         return indexNames;
     }
 
-    private void validateCompactionThresholds() throws ConfigurationException
-    {
-        if (maxCompactionThreshold == 0)
-        {
-            logger.warn("Disabling compaction by setting max or min compaction has been deprecated, " +
-                    "set the compaction strategy option 'enabled' to 'false' instead");
-            return;
-        }
-
-        if (minCompactionThreshold <= 1)
-            throw new ConfigurationException(String.format("Min compaction threshold cannot be less than 2 (got %d).", minCompactionThreshold));
-
-        if (minCompactionThreshold > maxCompactionThreshold)
-            throw new ConfigurationException(String.format("Min compaction threshold (got %d) cannot be greater than max compaction threshold (got %d)",
-                                                            minCompactionThreshold, maxCompactionThreshold));
-    }
-
-    private void validateIndexIntervalThresholds() throws ConfigurationException
-    {
-        if (minIndexInterval <= 0)
-            throw new ConfigurationException(String.format("Min index interval must be greater than 0 (got %d).", minIndexInterval));
-        if (maxIndexInterval < minIndexInterval)
-            throw new ConfigurationException(String.format("Max index interval (%d) must be greater than the min index " +
-                                                           "interval (%d).", maxIndexInterval, minIndexInterval));
-    }
-
     // The comparator to validate the definition name with thrift.
     public AbstractType<?> thriftColumnNameType()
     {
@@ -1207,13 +971,6 @@ public final class CFMetaData
         return clusteringColumns.get(0).type;
     }
 
-    public CFMetaData addAllColumnDefinitions(Collection<ColumnDefinition> defs)
-    {
-        for (ColumnDefinition def : defs)
-            addOrReplaceColumnDefinition(def);
-        return this;
-    }
-
     public CFMetaData addColumnDefinition(ColumnDefinition def) throws ConfigurationException
     {
         if (columnMetadata.containsKey(def.name.bytes))
@@ -1415,28 +1172,13 @@ public final class CFMetaData
             .append("ksName", ksName)
             .append("cfName", cfName)
             .append("flags", flags)
+            .append("params", params)
             .append("comparator", comparator)
             .append("partitionColumns", partitionColumns)
             .append("partitionKeyColumns", partitionKeyColumns)
             .append("clusteringColumns", clusteringColumns)
-            .append("comment", comment)
-            .append("readRepairChance", readRepairChance)
-            .append("dcLocalReadRepairChance", dcLocalReadRepairChance)
-            .append("gcGraceSeconds", gcGraceSeconds)
             .append("keyValidator", keyValidator)
-            .append("minCompactionThreshold", minCompactionThreshold)
-            .append("maxCompactionThreshold", maxCompactionThreshold)
             .append("columnMetadata", columnMetadata.values())
-            .append("compactionStrategyClass", compactionStrategyClass)
-            .append("compactionStrategyOptions", compactionStrategyOptions)
-            .append("compressionParameters", compressionParameters.asMap())
-            .append("bloomFilterFpChance", getBloomFilterFpChance())
-            .append("memtableFlushPeriod", memtableFlushPeriod)
-            .append("caching", caching)
-            .append("defaultTimeToLive", defaultTimeToLive)
-            .append("minIndexInterval", minIndexInterval)
-            .append("maxIndexInterval", maxIndexInterval)
-            .append("speculativeRetry", speculativeRetry)
             .append("droppedColumns", droppedColumns)
             .append("triggers", triggers)
             .append("materializedViews", materializedViews)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/cql3/Cql.g
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Cql.g b/src/java/org/apache/cassandra/cql3/Cql.g
index 0eadaee..a34eebb 100644
--- a/src/java/org/apache/cassandra/cql3/Cql.g
+++ b/src/java/org/apache/cassandra/cql3/Cql.g
@@ -807,7 +807,7 @@ alterKeyspaceStatement returns [AlterKeyspaceStatement expr]
 alterTableStatement returns [AlterTableStatement expr]
     @init {
         AlterTableStatement.Type type = null;
-        CFPropDefs props = new CFPropDefs();
+        TableAttributes attrs = new TableAttributes();
         Map<ColumnIdentifier.Raw, ColumnIdentifier.Raw> renames = new HashMap<ColumnIdentifier.Raw, ColumnIdentifier.Raw>();
         boolean isStatic = false;
     }
@@ -815,24 +815,24 @@ alterTableStatement returns [AlterTableStatement expr]
           ( K_ALTER id=cident K_TYPE v=comparatorType { type = AlterTableStatement.Type.ALTER; }
           | K_ADD   id=cident v=comparatorType ({ isStatic=true; } K_STATIC)? { type = AlterTableStatement.Type.ADD; }
           | K_DROP  id=cident                         { type = AlterTableStatement.Type.DROP; }
-          | K_WITH  properties[props]                 { type = AlterTableStatement.Type.OPTS; }
+          | K_WITH  properties[attrs]                 { type = AlterTableStatement.Type.OPTS; }
           | K_RENAME                                  { type = AlterTableStatement.Type.RENAME; }
                id1=cident K_TO toId1=cident { renames.put(id1, toId1); }
                ( K_AND idn=cident K_TO toIdn=cident { renames.put(idn, toIdn); } )*
           )
     {
-        $expr = new AlterTableStatement(cf, type, id, v, props, renames, isStatic);
+        $expr = new AlterTableStatement(cf, type, id, v, attrs, renames, isStatic);
     }
     ;
 
 alterMaterializedViewStatement returns [AlterMaterializedViewStatement expr]
     @init {
-        CFPropDefs props = new CFPropDefs();
+        TableAttributes attrs = new TableAttributes();
     }
     : K_ALTER K_MATERIALIZED K_VIEW name=columnFamilyName
-          K_WITH properties[props]
+          K_WITH properties[attrs]
     {
-        $expr = new AlterMaterializedViewStatement(name, props);
+        $expr = new AlterMaterializedViewStatement(name, attrs);
     }
     ;
     

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/cql3/statements/AlterMaterializedViewStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterMaterializedViewStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterMaterializedViewStatement.java
index d2b1d13..d0116fb 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterMaterializedViewStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterMaterializedViewStatement.java
@@ -15,12 +15,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.cassandra.cql3.statements;
 
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.CFName;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.RequestValidationException;
@@ -33,13 +31,12 @@ import static org.apache.cassandra.thrift.ThriftValidation.validateColumnFamily;
 
 public class AlterMaterializedViewStatement extends SchemaAlteringStatement
 {
-    private final CFPropDefs cfProps;
+    private final TableAttributes attrs;
 
-    public AlterMaterializedViewStatement(CFName name,
-                                          CFPropDefs cfProps)
+    public AlterMaterializedViewStatement(CFName name, TableAttributes attrs)
     {
         super(name);
-        this.cfProps = cfProps;
+        this.attrs = attrs;
     }
 
     public void checkAccess(ClientState state) throws UnauthorizedException, InvalidRequestException
@@ -60,13 +57,12 @@ public class AlterMaterializedViewStatement extends SchemaAlteringStatement
 
         CFMetaData cfm = meta.copy();
 
-
-        if (cfProps == null)
+        if (attrs == null)
             throw new InvalidRequestException("ALTER MATERIALIZED VIEW WITH invoked, but no parameters found");
 
-        cfProps.validate();
+        attrs.validate();
+        cfm.params(attrs.asAlteredTableParams(cfm.params));
 
-        cfProps.applyToCFMetadata(cfm);
         MigrationManager.announceColumnFamilyUpdate(cfm, false, isLocalOnly);
         return true;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
index 1495f2d..b7e09d9 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.schema.TableParams;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.transport.Event;
@@ -37,7 +38,7 @@ import static org.apache.cassandra.thrift.ThriftValidation.validateColumnFamily;
 
 public class AlterTableStatement extends SchemaAlteringStatement
 {
-    public static enum Type
+    public enum Type
     {
         ADD, ALTER, DROP, OPTS, RENAME
     }
@@ -45,7 +46,7 @@ public class AlterTableStatement extends SchemaAlteringStatement
     public final Type oType;
     public final CQL3Type.Raw validator;
     public final ColumnIdentifier.Raw rawColumnName;
-    private final CFPropDefs cfProps;
+    private final TableAttributes attrs;
     private final Map<ColumnIdentifier.Raw, ColumnIdentifier.Raw> renames;
     private final boolean isStatic; // Only for ALTER ADD
 
@@ -53,7 +54,7 @@ public class AlterTableStatement extends SchemaAlteringStatement
                                Type type,
                                ColumnIdentifier.Raw columnName,
                                CQL3Type.Raw validator,
-                               CFPropDefs cfProps,
+                               TableAttributes attrs,
                                Map<ColumnIdentifier.Raw, ColumnIdentifier.Raw> renames,
                                boolean isStatic)
     {
@@ -61,7 +62,7 @@ public class AlterTableStatement extends SchemaAlteringStatement
         this.oType = type;
         this.rawColumnName = columnName;
         this.validator = validator; // used only for ADD/ALTER commands
-        this.cfProps = cfProps;
+        this.attrs = attrs;
         this.renames = renames;
         this.isStatic = isStatic;
     }
@@ -284,15 +285,17 @@ public class AlterTableStatement extends SchemaAlteringStatement
                                                                     builder.toString()));
                 break;
             case OPTS:
-                if (cfProps == null)
+                if (attrs == null)
                     throw new InvalidRequestException("ALTER TABLE WITH invoked, but no parameters found");
+                attrs.validate();
 
-                cfProps.validate();
+                TableParams params = attrs.asAlteredTableParams(cfm.params);
 
-                if (meta.isCounter() && cfProps.getDefaultTimeToLive() > 0)
+                if (meta.isCounter() && params.defaultTimeToLive > 0)
                     throw new InvalidRequestException("Cannot set default_time_to_live on a table with counters");
 
-                cfProps.applyToCFMetadata(cfm);
+                cfm.params(params);
+
                 break;
             case RENAME:
                 for (Map.Entry<ColumnIdentifier.Raw, ColumnIdentifier.Raw> entry : renames.entrySet())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java b/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java
deleted file mode 100644
index 56db85a..0000000
--- a/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.cql3.statements;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.cassandra.cache.CachingOptions;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.CFMetaData.SpeculativeRetry;
-import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.io.compress.CompressionParameters;
-
-public class CFPropDefs extends PropertyDefinitions
-{
-    public static final String KW_COMMENT = "comment";
-    public static final String KW_READREPAIRCHANCE = "read_repair_chance";
-    public static final String KW_DCLOCALREADREPAIRCHANCE = "dclocal_read_repair_chance";
-    public static final String KW_GCGRACESECONDS = "gc_grace_seconds";
-    public static final String KW_MINCOMPACTIONTHRESHOLD = "min_threshold";
-    public static final String KW_MAXCOMPACTIONTHRESHOLD = "max_threshold";
-    public static final String KW_CACHING = "caching";
-    public static final String KW_DEFAULT_TIME_TO_LIVE = "default_time_to_live";
-    public static final String KW_MIN_INDEX_INTERVAL = "min_index_interval";
-    public static final String KW_MAX_INDEX_INTERVAL = "max_index_interval";
-    public static final String KW_SPECULATIVE_RETRY = "speculative_retry";
-    public static final String KW_BF_FP_CHANCE = "bloom_filter_fp_chance";
-    public static final String KW_MEMTABLE_FLUSH_PERIOD = "memtable_flush_period_in_ms";
-
-    public static final String KW_COMPACTION = "compaction";
-    public static final String KW_COMPRESSION = "compression";
-
-    public static final String COMPACTION_STRATEGY_CLASS_KEY = "class";
-
-    public static final Set<String> keywords = new HashSet<>();
-    public static final Set<String> obsoleteKeywords = new HashSet<>();
-
-    static
-    {
-        keywords.add(KW_COMMENT);
-        keywords.add(KW_READREPAIRCHANCE);
-        keywords.add(KW_DCLOCALREADREPAIRCHANCE);
-        keywords.add(KW_GCGRACESECONDS);
-        keywords.add(KW_CACHING);
-        keywords.add(KW_DEFAULT_TIME_TO_LIVE);
-        keywords.add(KW_MIN_INDEX_INTERVAL);
-        keywords.add(KW_MAX_INDEX_INTERVAL);
-        keywords.add(KW_SPECULATIVE_RETRY);
-        keywords.add(KW_BF_FP_CHANCE);
-        keywords.add(KW_COMPACTION);
-        keywords.add(KW_COMPRESSION);
-        keywords.add(KW_MEMTABLE_FLUSH_PERIOD);
-
-        obsoleteKeywords.add("index_interval");
-        obsoleteKeywords.add("replicate_on_write");
-        obsoleteKeywords.add("populate_io_cache_on_flush");
-    }
-
-    private Class<? extends AbstractCompactionStrategy> compactionStrategyClass = null;
-
-    public void validate() throws ConfigurationException, SyntaxException
-    {
-        // Skip validation if the comapction strategy class is already set as it means we've alreayd
-        // prepared (and redoing it would set strategyClass back to null, which we don't want)
-        if (compactionStrategyClass != null)
-            return;
-
-        validate(keywords, obsoleteKeywords);
-
-        Map<String, String> compactionOptions = getCompactionOptions();
-        if (!compactionOptions.isEmpty())
-        {
-            String strategy = compactionOptions.get(COMPACTION_STRATEGY_CLASS_KEY);
-            if (strategy == null)
-                throw new ConfigurationException("Missing sub-option '" + COMPACTION_STRATEGY_CLASS_KEY + "' for the '" + KW_COMPACTION + "' option.");
-
-            compactionStrategyClass = CFMetaData.createCompactionStrategy(strategy);
-            compactionOptions.remove(COMPACTION_STRATEGY_CLASS_KEY);
-
-            CFMetaData.validateCompactionOptions(compactionStrategyClass, compactionOptions);
-        }
-
-        Map<String, String> compressionOptions = getCompressionOptions();
-        if (!compressionOptions.isEmpty())
-        {
-            if (CompressionParameters.isEnabled(compressionOptions)
-                && !CompressionParameters.containsSstableCompressionClass(compressionOptions))
-            {
-                throw new ConfigurationException("Missing sub-option '" + CompressionParameters.CLASS + "' for the '" + KW_COMPRESSION + "' option.");
-            }
-
-            CompressionParameters compressionParameters = CompressionParameters.fromMap(compressionOptions);
-            compressionParameters.validate();
-        }
-
-        validateMinimumInt(KW_DEFAULT_TIME_TO_LIVE, 0, CFMetaData.DEFAULT_DEFAULT_TIME_TO_LIVE);
-
-        Integer minIndexInterval = getInt(KW_MIN_INDEX_INTERVAL, null);
-        Integer maxIndexInterval = getInt(KW_MAX_INDEX_INTERVAL, null);
-        if (minIndexInterval != null && minIndexInterval < 1)
-            throw new ConfigurationException(KW_MIN_INDEX_INTERVAL + " must be greater than 0, but was " + minIndexInterval);
-        if (maxIndexInterval != null && minIndexInterval != null && maxIndexInterval < minIndexInterval)
-            throw new ConfigurationException(KW_MAX_INDEX_INTERVAL + " must be greater than " + KW_MIN_INDEX_INTERVAL + ", but was " + maxIndexInterval);
-
-        SpeculativeRetry.fromString(getString(KW_SPECULATIVE_RETRY, SpeculativeRetry.RetryType.NONE.name()));
-    }
-
-    public Class<? extends AbstractCompactionStrategy> getCompactionStrategy()
-    {
-        return compactionStrategyClass;
-    }
-
-    public Map<String, String> getCompactionOptions() throws SyntaxException
-    {
-        Map<String, String> compactionOptions = getMap(KW_COMPACTION);
-        if (compactionOptions == null)
-            return Collections.emptyMap();
-        return compactionOptions;
-    }
-
-    public Map<String, String> getCompressionOptions() throws SyntaxException
-    {
-        Map<String, String> compressionOptions = getMap(KW_COMPRESSION);
-        if (compressionOptions == null)
-            return Collections.emptyMap();
-        return compressionOptions;
-    }
-    public CachingOptions getCachingOptions() throws SyntaxException, ConfigurationException
-    {
-        CachingOptions options = null;
-        Object val = properties.get(KW_CACHING);
-        if (val == null)
-            return null;
-        else if (val instanceof Map)
-            options = CachingOptions.fromMap(getMap(KW_CACHING));
-        else if (val instanceof String) // legacy syntax
-        {
-            options = CachingOptions.fromString(getSimple(KW_CACHING));
-            logger.warn("Setting caching options with deprecated syntax. {}", val);
-        }
-        return options;
-    }
-
-    public Integer getDefaultTimeToLive() throws SyntaxException
-    {
-        return getInt(KW_DEFAULT_TIME_TO_LIVE, 0);
-    }
-
-    public void applyToCFMetadata(CFMetaData cfm) throws ConfigurationException, SyntaxException
-    {
-        if (hasProperty(KW_COMMENT))
-            cfm.comment(getString(KW_COMMENT, ""));
-
-        cfm.readRepairChance(getDouble(KW_READREPAIRCHANCE, cfm.getReadRepairChance()));
-        cfm.dcLocalReadRepairChance(getDouble(KW_DCLOCALREADREPAIRCHANCE, cfm.getDcLocalReadRepairChance()));
-        cfm.gcGraceSeconds(getInt(KW_GCGRACESECONDS, cfm.getGcGraceSeconds()));
-        int minCompactionThreshold = toInt(KW_MINCOMPACTIONTHRESHOLD, getCompactionOptions().get(KW_MINCOMPACTIONTHRESHOLD), cfm.getMinCompactionThreshold());
-        int maxCompactionThreshold = toInt(KW_MAXCOMPACTIONTHRESHOLD, getCompactionOptions().get(KW_MAXCOMPACTIONTHRESHOLD), cfm.getMaxCompactionThreshold());
-        if (minCompactionThreshold <= 0 || maxCompactionThreshold <= 0)
-            throw new ConfigurationException("Disabling compaction by setting compaction thresholds to 0 has been deprecated, set the compaction option 'enabled' to false instead.");
-        cfm.minCompactionThreshold(minCompactionThreshold);
-        cfm.maxCompactionThreshold(maxCompactionThreshold);
-        cfm.defaultTimeToLive(getInt(KW_DEFAULT_TIME_TO_LIVE, cfm.getDefaultTimeToLive()));
-        cfm.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(getString(KW_SPECULATIVE_RETRY, cfm.getSpeculativeRetry().toString())));
-        cfm.memtableFlushPeriod(getInt(KW_MEMTABLE_FLUSH_PERIOD, cfm.getMemtableFlushPeriod()));
-        cfm.minIndexInterval(getInt(KW_MIN_INDEX_INTERVAL, cfm.getMinIndexInterval()));
-        cfm.maxIndexInterval(getInt(KW_MAX_INDEX_INTERVAL, cfm.getMaxIndexInterval()));
-
-        if (compactionStrategyClass != null)
-        {
-            cfm.compactionStrategyClass(compactionStrategyClass);
-            cfm.compactionStrategyOptions(new HashMap<>(getCompactionOptions()));
-        }
-
-        cfm.bloomFilterFpChance(getDouble(KW_BF_FP_CHANCE, cfm.getBloomFilterFpChance()));
-
-        if (!getCompressionOptions().isEmpty())
-        {
-            CompressionParameters compressionParameters = CompressionParameters.fromMap(getCompressionOptions());
-            compressionParameters.validate();
-            cfm.compressionParameters(compressionParameters);
-        }
-        CachingOptions cachingOptions = getCachingOptions();
-        if (cachingOptions != null)
-            cfm.caching(cachingOptions);
-    }
-
-    @Override
-    public String toString()
-    {
-        return String.format("CFPropDefs(%s)", properties);
-    }
-
-    private void validateMinimumInt(String field, int minimumValue, int defaultValue) throws SyntaxException, ConfigurationException
-    {
-        Integer val = getInt(field, null);
-        if (val != null && val < minimumValue)
-            throw new ConfigurationException(String.format("%s cannot be smaller than %d, (default %d), but was %d",
-                                                            field, minimumValue, defaultValue, val));
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/cql3/statements/CFProperties.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CFProperties.java b/src/java/org/apache/cassandra/cql3/statements/CFProperties.java
index 50ec360..92dd994 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CFProperties.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CFProperties.java
@@ -15,10 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.cassandra.cql3.statements;
 
-import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
@@ -28,7 +26,7 @@ import org.apache.cassandra.db.marshal.ReversedType;
 
 public class CFProperties
 {
-    public final CFPropDefs properties = new CFPropDefs();
+    public final TableAttributes properties = new TableAttributes();
     final Map<ColumnIdentifier, Boolean> definedOrdering = new LinkedHashMap<>(); // Insertion ordering is important
     boolean useCompactStorage = false;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
index 6e28f8c..aa58fb4 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
@@ -30,54 +30,40 @@ import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.io.compress.CompressionParameters;
+import org.apache.cassandra.schema.TableParams;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.transport.Event;
-import org.apache.cassandra.utils.ByteBufferUtil;
 
-/** A <code>CREATE TABLE</code> parsed from a CQL query statement. */
+/** A {@code CREATE TABLE} parsed from a CQL query statement. */
 public class CreateTableStatement extends SchemaAlteringStatement
 {
     private List<AbstractType<?>> keyTypes;
     private List<AbstractType<?>> clusteringTypes;
 
-    private Map<ByteBuffer, CollectionType> collections = new HashMap<>();
+    private final Map<ByteBuffer, CollectionType> collections = new HashMap<>();
 
     private final List<ColumnIdentifier> keyAliases = new ArrayList<>();
     private final List<ColumnIdentifier> columnAliases = new ArrayList<>();
-    private ByteBuffer valueAlias;
 
     private boolean isDense;
     private boolean isCompound;
     private boolean hasCounters;
 
     // use a TreeMap to preserve ordering across JDK versions (see CASSANDRA-9492)
-    private final Map<ColumnIdentifier, AbstractType> columns = new TreeMap<>(new Comparator<ColumnIdentifier>()
-    {
-        public int compare(ColumnIdentifier o1, ColumnIdentifier o2)
-        {
-            return o1.bytes.compareTo(o2.bytes);
-        }
-    });
+    private final Map<ColumnIdentifier, AbstractType> columns = new TreeMap<>((o1, o2) -> o1.bytes.compareTo(o2.bytes));
+
     private final Set<ColumnIdentifier> staticColumns;
-    private final CFPropDefs properties;
+    private final TableParams params;
     private final boolean ifNotExists;
 
-    public CreateTableStatement(CFName name, CFPropDefs properties, boolean ifNotExists, Set<ColumnIdentifier> staticColumns)
+    public CreateTableStatement(CFName name, TableParams params, boolean ifNotExists, Set<ColumnIdentifier> staticColumns)
     {
         super(name);
-        this.properties = properties;
+        this.params = params;
         this.ifNotExists = ifNotExists;
         this.staticColumns = staticColumns;
-
-        if (!this.properties.hasProperty(CFPropDefs.KW_COMPRESSION) && CFMetaData.DEFAULT_COMPRESSOR != null)
-            this.properties.addProperty(CFPropDefs.KW_COMPRESSION,
-                                        new HashMap<String, String>()
-                                        {{
-                                            put(CompressionParameters.CLASS, CFMetaData.DEFAULT_COMPRESSOR);
-                                        }});
     }
 
     public void checkAccess(ClientState state) throws UnauthorizedException, InvalidRequestException
@@ -168,21 +154,19 @@ public class CreateTableStatement extends SchemaAlteringStatement
 
     /**
      * Returns a CFMetaData instance based on the parameters parsed from this
-     * <code>CREATE</code> statement, or defaults where applicable.
+     * {@code CREATE} statement, or defaults where applicable.
      *
      * @return a CFMetaData instance corresponding to the values parsed from this statement
      * @throws InvalidRequestException on failure to validate parsed parameters
      */
-    public CFMetaData getCFMetaData() throws RequestValidationException
+    public CFMetaData getCFMetaData()
     {
-        CFMetaData newCFMD = metadataBuilder().build();
-        applyPropertiesTo(newCFMD);
-        return newCFMD;
+        return metadataBuilder().build().params(params);
     }
 
-    public void applyPropertiesTo(CFMetaData cfmd) throws RequestValidationException
+    public TableParams params()
     {
-        properties.applyToCFMetadata(cfmd);
+        return params;
     }
 
     public static class RawStatement extends CFStatement
@@ -190,9 +174,9 @@ public class CreateTableStatement extends SchemaAlteringStatement
         private final Map<ColumnIdentifier, CQL3Type.Raw> definitions = new HashMap<>();
         public final CFProperties properties = new CFProperties();
 
-        private final List<List<ColumnIdentifier>> keyAliases = new ArrayList<List<ColumnIdentifier>>();
-        private final List<ColumnIdentifier> columnAliases = new ArrayList<ColumnIdentifier>();
-        private final Set<ColumnIdentifier> staticColumns = new HashSet<ColumnIdentifier>();
+        private final List<List<ColumnIdentifier>> keyAliases = new ArrayList<>();
+        private final List<ColumnIdentifier> columnAliases = new ArrayList<>();
+        private final Set<ColumnIdentifier> staticColumns = new HashSet<>();
 
         private final Multiset<ColumnIdentifier> definedNames = HashMultiset.create(1);
 
@@ -221,7 +205,9 @@ public class CreateTableStatement extends SchemaAlteringStatement
 
             properties.validate();
 
-            CreateTableStatement stmt = new CreateTableStatement(cfName, properties.properties, ifNotExists, staticColumns);
+            TableParams params = properties.properties.asNewTableParams();
+
+            CreateTableStatement stmt = new CreateTableStatement(cfName, params, ifNotExists, staticColumns);
 
             for (Map.Entry<ColumnIdentifier, CQL3Type.Raw> entry : definitions.entrySet())
             {
@@ -238,11 +224,11 @@ public class CreateTableStatement extends SchemaAlteringStatement
                 throw new InvalidRequestException("No PRIMARY KEY specifed (exactly one required)");
             if (keyAliases.size() > 1)
                 throw new InvalidRequestException("Multiple PRIMARY KEYs specifed (exactly one required)");
-            if (stmt.hasCounters && properties.properties.getDefaultTimeToLive() > 0)
+            if (stmt.hasCounters && params.defaultTimeToLive > 0)
                 throw new InvalidRequestException("Cannot set default_time_to_live on a table with counters");
 
             List<ColumnIdentifier> kAliases = keyAliases.get(0);
-            stmt.keyTypes = new ArrayList<AbstractType<?>>(kAliases.size());
+            stmt.keyTypes = new ArrayList<>(kAliases.size());
             for (ColumnIdentifier alias : kAliases)
             {
                 stmt.keyAliases.add(alias);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/cql3/statements/KeyspaceAttributes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/KeyspaceAttributes.java b/src/java/org/apache/cassandra/cql3/statements/KeyspaceAttributes.java
index d931530..db6b0d6 100644
--- a/src/java/org/apache/cassandra/cql3/statements/KeyspaceAttributes.java
+++ b/src/java/org/apache/cassandra/cql3/statements/KeyspaceAttributes.java
@@ -21,34 +21,42 @@ import java.util.*;
 
 import com.google.common.collect.ImmutableSet;
 
-import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.KeyspaceParams.Option;
-import org.apache.cassandra.schema.KeyspaceParams.Replication;
+import org.apache.cassandra.schema.ReplicationParams;
 
-public class KeyspaceAttributes extends PropertyDefinitions
+public final class KeyspaceAttributes extends PropertyDefinitions
 {
-    private static final Set<String> keywords = ImmutableSet.of(Option.DURABLE_WRITES.toString(), Option.REPLICATION.toString());
-    private static final Set<String> obsoleteKeywords = ImmutableSet.of();
+    private static final Set<String> validKeywords;
+    private static final Set<String> obsoleteKeywords;
 
-    public void validate() throws SyntaxException
+    static
     {
-        validate(keywords, obsoleteKeywords);
+        ImmutableSet.Builder<String> validBuilder = ImmutableSet.builder();
+        for (Option option : Option.values())
+            validBuilder.add(option.toString());
+        validKeywords = validBuilder.build();
+        obsoleteKeywords = ImmutableSet.of();
+    }
+
+    public void validate()
+    {
+        validate(validKeywords, obsoleteKeywords);
     }
 
     public String getReplicationStrategyClass()
     {
-        return getAllReplicationOptions().get(Replication.CLASS);
+        return getAllReplicationOptions().get(ReplicationParams.CLASS);
     }
 
-    public Map<String, String> getReplicationOptions() throws SyntaxException
+    public Map<String, String> getReplicationOptions()
     {
         Map<String, String> replication = new HashMap<>(getAllReplicationOptions());
-        replication.remove(Replication.CLASS);
+        replication.remove(ReplicationParams.CLASS);
         return replication;
     }
 
-    public Map<String, String> getAllReplicationOptions() throws SyntaxException
+    public Map<String, String> getAllReplicationOptions()
     {
         Map<String, String> replication = getMap(Option.REPLICATION.toString());
         return replication == null
@@ -65,9 +73,9 @@ public class KeyspaceAttributes extends PropertyDefinitions
     public KeyspaceParams asAlteredKeyspaceParams(KeyspaceParams previous)
     {
         boolean durableWrites = getBoolean(Option.DURABLE_WRITES.toString(), previous.durableWrites);
-        Replication replication = getReplicationStrategyClass() == null
-                                ? previous.replication
-                                : Replication.fromMap(getAllReplicationOptions());
+        ReplicationParams replication = getReplicationStrategyClass() == null
+                                      ? previous.replication
+                                      : ReplicationParams.fromMap(getAllReplicationOptions());
         return new KeyspaceParams(durableWrites, replication);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/cql3/statements/TableAttributes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/TableAttributes.java b/src/java/org/apache/cassandra/cql3/statements/TableAttributes.java
new file mode 100644
index 0000000..ed64f0d
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/TableAttributes.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements;
+
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.cassandra.exceptions.SyntaxException;
+import org.apache.cassandra.schema.*;
+import org.apache.cassandra.schema.TableParams.Option;
+
+import static java.lang.String.format;
+
+public final class TableAttributes extends PropertyDefinitions
+{
+    private static final Set<String> validKeywords;
+    private static final Set<String> obsoleteKeywords;
+
+    static
+    {
+        ImmutableSet.Builder<String> validBuilder = ImmutableSet.builder();
+        for (Option option : Option.values())
+            validBuilder.add(option.toString());
+        validKeywords = validBuilder.build();
+        obsoleteKeywords = ImmutableSet.of();
+    }
+
+    public void validate()
+    {
+        validate(validKeywords, obsoleteKeywords);
+        build(TableParams.builder()).validate();
+    }
+
+    public TableParams asNewTableParams()
+    {
+        return build(TableParams.builder());
+    }
+
+    public TableParams asAlteredTableParams(TableParams previous)
+    {
+        return build(TableParams.builder(previous));
+    }
+
+    private TableParams build(TableParams.Builder builder)
+    {
+        if (hasOption(Option.BLOOM_FILTER_FP_CHANCE))
+            builder.bloomFilterFpChance(getDouble(Option.BLOOM_FILTER_FP_CHANCE));
+
+        if (hasOption(Option.CACHING))
+            builder.caching(CachingParams.fromMap(getMap(Option.CACHING)));
+
+        if (hasOption(Option.COMMENT))
+            builder.comment(getString(Option.COMMENT));
+
+        if (hasOption(Option.COMPACTION))
+            builder.compaction(CompactionParams.fromMap(getMap(Option.COMPACTION)));
+
+        if (hasOption(Option.COMPRESSION))
+            builder.compression(CompressionParams.fromMap(getMap(Option.COMPRESSION)));
+
+        if (hasOption(Option.DCLOCAL_READ_REPAIR_CHANCE))
+            builder.dcLocalReadRepairChance(getDouble(Option.DCLOCAL_READ_REPAIR_CHANCE));
+
+        if (hasOption(Option.DEFAULT_TIME_TO_LIVE))
+            builder.defaultTimeToLive(getInt(Option.DEFAULT_TIME_TO_LIVE));
+
+        if (hasOption(Option.GC_GRACE_SECONDS))
+            builder.gcGraceSeconds(getInt(Option.GC_GRACE_SECONDS));
+
+        if (hasOption(Option.MAX_INDEX_INTERVAL))
+            builder.maxIndexInterval(getInt(Option.MAX_INDEX_INTERVAL));
+
+        if (hasOption(Option.MEMTABLE_FLUSH_PERIOD_IN_MS))
+            builder.memtableFlushPeriodInMs(getInt(Option.MEMTABLE_FLUSH_PERIOD_IN_MS));
+
+        if (hasOption(Option.MIN_INDEX_INTERVAL))
+            builder.minIndexInterval(getInt(Option.MIN_INDEX_INTERVAL));
+
+        if (hasOption(Option.READ_REPAIR_CHANCE))
+            builder.readRepairChance(getDouble(Option.READ_REPAIR_CHANCE));
+
+        if (hasOption(Option.SPECULATIVE_RETRY))
+            builder.speculativeRetry(SpeculativeRetryParam.fromString(getString(Option.SPECULATIVE_RETRY)));
+
+        return builder.build();
+    }
+
+    private double getDouble(Option option)
+    {
+        String value = getString(option);
+
+        try
+        {
+            return Double.parseDouble(value);
+        }
+        catch (NumberFormatException e)
+        {
+            throw new SyntaxException(format("Invalid double value %s for '%s'", value, option));
+        }
+    }
+
+    private int getInt(Option option)
+    {
+        String value = getString(option);
+
+        try
+        {
+            return Integer.parseInt(value);
+        }
+        catch (NumberFormatException e)
+        {
+            throw new SyntaxException(String.format("Invalid integer value %s for '%s'", value, option));
+        }
+    }
+
+    private String getString(Option option)
+    {
+        String value = getSimple(option.toString());
+        if (value == null)
+            throw new IllegalStateException(format("Option '%s' is absent", option));
+        return value;
+    }
+
+    private Map<String, String> getMap(Option option)
+    {
+        Map<String, String> value = getMap(option.toString());
+        if (value == null)
+            throw new IllegalStateException(format("Option '%s' is absent", option));
+        return value;
+    }
+
+    private boolean hasOption(Option option)
+    {
+        return hasProperty(option.toString());
+    }
+}