You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2017/01/27 22:18:16 UTC

[10/37] cassandra git commit: Make TableMetadata immutable, optimize Schema

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java b/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java
index 975eb8e..30fbb0d 100644
--- a/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java
+++ b/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java
@@ -26,9 +26,9 @@ import java.util.List;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.statements.SelectStatement;
 import org.apache.cassandra.db.marshal.*;
@@ -45,7 +45,7 @@ import static org.junit.Assert.assertTrue;
 
 public class SelectionColumnMappingTest extends CQLTester
 {
-    private static final ColumnDefinition NULL_DEF = null;
+    private static final ColumnMetadata NULL_DEF = null;
     String tableName;
     String typeName;
     UserType userType;
@@ -71,7 +71,7 @@ public class SelectionColumnMappingTest extends CQLTester
                                 " v1 int," +
                                 " v2 ascii," +
                                 " v3 frozen<" + typeName + ">)");
-        userType = Schema.instance.getKSMetaData(KEYSPACE).types.get(ByteBufferUtil.bytes(typeName)).get().freeze();
+        userType = Schema.instance.getKeyspaceMetadata(KEYSPACE).types.get(ByteBufferUtil.bytes(typeName)).get().freeze();
         functionName = createFunction(KEYSPACE, "int, ascii",
                                       "CREATE FUNCTION %s (i int, a ascii) " +
                                       "CALLED ON NULL INPUT " +
@@ -130,7 +130,7 @@ public class SelectionColumnMappingTest extends CQLTester
     private void testSimpleTypes() throws Throwable
     {
         // simple column identifiers without aliases are represented in
-        // ResultSet.Metadata by the underlying ColumnDefinition
+        // ResultSet.Metadata by the underlying ColumnMetadata
         ColumnSpecification kSpec = columnSpecification("k", Int32Type.instance);
         ColumnSpecification v1Spec = columnSpecification("v1", Int32Type.instance);
         ColumnSpecification v2Spec = columnSpecification("v2", AsciiType.instance);
@@ -144,12 +144,12 @@ public class SelectionColumnMappingTest extends CQLTester
 
     private void testWildcard() throws Throwable
     {
-        // Wildcard select represents each column in the table with a ColumnDefinition
+        // Wildcard select represents each column in the table with a ColumnMetadata
         // in the ResultSet metadata
-        ColumnDefinition kSpec = columnDefinition("k");
-        ColumnDefinition v1Spec = columnDefinition("v1");
-        ColumnDefinition v2Spec = columnDefinition("v2");
-        ColumnDefinition v3Spec = columnDefinition("v3");
+        ColumnMetadata kSpec = columnDefinition("k");
+        ColumnMetadata v1Spec = columnDefinition("v1");
+        ColumnMetadata v2Spec = columnDefinition("v2");
+        ColumnMetadata v3Spec = columnDefinition("v3");
         SelectionColumnMapping expected = SelectionColumnMapping.newMapping()
                                                                 .addMapping(kSpec, columnDefinition("k"))
                                                                 .addMapping(v1Spec, columnDefinition("v1"))
@@ -162,7 +162,7 @@ public class SelectionColumnMappingTest extends CQLTester
     private void testSimpleTypesWithAliases() throws Throwable
     {
         // simple column identifiers with aliases are represented in ResultSet.Metadata
-        // by a ColumnSpecification based on the underlying ColumnDefinition
+        // by a ColumnSpecification based on the underlying ColumnMetadata
         ColumnSpecification kSpec = columnSpecification("k_alias", Int32Type.instance);
         ColumnSpecification v1Spec = columnSpecification("v1_alias", Int32Type.instance);
         ColumnSpecification v2Spec = columnSpecification("v2_alias", AsciiType.instance);
@@ -406,7 +406,7 @@ public class SelectionColumnMappingTest extends CQLTester
     private void testMultipleUnaliasedSelectionOfSameColumn() throws Throwable
     {
         // simple column identifiers without aliases are represented in
-        // ResultSet.Metadata by the underlying ColumnDefinition
+        // ResultSet.Metadata by the underlying ColumnMetadata
         SelectionColumns expected = SelectionColumnMapping.newMapping()
                                                           .addMapping(columnSpecification("v1", Int32Type.instance),
                                                                       columnDefinition("v1"))
@@ -430,7 +430,7 @@ public class SelectionColumnMappingTest extends CQLTester
     {
         ColumnSpecification listSpec = columnSpecification("(list<int>)[]", ListType.getInstance(Int32Type.instance, false));
         SelectionColumnMapping expected = SelectionColumnMapping.newMapping()
-                                                                .addMapping(listSpec, (ColumnDefinition) null);
+                                                                .addMapping(listSpec, (ColumnMetadata) null);
 
         verify(expected, "SELECT (list<int>)[] FROM %s");
     }
@@ -449,7 +449,7 @@ public class SelectionColumnMappingTest extends CQLTester
     {
         ColumnSpecification setSpec = columnSpecification("(set<int>){}", SetType.getInstance(Int32Type.instance, false));
         SelectionColumnMapping expected = SelectionColumnMapping.newMapping()
-                                                                .addMapping(setSpec, (ColumnDefinition) null);
+                                                                .addMapping(setSpec, (ColumnMetadata) null);
 
         verify(expected, "SELECT (set<int>){} FROM %s");
     }
@@ -467,7 +467,7 @@ public class SelectionColumnMappingTest extends CQLTester
     {
         ColumnSpecification mapSpec = columnSpecification("(map<text, int>){}", MapType.getInstance(UTF8Type.instance, Int32Type.instance, false));
         SelectionColumnMapping expected = SelectionColumnMapping.newMapping()
-                                                                .addMapping(mapSpec, (ColumnDefinition) null);
+                                                                .addMapping(mapSpec, (ColumnMetadata) null);
 
         verify(expected, "SELECT (map<text, int>){} FROM %s");
     }
@@ -557,7 +557,7 @@ public class SelectionColumnMappingTest extends CQLTester
                                        " LANGUAGE javascript" +
                                        " AS 'a*a'");
 
-        ColumnDefinition v1 = columnDefinition("v1");
+        ColumnMetadata v1 = columnDefinition("v1");
         SelectionColumns expected = SelectionColumnMapping.newMapping()
                                                           .addMapping(columnSpecification(aFunc + "(v1)",
                                                                                           Int32Type.instance),
@@ -600,18 +600,17 @@ public class SelectionColumnMappingTest extends CQLTester
         assertEquals(expected, select.getSelection().getColumnMapping());
     }
 
-    private Iterable<ColumnDefinition> columnDefinitions(String...names)
+    private Iterable<ColumnMetadata> columnDefinitions(String...names)
     {
-        List<ColumnDefinition> defs = new ArrayList<>();
+        List<ColumnMetadata> defs = new ArrayList<>();
         for (String n : names)
             defs.add(columnDefinition(n));
         return defs;
     }
 
-    private ColumnDefinition columnDefinition(String name)
+    private ColumnMetadata columnDefinition(String name)
     {
-        return Schema.instance.getCFMetaData(KEYSPACE, tableName)
-                              .getColumnDefinition(new ColumnIdentifier(name, true));
+        return Schema.instance.getTableMetadata(KEYSPACE, tableName).getColumn(new ColumnIdentifier(name, true));
 
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
index 15a93a1..f098126 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
@@ -25,8 +25,8 @@ import java.util.concurrent.CountDownLatch;
 import org.apache.commons.lang3.StringUtils;
 import org.junit.Test;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.ColumnIdentifier;
@@ -111,9 +111,9 @@ public class SecondaryIndexTest extends CQLTester
 
         // IF NOT EXISTS should apply in cases where the new index differs from an existing one in name only
         String otherIndexName = "index_" + System.nanoTime();
-        assertEquals(1, getCurrentColumnFamilyStore().metadata.getIndexes().size());
+        assertEquals(1, getCurrentColumnFamilyStore().metadata().indexes.size());
         createIndex("CREATE INDEX IF NOT EXISTS " + otherIndexName + " ON %s(b)");
-        assertEquals(1, getCurrentColumnFamilyStore().metadata.getIndexes().size());
+        assertEquals(1, getCurrentColumnFamilyStore().metadata().indexes.size());
         assertInvalidMessage(String.format("Index %s is a duplicate of existing index %s",
                                            removeQuotes(otherIndexName.toLowerCase(Locale.US)),
                                            removeQuotes(indexName.toLowerCase(Locale.US))),
@@ -845,11 +845,11 @@ public class SecondaryIndexTest extends CQLTester
         createIndex(String.format("CREATE CUSTOM INDEX c_idx_2 ON %%s(c) USING '%s' WITH OPTIONS = {'foo':'b'}", indexClassName));
 
         ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
-        CFMetaData cfm = cfs.metadata;
-        StubIndex index1 = (StubIndex)cfs.indexManager.getIndex(cfm.getIndexes()
+        TableMetadata cfm = cfs.metadata();
+        StubIndex index1 = (StubIndex)cfs.indexManager.getIndex(cfm.indexes
                                                                    .get("c_idx_1")
                                                                    .orElseThrow(throwAssert("index not found")));
-        StubIndex index2 = (StubIndex)cfs.indexManager.getIndex(cfm.getIndexes()
+        StubIndex index2 = (StubIndex)cfs.indexManager.getIndex(cfm.indexes
                                                                    .get("c_idx_2")
                                                                    .orElseThrow(throwAssert("index not found")));
         Object[] row1a = row(0, 0, 0);
@@ -887,8 +887,8 @@ public class SecondaryIndexTest extends CQLTester
         createIndex(String.format("CREATE CUSTOM INDEX c_idx ON %%s(c) USING '%s'", indexClassName));
 
         ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
-        CFMetaData cfm = cfs.metadata;
-        StubIndex index1 = (StubIndex) cfs.indexManager.getIndex(cfm.getIndexes()
+        TableMetadata cfm = cfs.metadata();
+        StubIndex index1 = (StubIndex) cfs.indexManager.getIndex(cfm.indexes
                 .get("c_idx")
                 .orElseThrow(throwAssert("index not found")));
 
@@ -943,8 +943,8 @@ public class SecondaryIndexTest extends CQLTester
         createIndex(String.format("CREATE CUSTOM INDEX test_index ON %%s() USING '%s'", StubIndex.class.getName()));
         execute("INSERT INTO %s (k, c, v1, v2) VALUES (0, 0, 0, 0) USING TIMESTAMP 0");
 
-        ColumnDefinition v1 = getCurrentColumnFamilyStore().metadata.getColumnDefinition(new ColumnIdentifier("v1", true));
-        ColumnDefinition v2 = getCurrentColumnFamilyStore().metadata.getColumnDefinition(new ColumnIdentifier("v2", true));
+        ColumnMetadata v1 = getCurrentColumnFamilyStore().metadata().getColumn(new ColumnIdentifier("v1", true));
+        ColumnMetadata v2 = getCurrentColumnFamilyStore().metadata().getColumn(new ColumnIdentifier("v2", true));
 
         StubIndex index = (StubIndex)getCurrentColumnFamilyStore().indexManager.getIndexByName("test_index");
         assertEquals(1, index.rowsInserted.size());
@@ -1336,16 +1336,16 @@ public class SecondaryIndexTest extends CQLTester
                                       ClientState.forInternalCalls());
     }
 
-    private void validateCell(Cell cell, ColumnDefinition def, ByteBuffer val, long timestamp)
+    private void validateCell(Cell cell, ColumnMetadata def, ByteBuffer val, long timestamp)
     {
         assertNotNull(cell);
         assertEquals(0, def.type.compare(cell.value(), val));
         assertEquals(timestamp, cell.timestamp());
     }
 
-    private static void assertColumnValue(int expected, String name, Row row, CFMetaData cfm)
+    private static void assertColumnValue(int expected, String name, Row row, TableMetadata cfm)
     {
-        ColumnDefinition col = cfm.getColumnDefinition(new ColumnIdentifier(name, true));
+        ColumnMetadata col = cfm.getColumn(new ColumnIdentifier(name, true));
         AbstractType<?> type = col.type;
         assertEquals(expected, type.compose(row.getCell(col).value()));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/cql3/validation/entities/UFAuthTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/UFAuthTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/UFAuthTest.java
index 3affe9a..ecff0cc 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/UFAuthTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/UFAuthTest.java
@@ -28,7 +28,7 @@ import org.junit.Test;
 
 import org.apache.cassandra.auth.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.cql3.functions.FunctionName;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/cql3/validation/entities/UFJavaTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/UFJavaTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/UFJavaTest.java
index a7c09b1..187871a 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/UFJavaTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/UFJavaTest.java
@@ -33,15 +33,14 @@ import com.datastax.driver.core.Row;
 import com.datastax.driver.core.TupleType;
 import com.datastax.driver.core.TupleValue;
 import com.datastax.driver.core.UDTValue;
-import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.CQL3Type;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.cql3.functions.FunctionName;
 import org.apache.cassandra.exceptions.FunctionExecutionException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.transport.ProtocolVersion;
-import org.apache.cassandra.transport.Server;
 
 public class UFJavaTest extends CQLTester
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
index 74f6409..7940b92 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
@@ -28,7 +28,6 @@ import org.junit.Test;
 
 import com.datastax.driver.core.*;
 import com.datastax.driver.core.exceptions.InvalidQueryException;
-import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
@@ -38,6 +37,7 @@ import org.apache.cassandra.cql3.functions.UDFunction;
 import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.transport.*;
 import org.apache.cassandra.transport.ProtocolVersion;
@@ -823,7 +823,7 @@ public class UFTest extends CQLTester
                                       "LANGUAGE JAVA\n" +
                                       "AS 'throw new RuntimeException();';");
 
-        KeyspaceMetadata ksm = Schema.instance.getKSMetaData(KEYSPACE_PER_TEST);
+        KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(KEYSPACE_PER_TEST);
         UDFunction f = (UDFunction) ksm.functions.get(parseFunctionName(fName)).iterator().next();
 
         UDFunction broken = UDFunction.createBrokenFunction(f.name(),
@@ -834,7 +834,7 @@ public class UFTest extends CQLTester
                                                             "java",
                                                             f.body(),
                                                             new InvalidRequestException("foo bar is broken"));
-        Schema.instance.setKeyspaceMetadata(ksm.withSwapped(ksm.functions.without(f.name(), f.argTypes()).with(broken)));
+        Schema.instance.load(ksm.withSwapped(ksm.functions.without(f.name(), f.argTypes()).with(broken)));
 
         assertInvalidThrowMessage("foo bar is broken", InvalidRequestException.class,
                                   "SELECT key, " + fName + "(dval) FROM %s");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java
index 2760ae5..becbdad 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java
@@ -135,7 +135,7 @@ public class CrcCheckChanceTest extends CQLTester
             alterTable("ALTER TABLE %s WITH compression = {'sstable_compression': 'LZ4Compressor', 'crc_check_chance': 0.5}");
 
         //We should be able to get the new value by accessing directly the schema metadata
-        Assert.assertEquals(0.5, cfs.metadata.params.crcCheckChance);
+        Assert.assertEquals(0.5, cfs.metadata().params.crcCheckChance);
 
         //but previous JMX-set value will persist until next restart
         Assert.assertEquals(0.01, cfs.getLiveSSTables().iterator().next().getCrcCheckChance());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java
index cfedc08..c79c596 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java
@@ -40,8 +40,8 @@ import ch.qos.logback.classic.LoggerContext;
 import ch.qos.logback.classic.spi.TurboFilterList;
 import ch.qos.logback.classic.turbo.ReconfigureOnChangeFilter;
 import ch.qos.logback.classic.turbo.TurboFilter;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.config.SchemaConstants;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
@@ -1282,7 +1282,7 @@ public class AggregationTest extends CQLTester
                                    "SFUNC " + shortFunctionName(fState) + " " +
                                    "STYPE int ");
 
-        KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace());
+        KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(keyspace());
         UDAggregate f = (UDAggregate) ksm.functions.get(parseFunctionName(a)).iterator().next();
 
         UDAggregate broken = UDAggregate.createBroken(f.name(),
@@ -1291,7 +1291,7 @@ public class AggregationTest extends CQLTester
                                                       null,
                                                       new InvalidRequestException("foo bar is broken"));
 
-        Schema.instance.setKeyspaceMetadata(ksm.withSwapped(ksm.functions.without(f.name(), f.argTypes()).with(broken)));
+        Schema.instance.load(ksm.withSwapped(ksm.functions.without(f.name(), f.argTypes()).with(broken)));
 
         assertInvalidThrowMessage("foo bar is broken", InvalidRequestException.class,
                                   "SELECT " + a + "(val) FROM %s");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java
index 3f98df1..88c6a3f 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.cql3.validation.operations;
 import org.junit.Assert;
 import org.junit.Test;
 
-import org.apache.cassandra.config.SchemaConstants;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
@@ -183,12 +183,12 @@ public class AlterTest extends CQLTester
         ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName);
 
         alterTable("ALTER TABLE %s WITH min_index_interval=256 AND max_index_interval=512");
-        assertEquals(256, cfs.metadata.params.minIndexInterval);
-        assertEquals(512, cfs.metadata.params.maxIndexInterval);
+        assertEquals(256, cfs.metadata().params.minIndexInterval);
+        assertEquals(512, cfs.metadata().params.maxIndexInterval);
 
         alterTable("ALTER TABLE %s WITH caching = {}");
-        assertEquals(256, cfs.metadata.params.minIndexInterval);
-        assertEquals(512, cfs.metadata.params.maxIndexInterval);
+        assertEquals(256, cfs.metadata().params.minIndexInterval);
+        assertEquals(512, cfs.metadata().params.maxIndexInterval);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
index b92b0df..6339a46 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
@@ -26,10 +26,11 @@ import java.util.UUID;
 
 import org.junit.Test;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.config.SchemaConstants;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.Duration;
 import org.apache.cassandra.db.Mutation;
@@ -819,14 +820,14 @@ public class CreateTest extends CQLTester
 
     private void assertTriggerExists(String name)
     {
-        CFMetaData cfm = Schema.instance.getCFMetaData(keyspace(), currentTable()).copy();
-        assertTrue("the trigger does not exist", cfm.getTriggers().get(name).isPresent());
+        TableMetadata metadata = Schema.instance.getTableMetadata(keyspace(), currentTable());
+        assertTrue("the trigger does not exist", metadata.triggers.get(name).isPresent());
     }
 
     private void assertTriggerDoesNotExists(String name)
     {
-        CFMetaData cfm = Schema.instance.getCFMetaData(keyspace(), currentTable()).copy();
-        assertFalse("the trigger exists", cfm.getTriggers().get(name).isPresent());
+        TableMetadata metadata = Schema.instance.getTableMetadata(keyspace(), currentTable());
+        assertFalse("the trigger exists", metadata.triggers.get(name).isPresent());
     }
 
     public static class TestTrigger implements ITrigger

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/cql3/validation/operations/DropRecreateAndRestoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/DropRecreateAndRestoreTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/DropRecreateAndRestoreTest.java
index f491d24..19aba64 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/DropRecreateAndRestoreTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/DropRecreateAndRestoreTest.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.cql3.validation.operations;
 
 import java.io.File;
 import java.util.List;
-import java.util.UUID;
 
 import org.junit.Test;
 
@@ -30,6 +29,7 @@ import org.apache.cassandra.exceptions.AlreadyExistsException;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.schema.TableId;
 
 public class DropRecreateAndRestoreTest extends CQLTester
 {
@@ -43,7 +43,7 @@ public class DropRecreateAndRestoreTest extends CQLTester
 
 
         long time = System.currentTimeMillis();
-        UUID id = currentTableMetadata().cfId;
+        TableId id = currentTableMetadata().id;
         assertRows(execute("SELECT * FROM %s"), row(0, 0, 0), row(0, 1, 1));
         Thread.sleep(5);
 
@@ -84,7 +84,7 @@ public class DropRecreateAndRestoreTest extends CQLTester
     public void testCreateWithIdDuplicate() throws Throwable
     {
         createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY(a, b))");
-        UUID id = currentTableMetadata().cfId;
+        TableId id = currentTableMetadata().id;
         execute(String.format("CREATE TABLE %%s (a int, b int, c int, PRIMARY KEY(a, b)) WITH ID = %s", id));
     }
 
@@ -98,7 +98,7 @@ public class DropRecreateAndRestoreTest extends CQLTester
     public void testAlterWithId() throws Throwable
     {
         createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY(a, b))");
-        UUID id = currentTableMetadata().cfId;
+        TableId id = currentTableMetadata().id;
         execute(String.format("ALTER TABLE %%s WITH ID = %s", id));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
index b5f8b6c..7cbe085 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
@@ -23,7 +23,7 @@ import java.util.List;
 
 import org.junit.Test;
 
-import org.apache.cassandra.config.SchemaConstants;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.SyntaxException;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/db/CellTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CellTest.java b/test/unit/org/apache/cassandra/db/CellTest.java
index 86fb20e..febfa3c 100644
--- a/test/unit/org/apache/cassandra/db/CellTest.java
+++ b/test/unit/org/apache/cassandra/db/CellTest.java
@@ -27,16 +27,16 @@ import junit.framework.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.FieldIdentifier;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -54,13 +54,14 @@ public class CellTest
     private static final String CF_STANDARD1 = "Standard1";
     private static final String CF_COLLECTION = "Collection1";
 
-    private static final CFMetaData cfm = SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1);
-    private static final CFMetaData cfm2 = CFMetaData.Builder.create(KEYSPACE1, CF_COLLECTION)
-                                                             .addPartitionKey("k", IntegerType.instance)
-                                                             .addClusteringColumn("c", IntegerType.instance)
-                                                             .addRegularColumn("v", IntegerType.instance)
-                                                             .addRegularColumn("m", MapType.getInstance(IntegerType.instance, IntegerType.instance, true))
-                                                             .build();
+    private static final TableMetadata cfm = SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1).build();
+    private static final TableMetadata cfm2 =
+        TableMetadata.builder(KEYSPACE1, CF_COLLECTION)
+                     .addPartitionKeyColumn("k", IntegerType.instance)
+                     .addClusteringColumn("c", IntegerType.instance)
+                     .addRegularColumn("v", IntegerType.instance)
+                     .addRegularColumn("m", MapType.getInstance(IntegerType.instance, IntegerType.instance, true))
+                     .build();
 
     @BeforeClass
     public static void defineSchema() throws ConfigurationException
@@ -69,14 +70,14 @@ public class CellTest
         SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1), cfm, cfm2);
     }
 
-    private static ColumnDefinition fakeColumn(String name, AbstractType<?> type)
+    private static ColumnMetadata fakeColumn(String name, AbstractType<?> type)
     {
-        return new ColumnDefinition("fakeKs",
-                                    "fakeTable",
-                                    ColumnIdentifier.getInterned(name, false),
-                                    type,
-                                    ColumnDefinition.NO_POSITION,
-                                    ColumnDefinition.Kind.REGULAR);
+        return new ColumnMetadata("fakeKs",
+                                  "fakeTable",
+                                  ColumnIdentifier.getInterned(name, false),
+                                  type,
+                                  ColumnMetadata.NO_POSITION,
+                                  ColumnMetadata.Kind.REGULAR);
     }
 
     @Test
@@ -130,7 +131,7 @@ public class CellTest
     @Test
     public void testValidate()
     {
-        ColumnDefinition c;
+        ColumnMetadata c;
 
         // Valid cells
         c = fakeColumn("c", Int32Type.instance);
@@ -173,7 +174,7 @@ public class CellTest
                                     asList(f1, f2),
                                     asList(Int32Type.instance, UTF8Type.instance),
                                     true);
-        ColumnDefinition c;
+        ColumnMetadata c;
 
         // Valid cells
         c = fakeColumn("c", udt);
@@ -207,7 +208,7 @@ public class CellTest
                                     asList(Int32Type.instance, UTF8Type.instance),
                                     false);
 
-        ColumnDefinition c = fakeColumn("c", udt);
+        ColumnMetadata c = fakeColumn("c", udt);
         ByteBuffer val = udt(bb(1), bb("foo"));
 
         // Valid cells
@@ -278,7 +279,7 @@ public class CellTest
     @Test
     public void testComplexCellReconcile()
     {
-        ColumnDefinition m = cfm2.getColumnDefinition(new ColumnIdentifier("m", false));
+        ColumnMetadata m = cfm2.getColumn(new ColumnIdentifier("m", false));
         int now1 = FBUtilities.nowInSeconds();
         long ts1 = now1*1000000;
 
@@ -318,21 +319,21 @@ public class CellTest
         return Cells.reconcile(c2, c1, now) == c2 ? 1 : 0;
     }
 
-    private Cell regular(CFMetaData cfm, String columnName, String value, long timestamp)
+    private Cell regular(TableMetadata cfm, String columnName, String value, long timestamp)
     {
-        ColumnDefinition cdef = cfm.getColumnDefinition(ByteBufferUtil.bytes(columnName));
+        ColumnMetadata cdef = cfm.getColumn(ByteBufferUtil.bytes(columnName));
         return BufferCell.live(cdef, timestamp, ByteBufferUtil.bytes(value));
     }
 
-    private Cell expiring(CFMetaData cfm, String columnName, String value, long timestamp, int localExpirationTime)
+    private Cell expiring(TableMetadata cfm, String columnName, String value, long timestamp, int localExpirationTime)
     {
-        ColumnDefinition cdef = cfm.getColumnDefinition(ByteBufferUtil.bytes(columnName));
+        ColumnMetadata cdef = cfm.getColumn(ByteBufferUtil.bytes(columnName));
         return new BufferCell(cdef, timestamp, 1, localExpirationTime, ByteBufferUtil.bytes(value), null);
     }
 
-    private Cell deleted(CFMetaData cfm, String columnName, int localDeletionTime, long timestamp)
+    private Cell deleted(TableMetadata cfm, String columnName, int localDeletionTime, long timestamp)
     {
-        ColumnDefinition cdef = cfm.getColumnDefinition(ByteBufferUtil.bytes(columnName));
+        ColumnMetadata cdef = cfm.getColumn(ByteBufferUtil.bytes(columnName));
         return BufferCell.tombstone(cdef, timestamp, localDeletionTime);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/db/CleanupTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CleanupTest.java b/test/unit/org/apache/cassandra/db/CleanupTest.java
index abd5a04..f576290 100644
--- a/test/unit/org/apache/cassandra/db/CleanupTest.java
+++ b/test/unit/org/apache/cassandra/db/CleanupTest.java
@@ -35,7 +35,7 @@ import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.RowFilter;
@@ -118,7 +118,7 @@ public class CleanupTest
         fillCF(cfs, "birthdate", LOOPS);
         assertEquals(LOOPS, Util.getAll(Util.cmd(cfs).build()).size());
 
-        ColumnDefinition cdef = cfs.metadata.getColumnDefinition(COLUMN);
+        ColumnMetadata cdef = cfs.metadata().getColumn(COLUMN);
         String indexName = "birthdate_key_index";
         long start = System.nanoTime();
         while (!cfs.getBuiltIndexes().contains(indexName) && System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10))
@@ -279,7 +279,7 @@ public class CleanupTest
         {
             String key = String.valueOf(i);
             // create a row and update the birthdate value, test that the index query fetches the new version
-            new RowUpdateBuilder(cfs.metadata, System.currentTimeMillis(), ByteBufferUtil.bytes(key))
+            new RowUpdateBuilder(cfs.metadata(), System.currentTimeMillis(), ByteBufferUtil.bytes(key))
                     .clustering(COLUMN)
                     .add(colName, VALUE)
                     .build()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/db/ColumnFamilyMetricTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyMetricTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyMetricTest.java
index 2f7aaa5..c016f9b 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyMetricTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyMetricTest.java
@@ -55,7 +55,7 @@ public class ColumnFamilyMetricTest
 
         for (int j = 0; j < 10; j++)
         {
-            new RowUpdateBuilder(cfs.metadata, FBUtilities.timestampMicros(), String.valueOf(j))
+            new RowUpdateBuilder(cfs.metadata(), FBUtilities.timestampMicros(), String.valueOf(j))
                     .clustering("0")
                     .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
                     .build()
@@ -91,7 +91,7 @@ public class ColumnFamilyMetricTest
         // This confirms another test/set up did not overflow the histogram
         store.metric.colUpdateTimeDeltaHistogram.cf.getSnapshot().get999thPercentile();
 
-        new RowUpdateBuilder(store.metadata, 0, "4242")
+        new RowUpdateBuilder(store.metadata(), 0, "4242")
             .clustering("0")
             .add("val", ByteBufferUtil.bytes("0"))
             .build()
@@ -101,7 +101,7 @@ public class ColumnFamilyMetricTest
         store.metric.colUpdateTimeDeltaHistogram.cf.getSnapshot().get999thPercentile();
 
         // smallest time delta that would overflow the histogram if unfiltered
-        new RowUpdateBuilder(store.metadata, 18165375903307L, "4242")
+        new RowUpdateBuilder(store.metadata(), 18165375903307L, "4242")
             .clustering("0")
             .add("val", ByteBufferUtil.bytes("0"))
             .build()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/db/ColumnFamilyStoreCQLHelperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreCQLHelperTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreCQLHelperTest.java
index 0b04bb3..59f9db3 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreCQLHelperTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreCQLHelperTest.java
@@ -31,7 +31,6 @@ import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.cassandra.*;
-import org.apache.cassandra.config.*;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.statements.*;
 import org.apache.cassandra.db.marshal.*;
@@ -87,25 +86,23 @@ public class ColumnFamilyStoreCQLHelperTest extends CQLTester
                                                     typeB),
                                       true);
 
-        CFMetaData cfm = CFMetaData.Builder.create(keyspace, table)
-                                           .addPartitionKey("pk1", IntegerType.instance)
-                                           .addClusteringColumn("ck1", IntegerType.instance)
-                                           .addRegularColumn("reg1", typeC)
-                                           .addRegularColumn("reg2", ListType.getInstance(IntegerType.instance, false))
-                                           .addRegularColumn("reg3", MapType.getInstance(AsciiType.instance, IntegerType.instance, true))
-                                           .build();
+        TableMetadata cfm =
+            TableMetadata.builder(keyspace, table)
+                         .addPartitionKeyColumn("pk1", IntegerType.instance)
+                         .addClusteringColumn("ck1", IntegerType.instance)
+                         .addRegularColumn("reg1", typeC)
+                         .addRegularColumn("reg2", ListType.getInstance(IntegerType.instance, false))
+                         .addRegularColumn("reg3", MapType.getInstance(AsciiType.instance, IntegerType.instance, true))
+                         .build();
 
-        SchemaLoader.createKeyspace(keyspace,
-                                    KeyspaceParams.simple(1),
-                                    Tables.of(cfm),
-                                    Types.of(typeA, typeB, typeC));
+        SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), Tables.of(cfm), Types.of(typeA, typeB, typeC));
 
         ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
 
-        assertEquals(ImmutableList.of("CREATE TYPE cql_test_keyspace_user_types.a(a1 varint, a2 varint, a3 varint);",
-                                      "CREATE TYPE cql_test_keyspace_user_types.b(b1 a, b2 a, b3 a);",
-                                      "CREATE TYPE cql_test_keyspace_user_types.c(c1 b, c2 b, c3 b);"),
-                     ColumnFamilyStoreCQLHelper.getUserTypesAsCQL(cfs.metadata));
+        assertEquals(ImmutableList.of("CREATE TYPE cql_test_keyspace_user_types.a (a1 varint, a2 varint, a3 varint);",
+                                      "CREATE TYPE cql_test_keyspace_user_types.b (b1 a, b2 a, b3 a);",
+                                      "CREATE TYPE cql_test_keyspace_user_types.c (c1 b, c2 b, c3 b);"),
+                     ColumnFamilyStoreCQLHelper.getUserTypesAsCQL(cfs.metadata()));
     }
 
     @Test
@@ -114,39 +111,36 @@ public class ColumnFamilyStoreCQLHelperTest extends CQLTester
         String keyspace = "cql_test_keyspace_dropped_columns";
         String table = "test_table_dropped_columns";
 
-        CFMetaData cfm = CFMetaData.Builder.create(keyspace, table)
-                                           .addPartitionKey("pk1", IntegerType.instance)
-                                           .addClusteringColumn("ck1", IntegerType.instance)
-                                           .addRegularColumn("reg1", IntegerType.instance)
-                                           .addRegularColumn("reg2", IntegerType.instance)
-                                           .addRegularColumn("reg3", IntegerType.instance)
-                                           .build();
-
+        TableMetadata.Builder builder =
+            TableMetadata.builder(keyspace, table)
+                         .addPartitionKeyColumn("pk1", IntegerType.instance)
+                         .addClusteringColumn("ck1", IntegerType.instance)
+                         .addRegularColumn("reg1", IntegerType.instance)
+                         .addRegularColumn("reg2", IntegerType.instance)
+                         .addRegularColumn("reg3", IntegerType.instance);
 
-        ColumnDefinition reg1 = cfm.getColumnDefinition(ByteBufferUtil.bytes("reg1"));
-        ColumnDefinition reg2 = cfm.getColumnDefinition(ByteBufferUtil.bytes("reg2"));
-        ColumnDefinition reg3 = cfm.getColumnDefinition(ByteBufferUtil.bytes("reg3"));
+        ColumnMetadata reg1 = builder.getColumn(ByteBufferUtil.bytes("reg1"));
+        ColumnMetadata reg2 = builder.getColumn(ByteBufferUtil.bytes("reg2"));
+        ColumnMetadata reg3 = builder.getColumn(ByteBufferUtil.bytes("reg3"));
 
-        cfm.removeColumnDefinition(reg1);
-        cfm.removeColumnDefinition(reg2);
-        cfm.removeColumnDefinition(reg3);
+        builder.removeRegularOrStaticColumn(reg1.name)
+               .removeRegularOrStaticColumn(reg2.name)
+               .removeRegularOrStaticColumn(reg3.name);
 
-        cfm.recordColumnDrop(reg1, 10000);
-        cfm.recordColumnDrop(reg2, 20000);
-        cfm.recordColumnDrop(reg3, 30000);
+        builder.recordColumnDrop(reg1, 10000)
+               .recordColumnDrop(reg2, 20000)
+               .recordColumnDrop(reg3, 30000);
 
-        SchemaLoader.createKeyspace(keyspace,
-                                    KeyspaceParams.simple(1),
-                                    cfm);
+        SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), builder);
 
         ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
 
         assertEquals(ImmutableList.of("ALTER TABLE cql_test_keyspace_dropped_columns.test_table_dropped_columns DROP reg1 USING TIMESTAMP 10000;",
                                       "ALTER TABLE cql_test_keyspace_dropped_columns.test_table_dropped_columns DROP reg3 USING TIMESTAMP 30000;",
                                       "ALTER TABLE cql_test_keyspace_dropped_columns.test_table_dropped_columns DROP reg2 USING TIMESTAMP 20000;"),
-                     ColumnFamilyStoreCQLHelper.getDroppedColumnsAsCQL(cfs.metadata));
+                     ColumnFamilyStoreCQLHelper.getDroppedColumnsAsCQL(cfs.metadata()));
 
-        assertTrue(ColumnFamilyStoreCQLHelper.getCFMetadataAsCQL(cfs.metadata, true).startsWith(
+        assertTrue(ColumnFamilyStoreCQLHelper.getTableMetadataAsCQL(cfs.metadata(), true).startsWith(
         "CREATE TABLE IF NOT EXISTS cql_test_keyspace_dropped_columns.test_table_dropped_columns (\n" +
         "\tpk1 varint,\n" +
         "\tck1 varint,\n" +
@@ -162,34 +156,32 @@ public class ColumnFamilyStoreCQLHelperTest extends CQLTester
         String keyspace = "cql_test_keyspace_readded_columns";
         String table = "test_table_readded_columns";
 
-        CFMetaData cfm = CFMetaData.Builder.create(keyspace, table)
-                                           .addPartitionKey("pk1", IntegerType.instance)
-                                           .addClusteringColumn("ck1", IntegerType.instance)
-                                           .addRegularColumn("reg1", IntegerType.instance)
-                                           .addStaticColumn("reg2", IntegerType.instance)
-                                           .addRegularColumn("reg3", IntegerType.instance)
-                                           .build();
+        TableMetadata.Builder builder =
+            TableMetadata.builder(keyspace, table)
+                         .addPartitionKeyColumn("pk1", IntegerType.instance)
+                         .addClusteringColumn("ck1", IntegerType.instance)
+                         .addRegularColumn("reg1", IntegerType.instance)
+                         .addStaticColumn("reg2", IntegerType.instance)
+                         .addRegularColumn("reg3", IntegerType.instance);
 
-        ColumnDefinition reg1 = cfm.getColumnDefinition(ByteBufferUtil.bytes("reg1"));
-        ColumnDefinition reg2 = cfm.getColumnDefinition(ByteBufferUtil.bytes("reg2"));
+        ColumnMetadata reg1 = builder.getColumn(ByteBufferUtil.bytes("reg1"));
+        ColumnMetadata reg2 = builder.getColumn(ByteBufferUtil.bytes("reg2"));
 
-        cfm.removeColumnDefinition(reg1);
-        cfm.removeColumnDefinition(reg2);
+        builder.removeRegularOrStaticColumn(reg1.name);
+        builder.removeRegularOrStaticColumn(reg2.name);
 
-        cfm.recordColumnDrop(reg1, 10000);
-        cfm.recordColumnDrop(reg2, 20000);
+        builder.recordColumnDrop(reg1, 10000);
+        builder.recordColumnDrop(reg2, 20000);
 
-        cfm.addColumnDefinition(reg1);
-        cfm.addColumnDefinition(reg2);
+        builder.addColumn(reg1);
+        builder.addColumn(reg2);
 
-        SchemaLoader.createKeyspace(keyspace,
-                                    KeyspaceParams.simple(1),
-                                    cfm);
+        SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), builder);
 
         ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
 
         // when re-adding, column is present in CREATE, then in DROP and then in ADD again, to record DROP with a proper timestamp
-        assertTrue(ColumnFamilyStoreCQLHelper.getCFMetadataAsCQL(cfs.metadata, true).startsWith(
+        assertTrue(ColumnFamilyStoreCQLHelper.getTableMetadataAsCQL(cfs.metadata(), true).startsWith(
         "CREATE TABLE IF NOT EXISTS cql_test_keyspace_readded_columns.test_table_readded_columns (\n" +
         "\tpk1 varint,\n" +
         "\tck1 varint,\n" +
@@ -202,7 +194,7 @@ public class ColumnFamilyStoreCQLHelperTest extends CQLTester
                                       "ALTER TABLE cql_test_keyspace_readded_columns.test_table_readded_columns ADD reg1 varint;",
                                       "ALTER TABLE cql_test_keyspace_readded_columns.test_table_readded_columns DROP reg2 USING TIMESTAMP 20000;",
                                       "ALTER TABLE cql_test_keyspace_readded_columns.test_table_readded_columns ADD reg2 varint static;"),
-                     ColumnFamilyStoreCQLHelper.getDroppedColumnsAsCQL(cfs.metadata));
+                     ColumnFamilyStoreCQLHelper.getDroppedColumnsAsCQL(cfs.metadata()));
     }
 
     @Test
@@ -211,24 +203,22 @@ public class ColumnFamilyStoreCQLHelperTest extends CQLTester
         String keyspace = "cql_test_keyspace_create_table";
         String table = "test_table_create_table";
 
-        CFMetaData cfm = CFMetaData.Builder.create(keyspace, table)
-                                           .addPartitionKey("pk1", IntegerType.instance)
-                                           .addPartitionKey("pk2", AsciiType.instance)
-                                           .addClusteringColumn("ck1", ReversedType.getInstance(IntegerType.instance))
-                                           .addClusteringColumn("ck2", IntegerType.instance)
-                                           .addStaticColumn("st1", AsciiType.instance)
-                                           .addRegularColumn("reg1", AsciiType.instance)
-                                           .addRegularColumn("reg2", ListType.getInstance(IntegerType.instance, false))
-                                           .addRegularColumn("reg3", MapType.getInstance(AsciiType.instance, IntegerType.instance, true))
-                                           .build();
-
-        SchemaLoader.createKeyspace(keyspace,
-                                    KeyspaceParams.simple(1),
-                                    cfm);
+        TableMetadata.Builder metadata =
+            TableMetadata.builder(keyspace, table)
+                         .addPartitionKeyColumn("pk1", IntegerType.instance)
+                         .addPartitionKeyColumn("pk2", AsciiType.instance)
+                         .addClusteringColumn("ck1", ReversedType.getInstance(IntegerType.instance))
+                         .addClusteringColumn("ck2", IntegerType.instance)
+                         .addStaticColumn("st1", AsciiType.instance)
+                         .addRegularColumn("reg1", AsciiType.instance)
+                         .addRegularColumn("reg2", ListType.getInstance(IntegerType.instance, false))
+                         .addRegularColumn("reg3", MapType.getInstance(AsciiType.instance, IntegerType.instance, true));
+
+        SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), metadata);
 
         ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
 
-        assertTrue(ColumnFamilyStoreCQLHelper.getCFMetadataAsCQL(cfs.metadata, true).startsWith(
+        assertTrue(ColumnFamilyStoreCQLHelper.getTableMetadataAsCQL(cfs.metadata(), true).startsWith(
         "CREATE TABLE IF NOT EXISTS cql_test_keyspace_create_table.test_table_create_table (\n" +
         "\tpk1 varint,\n" +
         "\tpk2 ascii,\n" +
@@ -239,7 +229,7 @@ public class ColumnFamilyStoreCQLHelperTest extends CQLTester
         "\treg2 frozen<list<varint>>,\n" +
         "\treg3 map<ascii, varint>,\n" +
         "\tPRIMARY KEY ((pk1, pk2), ck1, ck2))\n" +
-        "\tWITH ID = " + cfs.metadata.cfId + "\n" +
+        "\tWITH ID = " + cfs.metadata.id + "\n" +
         "\tAND CLUSTERING ORDER BY (ck1 DESC, ck2 ASC)"));
     }
 
@@ -249,21 +239,20 @@ public class ColumnFamilyStoreCQLHelperTest extends CQLTester
         String keyspace = "cql_test_keyspace_compact";
         String table = "test_table_compact";
 
-        CFMetaData cfm = CFMetaData.Builder.createDense(keyspace, table, true, false)
-                                           .addPartitionKey("pk1", IntegerType.instance)
-                                           .addPartitionKey("pk2", AsciiType.instance)
-                                           .addClusteringColumn("ck1", ReversedType.getInstance(IntegerType.instance))
-                                           .addClusteringColumn("ck2", IntegerType.instance)
-                                           .addRegularColumn("reg", IntegerType.instance)
-                                           .build();
+        TableMetadata.Builder metadata =
+            TableMetadata.builder(keyspace, table)
+                         .isDense(true)
+                         .addPartitionKeyColumn("pk1", IntegerType.instance)
+                         .addPartitionKeyColumn("pk2", AsciiType.instance)
+                         .addClusteringColumn("ck1", ReversedType.getInstance(IntegerType.instance))
+                         .addClusteringColumn("ck2", IntegerType.instance)
+                         .addRegularColumn("reg", IntegerType.instance);
 
-        SchemaLoader.createKeyspace(keyspace,
-                                    KeyspaceParams.simple(1),
-                                    cfm);
+        SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), metadata);
 
         ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
 
-        assertTrue(ColumnFamilyStoreCQLHelper.getCFMetadataAsCQL(cfs.metadata, true).startsWith(
+        assertTrue(ColumnFamilyStoreCQLHelper.getTableMetadataAsCQL(cfs.metadata(), true).startsWith(
         "CREATE TABLE IF NOT EXISTS cql_test_keyspace_compact.test_table_compact (\n" +
         "\tpk1 varint,\n" +
         "\tpk2 ascii,\n" +
@@ -271,7 +260,7 @@ public class ColumnFamilyStoreCQLHelperTest extends CQLTester
         "\tck2 varint,\n" +
         "\treg varint,\n" +
         "\tPRIMARY KEY ((pk1, pk2), ck1, ck2))\n" +
-        "\tWITH ID = " + cfm.cfId + "\n" +
+        "\tWITH ID = " + cfs.metadata.id + "\n" +
         "\tAND COMPACT STORAGE\n" +
         "\tAND CLUSTERING ORDER BY (ck1 DESC, ck2 ASC)"));
     }
@@ -282,21 +271,21 @@ public class ColumnFamilyStoreCQLHelperTest extends CQLTester
         String keyspace = "cql_test_keyspace_counter";
         String table = "test_table_counter";
 
-        CFMetaData cfm = CFMetaData.Builder.createDense(keyspace, table, true, true)
-                                           .addPartitionKey("pk1", IntegerType.instance)
-                                           .addPartitionKey("pk2", AsciiType.instance)
-                                           .addClusteringColumn("ck1", ReversedType.getInstance(IntegerType.instance))
-                                           .addClusteringColumn("ck2", IntegerType.instance)
-                                           .addRegularColumn("cnt", CounterColumnType.instance)
-                                           .build();
+        TableMetadata.Builder metadata =
+            TableMetadata.builder(keyspace, table)
+                         .isDense(true)
+                         .isCounter(true)
+                         .addPartitionKeyColumn("pk1", IntegerType.instance)
+                         .addPartitionKeyColumn("pk2", AsciiType.instance)
+                         .addClusteringColumn("ck1", ReversedType.getInstance(IntegerType.instance))
+                         .addClusteringColumn("ck2", IntegerType.instance)
+                         .addRegularColumn("cnt", CounterColumnType.instance);
 
-        SchemaLoader.createKeyspace(keyspace,
-                                    KeyspaceParams.simple(1),
-                                    cfm);
+        SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), metadata);
 
         ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
 
-        assertTrue(ColumnFamilyStoreCQLHelper.getCFMetadataAsCQL(cfs.metadata, true).startsWith(
+        assertTrue(ColumnFamilyStoreCQLHelper.getTableMetadataAsCQL(cfs.metadata(), true).startsWith(
         "CREATE TABLE IF NOT EXISTS cql_test_keyspace_counter.test_table_counter (\n" +
         "\tpk1 varint,\n" +
         "\tpk2 ascii,\n" +
@@ -304,7 +293,7 @@ public class ColumnFamilyStoreCQLHelperTest extends CQLTester
         "\tck2 varint,\n" +
         "\tcnt counter,\n" +
         "\tPRIMARY KEY ((pk1, pk2), ck1, ck2))\n" +
-        "\tWITH ID = " + cfm.cfId + "\n" +
+        "\tWITH ID = " + cfs.metadata.id + "\n" +
         "\tAND COMPACT STORAGE\n" +
         "\tAND CLUSTERING ORDER BY (ck1 DESC, ck2 ASC)"));
     }
@@ -315,35 +304,32 @@ public class ColumnFamilyStoreCQLHelperTest extends CQLTester
         String keyspace = "cql_test_keyspace_options";
         String table = "test_table_options";
 
-        CFMetaData cfm = CFMetaData.Builder.create(keyspace, table)
-                                           .addPartitionKey("pk1", IntegerType.instance)
-                                           .addClusteringColumn("cl1", IntegerType.instance)
-                                           .addRegularColumn("reg1", AsciiType.instance)
-                                           .build();
-
-        cfm.recordColumnDrop(cfm.getColumnDefinition(ByteBuffer.wrap("reg1".getBytes())), FBUtilities.timestampMicros());
-        cfm.bloomFilterFpChance(1.0);
-        cfm.comment("comment");
-        cfm.compaction(CompactionParams.lcs(Collections.singletonMap("sstable_size_in_mb", "1")));
-        cfm.compression(CompressionParams.lz4(1 << 16));
-        cfm.dcLocalReadRepairChance(0.2);
-        cfm.crcCheckChance(0.3);
-        cfm.defaultTimeToLive(4);
-        cfm.gcGraceSeconds(5);
-        cfm.minIndexInterval(6);
-        cfm.maxIndexInterval(7);
-        cfm.memtableFlushPeriod(8);
-        cfm.readRepairChance(0.9);
-        cfm.speculativeRetry(SpeculativeRetryParam.always());
-        cfm.extensions(ImmutableMap.of("ext1",
-                                       ByteBuffer.wrap("val1".getBytes())));
-
-        SchemaLoader.createKeyspace(keyspace,
-                                    KeyspaceParams.simple(1),
-                                    cfm);
+        TableMetadata.Builder builder = TableMetadata.builder(keyspace, table);
+        builder.addPartitionKeyColumn("pk1", IntegerType.instance)
+               .addClusteringColumn("cl1", IntegerType.instance)
+               .addRegularColumn("reg1", AsciiType.instance)
+               .bloomFilterFpChance(1.0)
+               .comment("comment")
+               .compaction(CompactionParams.lcs(Collections.singletonMap("sstable_size_in_mb", "1")))
+               .compression(CompressionParams.lz4(1 << 16))
+               .dcLocalReadRepairChance(0.2)
+               .crcCheckChance(0.3)
+               .defaultTimeToLive(4)
+               .gcGraceSeconds(5)
+               .minIndexInterval(6)
+               .maxIndexInterval(7)
+               .memtableFlushPeriod(8)
+               .readRepairChance(0.9)
+               .speculativeRetry(SpeculativeRetryParam.always())
+               .extensions(ImmutableMap.of("ext1", ByteBuffer.wrap("val1".getBytes())))
+               .recordColumnDrop(ColumnMetadata.regularColumn(keyspace, table, "reg1", AsciiType.instance),
+                                 FBUtilities.timestampMicros());
+
+        SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), builder);
+
         ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
 
-        assertTrue(ColumnFamilyStoreCQLHelper.getCFMetadataAsCQL(cfs.metadata, true).endsWith(
+        assertTrue(ColumnFamilyStoreCQLHelper.getTableMetadataAsCQL(cfs.metadata(), true).endsWith(
         "AND bloom_filter_fp_chance = 1.0\n" +
         "\tAND dclocal_read_repair_chance = 0.2\n" +
         "\tAND crc_check_chance = 0.3\n" +
@@ -369,52 +355,46 @@ public class ColumnFamilyStoreCQLHelperTest extends CQLTester
         String keyspace = "cql_test_keyspace_3";
         String table = "test_table_3";
 
-        CFMetaData cfm = CFMetaData.Builder.create(keyspace, table)
-                                           .addPartitionKey("pk1", IntegerType.instance)
-                                           .addClusteringColumn("cl1", IntegerType.instance)
-                                           .addRegularColumn("reg1", AsciiType.instance)
-                                           .build();
-
-        cfm.indexes(cfm.getIndexes()
-                       .with(IndexMetadata.fromIndexTargets(cfm,
-                                                            Collections.singletonList(new IndexTarget(cfm.getColumnDefinition(ByteBufferUtil.bytes("reg1")).name,
-                                                                                                      IndexTarget.Type.VALUES)),
-                                                            "indexName",
-                                                            IndexMetadata.Kind.COMPOSITES,
-                                                            Collections.emptyMap()))
-                       .with(IndexMetadata.fromIndexTargets(cfm,
-                                                            Collections.singletonList(new IndexTarget(cfm.getColumnDefinition(ByteBufferUtil.bytes("reg1")).name,
-                                                                                                      IndexTarget.Type.KEYS)),
-                                                            "indexName2",
-                                                            IndexMetadata.Kind.COMPOSITES,
-                                                            Collections.emptyMap()))
-                       .with(IndexMetadata.fromIndexTargets(cfm,
-                                                            Collections.singletonList(new IndexTarget(cfm.getColumnDefinition(ByteBufferUtil.bytes("reg1")).name,
-                                                                                                      IndexTarget.Type.KEYS_AND_VALUES)),
-                                                            "indexName3",
-                                                            IndexMetadata.Kind.COMPOSITES,
-                                                            Collections.emptyMap()))
-                       .with(IndexMetadata.fromIndexTargets(cfm,
-                                                            Collections.singletonList(new IndexTarget(cfm.getColumnDefinition(ByteBufferUtil.bytes("reg1")).name,
-                                                                                                      IndexTarget.Type.KEYS_AND_VALUES)),
-                                                            "indexName4",
-                                                            IndexMetadata.Kind.CUSTOM,
-                                                            Collections.singletonMap(IndexTarget.CUSTOM_INDEX_OPTION_NAME,
-                                                                                     SASIIndex.class.getName()))
-                       ));
-
-
-        SchemaLoader.createKeyspace(keyspace,
-                                    KeyspaceParams.simple(1),
-                                    cfm);
+        TableMetadata.Builder builder =
+            TableMetadata.builder(keyspace, table)
+                         .addPartitionKeyColumn("pk1", IntegerType.instance)
+                         .addClusteringColumn("cl1", IntegerType.instance)
+                         .addRegularColumn("reg1", AsciiType.instance);
+
+        ColumnIdentifier reg1 = ColumnIdentifier.getInterned("reg1", true);
+
+        builder.indexes(
+            Indexes.of(IndexMetadata.fromIndexTargets(
+            Collections.singletonList(new IndexTarget(reg1, IndexTarget.Type.VALUES)),
+                                                      "indexName",
+                                                      IndexMetadata.Kind.COMPOSITES,
+                                                      Collections.emptyMap()),
+                       IndexMetadata.fromIndexTargets(
+                       Collections.singletonList(new IndexTarget(reg1, IndexTarget.Type.KEYS)),
+                                                      "indexName2",
+                                                      IndexMetadata.Kind.COMPOSITES,
+                                                      Collections.emptyMap()),
+                       IndexMetadata.fromIndexTargets(
+                       Collections.singletonList(new IndexTarget(reg1, IndexTarget.Type.KEYS_AND_VALUES)),
+                                                      "indexName3",
+                                                      IndexMetadata.Kind.COMPOSITES,
+                                                      Collections.emptyMap()),
+                       IndexMetadata.fromIndexTargets(
+                       Collections.singletonList(new IndexTarget(reg1, IndexTarget.Type.KEYS_AND_VALUES)),
+                                                      "indexName4",
+                                                      IndexMetadata.Kind.CUSTOM,
+                                                      Collections.singletonMap(IndexTarget.CUSTOM_INDEX_OPTION_NAME, SASIIndex.class.getName()))));
+
+
+        SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), builder);
 
         ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
 
-        assertEquals(ImmutableList.of("CREATE INDEX \"indexName\" ON cql_test_keyspace_3.test_table_3 (reg1);",
-                                      "CREATE INDEX \"indexName2\" ON cql_test_keyspace_3.test_table_3 (reg1);",
-                                      "CREATE INDEX \"indexName3\" ON cql_test_keyspace_3.test_table_3 (reg1);",
-                                      "CREATE CUSTOM INDEX \"indexName4\" ON cql_test_keyspace_3.test_table_3 (reg1) USING 'org.apache.cassandra.index.sasi.SASIIndex';"),
-                     ColumnFamilyStoreCQLHelper.getIndexesAsCQL(cfs.metadata));
+        assertEquals(ImmutableList.of("CREATE INDEX \"indexName\" ON cql_test_keyspace_3.test_table_3 (values(reg1));",
+                                      "CREATE INDEX \"indexName2\" ON cql_test_keyspace_3.test_table_3 (keys(reg1));",
+                                      "CREATE INDEX \"indexName3\" ON cql_test_keyspace_3.test_table_3 (entries(reg1));",
+                                      "CREATE CUSTOM INDEX \"indexName4\" ON cql_test_keyspace_3.test_table_3 (entries(reg1)) USING 'org.apache.cassandra.index.sasi.SASIIndex';"),
+                     ColumnFamilyStoreCQLHelper.getIndexesAsCQL(cfs.metadata()));
     }
 
     private final static String SNAPSHOT = "testsnapshot";
@@ -447,10 +427,10 @@ public class ColumnFamilyStoreCQLHelperTest extends CQLTester
         cfs.snapshot(SNAPSHOT);
 
         String schema = Files.toString(cfs.getDirectories().getSnapshotSchemaFile(SNAPSHOT), Charset.defaultCharset());
-        assertTrue(schema.contains(String.format("CREATE TYPE %s.%s(a1 varint, a2 varint, a3 varint);", keyspace(), typeA)));
-        assertTrue(schema.contains(String.format("CREATE TYPE %s.%s(a1 varint, a2 varint, a3 varint);", keyspace(), typeA)));
-        assertTrue(schema.contains(String.format("CREATE TYPE %s.%s(b1 frozen<%s>, b2 frozen<%s>, b3 frozen<%s>);", keyspace(), typeB, typeA, typeA, typeA)));
-        assertTrue(schema.contains(String.format("CREATE TYPE %s.%s(c1 frozen<%s>, c2 frozen<%s>, c3 frozen<%s>);", keyspace(), typeC, typeB, typeB, typeB)));
+        assertTrue(schema.contains(String.format("CREATE TYPE %s.%s (a1 varint, a2 varint, a3 varint);", keyspace(), typeA)));
+        assertTrue(schema.contains(String.format("CREATE TYPE %s.%s (a1 varint, a2 varint, a3 varint);", keyspace(), typeA)));
+        assertTrue(schema.contains(String.format("CREATE TYPE %s.%s (b1 frozen<%s>, b2 frozen<%s>, b3 frozen<%s>);", keyspace(), typeB, typeA, typeA, typeA)));
+        assertTrue(schema.contains(String.format("CREATE TYPE %s.%s (c1 frozen<%s>, c2 frozen<%s>, c3 frozen<%s>);", keyspace(), typeC, typeB, typeB, typeB)));
 
         schema = schema.substring(schema.indexOf("CREATE TABLE")); // trim to ensure order
 
@@ -463,7 +443,7 @@ public class ColumnFamilyStoreCQLHelperTest extends CQLTester
                                      "\treg3 int,\n" +
                                      "\treg1 " + typeC + ",\n" +
                                      "\tPRIMARY KEY ((pk1, pk2), ck1, ck2))\n" +
-                                     "\tWITH ID = " + cfs.metadata.cfId + "\n" +
+                                     "\tWITH ID = " + cfs.metadata.id + "\n" +
                                      "\tAND CLUSTERING ORDER BY (ck1 ASC, ck2 DESC)"));
 
         schema = schema.substring(schema.indexOf("ALTER"));
@@ -539,11 +519,11 @@ public class ColumnFamilyStoreCQLHelperTest extends CQLTester
 
         ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(tableName);
 
-        assertTrue(ColumnFamilyStoreCQLHelper.getCFMetadataAsCQL(cfs.metadata, true).startsWith(
+        assertTrue(ColumnFamilyStoreCQLHelper.getTableMetadataAsCQL(cfs.metadata(), true).startsWith(
         "CREATE TABLE IF NOT EXISTS " + keyspace() + "." + tableName + " (\n" +
         "\tpk1 varint PRIMARY KEY,\n" +
         "\treg1 int)\n" +
-        "\tWITH ID = " + cfs.metadata.cfId + "\n" +
+        "\tWITH ID = " + cfs.metadata.id + "\n" +
         "\tAND COMPACT STORAGE"));
     }
 
@@ -558,12 +538,12 @@ public class ColumnFamilyStoreCQLHelperTest extends CQLTester
 
         ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(tableName);
 
-        assertTrue(ColumnFamilyStoreCQLHelper.getCFMetadataAsCQL(cfs.metadata, true).startsWith(
+        assertTrue(ColumnFamilyStoreCQLHelper.getTableMetadataAsCQL(cfs.metadata(), true).startsWith(
         "CREATE TABLE IF NOT EXISTS " + keyspace() + "." + tableName + " (\n" +
         "\tpk1 varint PRIMARY KEY,\n" +
         "\treg1 int,\n" +
         "\treg2 int)\n" +
-        "\tWITH ID = " + cfs.metadata.cfId + "\n" +
+        "\tWITH ID = " + cfs.metadata.id + "\n" +
         "\tAND COMPACT STORAGE"));
     }
 
@@ -579,12 +559,12 @@ public class ColumnFamilyStoreCQLHelperTest extends CQLTester
 
         ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(tableName);
 
-        assertTrue(ColumnFamilyStoreCQLHelper.getCFMetadataAsCQL(cfs.metadata, true).startsWith(
+        assertTrue(ColumnFamilyStoreCQLHelper.getTableMetadataAsCQL(cfs.metadata(), true).startsWith(
         "CREATE TABLE IF NOT EXISTS " + keyspace() + "." + tableName + " (\n" +
         "\tpk1 varint PRIMARY KEY,\n" +
         "\treg1 counter,\n" +
         "\treg2 counter)\n" +
-        "\tWITH ID = " + cfs.metadata.cfId + "\n" +
+        "\tWITH ID = " + cfs.metadata.id + "\n" +
         "\tAND COMPACT STORAGE"));
     }
 
@@ -599,12 +579,12 @@ public class ColumnFamilyStoreCQLHelperTest extends CQLTester
 
         ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(tableName);
 
-        assertTrue(ColumnFamilyStoreCQLHelper.getCFMetadataAsCQL(cfs.metadata, true).startsWith(
+        assertTrue(ColumnFamilyStoreCQLHelper.getTableMetadataAsCQL(cfs.metadata(), true).startsWith(
         "CREATE TABLE IF NOT EXISTS " + keyspace() + "." + tableName + " (\n" +
         "\tpk1 varint,\n" +
         "\tck1 int,\n" +
         "\tPRIMARY KEY (pk1, ck1))\n" +
-        "\tWITH ID = " + cfs.metadata.cfId + "\n" +
+        "\tWITH ID = " + cfs.metadata.id + "\n" +
         "\tAND COMPACT STORAGE"));
     }
 
@@ -620,13 +600,13 @@ public class ColumnFamilyStoreCQLHelperTest extends CQLTester
 
         ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(tableName);
 
-        assertTrue(ColumnFamilyStoreCQLHelper.getCFMetadataAsCQL(cfs.metadata, true).startsWith(
+        assertTrue(ColumnFamilyStoreCQLHelper.getTableMetadataAsCQL(cfs.metadata(), true).startsWith(
         "CREATE TABLE IF NOT EXISTS " + keyspace() + "." + tableName + " (\n" +
         "\tpk1 varint,\n" +
         "\tck1 int,\n" +
         "\treg int,\n" +
         "\tPRIMARY KEY (pk1, ck1))\n" +
-        "\tWITH ID = " + cfs.metadata.cfId + "\n" +
+        "\tWITH ID = " + cfs.metadata.id + "\n" +
         "\tAND COMPACT STORAGE"));
     }
 
@@ -647,13 +627,13 @@ public class ColumnFamilyStoreCQLHelperTest extends CQLTester
 
         ColumnFamilyStore cfs = Keyspace.open(DYNAMIC_COMPOSITE).getColumnFamilyStore(DYNAMIC_COMPOSITE);
 
-        assertTrue(ColumnFamilyStoreCQLHelper.getCFMetadataAsCQL(cfs.metadata, true).startsWith(
+        assertTrue(ColumnFamilyStoreCQLHelper.getTableMetadataAsCQL(cfs.metadata(), true).startsWith(
         "CREATE TABLE IF NOT EXISTS " + DYNAMIC_COMPOSITE + "." + DYNAMIC_COMPOSITE + " (\n" +
         "\tkey ascii,\n" +
         "\tcols 'org.apache.cassandra.db.marshal.DynamicCompositeType(a=>org.apache.cassandra.db.marshal.BytesType,b=>org.apache.cassandra.db.marshal.BytesType,c=>org.apache.cassandra.db.marshal.BytesType)',\n" +
         "\tval ascii,\n" +
         "\tPRIMARY KEY (key, cols))\n" +
-        "\tWITH ID = " + cfs.metadata.cfId + "\n" +
+        "\tWITH ID = " + cfs.metadata.id + "\n" +
         "\tAND COMPACT STORAGE"));
     }
 
@@ -663,20 +643,19 @@ public class ColumnFamilyStoreCQLHelperTest extends CQLTester
         final String KEYSPACE = "thrift_compact_table_with_supercolumns_test";
         final String TABLE = "test_table_1";
 
-        CFMetaData cfm = CFMetaData.Builder.createSuper(KEYSPACE, TABLE, false)
-                                           .addPartitionKey("pk", BytesType.instance)
-                                           .addClusteringColumn("c1", AsciiType.instance)
-                                           .addClusteringColumn("c2", AsciiType.instance)
-                                           .addRegularColumn("", MapType.getInstance(Int32Type.instance, AsciiType.instance, true))
-                                           .build();
+        TableMetadata.Builder table =
+            TableMetadata.builder(KEYSPACE, TABLE)
+                         .isSuper(true)
+                         .addPartitionKeyColumn("pk", BytesType.instance)
+                         .addClusteringColumn("c1", AsciiType.instance)
+                         .addClusteringColumn("c2", AsciiType.instance)
+                         .addRegularColumn("", MapType.getInstance(Int32Type.instance, AsciiType.instance, true));
 
-        SchemaLoader.createKeyspace(KEYSPACE,
-                                    KeyspaceParams.simple(1),
-                                    cfm);
+        SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), table);
 
         ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE);
 
-        assertTrue(ColumnFamilyStoreCQLHelper.getCFMetadataAsCQL(cfs.metadata, true).startsWith(
+        assertTrue(ColumnFamilyStoreCQLHelper.getTableMetadataAsCQL(cfs.metadata(), true).startsWith(
         "/*\n" +
         "Warning: Table " + KEYSPACE + "." + TABLE + " omitted because it has constructs not compatible with CQL (was created via legacy API).\n\n" +
         "Approximate structure, for reference:\n" +
@@ -687,7 +666,7 @@ public class ColumnFamilyStoreCQLHelperTest extends CQLTester
         "\tc2 ascii,\n" +
         "\t\"\" map<int, ascii>,\n" +
         "\tPRIMARY KEY (pk, c1, c2))\n" +
-        "\tWITH ID = " + cfs.metadata.cfId + "\n" +
+        "\tWITH ID = " + cfs.metadata.id + "\n" +
         "\tAND COMPACT STORAGE"));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index a2003d8..e5c5727 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -34,7 +34,6 @@ import static org.junit.Assert.assertTrue;
 
 import com.google.common.collect.Iterators;
 import org.apache.cassandra.*;
-import org.apache.cassandra.config.*;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.db.rows.*;
@@ -46,6 +45,7 @@ import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.metrics.ClearableHistogram;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -97,14 +97,14 @@ public class ColumnFamilyStoreTest
         Keyspace keyspace = Keyspace.open(KEYSPACE1);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
 
-        new RowUpdateBuilder(cfs.metadata, 0, "key1")
+        new RowUpdateBuilder(cfs.metadata(), 0, "key1")
                 .clustering("Column1")
                 .add("val", "asdf")
                 .build()
                 .applyUnsafe();
         cfs.forceBlockingFlush();
 
-        new RowUpdateBuilder(cfs.metadata, 1, "key1")
+        new RowUpdateBuilder(cfs.metadata(), 1, "key1")
                 .clustering("Column1")
                 .add("val", "asdf")
                 .build()
@@ -123,7 +123,7 @@ public class ColumnFamilyStoreTest
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
 
         List<Mutation> rms = new LinkedList<>();
-        rms.add(new RowUpdateBuilder(cfs.metadata, 0, "key1")
+        rms.add(new RowUpdateBuilder(cfs.metadata(), 0, "key1")
                 .clustering("Column1")
                 .add("val", "asdf")
                 .build());
@@ -142,7 +142,7 @@ public class ColumnFamilyStoreTest
         Keyspace keyspace = Keyspace.open(KEYSPACE1);
         final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD2);
 
-        RowUpdateBuilder.deleteRow(cfs.metadata, FBUtilities.timestampMicros(), "key1", "Column1").applyUnsafe();
+        RowUpdateBuilder.deleteRow(cfs.metadata(), FBUtilities.timestampMicros(), "key1", "Column1").applyUnsafe();
 
         Runnable r = new WrappedRunnable()
         {
@@ -170,22 +170,22 @@ public class ColumnFamilyStoreTest
         ByteBuffer val = ByteBufferUtil.bytes("val1");
 
         // insert
-        ColumnDefinition newCol = ColumnDefinition.regularDef(cfs.metadata, ByteBufferUtil.bytes("val2"), AsciiType.instance);
-        new RowUpdateBuilder(cfs.metadata, 0, "key1").clustering("Column1").add("val", "val1").build().applyUnsafe();
-        new RowUpdateBuilder(cfs.metadata, 0, "key2").clustering("Column1").add("val", "val1").build().applyUnsafe();
+        ColumnMetadata newCol = ColumnMetadata.regularColumn(cfs.metadata(), ByteBufferUtil.bytes("val2"), AsciiType.instance);
+        new RowUpdateBuilder(cfs.metadata(), 0, "key1").clustering("Column1").add("val", "val1").build().applyUnsafe();
+        new RowUpdateBuilder(cfs.metadata(), 0, "key2").clustering("Column1").add("val", "val1").build().applyUnsafe();
         assertRangeCount(cfs, col, val, 2);
 
         // flush.
         cfs.forceBlockingFlush();
 
         // insert, don't flush
-        new RowUpdateBuilder(cfs.metadata, 1, "key3").clustering("Column1").add("val", "val1").build().applyUnsafe();
-        new RowUpdateBuilder(cfs.metadata, 1, "key4").clustering("Column1").add("val", "val1").build().applyUnsafe();
+        new RowUpdateBuilder(cfs.metadata(), 1, "key3").clustering("Column1").add("val", "val1").build().applyUnsafe();
+        new RowUpdateBuilder(cfs.metadata(), 1, "key4").clustering("Column1").add("val", "val1").build().applyUnsafe();
         assertRangeCount(cfs, col, val, 4);
 
         // delete (from sstable and memtable)
-        RowUpdateBuilder.deleteRow(cfs.metadata, 5, "key1", "Column1").applyUnsafe();
-        RowUpdateBuilder.deleteRow(cfs.metadata, 5, "key3", "Column1").applyUnsafe();
+        RowUpdateBuilder.deleteRow(cfs.metadata(), 5, "key1", "Column1").applyUnsafe();
+        RowUpdateBuilder.deleteRow(cfs.metadata(), 5, "key3", "Column1").applyUnsafe();
 
         // verify delete
         assertRangeCount(cfs, col, val, 2);
@@ -197,15 +197,15 @@ public class ColumnFamilyStoreTest
         assertRangeCount(cfs, col, val, 2);
 
         // simulate a 'late' insertion that gets put in after the deletion. should get inserted, but fail on read.
-        new RowUpdateBuilder(cfs.metadata, 2, "key1").clustering("Column1").add("val", "val1").build().applyUnsafe();
-        new RowUpdateBuilder(cfs.metadata, 2, "key3").clustering("Column1").add("val", "val1").build().applyUnsafe();
+        new RowUpdateBuilder(cfs.metadata(), 2, "key1").clustering("Column1").add("val", "val1").build().applyUnsafe();
+        new RowUpdateBuilder(cfs.metadata(), 2, "key3").clustering("Column1").add("val", "val1").build().applyUnsafe();
 
         // should still be nothing there because we deleted this row. 2nd breakage, but was undetected because of 1837.
         assertRangeCount(cfs, col, val, 2);
 
         // make sure that new writes are recognized.
-        new RowUpdateBuilder(cfs.metadata, 10, "key5").clustering("Column1").add("val", "val1").build().applyUnsafe();
-        new RowUpdateBuilder(cfs.metadata, 10, "key6").clustering("Column1").add("val", "val1").build().applyUnsafe();
+        new RowUpdateBuilder(cfs.metadata(), 10, "key5").clustering("Column1").add("val", "val1").build().applyUnsafe();
+        new RowUpdateBuilder(cfs.metadata(), 10, "key6").clustering("Column1").add("val", "val1").build().applyUnsafe();
         assertRangeCount(cfs, col, val, 4);
 
         // and it remains so after flush. (this wasn't failing before, but it's good to check.)
@@ -257,9 +257,9 @@ public class ColumnFamilyStoreTest
     public void testBackupAfterFlush() throws Throwable
     {
         ColumnFamilyStore cfs = Keyspace.open(KEYSPACE2).getColumnFamilyStore(CF_STANDARD1);
-        new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key1")).clustering("Column1").add("val", "asdf").build().applyUnsafe();
+        new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key1")).clustering("Column1").add("val", "asdf").build().applyUnsafe();
         cfs.forceBlockingFlush();
-        new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key2")).clustering("Column1").add("val", "asdf").build().applyUnsafe();
+        new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key2")).clustering("Column1").add("val", "asdf").build().applyUnsafe();
         cfs.forceBlockingFlush();
 
         for (int version = 1; version <= 2; ++version)
@@ -319,7 +319,7 @@ public class ColumnFamilyStoreTest
 //        ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(cf);
 //        SSTableDeletingTask.waitForDeletions();
 //
-//        final CFMetaData cfmeta = Schema.instance.getCFMetaData(ks, cf);
+//        final CFMetaData cfmeta = Schema.instance.getTableMetadataRef(ks, cf);
 //        Directories dir = new Directories(cfs.metadata);
 //
 //        // clear old SSTables (probably left by CFS.clearUnsafe() calls in other tests)
@@ -405,10 +405,10 @@ public class ColumnFamilyStoreTest
 
     private void assertRangeCount(ColumnFamilyStore cfs, ByteBuffer col, ByteBuffer val, int count)
     {
-        assertRangeCount(cfs, cfs.metadata.getColumnDefinition(col), val, count);
+        assertRangeCount(cfs, cfs.metadata().getColumn(col), val, count);
     }
 
-    private void assertRangeCount(ColumnFamilyStore cfs, ColumnDefinition col, ByteBuffer val, int count)
+    private void assertRangeCount(ColumnFamilyStore cfs, ColumnMetadata col, ByteBuffer val, int count)
     {
 
         int found = 0;
@@ -431,9 +431,9 @@ public class ColumnFamilyStoreTest
     {
         ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
 
-        ColumnFamilyStore.scrubDataDirectories(cfs.metadata);
+        ColumnFamilyStore.scrubDataDirectories(cfs.metadata());
 
-        new RowUpdateBuilder(cfs.metadata, 2, "key").clustering("name").add("val", "2").build().applyUnsafe();
+        new RowUpdateBuilder(cfs.metadata(), 2, "key").clustering("name").add("val", "2").build().applyUnsafe();
         cfs.forceBlockingFlush();
 
         // Nuke the metadata and reload that sstable
@@ -447,9 +447,9 @@ public class ColumnFamilyStoreTest
 
         ssTable.selfRef().release();
 
-        ColumnFamilyStore.scrubDataDirectories(cfs.metadata);
+        ColumnFamilyStore.scrubDataDirectories(cfs.metadata());
 
-        List<File> ssTableFiles = new Directories(cfs.metadata).sstableLister(Directories.OnTxnErr.THROW).listFiles();
+        List<File> ssTableFiles = new Directories(cfs.metadata()).sstableLister(Directories.OnTxnErr.THROW).listFiles();
         assertNotNull(ssTableFiles);
         assertEquals(0, ssTableFiles.size());
     }