You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/07/01 15:39:08 UTC

[3/3] cassandra git commit: Factor out KSMetaData.KeyspaceParams

Factor out KSMetaData.KeyspaceParams

patch by Aleksey Yeschenko; reviewed by Robert Stupp for CASSANDRA-9677


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/31e3f612
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/31e3f612
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/31e3f612

Branch: refs/heads/trunk
Commit: 31e3f612b113a76ca8c04a3a86aa6df3915ad055
Parents: 6e1033b
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Tue Jun 30 20:38:39 2015 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Jul 1 16:36:12 2015 +0300

----------------------------------------------------------------------
 .../org/apache/cassandra/auth/AuthKeyspace.java |  13 +-
 .../org/apache/cassandra/config/KSMetaData.java | 130 ++++---------
 .../org/apache/cassandra/config/Schema.java     |  94 ++++------
 src/java/org/apache/cassandra/cql3/Cql.g        |   4 +-
 .../cql3/statements/AlterKeyspaceStatement.java |  30 ++-
 .../statements/CreateKeyspaceStatement.java     |  20 +-
 .../cassandra/cql3/statements/KSPropDefs.java   |  89 ---------
 .../cql3/statements/KeyspaceAttributes.java     |  73 +++++++
 .../apache/cassandra/db/ColumnFamilyStore.java  |   2 +-
 src/java/org/apache/cassandra/db/Keyspace.java  |  17 +-
 src/java/org/apache/cassandra/db/Mutation.java  |   2 +-
 .../org/apache/cassandra/db/SystemKeyspace.java |   5 +-
 .../cassandra/io/sstable/CQLSSTableWriter.java  |  16 +-
 .../repair/SystemDistributedKeyspace.java       |  12 +-
 .../apache/cassandra/schema/KeyspaceParams.java | 188 +++++++++++++++++++
 .../cassandra/schema/LegacySchemaTables.java    |  57 +++---
 .../cassandra/service/StorageService.java       |   4 +-
 .../cassandra/thrift/ThriftConversion.java      |  18 +-
 .../apache/cassandra/tracing/TraceKeyspace.java |  11 +-
 .../db/compaction/LongCompactionsTest.java      |   6 +-
 .../LongLeveledCompactionStrategyTest.java      |   8 +-
 test/unit/org/apache/cassandra/MockSchema.java  |   5 +-
 .../unit/org/apache/cassandra/SchemaLoader.java | 119 ++++--------
 .../cassandra/cache/AutoSavingCacheTest.java    |  15 +-
 .../cassandra/cache/CacheProviderTest.java      |  12 +-
 .../apache/cassandra/config/CFMetaDataTest.java |   7 +-
 .../config/DatabaseDescriptorTest.java          |   6 +-
 .../apache/cassandra/config/KSMetaDataTest.java |  45 -----
 .../config/LegacySchemaTablesTest.java          |   7 +-
 .../cassandra/cql3/ThriftCompatibilityTest.java |  14 +-
 .../cql3/validation/entities/UFTest.java        |   3 +-
 .../validation/operations/AggregationTest.java  |   2 +-
 .../cassandra/db/BatchlogManagerTest.java       |  12 +-
 test/unit/org/apache/cassandra/db/CellTest.java |  13 +-
 .../org/apache/cassandra/db/CleanupTest.java    |   6 +-
 .../cassandra/db/ColumnFamilyMetricTest.java    |   8 +-
 .../cassandra/db/ColumnFamilyStoreTest.java     |   9 +-
 .../org/apache/cassandra/db/CommitLogTest.java  |  17 +-
 .../apache/cassandra/db/CounterCacheTest.java   |   9 +-
 .../apache/cassandra/db/CounterCellTest.java    |   6 +-
 .../cassandra/db/CounterMutationTest.java       |  11 +-
 .../cassandra/db/DeletePartitionTest.java       |   8 +-
 .../apache/cassandra/db/HintedHandOffTest.java  |   6 +-
 .../org/apache/cassandra/db/KeyCacheTest.java   |  13 +-
 .../org/apache/cassandra/db/NameSortTest.java   |   8 +-
 .../cassandra/db/PartitionRangeReadTest.java    |  16 +-
 .../org/apache/cassandra/db/PartitionTest.java  |   7 +-
 .../apache/cassandra/db/RangeTombstoneTest.java |   7 +-
 .../apache/cassandra/db/ReadMessageTest.java    |  17 +-
 .../db/RecoveryManagerFlushedTest.java          |  15 +-
 .../db/RecoveryManagerMissingHeaderTest.java    |  14 +-
 .../cassandra/db/RecoveryManagerTest.java       |  12 +-
 .../db/RecoveryManagerTruncateTest.java         |  11 +-
 .../org/apache/cassandra/db/RowCacheTest.java   |  18 +-
 test/unit/org/apache/cassandra/db/RowTest.java  |   7 +-
 .../unit/org/apache/cassandra/db/ScrubTest.java |  16 +-
 .../apache/cassandra/db/SecondaryIndexTest.java |   9 +-
 .../org/apache/cassandra/db/VerifyTest.java     |  15 +-
 .../db/compaction/AntiCompactionTest.java       |   6 +-
 .../compaction/BlacklistingCompactionsTest.java |   4 +-
 .../db/compaction/CompactionsPurgeTest.java     |  31 ++-
 .../db/compaction/CompactionsTest.java          |   4 +-
 .../DateTieredCompactionStrategyTest.java       |   9 +-
 .../LeveledCompactionStrategyTest.java          |   8 +-
 .../db/compaction/OneCompactionTest.java        |   8 +-
 .../SizeTieredCompactionStrategyTest.java       |   9 +-
 .../cassandra/db/compaction/TTLExpiryTest.java  |  32 ++--
 .../db/index/PerRowSecondaryIndexTest.java      |   9 +-
 .../cassandra/db/marshal/CompositeTypeTest.java |   7 +-
 .../db/marshal/DynamicCompositeTypeTest.java    |   7 +-
 .../apache/cassandra/dht/KeyCollisionTest.java  |   9 +-
 .../io/sstable/BigTableWriterTest.java          |   6 +-
 .../io/sstable/IndexSummaryManagerTest.java     |  16 +-
 .../cassandra/io/sstable/LegacySSTableTest.java |   7 +-
 .../cassandra/io/sstable/SSTableLoaderTest.java |   7 +-
 .../io/sstable/SSTableMetadataTest.java         |  17 +-
 .../cassandra/io/sstable/SSTableReaderTest.java |  12 +-
 .../io/sstable/SSTableRewriterTest.java         |   4 +-
 .../io/sstable/SSTableScannerTest.java          |   4 +-
 .../locator/OldNetworkTopologyStrategyTest.java |  16 +-
 .../ReplicationStrategyEndpointCacheTest.java   |   6 +-
 .../cassandra/locator/SimpleStrategyTest.java   |  10 +-
 .../cassandra/repair/LocalSyncTaskTest.java     |   6 +-
 .../apache/cassandra/repair/ValidatorTest.java  |   7 +-
 .../org/apache/cassandra/schema/DefsTest.java   |  34 ++--
 .../service/ActiveRepairServiceTest.java        |  10 +-
 .../cassandra/service/DataResolverTest.java     |  11 +-
 .../service/EmbeddedCassandraServiceTest.java   |  16 +-
 .../service/LeaveAndBootstrapTest.java          |   4 +-
 .../org/apache/cassandra/service/MoveTest.java  |   4 +-
 .../cassandra/service/QueryPagerTest.java       |   9 +-
 .../service/StorageServiceServerTest.java       |  26 +--
 .../streaming/StreamTransferTaskTest.java       |   8 +-
 .../streaming/StreamingTransferTest.java        |  38 ++--
 .../cassandra/triggers/TriggersSchemaTest.java  |  29 +--
 .../cassandra/utils/EncodedStreamsTest.java     |  10 +-
 96 files changed, 755 insertions(+), 1029 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/31e3f612/src/java/org/apache/cassandra/auth/AuthKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/AuthKeyspace.java b/src/java/org/apache/cassandra/auth/AuthKeyspace.java
index d8e31ae..17a9f72 100644
--- a/src/java/org/apache/cassandra/auth/AuthKeyspace.java
+++ b/src/java/org/apache/cassandra/auth/AuthKeyspace.java
@@ -19,15 +19,17 @@ package org.apache.cassandra.auth;
 
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.collect.ImmutableMap;
-
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.KSMetaData;
-import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.Tables;
 
-public class AuthKeyspace
+public final class AuthKeyspace
 {
+    private AuthKeyspace()
+    {
+    }
+
     public static final String NAME = "system_auth";
 
     public static final String ROLES = "roles";
@@ -83,7 +85,6 @@ public class AuthKeyspace
 
     public static KSMetaData definition()
     {
-        Tables tables = Tables.of(Roles, RoleMembers, RolePermissions, ResourceRoleIndex);
-        return new KSMetaData(NAME, SimpleStrategy.class, ImmutableMap.of("replication_factor", "1"), true, tables);
+        return KSMetaData.create(NAME, KeyspaceParams.simple(1), Tables.of(Roles, RoleMembers, RolePermissions, ResourceRoleIndex));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/31e3f612/src/java/org/apache/cassandra/config/KSMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/KSMetaData.java b/src/java/org/apache/cassandra/config/KSMetaData.java
index a6a796c..8683cfa 100644
--- a/src/java/org/apache/cassandra/config/KSMetaData.java
+++ b/src/java/org/apache/cassandra/config/KSMetaData.java
@@ -17,121 +17,70 @@
  */
 package org.apache.cassandra.config;
 
-import java.util.*;
-
 import com.google.common.base.Objects;
 
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.locator.*;
-import org.apache.cassandra.schema.Functions;
-import org.apache.cassandra.schema.Tables;
-import org.apache.cassandra.schema.Types;
-import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.schema.*;
 
+/**
+ * An immutable representation of keyspace metadata (name, params, tables, types, and functions).
+ */
 public final class KSMetaData
 {
     public final String name;
-    public final Class<? extends AbstractReplicationStrategy> strategyClass;
-    public final Map<String, String> strategyOptions;
-    public final boolean durableWrites;
-
+    public final KeyspaceParams params;
     public final Tables tables;
     public final Types types;
     public final Functions functions;
 
-    public KSMetaData(String name,
-                      Class<? extends AbstractReplicationStrategy> strategyClass,
-                      Map<String, String> strategyOptions,
-                      boolean durableWrites)
-    {
-        this(name, strategyClass, strategyOptions, durableWrites, Tables.none(), Types.none(), Functions.none());
-    }
-
-    public KSMetaData(String name,
-                      Class<? extends AbstractReplicationStrategy> strategyClass,
-                      Map<String, String> strategyOptions,
-                      boolean durableWrites,
-                      Tables tables)
-    {
-        this(name, strategyClass, strategyOptions, durableWrites, tables, Types.none(), Functions.none());
-    }
-
-    public KSMetaData(String name,
-                      Class<? extends AbstractReplicationStrategy> strategyClass,
-                      Map<String, String> strategyOptions,
-                      boolean durableWrites,
-                      Tables tables,
-                      Functions functions)
-    {
-        this(name, strategyClass, strategyOptions, durableWrites, tables, Types.none(), functions);
-    }
-
-    private KSMetaData(String name,
-                       Class<? extends AbstractReplicationStrategy> strategyClass,
-                       Map<String, String> strategyOptions,
-                       boolean durableWrites,
-                       Tables tables,
-                       Types types,
-                       Functions functions)
+    private KSMetaData(String name, KeyspaceParams params, Tables tables, Types types, Functions functions)
     {
         this.name = name;
-        this.strategyClass = strategyClass == null ? NetworkTopologyStrategy.class : strategyClass;
-        this.strategyOptions = strategyOptions;
-        this.durableWrites = durableWrites;
+        this.params = params;
         this.tables = tables;
         this.types = types;
         this.functions = functions;
     }
 
-    // For new user created keyspaces (through CQL)
-    public static KSMetaData newKeyspace(String name, String strategyName, Map<String, String> options, boolean durableWrites) throws ConfigurationException
-    {
-        Class<? extends AbstractReplicationStrategy> cls = AbstractReplicationStrategy.getClass(strategyName);
-        if (cls.equals(LocalStrategy.class))
-            throw new ConfigurationException("Unable to use given strategy class: LocalStrategy is reserved for internal use.");
-
-        return newKeyspace(name, cls, options, durableWrites, Tables.none());
-    }
-
-    public static KSMetaData newKeyspace(String name, Class<? extends AbstractReplicationStrategy> strategyClass, Map<String, String> options, boolean durablesWrites, Iterable<CFMetaData> cfDefs)
+    public static KSMetaData create(String name, KeyspaceParams params)
     {
-        return new KSMetaData(name, strategyClass, options, durablesWrites, Tables.of(cfDefs), Types.none(), Functions.none());
+        return new KSMetaData(name, params, Tables.none(), Types.none(), Functions.none());
     }
 
-    public KSMetaData cloneWith(Tables tables, Types types, Functions functions)
+    public static KSMetaData create(String name, KeyspaceParams params, Tables tables)
     {
-        return new KSMetaData(name, strategyClass, strategyOptions, durableWrites, tables, types, functions);
+        return new KSMetaData(name, params, tables, Types.none(), Functions.none());
     }
 
-    public KSMetaData cloneWith(Tables tables)
+    public static KSMetaData create(String name, KeyspaceParams params, Tables tables, Types types, Functions functions)
     {
-        return new KSMetaData(name, strategyClass, strategyOptions, durableWrites, tables, types, functions);
+        return new KSMetaData(name, params, tables, types, functions);
     }
 
-    public KSMetaData cloneWith(Types types)
+    public KSMetaData withSwapped(KeyspaceParams params)
     {
-        return new KSMetaData(name, strategyClass, strategyOptions, durableWrites, tables, types, functions);
+        return new KSMetaData(name, params, tables, types, functions);
     }
 
-    public KSMetaData cloneWith(Functions functions)
+    public KSMetaData withSwapped(Tables tables)
     {
-        return new KSMetaData(name, strategyClass, strategyOptions, durableWrites, tables, types, functions);
+        return new KSMetaData(name, params, tables, types, functions);
     }
 
-    public static KSMetaData testMetadata(String name, Class<? extends AbstractReplicationStrategy> strategyClass, Map<String, String> strategyOptions, CFMetaData... cfDefs)
+    public KSMetaData withSwapped(Types types)
     {
-        return new KSMetaData(name, strategyClass, strategyOptions, true, Tables.of(cfDefs));
+        return new KSMetaData(name, params, tables, types, functions);
     }
 
-    public static KSMetaData testMetadataNotDurable(String name, Class<? extends AbstractReplicationStrategy> strategyClass, Map<String, String> strategyOptions, CFMetaData... cfDefs)
+    public KSMetaData withSwapped(Functions functions)
     {
-        return new KSMetaData(name, strategyClass, strategyOptions, false, Tables.of(cfDefs));
+        return new KSMetaData(name, params, tables, types, functions);
     }
 
     @Override
     public int hashCode()
     {
-        return Objects.hashCode(name, strategyClass, strategyOptions, durableWrites, tables, functions, types);
+        return Objects.hashCode(name, params, tables, functions, types);
     }
 
     @Override
@@ -145,13 +94,11 @@ public final class KSMetaData
 
         KSMetaData other = (KSMetaData) o;
 
-        return Objects.equal(name, other.name)
-            && Objects.equal(strategyClass, other.strategyClass)
-            && Objects.equal(strategyOptions, other.strategyOptions)
-            && Objects.equal(durableWrites, other.durableWrites)
-            && Objects.equal(tables, other.tables)
-            && Objects.equal(functions, other.functions)
-            && Objects.equal(types, other.types);
+        return name.equals(other.name)
+            && params.equals(other.params)
+            && tables.equals(other.tables)
+            && functions.equals(other.functions)
+            && types.equals(other.types);
     }
 
     @Override
@@ -159,31 +106,20 @@ public final class KSMetaData
     {
         return Objects.toStringHelper(this)
                       .add("name", name)
-                      .add("strategyClass", strategyClass.getSimpleName())
-                      .add("strategyOptions", strategyOptions)
-                      .add("durableWrites", durableWrites)
+                      .add("params", params)
                       .add("tables", tables)
                       .add("functions", functions)
                       .add("types", types)
                       .toString();
     }
 
-    public static Map<String,String> optsWithRF(final Integer rf)
-    {
-        return Collections.singletonMap("replication_factor", rf.toString());
-    }
-
-    public KSMetaData validate() throws ConfigurationException
+    public void validate()
     {
         if (!CFMetaData.isNameValid(name))
-            throw new ConfigurationException(String.format("Keyspace name must not be empty, more than %s characters long, or contain non-alphanumeric-underscore characters (got \"%s\")", Schema.NAME_LENGTH, name));
-
-        // Attempt to instantiate the ARS, which will throw a ConfigException if the strategy_options aren't fully formed
-        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
-        IEndpointSnitch eps = DatabaseDescriptor.getEndpointSnitch();
-        AbstractReplicationStrategy.validateReplicationStrategy(name, strategyClass, tmd, eps, strategyOptions);
-
+            throw new ConfigurationException(String.format("Keyspace name must not be empty, more than %s characters long, "
+                                                           + "or contain non-alphanumeric-underscore characters (got \"%s\")",
+                                                           Schema.NAME_LENGTH,
+                                                           name));
         tables.forEach(CFMetaData::validate);
-        return this;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/31e3f612/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java
index 160d7ed..aba9240 100644
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.LegacySchemaTables;
 import org.apache.cassandra.schema.Tables;
 import org.apache.cassandra.service.MigrationManager;
@@ -288,7 +289,11 @@ public class Schema
     public void setKeyspaceDefinition(KSMetaData ksm)
     {
         assert ksm != null;
+
         keyspaces.put(ksm.name, ksm);
+        Keyspace keyspace = getKeyspaceInstance(ksm.name);
+        if (keyspace != null)
+            keyspace.setMetadata(ksm);
     }
 
     /* ColumnFamily query/control methods */
@@ -444,15 +449,11 @@ public class Schema
         MigrationManager.instance.notifyCreateKeyspace(ksm);
     }
 
-    public void updateKeyspace(String ksName)
+    public void updateKeyspace(String ksName, KeyspaceParams newParams)
     {
-        KSMetaData oldKsm = getKSMetaData(ksName);
-        assert oldKsm != null;
-        KSMetaData newKsm = LegacySchemaTables.createKeyspaceFromName(ksName)
-                                              .cloneWith(oldKsm.tables, oldKsm.types, oldKsm.functions);
-        setKeyspaceDefinition(newKsm);
-        Keyspace.open(ksName).createReplicationStrategy(newKsm);
-        MigrationManager.instance.notifyUpdateKeyspace(newKsm);
+        KSMetaData ksm = update(ksName, ks -> ks.withSwapped(newParams));
+        Keyspace.open(ksName).createReplicationStrategy(ksm);
+        MigrationManager.instance.notifyUpdateKeyspace(ksm);
     }
 
     public void dropKeyspace(String ksName)
@@ -494,18 +495,18 @@ public class Schema
     public void addTable(CFMetaData cfm)
     {
         assert getCFMetaData(cfm.ksName, cfm.cfName) == null;
-        KSMetaData oldKsm = getKSMetaData(cfm.ksName);
-        KSMetaData newKsm = oldKsm.cloneWith(oldKsm.tables.with(cfm));
 
-        logger.info("Loading {}", cfm);
+        update(cfm.ksName, ks ->
+        {
+            load(cfm);
 
-        load(cfm);
+            // make sure it's init-ed w/ the old definitions first,
+            // since we're going to call initCf on the new one manually
+            Keyspace.open(cfm.ksName);
 
-        // make sure it's init-ed w/ the old definitions first,
-        // since we're going to call initCf on the new one manually
-        Keyspace.open(cfm.ksName);
+            return ks.withSwapped(ks.tables.with(cfm));
+        });
 
-        setKeyspaceDefinition(newKsm);
         Keyspace.open(cfm.ksName).initCf(cfm.cfId, cfm.cfName, true);
         MigrationManager.instance.notifyCreateColumnFamily(cfm);
     }
@@ -530,7 +531,7 @@ public class Schema
 
         // reinitialize the keyspace.
         CFMetaData cfm = oldKsm.tables.get(tableName).get();
-        KSMetaData newKsm = oldKsm.cloneWith(oldKsm.tables.without(tableName));
+        KSMetaData newKsm = oldKsm.withSwapped(oldKsm.tables.without(tableName));
 
         purge(cfm);
         setKeyspaceDefinition(newKsm);
@@ -547,94 +548,67 @@ public class Schema
 
     public void addType(UserType ut)
     {
-        KSMetaData oldKsm = getKSMetaData(ut.keyspace);
-        assert oldKsm != null;
-        KSMetaData newKsm = oldKsm.cloneWith(oldKsm.types.with(ut));
-        setKeyspaceDefinition(newKsm);
+        update(ut.keyspace, ks -> ks.withSwapped(ks.types.with(ut)));
         MigrationManager.instance.notifyCreateUserType(ut);
     }
 
     public void updateType(UserType ut)
     {
-        KSMetaData oldKsm = getKSMetaData(ut.keyspace);
-        assert oldKsm != null;
-        KSMetaData newKsm = oldKsm.cloneWith(oldKsm.types.without(ut.name).with(ut));
-        setKeyspaceDefinition(newKsm);
+        update(ut.keyspace, ks -> ks.withSwapped(ks.types.without(ut.name).with(ut)));
         MigrationManager.instance.notifyUpdateUserType(ut);
     }
 
     public void dropType(UserType ut)
     {
-        KSMetaData oldKsm = getKSMetaData(ut.keyspace);
-        assert oldKsm != null;
-        KSMetaData newKsm = oldKsm.cloneWith(oldKsm.types.without(ut.name));
-        setKeyspaceDefinition(newKsm);
+        update(ut.keyspace, ks -> ks.withSwapped(ks.types.without(ut.name)));
         MigrationManager.instance.notifyDropUserType(ut);
     }
 
     public void addFunction(UDFunction udf)
     {
-        addFunctionInternal(udf);
+        update(udf.name().keyspace, ks -> ks.withSwapped(ks.functions.with(udf)));
         MigrationManager.instance.notifyCreateFunction(udf);
     }
 
     public void updateFunction(UDFunction udf)
     {
-        updateFunctionInternal(udf);
+        update(udf.name().keyspace, ks -> ks.withSwapped(ks.functions.without(udf.name(), udf.argTypes()).with(udf)));
         MigrationManager.instance.notifyUpdateFunction(udf);
     }
 
     public void dropFunction(UDFunction udf)
     {
-        dropFunctionInternal(udf);
+        update(udf.name().keyspace, ks -> ks.withSwapped(ks.functions.without(udf.name(), udf.argTypes())));
         MigrationManager.instance.notifyDropFunction(udf);
     }
 
     public void addAggregate(UDAggregate uda)
     {
-        addFunctionInternal(uda);
+        update(uda.name().keyspace, ks -> ks.withSwapped(ks.functions.with(uda)));
         MigrationManager.instance.notifyCreateAggregate(uda);
     }
 
     public void updateAggregate(UDAggregate uda)
     {
-        updateFunctionInternal(uda);
+        update(uda.name().keyspace, ks -> ks.withSwapped(ks.functions.without(uda.name(), uda.argTypes()).with(uda)));
         MigrationManager.instance.notifyUpdateAggregate(uda);
     }
 
     public void dropAggregate(UDAggregate uda)
     {
-        dropFunctionInternal(uda);
+        update(uda.name().keyspace, ks -> ks.withSwapped(ks.functions.without(uda.name(), uda.argTypes())));
         MigrationManager.instance.notifyDropAggregate(uda);
     }
 
-    private void addFunctionInternal(Function fun)
+    private KSMetaData update(String keyspaceName, java.util.function.Function<KSMetaData, KSMetaData> transformation)
     {
-        assert fun instanceof UDFunction || fun instanceof UDAggregate;
+        KSMetaData current = getKSMetaData(keyspaceName);
+        if (current == null)
+            throw new IllegalStateException(String.format("Keyspace %s doesn't exist", keyspaceName));
 
-        KSMetaData oldKsm = getKSMetaData(fun.name().keyspace);
-        assert oldKsm != null;
-        KSMetaData newKsm = oldKsm.cloneWith(oldKsm.functions.with(fun));
-        setKeyspaceDefinition(newKsm);
-    }
-
-    private void updateFunctionInternal(Function fun)
-    {
-        assert fun instanceof UDFunction || fun instanceof UDAggregate;
-
-        KSMetaData oldKsm = getKSMetaData(fun.name().keyspace);
-        assert oldKsm != null;
-        KSMetaData newKsm = oldKsm.cloneWith(oldKsm.functions.without(fun.name(), fun.argTypes()).with(fun));
-        setKeyspaceDefinition(newKsm);
-    }
-
-    private void dropFunctionInternal(Function fun)
-    {
-        assert fun instanceof UDFunction || fun instanceof UDAggregate;
+        KSMetaData transformed = transformation.apply(current);
+        setKeyspaceDefinition(transformed);
 
-        KSMetaData oldKsm = getKSMetaData(fun.name().keyspace);
-        assert oldKsm != null;
-        KSMetaData newKsm = oldKsm.cloneWith(oldKsm.functions.without(fun.name(), fun.argTypes()));
-        setKeyspaceDefinition(newKsm);
+        return transformed;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/31e3f612/src/java/org/apache/cassandra/cql3/Cql.g
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Cql.g b/src/java/org/apache/cassandra/cql3/Cql.g
index 093a47c..df4623d 100644
--- a/src/java/org/apache/cassandra/cql3/Cql.g
+++ b/src/java/org/apache/cassandra/cql3/Cql.g
@@ -642,7 +642,7 @@ dropFunctionStatement returns [DropFunctionStatement expr]
  */
 createKeyspaceStatement returns [CreateKeyspaceStatement expr]
     @init {
-        KSPropDefs attrs = new KSPropDefs();
+        KeyspaceAttributes attrs = new KeyspaceAttributes();
         boolean ifNotExists = false;
     }
     : K_CREATE K_KEYSPACE (K_IF K_NOT K_EXISTS { ifNotExists = true; } )? ks=keyspaceName
@@ -760,7 +760,7 @@ dropTriggerStatement returns [DropTriggerStatement expr]
  * ALTER KEYSPACE <KS> WITH <property> = <value>;
  */
 alterKeyspaceStatement returns [AlterKeyspaceStatement expr]
-    @init { KSPropDefs attrs = new KSPropDefs(); }
+    @init { KeyspaceAttributes attrs = new KeyspaceAttributes(); }
     : K_ALTER K_KEYSPACE ks=keyspaceName
         K_WITH properties[attrs] { $expr = new AlterKeyspaceStatement(ks, attrs); }
     ;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/31e3f612/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
index 50c3f00..502160d 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
@@ -18,23 +18,22 @@
 package org.apache.cassandra.cql3.statements;
 
 import org.apache.cassandra.auth.Permission;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.LocalStrategy;
+import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.MigrationManager;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.transport.Event;
 
 public class AlterKeyspaceStatement extends SchemaAlteringStatement
 {
     private final String name;
-    private final KSPropDefs attrs;
+    private final KeyspaceAttributes attrs;
 
-    public AlterKeyspaceStatement(String name, KSPropDefs attrs)
+    public AlterKeyspaceStatement(String name, KeyspaceAttributes attrs)
     {
         super();
         this.name = name;
@@ -63,30 +62,29 @@ public class AlterKeyspaceStatement extends SchemaAlteringStatement
         attrs.validate();
 
         if (attrs.getReplicationStrategyClass() == null && !attrs.getReplicationOptions().isEmpty())
-        {
             throw new ConfigurationException("Missing replication strategy class");
-        }
-        else if (attrs.getReplicationStrategyClass() != null)
+
+        if (attrs.getReplicationStrategyClass() != null)
         {
             // The strategy is validated through KSMetaData.validate() in announceKeyspaceUpdate below.
             // However, for backward compatibility with thrift, this doesn't validate unexpected options yet,
             // so doing proper validation here.
-            AbstractReplicationStrategy.validateReplicationStrategy(name,
-                                                                    AbstractReplicationStrategy.getClass(attrs.getReplicationStrategyClass()),
-                                                                    StorageService.instance.getTokenMetadata(),
-                                                                    DatabaseDescriptor.getEndpointSnitch(),
-                                                                    attrs.getReplicationOptions());
+            KeyspaceParams params = attrs.asAlteredKeyspaceParams(ksm.params);
+            params.validate(name);
+            if (params.replication.klass.equals(LocalStrategy.class))
+                throw new ConfigurationException("Unable to use given strategy class: LocalStrategy is reserved for internal use.");
         }
     }
 
     public boolean announceMigration(boolean isLocalOnly) throws RequestValidationException
     {
-        KSMetaData ksm = Schema.instance.getKSMetaData(name);
+        KSMetaData oldKsm = Schema.instance.getKSMetaData(name);
         // In the (very) unlikely case the keyspace was dropped since validate()
-        if (ksm == null)
+        if (oldKsm == null)
             throw new InvalidRequestException("Unknown keyspace " + name);
 
-        MigrationManager.announceKeyspaceUpdate(attrs.asKSMetadataUpdate(ksm), isLocalOnly);
+        KSMetaData newKsm = oldKsm.withSwapped(attrs.asAlteredKeyspaceParams(oldKsm.params));
+        MigrationManager.announceKeyspaceUpdate(newKsm, isLocalOnly);
         return true;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/31e3f612/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java
index a3e27e4..ac00222 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java
@@ -19,9 +19,11 @@ package org.apache.cassandra.cql3.statements;
 
 import org.apache.cassandra.auth.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.LocalStrategy;
+import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.service.*;
 import org.apache.cassandra.thrift.ThriftValidation;
 import org.apache.cassandra.transport.Event;
@@ -30,7 +32,7 @@ import org.apache.cassandra.transport.Event;
 public class CreateKeyspaceStatement extends SchemaAlteringStatement
 {
     private final String name;
-    private final KSPropDefs attrs;
+    private final KeyspaceAttributes attrs;
     private final boolean ifNotExists;
 
     /**
@@ -40,7 +42,7 @@ public class CreateKeyspaceStatement extends SchemaAlteringStatement
      * @param name the name of the keyspace to create
      * @param attrs map of the raw keyword arguments that followed the <code>WITH</code> keyword.
      */
-    public CreateKeyspaceStatement(String name, KSPropDefs attrs, boolean ifNotExists)
+    public CreateKeyspaceStatement(String name, KeyspaceAttributes attrs, boolean ifNotExists)
     {
         super();
         this.name = name;
@@ -84,18 +86,18 @@ public class CreateKeyspaceStatement extends SchemaAlteringStatement
         // The strategy is validated through KSMetaData.validate() in announceNewKeyspace below.
         // However, for backward compatibility with thrift, this doesn't validate unexpected options yet,
         // so doing proper validation here.
-        AbstractReplicationStrategy.validateReplicationStrategy(name,
-                                                                AbstractReplicationStrategy.getClass(attrs.getReplicationStrategyClass()),
-                                                                StorageService.instance.getTokenMetadata(),
-                                                                DatabaseDescriptor.getEndpointSnitch(),
-                                                                attrs.getReplicationOptions());
+        KeyspaceParams params = attrs.asNewKeyspaceParams();
+        params.validate(name);
+        if (params.replication.klass.equals(LocalStrategy.class))
+            throw new ConfigurationException("Unable to use given strategy class: LocalStrategy is reserved for internal use.");
     }
 
     public boolean announceMigration(boolean isLocalOnly) throws RequestValidationException
     {
+        KSMetaData ksm = KSMetaData.create(name, attrs.asNewKeyspaceParams());
         try
         {
-            MigrationManager.announceNewKeyspace(attrs.asKSMetadata(name), isLocalOnly);
+            MigrationManager.announceNewKeyspace(ksm, isLocalOnly);
             return true;
         }
         catch (AlreadyExistsException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/31e3f612/src/java/org/apache/cassandra/cql3/statements/KSPropDefs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/KSPropDefs.java b/src/java/org/apache/cassandra/cql3/statements/KSPropDefs.java
deleted file mode 100644
index 7c05435..0000000
--- a/src/java/org/apache/cassandra/cql3/statements/KSPropDefs.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.cql3.statements;
-
-import java.util.*;
-
-import org.apache.cassandra.config.KSMetaData;
-import org.apache.cassandra.exceptions.*;
-
-public class KSPropDefs extends PropertyDefinitions
-{
-    public static final String KW_DURABLE_WRITES = "durable_writes";
-    public static final String KW_REPLICATION = "replication";
-
-    public static final String REPLICATION_STRATEGY_CLASS_KEY = "class";
-
-    public static final Set<String> keywords = new HashSet<>();
-    public static final Set<String> obsoleteKeywords = new HashSet<>();
-
-    static
-    {
-        keywords.add(KW_DURABLE_WRITES);
-        keywords.add(KW_REPLICATION);
-    }
-
-    private String strategyClass;
-
-    public void validate() throws SyntaxException
-    {
-        // Skip validation if the strategy class is already set as it means we've alreayd
-        // prepared (and redoing it would set strategyClass back to null, which we don't want)
-        if (strategyClass != null)
-            return;
-
-        validate(keywords, obsoleteKeywords);
-
-        Map<String, String> replicationOptions = getReplicationOptions();
-        if (!replicationOptions.isEmpty())
-        {
-            strategyClass = replicationOptions.get(REPLICATION_STRATEGY_CLASS_KEY);
-            replicationOptions.remove(REPLICATION_STRATEGY_CLASS_KEY);
-        }
-    }
-
-    public Map<String, String> getReplicationOptions() throws SyntaxException
-    {
-        Map<String, String> replicationOptions = getMap(KW_REPLICATION);
-        if (replicationOptions == null)
-            return Collections.emptyMap();
-        return replicationOptions;
-    }
-
-    public String getReplicationStrategyClass()
-    {
-        return strategyClass;
-    }
-
-    public KSMetaData asKSMetadata(String ksName) throws RequestValidationException
-    {
-        return KSMetaData.newKeyspace(ksName, getReplicationStrategyClass(), getReplicationOptions(), getBoolean(KW_DURABLE_WRITES, true));
-    }
-
-    public KSMetaData asKSMetadataUpdate(KSMetaData old) throws RequestValidationException
-    {
-        String sClass = strategyClass;
-        Map<String, String> sOptions = getReplicationOptions();
-        if (sClass == null)
-        {
-            sClass = old.strategyClass.getName();
-            sOptions = old.strategyOptions;
-        }
-        return KSMetaData.newKeyspace(old.name, sClass, sOptions, getBoolean(KW_DURABLE_WRITES, old.durableWrites));
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/31e3f612/src/java/org/apache/cassandra/cql3/statements/KeyspaceAttributes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/KeyspaceAttributes.java b/src/java/org/apache/cassandra/cql3/statements/KeyspaceAttributes.java
new file mode 100644
index 0000000..d931530
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/KeyspaceAttributes.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements;
+
+import java.util.*;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.KeyspaceParams.Option;
+import org.apache.cassandra.schema.KeyspaceParams.Replication;
+
+public class KeyspaceAttributes extends PropertyDefinitions
+{
+    private static final Set<String> keywords = ImmutableSet.of(Option.DURABLE_WRITES.toString(), Option.REPLICATION.toString());
+    private static final Set<String> obsoleteKeywords = ImmutableSet.of();
+
+    public void validate() throws SyntaxException
+    {
+        validate(keywords, obsoleteKeywords);
+    }
+
+    public String getReplicationStrategyClass()
+    {
+        return getAllReplicationOptions().get(Replication.CLASS);
+    }
+
+    public Map<String, String> getReplicationOptions() throws SyntaxException
+    {
+        Map<String, String> replication = new HashMap<>(getAllReplicationOptions());
+        replication.remove(Replication.CLASS);
+        return replication;
+    }
+
+    public Map<String, String> getAllReplicationOptions() throws SyntaxException
+    {
+        Map<String, String> replication = getMap(Option.REPLICATION.toString());
+        return replication == null
+             ? Collections.emptyMap()
+             : replication;
+    }
+
+    public KeyspaceParams asNewKeyspaceParams()
+    {
+        boolean durableWrites = getBoolean(Option.DURABLE_WRITES.toString(), KeyspaceParams.DEFAULT_DURABLE_WRITES);
+        return KeyspaceParams.create(durableWrites, getAllReplicationOptions());
+    }
+
+    public KeyspaceParams asAlteredKeyspaceParams(KeyspaceParams previous)
+    {
+        boolean durableWrites = getBoolean(Option.DURABLE_WRITES.toString(), previous.durableWrites);
+        Replication replication = getReplicationStrategyClass() == null
+                                ? previous.replication
+                                : Replication.fromMap(getAllReplicationOptions());
+        return new KeyspaceParams(durableWrites, replication);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/31e3f612/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 8fb83af..dac7f01 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1963,7 +1963,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         // position in the System keyspace.
         logger.debug("truncating {}", name);
 
-        if (keyspace.metadata.durableWrites || DatabaseDescriptor.isAutoSnapshot())
+        if (keyspace.getMetadata().params.durableWrites || DatabaseDescriptor.isAutoSnapshot())
         {
             // flush the CF being truncated before forcing the new segment
             forceBlockingFlush();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/31e3f612/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index fc7fc14..3c5a8c8 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -65,7 +65,7 @@ public class Keyspace
             DatabaseDescriptor.createAllDirectories();
     }
 
-    public final KSMetaData metadata;
+    private volatile KSMetaData metadata;
     public final OpOrder writeOrder = new OpOrder();
 
     /* ColumnFamilyStore per column family */
@@ -81,6 +81,7 @@ public class Keyspace
     };
 
     private static volatile boolean initialized = false;
+
     public static void setInitialized()
     {
         initialized = true;
@@ -165,6 +166,16 @@ public class Keyspace
         }
     }
 
+    public void setMetadata(KSMetaData metadata)
+    {
+        this.metadata = metadata;
+    }
+
+    public KSMetaData getMetadata()
+    {
+        return metadata;
+    }
+
     public Collection<ColumnFamilyStore> getColumnFamilyStores()
     {
         return Collections.unmodifiableCollection(columnFamilyStores.values());
@@ -294,10 +305,10 @@ public class Keyspace
     public void createReplicationStrategy(KSMetaData ksm)
     {
         replicationStrategy = AbstractReplicationStrategy.createReplicationStrategy(ksm.name,
-                                                                                    ksm.strategyClass,
+                                                                                    ksm.params.replication.klass,
                                                                                     StorageService.instance.getTokenMetadata(),
                                                                                     DatabaseDescriptor.getEndpointSnitch(),
-                                                                                    ksm.strategyOptions);
+                                                                                    ksm.params.replication.options);
     }
 
     // best invoked on the compaction mananger.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/31e3f612/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index 355d259..b2eaf8e 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -183,7 +183,7 @@ public class Mutation implements IMutation
     public void apply()
     {
         Keyspace ks = Keyspace.open(keyspaceName);
-        ks.apply(this, ks.metadata.durableWrites);
+        ks.apply(this, ks.getMetadata().params.durableWrites);
     }
 
     public void applyUnsafe()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/31e3f612/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index c20cd40..85e1263 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -49,12 +49,13 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.locator.IEndpointSnitch;
-import org.apache.cassandra.locator.LocalStrategy;
 import org.apache.cassandra.metrics.RestorableMeter;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.Functions;
+import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.LegacySchemaTables;
 import org.apache.cassandra.schema.Tables;
+import org.apache.cassandra.schema.Types;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.paxos.Commit;
 import org.apache.cassandra.service.paxos.PaxosState;
@@ -267,7 +268,7 @@ public final class SystemKeyspace
 
     public static KSMetaData definition()
     {
-        return new KSMetaData(NAME, LocalStrategy.class, Collections.<String, String>emptyMap(), true, tables(), functions());
+        return KSMetaData.create(NAME, KeyspaceParams.local(), tables(), Types.none(), functions());
     }
 
     private static Tables tables()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/31e3f612/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index 1a9fc99..44651be 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -23,8 +23,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import com.google.common.collect.ImmutableMap;
-
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.statements.CreateTableStatement;
@@ -38,7 +36,8 @@ import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
-import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Tables;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.utils.Pair;
 
@@ -374,13 +373,7 @@ public class CQLSSTableWriter implements Closeable
          */
         private static void createKeyspaceWithTable(CFMetaData table)
         {
-            KSMetaData ksm;
-            ksm = KSMetaData.newKeyspace(table.ksName,
-                                         AbstractReplicationStrategy.getClass("org.apache.cassandra.locator.SimpleStrategy"),
-                                         ImmutableMap.of("replication_factor", "1"),
-                                         true,
-                                         Collections.singleton(table));
-            Schema.instance.load(ksm);
+            Schema.instance.load(KSMetaData.create(table.ksName, KeyspaceParams.simple(1), Tables.of(table)));
         }
 
         /**
@@ -391,9 +384,8 @@ public class CQLSSTableWriter implements Closeable
          */
         private static void addTableToKeyspace(KSMetaData keyspace, CFMetaData table)
         {
-            KSMetaData clone = keyspace.cloneWith(keyspace.tables.with(table));
             Schema.instance.load(table);
-            Schema.instance.setKeyspaceDefinition(clone);
+            Schema.instance.setKeyspaceDefinition(keyspace.withSwapped(keyspace.tables.with(table)));
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/31e3f612/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
index ecc1687..1fec76f 100644
--- a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
+++ b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
@@ -28,7 +28,6 @@ import java.util.Set;
 import java.util.UUID;
 
 import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
 
 import org.slf4j.Logger;
@@ -40,14 +39,18 @@ import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.Tables;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
 public final class SystemDistributedKeyspace
 {
-    private static Logger logger = LoggerFactory.getLogger(SystemDistributedKeyspace.class);
+    private SystemDistributedKeyspace()
+    {
+    }
+
+    private static final Logger logger = LoggerFactory.getLogger(SystemDistributedKeyspace.class);
 
     public static final String NAME = "system_distributed";
 
@@ -97,8 +100,7 @@ public final class SystemDistributedKeyspace
 
     public static KSMetaData definition()
     {
-        Tables tables = Tables.of(RepairHistory, ParentRepairHistory);
-        return new KSMetaData(NAME, SimpleStrategy.class, ImmutableMap.of("replication_factor", "3"), true, tables);
+        return KSMetaData.create(NAME, KeyspaceParams.simple(3), Tables.of(RepairHistory, ParentRepairHistory));
     }
 
     public static void startParentRepair(UUID parent_id, String keyspaceName, String[] cfnames, Collection<Range<Token>> ranges)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/31e3f612/src/java/org/apache/cassandra/schema/KeyspaceParams.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/KeyspaceParams.java b/src/java/org/apache/cassandra/schema/KeyspaceParams.java
new file mode 100644
index 0000000..a8de2bd
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/KeyspaceParams.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.schema;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.*;
+import org.apache.cassandra.service.StorageService;
+
+/**
+ * An immutable class representing keyspace parameters (durability and replication).
+ */
+public final class KeyspaceParams
+{
+    public static final boolean DEFAULT_DURABLE_WRITES = true;
+
+    public enum Option
+    {
+        DURABLE_WRITES,
+        REPLICATION;
+
+        @Override
+        public String toString()
+        {
+            return name().toLowerCase();
+        }
+    }
+
+    public final boolean durableWrites;
+    public final Replication replication;
+
+    public KeyspaceParams(boolean durableWrites, Replication replication)
+    {
+        this.durableWrites = durableWrites;
+        this.replication = replication;
+    }
+
+    public static KeyspaceParams create(boolean durableWrites, Map<String, String> replication)
+    {
+        return new KeyspaceParams(durableWrites, Replication.fromMap(replication));
+    }
+
+    public static KeyspaceParams local()
+    {
+        return new KeyspaceParams(true, Replication.local());
+    }
+
+    public static KeyspaceParams simple(int replicationFactor)
+    {
+        return new KeyspaceParams(true, Replication.simple(replicationFactor));
+    }
+
+    public static KeyspaceParams simpleTransient(int replicationFactor)
+    {
+        return new KeyspaceParams(false, Replication.simple(replicationFactor));
+    }
+
+    public void validate(String name)
+    {
+        replication.validate(name);
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+            return true;
+
+        if (!(o instanceof KeyspaceParams))
+            return false;
+
+        KeyspaceParams p = (KeyspaceParams) o;
+
+        return durableWrites == p.durableWrites && replication.equals(p.replication);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hashCode(durableWrites, replication);
+    }
+
+    @Override
+    public String toString()
+    {
+        return Objects.toStringHelper(this)
+                      .add(Option.DURABLE_WRITES.toString(), durableWrites)
+                      .add(Option.REPLICATION.toString(), replication)
+                      .toString();
+    }
+
+    public static final class Replication
+    {
+        public static String CLASS = "class";
+
+        public final Class<? extends AbstractReplicationStrategy> klass;
+        public final ImmutableMap<String, String> options;
+
+        private Replication(Class<? extends AbstractReplicationStrategy> klass, Map<String, String> options)
+        {
+            this.klass = klass;
+            this.options = ImmutableMap.copyOf(options);
+        }
+
+        private static Replication local()
+        {
+            return new Replication(LocalStrategy.class, ImmutableMap.of());
+        }
+
+        private static Replication simple(int replicationFactor)
+        {
+            return new Replication(SimpleStrategy.class, ImmutableMap.of("replication_factor", Integer.toString(replicationFactor)));
+        }
+
+        public void validate(String name)
+        {
+            // Attempt to instantiate the ARS, which will throw a ConfigurationException if the options aren't valid.
+            TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+            IEndpointSnitch eps = DatabaseDescriptor.getEndpointSnitch();
+            AbstractReplicationStrategy.validateReplicationStrategy(name, klass, tmd, eps, options);
+        }
+
+        public static Replication fromMap(Map<String, String> map)
+        {
+            Map<String, String> options = new HashMap<>(map);
+            String className = options.remove(CLASS);
+            Class<? extends AbstractReplicationStrategy> klass = AbstractReplicationStrategy.getClass(className);
+            return new Replication(klass, options);
+        }
+
+        public Map<String, String> asMap()
+        {
+            Map<String, String> map = new HashMap<>(options);
+            map.put(CLASS, klass.getName());
+            return map;
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o)
+                return true;
+
+            if (!(o instanceof Replication))
+                return false;
+
+            Replication r = (Replication) o;
+
+            return klass.equals(r.klass) && options.equals(r.options);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hashCode(klass, options);
+        }
+
+        @Override
+        public String toString()
+        {
+            Objects.ToStringHelper helper = Objects.toStringHelper(this);
+            helper.add(CLASS, klass.getName());
+            for (Map.Entry<String, String> entry : options.entrySet())
+                helper.add(entry.getKey(), entry.getValue());
+            return helper.toString();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/31e3f612/src/java/org/apache/cassandra/schema/LegacySchemaTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/LegacySchemaTables.java b/src/java/org/apache/cassandra/schema/LegacySchemaTables.java
index b3044ac..ffc3537 100644
--- a/src/java/org/apache/cassandra/schema/LegacySchemaTables.java
+++ b/src/java/org/apache/cassandra/schema/LegacySchemaTables.java
@@ -44,7 +44,6 @@ import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.io.compress.CompressionParameters;
-import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -460,16 +459,14 @@ public final class LegacySchemaTables
     {
         for (FilteredPartition newPartition : after.values())
         {
+            String name = AsciiType.instance.compose(newPartition.partitionKey().getKey());
+            KeyspaceParams params = createKeyspaceParamsFromSchemaPartition(newPartition.rowIterator());
+
             FilteredPartition oldPartition = before.remove(newPartition.partitionKey());
             if (oldPartition == null || oldPartition.isEmpty())
-            {
-                Schema.instance.addKeyspace(createKeyspaceFromSchemaPartition(newPartition.rowIterator()));
-            }
+                Schema.instance.addKeyspace(KSMetaData.create(name, params));
             else
-            {
-                String name = AsciiType.instance.compose(newPartition.partitionKey().getKey());
-                Schema.instance.updateKeyspace(name);
-            }
+                Schema.instance.updateKeyspace(name, params);
         }
 
         // What's remain in old is those keyspace that are not in updated, i.e. the dropped ones.
@@ -478,7 +475,7 @@ public final class LegacySchemaTables
 
     private static Set<String> asKeyspaceNamesSet(Set<DecoratedKey> keys)
     {
-        Set<String> names = new HashSet(keys.size());
+        Set<String> names = new HashSet<>(keys.size());
         for (DecoratedKey key : keys)
             names.add(AsciiType.instance.compose(key.getKey()));
         return names;
@@ -653,9 +650,9 @@ public final class LegacySchemaTables
         // Note that because Keyspaces is a COMPACT TABLE, we're really only setting static columns internally and shouldn't set any clustering.
         RowUpdateBuilder adder = new RowUpdateBuilder(Keyspaces, timestamp, keyspace.name);
 
-        adder.add("durable_writes", keyspace.durableWrites);
-        adder.add("strategy_class", keyspace.strategyClass.getName());
-        adder.add("strategy_options", json(keyspace.strategyOptions));
+        adder.add("durable_writes", keyspace.params.durableWrites);
+        adder.add("strategy_class", keyspace.params.replication.klass.getName());
+        adder.add("strategy_options", json(keyspace.params.replication.options));
 
         Mutation mutation = adder.build();
 
@@ -680,45 +677,41 @@ public final class LegacySchemaTables
         return mutation;
     }
 
-    private static KSMetaData createKeyspaceFromSchemaPartitions(RowIterator serializedKeyspace,
+    private static KSMetaData createKeyspaceFromSchemaPartitions(RowIterator serializedParams,
                                                                  RowIterator serializedTables,
                                                                  RowIterator serializedTypes,
                                                                  RowIterator serializedFunctions,
-                                                                 RowIterator seriazliedAggregates)
+                                                                 RowIterator serializedAggregates)
     {
+        String name = AsciiType.instance.compose(serializedParams.partitionKey().getKey());
+
+        KeyspaceParams params = createKeyspaceParamsFromSchemaPartition(serializedParams);
         Tables tables = createTablesFromTablesPartition(serializedTables);
         Types types = createTypesFromPartition(serializedTypes);
+
         Collection<UDFunction> udfs = createFunctionsFromFunctionsPartition(serializedFunctions);
-        Collection<UDAggregate> udas = createAggregatesFromAggregatesPartition(seriazliedAggregates);
+        Collection<UDAggregate> udas = createAggregatesFromAggregatesPartition(serializedAggregates);
         Functions functions = org.apache.cassandra.schema.Functions.builder().add(udfs).add(udas).build();
-        return createKeyspaceFromSchemaPartition(serializedKeyspace).cloneWith(tables, types, functions);
-    }
 
-    public static KSMetaData createKeyspaceFromName(String keyspace)
-    {
-        return readSchemaPartitionForKeyspaceAndApply(KEYSPACES, keyspace, partition ->
-        {
-            if (partition.isEmpty())
-                throw new RuntimeException(String.format("%s not found in the schema definitions keyspaceName (%s).", keyspace, KEYSPACES));
-
-            return createKeyspaceFromSchemaPartition(partition);
-        });
+        return KSMetaData.create(name, params, tables, types, functions);
     }
 
-
     /**
      * Deserialize only Keyspace attributes without nested tables or types
      *
      * @param partition Keyspace attributes in serialized form
      */
-    private static KSMetaData createKeyspaceFromSchemaPartition(RowIterator partition)
+
+    private static KeyspaceParams createKeyspaceParamsFromSchemaPartition(RowIterator partition)
     {
         String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, KEYSPACES);
         UntypedResultSet.Row row = QueryProcessor.resultify(query, partition).one();
-        return new KSMetaData(row.getString("keyspace_name"),
-                              AbstractReplicationStrategy.getClass(row.getString("strategy_class")),
-                              fromJsonMap(row.getString("strategy_options")),
-                              row.getBoolean("durable_writes"));
+
+        Map<String, String> replicationMap = new HashMap<>();
+        replicationMap.putAll(fromJsonMap(row.getString("strategy_options")));
+        replicationMap.put("class", row.getString("strategy_class"));
+
+        return KeyspaceParams.create(row.getBoolean("durable_writes"), replicationMap);
     }
 
     /*

http://git-wip-us.apache.org/repos/asf/cassandra/blob/31e3f612/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 1da18e0..401057c 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -645,11 +645,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 for (Keyspace keyspace : Keyspace.all())
                 {
                     KSMetaData ksm = Schema.instance.getKSMetaData(keyspace.getName());
-                    if (!ksm.durableWrites)
-                    {
+                    if (!ksm.params.durableWrites)
                         for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
                             flushes.add(cfs.forceFlush());
-                    }
                 }
                 try
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/31e3f612/src/java/org/apache/cassandra/thrift/ThriftConversion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftConversion.java b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
index 69332cf..9c761d7 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftConversion.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.LocalStrategy;
+import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.Tables;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.schema.LegacySchemaTables;
@@ -158,11 +159,12 @@ public class ThriftConversion
         if (cls.equals(LocalStrategy.class))
             throw new ConfigurationException("Unable to use given strategy class: LocalStrategy is reserved for internal use.");
 
-        return new KSMetaData(ksd.name,
-                              cls,
-                              ksd.strategy_options == null ? Collections.<String, String>emptyMap() : ksd.strategy_options,
-                              ksd.durable_writes,
-                              Tables.of(cfDefs));
+        Map<String, String> replicationMap = new HashMap<>();
+        if (ksd.strategy_options != null)
+            replicationMap.putAll(ksd.strategy_options);
+        replicationMap.put(KeyspaceParams.Replication.CLASS, cls.getName());
+
+        return KSMetaData.create(ksd.name, KeyspaceParams.create(ksd.durable_writes, replicationMap), Tables.of(cfDefs));
     }
 
     public static KsDef toThrift(KSMetaData ksm)
@@ -172,9 +174,9 @@ public class ThriftConversion
             if (cfm.isThriftCompatible()) // Don't expose CF that cannot be correctly handle by thrift; see CASSANDRA-4377 for further details
                 cfDefs.add(toThrift(cfm));
 
-        KsDef ksdef = new KsDef(ksm.name, ksm.strategyClass.getName(), cfDefs);
-        ksdef.setStrategy_options(ksm.strategyOptions);
-        ksdef.setDurable_writes(ksm.durableWrites);
+        KsDef ksdef = new KsDef(ksm.name, ksm.params.replication.klass.getName(), cfDefs);
+        ksdef.setStrategy_options(ksm.params.replication.options);
+        ksdef.setDurable_writes(ksm.params.durableWrites);
 
         return ksdef;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/31e3f612/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
index 2dd906d..8eab93c 100644
--- a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
+++ b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
@@ -21,19 +21,21 @@ import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import com.google.common.collect.ImmutableMap;
-
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.RowUpdateBuilder;
-import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.Tables;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 
 public final class TraceKeyspace
 {
+    private TraceKeyspace()
+    {
+    }
+
     public static final String NAME = "system_traces";
 
     public static final String SESSIONS = "sessions";
@@ -73,8 +75,7 @@ public final class TraceKeyspace
 
     public static KSMetaData definition()
     {
-        Tables tables = Tables.of(Sessions, Events);
-        return new KSMetaData(NAME, SimpleStrategy.class, ImmutableMap.of("replication_factor", "2"), true, tables);
+        return KSMetaData.create(NAME, KeyspaceParams.simple(2), Tables.of(Sessions, Events));
     }
 
     static Mutation makeStartSessionMutation(ByteBuffer sessionId,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/31e3f612/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
index ca223ca..56a7e86 100644
--- a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
+++ b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
@@ -28,7 +28,6 @@ import org.junit.Test;
 
 import org.apache.cassandra.UpdateBuilder;
 import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -37,7 +36,7 @@ import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.sstable.SSTableUtils;
-import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import static org.junit.Assert.assertEquals;
@@ -54,8 +53,7 @@ public class LongCompactionsTest
         compactionOptions.put("tombstone_compaction_interval", "1");
         SchemaLoader.prepareServer();
         SchemaLoader.createKeyspace(KEYSPACE1,
-                                    SimpleStrategy.class,
-                                    KSMetaData.optsWithRF(1),
+                                    KeyspaceParams.simple(1),
                                     SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD)
                                                 .compactionStrategyOptions(compactionOptions));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/31e3f612/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
index fbee72a..97fd3b3 100644
--- a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
+++ b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
@@ -28,14 +28,11 @@ import org.junit.Test;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.UpdateBuilder;
-import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.utils.FBUtilities;
 
-import static org.junit.Assert.assertTrue;
-
 public class LongLeveledCompactionStrategyTest
 {
     public static final String KEYSPACE1 = "LongLeveledCompactionStrategyTest";
@@ -48,8 +45,7 @@ public class LongLeveledCompactionStrategyTest
         leveledOptions.put("sstable_size_in_mb", "1");
         SchemaLoader.prepareServer();
         SchemaLoader.createKeyspace(KEYSPACE1,
-                                    SimpleStrategy.class,
-                                    KSMetaData.optsWithRF(1),
+                                    KeyspaceParams.simple(1),
                                     SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARDLVL)
                                                 .compactionStrategyClass(LeveledCompactionStrategy.class)
                                                 .compactionStrategyOptions(leveledOptions));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/31e3f612/test/unit/org/apache/cassandra/MockSchema.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/MockSchema.java b/test/unit/org/apache/cassandra/MockSchema.java
index f0a849b..6aa40d2 100644
--- a/test/unit/org/apache/cassandra/MockSchema.java
+++ b/test/unit/org/apache/cassandra/MockSchema.java
@@ -24,7 +24,6 @@ import java.io.RandomAccessFile;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 
 import org.apache.cassandra.cache.CachingOptions;
@@ -46,7 +45,7 @@ import org.apache.cassandra.io.util.ChannelProxy;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.Memory;
 import org.apache.cassandra.io.util.SegmentedFile;
-import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.utils.AlwaysPresentFilter;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -59,7 +58,7 @@ public class MockSchema
         indexSummary = new IndexSummary(Murmur3Partitioner.instance, offsets, 0, Memory.allocate(4), 0, 0, 0, 1);
     }
     private static final AtomicInteger id = new AtomicInteger();
-    public static final Keyspace ks = Keyspace.mockKS(new KSMetaData("mockks", SimpleStrategy.class, ImmutableMap.of("replication_factor", "1"), false));
+    public static final Keyspace ks = Keyspace.mockKS(KSMetaData.create("mockks", KeyspaceParams.simpleTransient(1)));
 
     private static final IndexSummary indexSummary;
     private static final SegmentedFile segmentedFile = new BufferedSegmentedFile(new ChannelProxy(temp("mocksegmentedfile")), 0);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/31e3f612/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java
index 43ba0ba..b3b4a64 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -41,8 +41,8 @@ import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.io.compress.SnappyCompressor;
 import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.locator.AbstractReplicationStrategy;
-import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Tables;
 import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -99,13 +99,6 @@ public class SchemaLoader
         String ks_prsi = testName + "PerRowSecondaryIndex";
         String ks_cql = testName + "cql_keyspace";
 
-        Class<? extends AbstractReplicationStrategy> simple = SimpleStrategy.class;
-
-        Map<String, String> opts_rf1 = KSMetaData.optsWithRF(1);
-        Map<String, String> opts_rf2 = KSMetaData.optsWithRF(2);
-        Map<String, String> opts_rf3 = KSMetaData.optsWithRF(3);
-        Map<String, String> opts_rf5 = KSMetaData.optsWithRF(5);
-
         AbstractType bytes = BytesType.instance;
 
         AbstractType<?> composite = CompositeType.getInstance(Arrays.asList(new AbstractType<?>[]{BytesType.instance, TimeUUIDType.instance, IntegerType.instance}));
@@ -124,10 +117,9 @@ public class SchemaLoader
         leveledOptions.put("sstable_size_in_mb", "1");
 
         // Keyspace 1
-        schema.add(KSMetaData.testMetadata(ks1,
-                simple,
-                opts_rf1,
-
+        schema.add(KSMetaData.create(ks1,
+                KeyspaceParams.simple(1),
+                Tables.of(
                 // Column Families
                 standardCFMD(ks1, "Standard1").compactionStrategyOptions(compactionOptions),
                 standardCFMD(ks1, "Standard2"),
@@ -171,13 +163,12 @@ public class SchemaLoader
                 //CFMetaData.Builder.create(ks1, "MixedTypes").withColumnNameComparator(LongType.instance).addPartitionKey("key", UUIDType.instance).build(),
                 //CFMetaData.Builder.create(ks1, "MixedTypesComposite", false, true, false).withColumnNameComparator(composite).addPartitionKey("key", composite).build(),
                 //CFMetaData.Builder.create(ks1, "AsciiKeys").addPartitionKey("key", AsciiType.instance).build()
-        ));
+        )));
 
         // Keyspace 2
-        schema.add(KSMetaData.testMetadata(ks2,
-                simple,
-                opts_rf1,
-
+        schema.add(KSMetaData.create(ks2,
+                KeyspaceParams.simple(1),
+                Tables.of(
                 // Column Families
                 standardCFMD(ks2, "Standard1"),
                 standardCFMD(ks2, "Standard3"),
@@ -185,58 +176,51 @@ public class SchemaLoader
                 superCFMD(ks2, "Super4", TimeUUIDType.instance),
                 keysIndexCFMD(ks2, "Indexed1", true),
                 compositeIndexCFMD(ks2, "Indexed2", true),
-                compositeIndexCFMD(ks2, "Indexed3", true).gcGraceSeconds(0)));
+                compositeIndexCFMD(ks2, "Indexed3", true).gcGraceSeconds(0))));
 
         // Keyspace 3
-        schema.add(KSMetaData.testMetadata(ks3,
-                simple,
-                opts_rf5,
-
-                // Column Families
+        schema.add(KSMetaData.create(ks3,
+                KeyspaceParams.simple(5),
+                Tables.of(
                 standardCFMD(ks3, "Standard1"),
-                keysIndexCFMD(ks3, "Indexed1", true)));
+                keysIndexCFMD(ks3, "Indexed1", true))));
 
         // Keyspace 4
-        schema.add(KSMetaData.testMetadata(ks4,
-                simple,
-                opts_rf3,
-
-                // Column Families
+        schema.add(KSMetaData.create(ks4,
+                KeyspaceParams.simple(3),
+                Tables.of(
                 standardCFMD(ks4, "Standard1"),
                 standardCFMD(ks4, "Standard3"),
                 superCFMD(ks4, "Super3", bytes),
                 superCFMD(ks4, "Super4", TimeUUIDType.instance),
-                superCFMD(ks4, "Super5", TimeUUIDType.instance, BytesType.instance)));
+                superCFMD(ks4, "Super5", TimeUUIDType.instance, BytesType.instance))));
 
         // Keyspace 5
-        schema.add(KSMetaData.testMetadata(ks5,
-                simple,
-                opts_rf2,
-                standardCFMD(ks5, "Standard1")));
-
+        schema.add(KSMetaData.create(ks5,
+                KeyspaceParams.simple(2),
+                Tables.of(standardCFMD(ks5, "Standard1"))));
         // Keyspace 6
-        schema.add(KSMetaData.testMetadata(ks6,
-                simple,
-                opts_rf1,
-                keysIndexCFMD(ks6, "Indexed1", true)));
+        schema.add(KSMetaData.create(ks6,
+                KeyspaceParams.simple(1),
+                Tables.of(keysIndexCFMD(ks6, "Indexed1", true))));
 
         // KeyCacheSpace
-        schema.add(KSMetaData.testMetadata(ks_kcs,
-                simple,
-                opts_rf1,
+        schema.add(KSMetaData.create(ks_kcs,
+                KeyspaceParams.simple(1),
+                Tables.of(
                 standardCFMD(ks_kcs, "Standard1"),
                 standardCFMD(ks_kcs, "Standard2"),
-                standardCFMD(ks_kcs, "Standard3")));
+                standardCFMD(ks_kcs, "Standard3"))));
 
         // RowCacheSpace
-        schema.add(KSMetaData.testMetadata(ks_rcs,
-                simple,
-                opts_rf1,
+        schema.add(KSMetaData.create(ks_rcs,
+                KeyspaceParams.simple(1),
+                Tables.of(
                 standardCFMD(ks_rcs, "CFWithoutCache").caching(CachingOptions.NONE),
                 standardCFMD(ks_rcs, "CachedCF").caching(CachingOptions.ALL),
                 standardCFMD(ks_rcs, "CachedIntCF").
                         caching(new CachingOptions(new CachingOptions.KeyCache(CachingOptions.KeyCache.Type.ALL),
-                                new CachingOptions.RowCache(CachingOptions.RowCache.Type.HEAD, 100)))));
+                                new CachingOptions.RowCache(CachingOptions.RowCache.Type.HEAD, 100))))));
 
         // CounterCacheSpace
         /*schema.add(KSMetaData.testMetadata(ks_ccs,
@@ -245,21 +229,15 @@ public class SchemaLoader
                 CFMetaData.Builder.create(ks_ccs, "Counter1", false, false, true).build(),
                 CFMetaData.Builder.create(ks_ccs, "Counter1", false, false, true).build()));*/
 
-        schema.add(KSMetaData.testMetadataNotDurable(ks_nocommit,
-                simple,
-                opts_rf1,
-                standardCFMD(ks_nocommit, "Standard1")));
+        schema.add(KSMetaData.create(ks_nocommit, KeyspaceParams.simpleTransient(1), Tables.of(
+                standardCFMD(ks_nocommit, "Standard1"))));
 
         // PerRowSecondaryIndexTest
-        schema.add(KSMetaData.testMetadata(ks_prsi,
-                simple,
-                opts_rf1,
-                perRowIndexedCFMD(ks_prsi, "Indexed1")));
+        schema.add(KSMetaData.create(ks_prsi, KeyspaceParams.simple(1), Tables.of(
+                perRowIndexedCFMD(ks_prsi, "Indexed1"))));
 
         // CQLKeyspace
-        schema.add(KSMetaData.testMetadata(ks_cql,
-                simple,
-                opts_rf1,
+        schema.add(KSMetaData.create(ks_cql, KeyspaceParams.simple(1), Tables.of(
 
                 // Column Families
                 CFMetaData.compile("CREATE TABLE table1 ("
@@ -287,7 +265,7 @@ public class SchemaLoader
                         + "foo text, "
                         + "PRIMARY KEY((bar, baz), qux, quz) ) "
                         + "WITH COMPACT STORAGE", ks_cql)
-        ));
+        )));
 
 
         if (Boolean.parseBoolean(System.getProperty("cassandra.test.compression", "false")))
@@ -299,24 +277,9 @@ public class SchemaLoader
             MigrationManager.announceNewKeyspace(ksm, false);
     }
 
-    public static void createKeyspace(String keyspaceName,
-                                      Class<? extends AbstractReplicationStrategy> strategy,
-                                      Map<String, String> options,
-                                      CFMetaData... cfmetas) throws ConfigurationException
-    {
-        createKeyspace(keyspaceName, true, true, strategy, options, cfmetas);
-    }
-
-    public static void createKeyspace(String keyspaceName,
-                                      boolean durable,
-                                      boolean announceLocally,
-                                      Class<? extends AbstractReplicationStrategy> strategy,
-                                      Map<String, String> options,
-                                      CFMetaData... cfmetas) throws ConfigurationException
+    public static void createKeyspace(String name, KeyspaceParams params, CFMetaData... tables)
     {
-        KSMetaData ksm = durable ? KSMetaData.testMetadata(keyspaceName, strategy, options, cfmetas)
-                                 : KSMetaData.testMetadataNotDurable(keyspaceName, strategy, options, cfmetas);
-        MigrationManager.announceNewKeyspace(ksm, announceLocally);
+        MigrationManager.announceNewKeyspace(KSMetaData.create(name, params, Tables.of(tables)), true);
     }
 
     public static ColumnDefinition integerColumn(String ksName, String cfName)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/31e3f612/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java b/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java
index e7a1706..fb5d84f 100644
--- a/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java
+++ b/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.cache;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -29,9 +28,8 @@ import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
-import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -45,12 +43,11 @@ public class AutoSavingCacheTest
     {
         SchemaLoader.prepareServer();
         SchemaLoader.createKeyspace(KEYSPACE1,
-                SimpleStrategy.class,
-                KSMetaData.optsWithRF(1),
-                CFMetaData.Builder.create(KEYSPACE1, CF_STANDARD1)
-                    .addPartitionKey("pKey", AsciiType.instance)
-                    .addRegularColumn("col1", AsciiType.instance)
-                    .build());
+                                    KeyspaceParams.simple(1),
+                                    CFMetaData.Builder.create(KEYSPACE1, CF_STANDARD1)
+                                                      .addPartitionKey("pKey", AsciiType.instance)
+                                                      .addRegularColumn("col1", AsciiType.instance)
+                                                      .build());
     }
 
     @Test