You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2017/01/27 22:18:14 UTC

[08/37] cassandra git commit: Make TableMetadata immutable, optimize Schema

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
index 9120546..4f9b12f 100644
--- a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
+++ b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
@@ -31,7 +31,7 @@ import org.junit.Test;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.UpdateBuilder;
 import org.apache.cassandra.Util;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.cql3.statements.IndexTarget;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.ColumnFilter;
@@ -45,9 +45,12 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.MigrationManager;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
+import static org.apache.cassandra.SchemaLoader.standardCFMD;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -63,12 +66,7 @@ public class RangeTombstoneTest
         SchemaLoader.prepareServer();
         SchemaLoader.createKeyspace(KSNAME,
                                     KeyspaceParams.simple(1),
-                                    SchemaLoader.standardCFMD(KSNAME,
-                                                              CFNAME,
-                                                              1,
-                                                              UTF8Type.instance,
-                                                              Int32Type.instance,
-                                                              Int32Type.instance));
+                                    standardCFMD(KSNAME, CFNAME, 1, UTF8Type.instance, Int32Type.instance, Int32Type.instance));
     }
 
     @Test
@@ -82,20 +80,20 @@ public class RangeTombstoneTest
 
         UpdateBuilder builder;
 
-        builder = UpdateBuilder.create(cfs.metadata, key).withTimestamp(0);
+        builder = UpdateBuilder.create(cfs.metadata(), key).withTimestamp(0);
         for (int i = 0; i < 40; i += 2)
             builder.newRow(i).add("val", i);
         builder.applyUnsafe();
         cfs.forceBlockingFlush();
 
-        new RowUpdateBuilder(cfs.metadata, 1, key).addRangeTombstone(10, 22).build().applyUnsafe();
+        new RowUpdateBuilder(cfs.metadata(), 1, key).addRangeTombstone(10, 22).build().applyUnsafe();
 
-        builder = UpdateBuilder.create(cfs.metadata, key).withTimestamp(2);
+        builder = UpdateBuilder.create(cfs.metadata(), key).withTimestamp(2);
         for (int i = 1; i < 40; i += 2)
             builder.newRow(i).add("val", i);
         builder.applyUnsafe();
 
-        new RowUpdateBuilder(cfs.metadata, 3, key).addRangeTombstone(19, 27).build().applyUnsafe();
+        new RowUpdateBuilder(cfs.metadata(), 3, key).addRangeTombstone(19, 27).build().applyUnsafe();
         // We don't flush to test with both a range tomsbtone in memtable and in sstable
 
         // Queries by name
@@ -135,14 +133,14 @@ public class RangeTombstoneTest
         // Inserting data
         String key = "k111";
 
-        UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, key).withTimestamp(0);
+        UpdateBuilder builder = UpdateBuilder.create(cfs.metadata(), key).withTimestamp(0);
         for (int i = 0; i < 40; i += 2)
             builder.newRow(i).add("val", i);
         builder.applyUnsafe();
 
-        new RowUpdateBuilder(cfs.metadata, 1, key).addRangeTombstone(5, 10).build().applyUnsafe();
+        new RowUpdateBuilder(cfs.metadata(), 1, key).addRangeTombstone(5, 10).build().applyUnsafe();
 
-        new RowUpdateBuilder(cfs.metadata, 2, key).addRangeTombstone(15, 20).build().applyUnsafe();
+        new RowUpdateBuilder(cfs.metadata(), 2, key).addRangeTombstone(15, 20).build().applyUnsafe();
 
         ImmutableBTreePartition partition;
 
@@ -210,14 +208,14 @@ public class RangeTombstoneTest
         sb.add(ClusteringBound.create(cfs.getComparator(), true, true, 1), ClusteringBound.create(cfs.getComparator(), false, true, 10));
         sb.add(ClusteringBound.create(cfs.getComparator(), true, true, 16), ClusteringBound.create(cfs.getComparator(), false, true, 20));
 
-        partition = Util.getOnlyPartitionUnfiltered(SinglePartitionReadCommand.create(cfs.metadata, FBUtilities.nowInSeconds(), Util.dk(key), sb.build()));
+        partition = Util.getOnlyPartitionUnfiltered(SinglePartitionReadCommand.create(cfs.metadata(), FBUtilities.nowInSeconds(), Util.dk(key), sb.build()));
         rt = rangeTombstones(partition);
         assertEquals(2, rt.size());
     }
 
     private Collection<RangeTombstone> rangeTombstones(ImmutableBTreePartition partition)
     {
-        List<RangeTombstone> tombstones = new ArrayList<RangeTombstone>();
+        List<RangeTombstone> tombstones = new ArrayList<>();
         Iterators.addAll(tombstones, partition.deletionInfo().rangeIterator(false));
         return tombstones;
     }
@@ -231,7 +229,7 @@ public class RangeTombstoneTest
         String key = "rt_times";
 
         int nowInSec = FBUtilities.nowInSeconds();
-        new Mutation(PartitionUpdate.fullPartitionDelete(cfs.metadata, Util.dk(key), 1000, nowInSec)).apply();
+        new Mutation(PartitionUpdate.fullPartitionDelete(cfs.metadata(), Util.dk(key), 1000, nowInSec)).apply();
         cfs.forceBlockingFlush();
 
         SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
@@ -249,11 +247,11 @@ public class RangeTombstoneTest
         cfs.truncateBlocking();
         String key = "rt_times";
 
-        UpdateBuilder.create(cfs.metadata, key).withTimestamp(999).newRow(5).add("val", 5).apply();
+        UpdateBuilder.create(cfs.metadata(), key).withTimestamp(999).newRow(5).add("val", 5).apply();
 
         key = "rt_times2";
         int nowInSec = FBUtilities.nowInSeconds();
-        new Mutation(PartitionUpdate.fullPartitionDelete(cfs.metadata, Util.dk(key), 1000, nowInSec)).apply();
+        new Mutation(PartitionUpdate.fullPartitionDelete(cfs.metadata(), Util.dk(key), 1000, nowInSec)).apply();
         cfs.forceBlockingFlush();
 
         SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
@@ -272,7 +270,7 @@ public class RangeTombstoneTest
         String key = "rt_times";
 
         int nowInSec = FBUtilities.nowInSeconds();
-        new RowUpdateBuilder(cfs.metadata, nowInSec, 1000L, key).addRangeTombstone(1, 2).build().apply();
+        new RowUpdateBuilder(cfs.metadata(), nowInSec, 1000L, key).addRangeTombstone(1, 2).build().apply();
         cfs.forceBlockingFlush();
 
         SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
@@ -290,11 +288,11 @@ public class RangeTombstoneTest
         cfs.truncateBlocking();
         String key = "rt_times";
 
-        UpdateBuilder.create(cfs.metadata, key).withTimestamp(999).newRow(5).add("val", 5).apply();
+        UpdateBuilder.create(cfs.metadata(), key).withTimestamp(999).newRow(5).add("val", 5).apply();
 
         key = "rt_times2";
         int nowInSec = FBUtilities.nowInSeconds();
-        new Mutation(PartitionUpdate.fullPartitionDelete(cfs.metadata, Util.dk(key), 1000, nowInSec)).apply();
+        new Mutation(PartitionUpdate.fullPartitionDelete(cfs.metadata(), Util.dk(key), 1000, nowInSec)).apply();
         cfs.forceBlockingFlush();
 
         cfs.forceBlockingFlush();
@@ -317,17 +315,17 @@ public class RangeTombstoneTest
     {
         Keyspace ks = Keyspace.open(KSNAME);
         ColumnFamilyStore cfs = ks.getColumnFamilyStore(CFNAME);
-        cfs.metadata.gcGraceSeconds(2);
+        MigrationManager.announceTableUpdate(cfs.metadata().unbuild().gcGraceSeconds(2).build(), true);
 
         String key = "7810";
 
-        UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, key).withTimestamp(0);
+        UpdateBuilder builder = UpdateBuilder.create(cfs.metadata(), key).withTimestamp(0);
         for (int i = 10; i < 20; i ++)
             builder.newRow(i).add("val", i);
         builder.apply();
         cfs.forceBlockingFlush();
 
-        new RowUpdateBuilder(cfs.metadata, 1, key).addRangeTombstone(10, 11).build().apply();
+        new RowUpdateBuilder(cfs.metadata(), 1, key).addRangeTombstone(10, 11).build().apply();
         cfs.forceBlockingFlush();
 
         Thread.sleep(5);
@@ -340,16 +338,16 @@ public class RangeTombstoneTest
     {
         Keyspace ks = Keyspace.open(KSNAME);
         ColumnFamilyStore cfs = ks.getColumnFamilyStore(CFNAME);
-        cfs.metadata.gcGraceSeconds(2);
+        MigrationManager.announceTableUpdate(cfs.metadata().unbuild().gcGraceSeconds(2).build(), true);
 
         String key = "7808_1";
-        UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, key).withTimestamp(0);
+        UpdateBuilder builder = UpdateBuilder.create(cfs.metadata(), key).withTimestamp(0);
         for (int i = 0; i < 40; i += 2)
             builder.newRow(i).add("val", i);
         builder.apply();
         cfs.forceBlockingFlush();
 
-        new Mutation(PartitionUpdate.fullPartitionDelete(cfs.metadata, Util.dk(key), 1, 1)).apply();
+        new Mutation(PartitionUpdate.fullPartitionDelete(cfs.metadata(), Util.dk(key), 1, 1)).apply();
         cfs.forceBlockingFlush();
         Thread.sleep(5);
         cfs.forceMajorCompaction();
@@ -360,18 +358,18 @@ public class RangeTombstoneTest
     {
         Keyspace ks = Keyspace.open(KSNAME);
         ColumnFamilyStore cfs = ks.getColumnFamilyStore(CFNAME);
-        cfs.metadata.gcGraceSeconds(2);
+        MigrationManager.announceTableUpdate(cfs.metadata().unbuild().gcGraceSeconds(2).build(), true);
 
         String key = "7808_2";
-        UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, key).withTimestamp(0);
+        UpdateBuilder builder = UpdateBuilder.create(cfs.metadata(), key).withTimestamp(0);
         for (int i = 10; i < 20; i ++)
             builder.newRow(i).add("val", i);
         builder.apply();
         cfs.forceBlockingFlush();
 
-        new Mutation(PartitionUpdate.fullPartitionDelete(cfs.metadata, Util.dk(key), 0, 0)).apply();
+        new Mutation(PartitionUpdate.fullPartitionDelete(cfs.metadata(), Util.dk(key), 0, 0)).apply();
 
-        UpdateBuilder.create(cfs.metadata, key).withTimestamp(1).newRow(5).add("val", 5).apply();
+        UpdateBuilder.create(cfs.metadata(), key).withTimestamp(1).newRow(5).add("val", 5).apply();
 
         cfs.forceBlockingFlush();
         Thread.sleep(5);
@@ -389,19 +387,19 @@ public class RangeTombstoneTest
         // Inserting data
         String key = "k2";
 
-        UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, key).withTimestamp(0);
+        UpdateBuilder builder = UpdateBuilder.create(cfs.metadata(), key).withTimestamp(0);
         for (int i = 0; i < 20; i++)
             builder.newRow(i).add("val", i);
         builder.applyUnsafe();
         cfs.forceBlockingFlush();
 
-        new RowUpdateBuilder(cfs.metadata, 1, key).addRangeTombstone(5, 15).build().applyUnsafe();
+        new RowUpdateBuilder(cfs.metadata(), 1, key).addRangeTombstone(5, 15).build().applyUnsafe();
         cfs.forceBlockingFlush();
 
-        new RowUpdateBuilder(cfs.metadata, 1, key).addRangeTombstone(5, 10).build().applyUnsafe();
+        new RowUpdateBuilder(cfs.metadata(), 1, key).addRangeTombstone(5, 10).build().applyUnsafe();
         cfs.forceBlockingFlush();
 
-        new RowUpdateBuilder(cfs.metadata, 2, key).addRangeTombstone(5, 8).build().applyUnsafe();
+        new RowUpdateBuilder(cfs.metadata(), 2, key).addRangeTombstone(5, 8).build().applyUnsafe();
         cfs.forceBlockingFlush();
 
         Partition partition = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, key).build());
@@ -435,18 +433,18 @@ public class RangeTombstoneTest
         // Inserting data
         String key = "k3";
 
-        UpdateBuilder.create(cfs.metadata, key).withTimestamp(0).newRow(2).add("val", 2).applyUnsafe();
+        UpdateBuilder.create(cfs.metadata(), key).withTimestamp(0).newRow(2).add("val", 2).applyUnsafe();
         cfs.forceBlockingFlush();
 
-        new RowUpdateBuilder(cfs.metadata, 1, key).addRangeTombstone(0, 10).build().applyUnsafe();
-        UpdateBuilder.create(cfs.metadata, key).withTimestamp(2).newRow(1).add("val", 1).applyUnsafe();
+        new RowUpdateBuilder(cfs.metadata(), 1, key).addRangeTombstone(0, 10).build().applyUnsafe();
+        UpdateBuilder.create(cfs.metadata(), key).withTimestamp(2).newRow(1).add("val", 1).applyUnsafe();
         cfs.forceBlockingFlush();
 
         // Get the last value of the row
         FilteredPartition partition = Util.getOnlyPartition(Util.cmd(cfs, key).build());
         assertTrue(partition.rowCount() > 0);
 
-        int last = i(partition.unfilteredIterator(ColumnFilter.all(cfs.metadata), Slices.ALL, true).next().clustering().get(0));
+        int last = i(partition.unfilteredIterator(ColumnFilter.all(cfs.metadata()), Slices.ALL, true).next().clustering().get(0));
         assertEquals("Last column should be column 1 since column 2 has been deleted", 1, last);
     }
 
@@ -461,17 +459,25 @@ public class RangeTombstoneTest
         cfs.truncateBlocking();
         cfs.disableAutoCompaction();
 
-        ColumnDefinition cd = cfs.metadata.getColumnDefinition(indexedColumnName).copy();
+        ColumnMetadata cd = cfs.metadata().getColumn(indexedColumnName).copy();
         IndexMetadata indexDef =
-            IndexMetadata.fromIndexTargets(cfs.metadata,
-                                           Collections.singletonList(new IndexTarget(cd.name, IndexTarget.Type.VALUES)),
+            IndexMetadata.fromIndexTargets(
+            Collections.singletonList(new IndexTarget(cd.name, IndexTarget.Type.VALUES)),
                                            "test_index",
                                            IndexMetadata.Kind.CUSTOM,
                                            ImmutableMap.of(IndexTarget.CUSTOM_INDEX_OPTION_NAME,
                                                            StubIndex.class.getName()));
 
-        if (!cfs.metadata.getIndexes().get("test_index").isPresent())
-            cfs.metadata.indexes(cfs.metadata.getIndexes().with(indexDef));
+        TableMetadata current = cfs.metadata();
+
+        if (!current.indexes.get("test_index").isPresent())
+        {
+            TableMetadata updated =
+                current.unbuild()
+                       .indexes(current.indexes.with(indexDef))
+                       .build();
+            MigrationManager.announceTableUpdate(updated, true);
+        }
 
         Future<?> rebuild = cfs.indexManager.addIndex(indexDef);
         // If rebuild there is, wait for the rebuild to finish so it doesn't race with the following insertions
@@ -485,13 +491,13 @@ public class RangeTombstoneTest
                                                      .orElseThrow(() -> new RuntimeException(new AssertionError("Index not found")));
         index.reset();
 
-        UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, key).withTimestamp(0);
+        UpdateBuilder builder = UpdateBuilder.create(cfs.metadata(), key).withTimestamp(0);
         for (int i = 0; i < 10; i++)
             builder.newRow(i).add("val", i);
         builder.applyUnsafe();
         cfs.forceBlockingFlush();
 
-        new RowUpdateBuilder(cfs.metadata, 0, key).addRangeTombstone(0, 7).build().applyUnsafe();
+        new RowUpdateBuilder(cfs.metadata(), 0, key).addRangeTombstone(0, 7).build().applyUnsafe();
         cfs.forceBlockingFlush();
 
         assertEquals(10, index.rowsInserted.size());
@@ -515,13 +521,13 @@ public class RangeTombstoneTest
         cfs.truncateBlocking();
         cfs.disableAutoCompaction();
 
-        UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, key).withTimestamp(0);
+        UpdateBuilder builder = UpdateBuilder.create(cfs.metadata(), key).withTimestamp(0);
         for (int i = 0; i < 10; i += 2)
             builder.newRow(i).add("val", i);
         builder.applyUnsafe();
         cfs.forceBlockingFlush();
 
-        new RowUpdateBuilder(cfs.metadata, 0, key).addRangeTombstone(0, 7).build().applyUnsafe();
+        new RowUpdateBuilder(cfs.metadata(), 0, key).addRangeTombstone(0, 7).build().applyUnsafe();
         cfs.forceBlockingFlush();
 
         // there should be 2 sstables
@@ -539,11 +545,11 @@ public class RangeTombstoneTest
             {
                 // after compaction, we should have a single RT with a single row (the row 8)
                 Unfiltered u1 = iter.next();
-                assertTrue("Expecting open marker, got " + u1.toString(cfs.metadata), u1 instanceof RangeTombstoneMarker);
+                assertTrue("Expecting open marker, got " + u1.toString(cfs.metadata()), u1 instanceof RangeTombstoneMarker);
                 Unfiltered u2 = iter.next();
-                assertTrue("Expecting close marker, got " + u2.toString(cfs.metadata), u2 instanceof RangeTombstoneMarker);
+                assertTrue("Expecting close marker, got " + u2.toString(cfs.metadata()), u2 instanceof RangeTombstoneMarker);
                 Unfiltered u3 = iter.next();
-                assertTrue("Expecting row, got " + u3.toString(cfs.metadata), u3 instanceof Row);
+                assertTrue("Expecting row, got " + u3.toString(cfs.metadata()), u3 instanceof Row);
             }
         }
     }
@@ -559,17 +565,25 @@ public class RangeTombstoneTest
         cfs.truncateBlocking();
         cfs.disableAutoCompaction();
 
-        ColumnDefinition cd = cfs.metadata.getColumnDefinition(indexedColumnName).copy();
+        ColumnMetadata cd = cfs.metadata().getColumn(indexedColumnName).copy();
         IndexMetadata indexDef =
-            IndexMetadata.fromIndexTargets(cfs.metadata,
-                                           Collections.singletonList(new IndexTarget(cd.name, IndexTarget.Type.VALUES)),
+            IndexMetadata.fromIndexTargets(
+            Collections.singletonList(new IndexTarget(cd.name, IndexTarget.Type.VALUES)),
                                            "test_index",
                                            IndexMetadata.Kind.CUSTOM,
                                            ImmutableMap.of(IndexTarget.CUSTOM_INDEX_OPTION_NAME,
                                                            StubIndex.class.getName()));
 
-        if (!cfs.metadata.getIndexes().get("test_index").isPresent())
-            cfs.metadata.indexes(cfs.metadata.getIndexes().with(indexDef));
+        TableMetadata current = cfs.metadata();
+
+        if (!current.indexes.get("test_index").isPresent())
+        {
+            TableMetadata updated =
+                current.unbuild()
+                       .indexes(current.indexes.with(indexDef))
+                       .build();
+            MigrationManager.announceTableUpdate(updated, true);
+        }
 
         Future<?> rebuild = cfs.indexManager.addIndex(indexDef);
         // If rebuild there is, wait for the rebuild to finish so it doesn't race with the following insertions
@@ -579,13 +593,13 @@ public class RangeTombstoneTest
         StubIndex index = (StubIndex)cfs.indexManager.getIndexByName("test_index");
         index.reset();
 
-        UpdateBuilder.create(cfs.metadata, key).withTimestamp(0).newRow(1).add("val", 1).applyUnsafe();
+        UpdateBuilder.create(cfs.metadata(), key).withTimestamp(0).newRow(1).add("val", 1).applyUnsafe();
 
         // add a RT which hides the column we just inserted
-        new RowUpdateBuilder(cfs.metadata, 1, key).addRangeTombstone(0, 1).build().applyUnsafe();
+        new RowUpdateBuilder(cfs.metadata(), 1, key).addRangeTombstone(0, 1).build().applyUnsafe();
 
         // now re-insert that column
-        UpdateBuilder.create(cfs.metadata, key).withTimestamp(2).newRow(1).add("val", 1).applyUnsafe();
+        UpdateBuilder.create(cfs.metadata(), key).withTimestamp(2).newRow(1).add("val", 1).applyUnsafe();
 
         cfs.forceBlockingFlush();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/db/ReadCommandTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
index 9b7775da..249d780 100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
@@ -27,7 +27,6 @@ import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
 import org.apache.cassandra.db.filter.ColumnFilter;
@@ -49,6 +48,7 @@ import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -66,30 +66,28 @@ public class ReadCommandTest
     {
         DatabaseDescriptor.daemonInitialization();
 
-        CFMetaData metadata1 = SchemaLoader.standardCFMD(KEYSPACE, CF1);
-
-        CFMetaData metadata2 = CFMetaData.Builder.create(KEYSPACE, CF2)
-                                                         .addPartitionKey("key", BytesType.instance)
-                                                         .addClusteringColumn("col", AsciiType.instance)
-                                                         .addRegularColumn("a", AsciiType.instance)
-                                                         .addRegularColumn("b", AsciiType.instance).build();
-
-        CFMetaData metadata3 = CFMetaData.Builder.create(KEYSPACE, CF3)
-                                                 .addPartitionKey("key", BytesType.instance)
-                                                 .addClusteringColumn("col", AsciiType.instance)
-                                                 .addRegularColumn("a", AsciiType.instance)
-                                                 .addRegularColumn("b", AsciiType.instance)
-                                                 .addRegularColumn("c", AsciiType.instance)
-                                                 .addRegularColumn("d", AsciiType.instance)
-                                                 .addRegularColumn("e", AsciiType.instance)
-                                                 .addRegularColumn("f", AsciiType.instance).build();
+        TableMetadata.Builder metadata1 = SchemaLoader.standardCFMD(KEYSPACE, CF1);
+
+        TableMetadata.Builder metadata2 =
+            TableMetadata.builder(KEYSPACE, CF2)
+                         .addPartitionKeyColumn("key", BytesType.instance)
+                         .addClusteringColumn("col", AsciiType.instance)
+                         .addRegularColumn("a", AsciiType.instance)
+                         .addRegularColumn("b", AsciiType.instance);
+
+        TableMetadata.Builder metadata3 =
+            TableMetadata.builder(KEYSPACE, CF3)
+                         .addPartitionKeyColumn("key", BytesType.instance)
+                         .addClusteringColumn("col", AsciiType.instance)
+                         .addRegularColumn("a", AsciiType.instance)
+                         .addRegularColumn("b", AsciiType.instance)
+                         .addRegularColumn("c", AsciiType.instance)
+                         .addRegularColumn("d", AsciiType.instance)
+                         .addRegularColumn("e", AsciiType.instance)
+                         .addRegularColumn("f", AsciiType.instance);
 
         SchemaLoader.prepareServer();
-        SchemaLoader.createKeyspace(KEYSPACE,
-                                    KeyspaceParams.simple(1),
-                                    metadata1,
-                                    metadata2,
-                                    metadata3);
+        SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), metadata1, metadata2, metadata3);
     }
 
     @Test
@@ -97,7 +95,7 @@ public class ReadCommandTest
     {
         ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF1);
 
-        new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key1"))
+        new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key1"))
                 .clustering("Column1")
                 .add("val", ByteBufferUtil.bytes("abcd"))
                 .build()
@@ -105,7 +103,7 @@ public class ReadCommandTest
 
         cfs.forceBlockingFlush();
 
-        new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key2"))
+        new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key2"))
                 .clustering("Column1")
                 .add("val", ByteBufferUtil.bytes("abcd"))
                 .build()
@@ -125,7 +123,7 @@ public class ReadCommandTest
 
         cfs.truncateBlocking();
 
-        new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key"))
+        new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key"))
                 .clustering("cc")
                 .add("a", ByteBufferUtil.bytes("abcd"))
                 .build()
@@ -133,7 +131,7 @@ public class ReadCommandTest
 
         cfs.forceBlockingFlush();
 
-        new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key"))
+        new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key"))
                 .clustering("dd")
                 .add("a", ByteBufferUtil.bytes("abcd"))
                 .build()
@@ -156,7 +154,7 @@ public class ReadCommandTest
 
         cfs.truncateBlocking();
 
-        new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key"))
+        new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key"))
                 .clustering("cc")
                 .add("a", ByteBufferUtil.bytes("abcd"))
                 .build()
@@ -164,7 +162,7 @@ public class ReadCommandTest
 
         cfs.forceBlockingFlush();
 
-        new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key"))
+        new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key"))
                 .clustering("dd")
                 .add("a", ByteBufferUtil.bytes("abcd"))
                 .build()
@@ -213,10 +211,10 @@ public class ReadCommandTest
 
         List<ByteBuffer> buffers = new ArrayList<>(groups.length);
         int nowInSeconds = FBUtilities.nowInSeconds();
-        ColumnFilter columnFilter = ColumnFilter.allRegularColumnsBuilder(cfs.metadata).build();
+        ColumnFilter columnFilter = ColumnFilter.allRegularColumnsBuilder(cfs.metadata()).build();
         RowFilter rowFilter = RowFilter.create();
         Slice slice = Slice.make(ClusteringBound.BOTTOM, ClusteringBound.TOP);
-        ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(Slices.with(cfs.metadata.comparator, slice), false);
+        ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(Slices.with(cfs.metadata().comparator, slice), false);
 
         for (String[][] group : groups)
         {
@@ -228,7 +226,7 @@ public class ReadCommandTest
             {
                 if (data[0].equals("1"))
                 {
-                    new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes(data[1]))
+                    new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes(data[1]))
                     .clustering(data[2])
                     .add(data[3], ByteBufferUtil.bytes("blah"))
                     .build()
@@ -236,9 +234,9 @@ public class ReadCommandTest
                 }
                 else
                 {
-                    RowUpdateBuilder.deleteRow(cfs.metadata, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(data[1]), data[2]).apply();
+                    RowUpdateBuilder.deleteRow(cfs.metadata(), FBUtilities.timestampMicros(), ByteBufferUtil.bytes(data[1]), data[2]).apply();
                 }
-                commands.add(SinglePartitionReadCommand.create(cfs.metadata, nowInSeconds, columnFilter, rowFilter, DataLimits.NONE, Util.dk(data[1]), sliceFilter));
+                commands.add(SinglePartitionReadCommand.create(cfs.metadata(), nowInSeconds, columnFilter, rowFilter, DataLimits.NONE, Util.dk(data[1]), sliceFilter));
             }
 
             cfs.forceBlockingFlush();
@@ -266,7 +264,7 @@ public class ReadCommandTest
             {
                 iterators.add(UnfilteredPartitionIterators.serializerForIntraNode().deserialize(in,
                                                                                                 MessagingService.current_version,
-                                                                                                cfs.metadata,
+                                                                                                cfs.metadata(),
                                                                                                 columnFilter,
                                                                                                 SerializationHelper.Flag.LOCAL));
             }
@@ -298,7 +296,7 @@ public class ReadCommandTest
                     while (rowIterator.hasNext())
                     {
                         Row row = rowIterator.next();
-                        assertEquals("col=" + expectedRows[i++], row.clustering().toString(cfs.metadata));
+                        assertEquals("col=" + expectedRows[i++], row.clustering().toString(cfs.metadata()));
                         //System.out.print(row.toString(cfs.metadata, true));
                     }
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/db/ReadMessageTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ReadMessageTest.java b/test/unit/org/apache/cassandra/db/ReadMessageTest.java
index f76bf93..5b05253 100644
--- a/test/unit/org/apache/cassandra/db/ReadMessageTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadMessageTest.java
@@ -28,8 +28,8 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.commitlog.CommitLogTestReplayer;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
@@ -59,29 +59,34 @@ public class ReadMessageTest
     {
         DatabaseDescriptor.daemonInitialization();
 
-        CFMetaData cfForReadMetadata = CFMetaData.Builder.create(KEYSPACE1, CF_FOR_READ_TEST)
-                                                            .addPartitionKey("key", BytesType.instance)
-                                                            .addClusteringColumn("col1", AsciiType.instance)
-                                                            .addClusteringColumn("col2", AsciiType.instance)
-                                                            .addRegularColumn("a", AsciiType.instance)
-                                                            .addRegularColumn("b", AsciiType.instance).build();
-
-        CFMetaData cfForCommitMetadata1 = CFMetaData.Builder.create(KEYSPACE1, CF_FOR_COMMIT_TEST)
-                                                       .addPartitionKey("key", BytesType.instance)
-                                                       .addClusteringColumn("name", AsciiType.instance)
-                                                       .addRegularColumn("commit1", AsciiType.instance).build();
-
-        CFMetaData cfForCommitMetadata2 = CFMetaData.Builder.create(KEYSPACENOCOMMIT, CF_FOR_COMMIT_TEST)
-                                                            .addPartitionKey("key", BytesType.instance)
-                                                            .addClusteringColumn("name", AsciiType.instance)
-                                                            .addRegularColumn("commit2", AsciiType.instance).build();
+        TableMetadata.Builder cfForReadMetadata =
+            TableMetadata.builder(KEYSPACE1, CF_FOR_READ_TEST)
+                         .addPartitionKeyColumn("key", BytesType.instance)
+                         .addClusteringColumn("col1", AsciiType.instance)
+                         .addClusteringColumn("col2", AsciiType.instance)
+                         .addRegularColumn("a", AsciiType.instance)
+                         .addRegularColumn("b", AsciiType.instance);
+
+        TableMetadata.Builder cfForCommitMetadata1 =
+            TableMetadata.builder(KEYSPACE1, CF_FOR_COMMIT_TEST)
+                         .addPartitionKeyColumn("key", BytesType.instance)
+                         .addClusteringColumn("name", AsciiType.instance)
+                         .addRegularColumn("commit1", AsciiType.instance);
+
+        TableMetadata.Builder cfForCommitMetadata2 =
+            TableMetadata.builder(KEYSPACENOCOMMIT, CF_FOR_COMMIT_TEST)
+                         .addPartitionKeyColumn("key", BytesType.instance)
+                         .addClusteringColumn("name", AsciiType.instance)
+                         .addRegularColumn("commit2", AsciiType.instance);
 
         SchemaLoader.prepareServer();
+
         SchemaLoader.createKeyspace(KEYSPACE1,
                                     KeyspaceParams.simple(1),
                                     SchemaLoader.standardCFMD(KEYSPACE1, CF),
                                     cfForReadMetadata,
                                     cfForCommitMetadata1);
+
         SchemaLoader.createKeyspace(KEYSPACENOCOMMIT,
                                     KeyspaceParams.simpleTransient(1),
                                     SchemaLoader.standardCFMD(KEYSPACENOCOMMIT, CF),
@@ -158,13 +163,13 @@ public class ReadMessageTest
     {
         ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF);
 
-        new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key1"))
+        new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key1"))
                 .clustering("Column1")
                 .add("val", ByteBufferUtil.bytes("abcd"))
                 .build()
                 .apply();
 
-        ColumnDefinition col = cfs.metadata.getColumnDefinition(ByteBufferUtil.bytes("val"));
+        ColumnMetadata col = cfs.metadata().getColumn(ByteBufferUtil.bytes("val"));
         int found = 0;
         for (FilteredPartition partition : Util.getAll(Util.cmd(cfs).build()))
         {
@@ -184,20 +189,20 @@ public class ReadMessageTest
 
         ColumnFamilyStore cfsnocommit = Keyspace.open(KEYSPACENOCOMMIT).getColumnFamilyStore(CF_FOR_COMMIT_TEST);
 
-        new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("row"))
+        new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("row"))
                 .clustering("c")
                 .add("commit1", ByteBufferUtil.bytes("abcd"))
                 .build()
                 .apply();
 
-        new RowUpdateBuilder(cfsnocommit.metadata, 0, ByteBufferUtil.bytes("row"))
+        new RowUpdateBuilder(cfsnocommit.metadata(), 0, ByteBufferUtil.bytes("row"))
                 .clustering("c")
                 .add("commit2", ByteBufferUtil.bytes("abcd"))
                 .build()
                 .apply();
 
-        Checker checker = new Checker(cfs.metadata.getColumnDefinition(ByteBufferUtil.bytes("commit1")),
-                                      cfsnocommit.metadata.getColumnDefinition(ByteBufferUtil.bytes("commit2")));
+        Checker checker = new Checker(cfs.metadata().getColumn(ByteBufferUtil.bytes("commit1")),
+                                      cfsnocommit.metadata().getColumn(ByteBufferUtil.bytes("commit2")));
 
         CommitLogTestReplayer replayer = new CommitLogTestReplayer(checker);
         replayer.examineCommitLog();
@@ -208,13 +213,13 @@ public class ReadMessageTest
 
     static class Checker implements Predicate<Mutation>
     {
-        private final ColumnDefinition withCommit;
-        private final ColumnDefinition withoutCommit;
+        private final ColumnMetadata withCommit;
+        private final ColumnMetadata withoutCommit;
 
         boolean commitLogMessageFound = false;
         boolean noCommitLogMessageFound = false;
 
-        public Checker(ColumnDefinition withCommit, ColumnDefinition withoutCommit)
+        public Checker(ColumnMetadata withCommit, ColumnMetadata withoutCommit)
         {
             this.withCommit = withCommit;
             this.withoutCommit = withoutCommit;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java b/test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
index 0b20343..7225560 100644
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
@@ -36,7 +36,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.ParameterizedClass;
-import org.apache.cassandra.config.SchemaConstants;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -126,7 +126,7 @@ public class RecoveryManagerFlushedTest
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE1);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
-        new RowUpdateBuilder(cfs.metadata, 0, key)
+        new RowUpdateBuilder(cfs.metadata(), 0, key)
             .clustering("c")
             .add("val", "val1")
             .build()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java b/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
index 8897700..d60574f 100644
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
@@ -98,11 +98,11 @@ public class RecoveryManagerMissingHeaderTest
         Keyspace keyspace2 = Keyspace.open(KEYSPACE2);
 
         DecoratedKey dk = Util.dk("keymulti");
-        UnfilteredRowIterator upd1 = Util.apply(new RowUpdateBuilder(keyspace1.getColumnFamilyStore(CF_STANDARD1).metadata, 1L, 0, "keymulti")
+        UnfilteredRowIterator upd1 = Util.apply(new RowUpdateBuilder(keyspace1.getColumnFamilyStore(CF_STANDARD1).metadata(), 1L, 0, "keymulti")
                                        .clustering("col1").add("val", "1")
                                        .build());
 
-        UnfilteredRowIterator upd2 = Util.apply(new RowUpdateBuilder(keyspace2.getColumnFamilyStore(CF_STANDARD3).metadata, 1L, 0, "keymulti")
+        UnfilteredRowIterator upd2 = Util.apply(new RowUpdateBuilder(keyspace2.getColumnFamilyStore(CF_STANDARD3).metadata(), 1L, 0, "keymulti")
                                        .clustering("col1").add("val", "1")
                                        .build());
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
index f5bda4f..164253d 100644
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
@@ -30,21 +30,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.Util;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.ParameterizedClass;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.context.CounterContext;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.compress.DeflateCompressor;
-import org.apache.cassandra.io.compress.LZ4Compressor;
-import org.apache.cassandra.io.compress.SnappyCompressor;
-
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -53,16 +38,30 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
-import static org.junit.Assert.assertEquals;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.CommitLogArchiver;
+import org.apache.cassandra.db.commitlog.CommitLogReplayer;
+import org.apache.cassandra.db.context.CounterContext;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.compress.DeflateCompressor;
+import org.apache.cassandra.io.compress.LZ4Compressor;
+import org.apache.cassandra.io.compress.SnappyCompressor;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.security.EncryptionContext;
 import org.apache.cassandra.security.EncryptionContextGenerator;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.db.commitlog.CommitLogReplayer;
+
+import static org.junit.Assert.assertEquals;
 
 @RunWith(Parameterized.class)
 public class RecoveryManagerTest
@@ -141,11 +140,11 @@ public class RecoveryManagerTest
             Keyspace keyspace1 = Keyspace.open(KEYSPACE1);
             Keyspace keyspace2 = Keyspace.open(KEYSPACE2);
 
-            UnfilteredRowIterator upd1 = Util.apply(new RowUpdateBuilder(keyspace1.getColumnFamilyStore(CF_STANDARD1).metadata, 1L, 0, "keymulti")
+            UnfilteredRowIterator upd1 = Util.apply(new RowUpdateBuilder(keyspace1.getColumnFamilyStore(CF_STANDARD1).metadata(), 1L, 0, "keymulti")
                 .clustering("col1").add("val", "1")
                 .build());
 
-            UnfilteredRowIterator upd2 = Util.apply(new RowUpdateBuilder(keyspace2.getColumnFamilyStore(CF_STANDARD3).metadata, 1L, 0, "keymulti")
+            UnfilteredRowIterator upd2 = Util.apply(new RowUpdateBuilder(keyspace2.getColumnFamilyStore(CF_STANDARD3).metadata(), 1L, 0, "keymulti")
                                            .clustering("col2").add("val", "1")
                                            .build());
 
@@ -204,11 +203,11 @@ public class RecoveryManagerTest
         Keyspace keyspace1 = Keyspace.open(KEYSPACE1);
         Keyspace keyspace2 = Keyspace.open(KEYSPACE2);
 
-        UnfilteredRowIterator upd1 = Util.apply(new RowUpdateBuilder(keyspace1.getColumnFamilyStore(CF_STANDARD1).metadata, 1L, 0, "keymulti")
+        UnfilteredRowIterator upd1 = Util.apply(new RowUpdateBuilder(keyspace1.getColumnFamilyStore(CF_STANDARD1).metadata(), 1L, 0, "keymulti")
             .clustering("col1").add("val", "1")
             .build());
 
-        UnfilteredRowIterator upd2 = Util.apply(new RowUpdateBuilder(keyspace2.getColumnFamilyStore(CF_STANDARD3).metadata, 1L, 0, "keymulti")
+        UnfilteredRowIterator upd2 = Util.apply(new RowUpdateBuilder(keyspace2.getColumnFamilyStore(CF_STANDARD3).metadata(), 1L, 0, "keymulti")
                                        .clustering("col2").add("val", "1")
                                        .build());
 
@@ -231,7 +230,7 @@ public class RecoveryManagerTest
 
         for (int i = 0; i < 10; ++i)
         {
-            new CounterMutation(new RowUpdateBuilder(cfs.metadata, 1L, 0, "key")
+            new CounterMutation(new RowUpdateBuilder(cfs.metadata(), 1L, 0, "key")
                 .clustering("cc").add("val", CounterContext.instance().createLocal(1L))
                 .build(), ConsistencyLevel.ALL).apply();
         }
@@ -240,7 +239,7 @@ public class RecoveryManagerTest
 
         int replayed = CommitLog.instance.resetUnsafe(false);
 
-        ColumnDefinition counterCol = cfs.metadata.getColumnDefinition(ByteBufferUtil.bytes("val"));
+        ColumnMetadata counterCol = cfs.metadata().getColumn(ByteBufferUtil.bytes("val"));
         Row row = Util.getOnlyRow(Util.cmd(cfs).includeRow("cc").columns("val").build());
         assertEquals(10L, CounterContext.instance().total(row.getCell(counterCol).value()));
     }
@@ -257,7 +256,7 @@ public class RecoveryManagerTest
         for (int i = 0; i < 10; ++i)
         {
             long ts = TimeUnit.MILLISECONDS.toMicros(timeMS + (i * 1000));
-            new RowUpdateBuilder(cfs.metadata, ts, "name-" + i)
+            new RowUpdateBuilder(cfs.metadata(), ts, "name-" + i)
                 .clustering("cc")
                 .add("val", Integer.toString(i))
                 .build()
@@ -292,7 +291,7 @@ public class RecoveryManagerTest
             else
                 ts = TimeUnit.MILLISECONDS.toMicros(timeMS + (i * 1000));
 
-            new RowUpdateBuilder(cfs.metadata, ts, "name-" + i)
+            new RowUpdateBuilder(cfs.metadata(), ts, "name-" + i)
                 .clustering("cc")
                 .add("val", Integer.toString(i))
                 .build()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java b/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
index 738888f..ef00b22 100644
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
@@ -93,7 +93,7 @@ public class RecoveryManagerTruncateTest
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
 
         // add a single cell
-        new RowUpdateBuilder(cfs.metadata, 0, "key1")
+        new RowUpdateBuilder(cfs.metadata(), 0, "key1")
             .clustering("cc")
             .add("val", "val1")
             .build()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/db/RowCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowCacheTest.java b/test/unit/org/apache/cassandra/db/RowCacheTest.java
index 558c187..fee3f2c 100644
--- a/test/unit/org/apache/cassandra/db/RowCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/RowCacheTest.java
@@ -32,8 +32,7 @@ import org.junit.Test;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.cache.RowCacheKey;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.compaction.CompactionManager;
@@ -99,9 +98,9 @@ public class RowCacheTest
 
         ByteBuffer key = ByteBufferUtil.bytes("rowcachekey");
         DecoratedKey dk = cachedStore.decorateKey(key);
-        RowCacheKey rck = new RowCacheKey(cachedStore.metadata.ksAndCFName, dk);
+        RowCacheKey rck = new RowCacheKey(cachedStore.metadata(), dk);
 
-        RowUpdateBuilder rub = new RowUpdateBuilder(cachedStore.metadata, System.currentTimeMillis(), key);
+        RowUpdateBuilder rub = new RowUpdateBuilder(cachedStore.metadata(), System.currentTimeMillis(), key);
         rub.clustering(String.valueOf(0));
         rub.add("val", ByteBufferUtil.bytes("val" + 0));
         rub.build().applyUnsafe();
@@ -414,11 +413,11 @@ public class RowCacheTest
 
         ByteBuffer key = ByteBufferUtil.bytes("rowcachekey");
         DecoratedKey dk = cachedStore.decorateKey(key);
-        RowCacheKey rck = new RowCacheKey(cachedStore.metadata.ksAndCFName, dk);
+        RowCacheKey rck = new RowCacheKey(cachedStore.metadata(), dk);
         String values[] = new String[200];
         for (int i = 0; i < 200; i++)
         {
-            RowUpdateBuilder rub = new RowUpdateBuilder(cachedStore.metadata, System.currentTimeMillis(), key);
+            RowUpdateBuilder rub = new RowUpdateBuilder(cachedStore.metadata(), System.currentTimeMillis(), key);
             rub.clustering(String.valueOf(i));
             values[i] = "val" + i;
             rub.add("val", ByteBufferUtil.bytes(values[i]));
@@ -548,12 +547,10 @@ public class RowCacheTest
     private static void readData(String keyspace, String columnFamily, int offset, int numberOfRows)
     {
         ColumnFamilyStore store = Keyspace.open(keyspace).getColumnFamilyStore(columnFamily);
-        CFMetaData cfm = Schema.instance.getCFMetaData(keyspace, columnFamily);
 
         for (int i = offset; i < offset + numberOfRows; i++)
         {
             DecoratedKey key = Util.dk("key" + i);
-            Clustering cl = Clustering.make(ByteBufferUtil.bytes("col" + i));
             Util.getAll(Util.cmd(store, key).build());
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
index 28590d8..e563070 100644
--- a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
+++ b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
@@ -33,7 +33,8 @@ import org.junit.Test;
 
 import org.apache.cassandra.Util;
 import org.apache.cassandra.cache.IMeasurableMemory;
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.db.columniterator.AbstractSSTableIterator;
@@ -131,18 +132,21 @@ public class RowIndexEntryTest extends CQLTester
 
     private static class DoubleSerializer implements AutoCloseable
     {
-        CFMetaData cfMeta = CFMetaData.compile("CREATE TABLE pipe.dev_null (pk bigint, ck bigint, val text, PRIMARY KEY(pk, ck))", "foo");
+        TableMetadata metadata =
+            CreateTableStatement.parse("CREATE TABLE pipe.dev_null (pk bigint, ck bigint, val text, PRIMARY KEY(pk, ck))", "foo")
+                                .build();
+
         Version version = BigFormat.latestVersion;
 
         DeletionTime deletionInfo = new DeletionTime(FBUtilities.timestampMicros(), FBUtilities.nowInSeconds());
         LivenessInfo primaryKeyLivenessInfo = LivenessInfo.EMPTY;
         Row.Deletion deletion = Row.Deletion.LIVE;
 
-        SerializationHeader header = new SerializationHeader(true, cfMeta, cfMeta.partitionColumns(), EncodingStats.NO_STATS);
+        SerializationHeader header = new SerializationHeader(true, metadata, metadata.regularAndStaticColumns(), EncodingStats.NO_STATS);
 
         // create C-11206 + old serializer instances
         RowIndexEntry.IndexSerializer rieSerializer = new RowIndexEntry.Serializer(version, header);
-        Pre_C_11206_RowIndexEntry.Serializer oldSerializer = new Pre_C_11206_RowIndexEntry.Serializer(cfMeta, version, header);
+        Pre_C_11206_RowIndexEntry.Serializer oldSerializer = new Pre_C_11206_RowIndexEntry.Serializer(metadata, version, header);
 
         @SuppressWarnings({ "resource", "IOResourceOpenedButNotSafelyClosed" })
         final DataOutputBuffer rieOutput = new DataOutputBuffer(1024);
@@ -201,7 +205,7 @@ public class RowIndexEntryTest extends CQLTester
         private AbstractUnfilteredRowIterator makeRowIter(Row staticRow, DecoratedKey partitionKey,
                                                           Iterator<Clustering> clusteringIter, SequentialWriter dataWriter)
         {
-            return new AbstractUnfilteredRowIterator(cfMeta, partitionKey, deletionInfo, cfMeta.partitionColumns(),
+            return new AbstractUnfilteredRowIterator(metadata, partitionKey, deletionInfo, metadata.regularAndStaticColumns(),
                                                      staticRow, false, new EncodingStats(0, 0, 0))
             {
                 protected Unfiltered computeNext()
@@ -225,7 +229,7 @@ public class RowIndexEntryTest extends CQLTester
         private Unfiltered buildRow(Clustering clustering)
         {
             BTree.Builder<ColumnData> builder = BTree.builder(ColumnData.comparator);
-            builder.add(BufferCell.live(cfMeta.partitionColumns().iterator().next(),
+            builder.add(BufferCell.live(metadata.regularAndStaticColumns().iterator().next(),
                                         1L,
                                         ByteBuffer.allocate(0)));
             return BTreeRow.create(clustering, primaryKeyLivenessInfo, deletion, builder.build());
@@ -404,8 +408,8 @@ public class RowIndexEntryTest extends CQLTester
         Pre_C_11206_RowIndexEntry simple = new Pre_C_11206_RowIndexEntry(123);
 
         DataOutputBuffer buffer = new DataOutputBuffer();
-        SerializationHeader header = new SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS);
-        Pre_C_11206_RowIndexEntry.Serializer serializer = new Pre_C_11206_RowIndexEntry.Serializer(cfs.metadata, BigFormat.latestVersion, header);
+        SerializationHeader header = new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS);
+        Pre_C_11206_RowIndexEntry.Serializer serializer = new Pre_C_11206_RowIndexEntry.Serializer(cfs.metadata(), BigFormat.latestVersion, header);
 
         serializer.serialize(simple, buffer);
 
@@ -565,7 +569,7 @@ public class RowIndexEntryTest extends CQLTester
             private final IndexInfo.Serializer idxSerializer;
             private final Version version;
 
-            Serializer(CFMetaData metadata, Version version, SerializationHeader header)
+            Serializer(TableMetadata metadata, Version version, SerializationHeader header)
             {
                 this.idxSerializer = IndexInfo.serializer(version, header);
                 this.version = version;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/db/RowTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowTest.java b/test/unit/org/apache/cassandra/db/RowTest.java
index 5fdb98a..5134857 100644
--- a/test/unit/org/apache/cassandra/db/RowTest.java
+++ b/test/unit/org/apache/cassandra/db/RowTest.java
@@ -29,8 +29,8 @@ import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.db.rows.*;
@@ -54,22 +54,23 @@ public class RowTest
     private int nowInSeconds;
     private DecoratedKey dk;
     private ColumnFamilyStore cfs;
-    private CFMetaData cfm;
+    private TableMetadata metadata;
 
     @BeforeClass
     public static void defineSchema() throws ConfigurationException
     {
         DatabaseDescriptor.daemonInitialization();
-        CFMetaData cfMetadata = CFMetaData.Builder.create(KEYSPACE1, CF_STANDARD1)
-                                                  .addPartitionKey("key", BytesType.instance)
-                                                  .addClusteringColumn("col1", AsciiType.instance)
-                                                  .addRegularColumn("a", AsciiType.instance)
-                                                  .addRegularColumn("b", AsciiType.instance)
-                                                  .build();
+
+        TableMetadata.Builder metadata =
+            TableMetadata.builder(KEYSPACE1, CF_STANDARD1)
+                         .addPartitionKeyColumn("key", BytesType.instance)
+                         .addClusteringColumn("col1", AsciiType.instance)
+                         .addRegularColumn("a", AsciiType.instance)
+                         .addRegularColumn("b", AsciiType.instance);
+
         SchemaLoader.prepareServer();
-        SchemaLoader.createKeyspace(KEYSPACE1,
-                                    KeyspaceParams.simple(1),
-                                    cfMetadata);
+
+        SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1), metadata);
     }
 
     @Before
@@ -78,19 +79,19 @@ public class RowTest
         nowInSeconds = FBUtilities.nowInSeconds();
         dk = Util.dk("key0");
         cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
-        cfm = cfs.metadata;
+        metadata = cfs.metadata();
     }
 
     @Test
     public void testMergeRangeTombstones() throws InterruptedException
     {
-        PartitionUpdate update1 = new PartitionUpdate(cfm, dk, cfm.partitionColumns(), 1);
+        PartitionUpdate update1 = new PartitionUpdate(metadata, dk, metadata.regularAndStaticColumns(), 1);
         writeRangeTombstone(update1, "1", "11", 123, 123);
         writeRangeTombstone(update1, "2", "22", 123, 123);
         writeRangeTombstone(update1, "3", "31", 123, 123);
         writeRangeTombstone(update1, "4", "41", 123, 123);
 
-        PartitionUpdate update2 = new PartitionUpdate(cfm, dk, cfm.partitionColumns(), 1);
+        PartitionUpdate update2 = new PartitionUpdate(metadata, dk, metadata.regularAndStaticColumns(), 1);
         writeRangeTombstone(update2, "1", "11", 123, 123);
         writeRangeTombstone(update2, "111", "112", 1230, 123);
         writeRangeTombstone(update2, "2", "24", 123, 123);
@@ -128,17 +129,17 @@ public class RowTest
     @Test
     public void testResolve()
     {
-        ColumnDefinition defA = cfm.getColumnDefinition(new ColumnIdentifier("a", true));
-        ColumnDefinition defB = cfm.getColumnDefinition(new ColumnIdentifier("b", true));
+        ColumnMetadata defA = metadata.getColumn(new ColumnIdentifier("a", true));
+        ColumnMetadata defB = metadata.getColumn(new ColumnIdentifier("b", true));
 
         Row.Builder builder = BTreeRow.unsortedBuilder(nowInSeconds);
-        builder.newRow(cfm.comparator.make("c1"));
-        writeSimpleCellValue(builder, cfm, defA, "a1", 0);
-        writeSimpleCellValue(builder, cfm, defA, "a2", 1);
-        writeSimpleCellValue(builder, cfm, defB, "b1", 1);
+        builder.newRow(metadata.comparator.make("c1"));
+        writeSimpleCellValue(builder, defA, "a1", 0);
+        writeSimpleCellValue(builder, defA, "a2", 1);
+        writeSimpleCellValue(builder, defB, "b1", 1);
         Row row = builder.build();
 
-        PartitionUpdate update = PartitionUpdate.singleRowUpdate(cfm, dk, row);
+        PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata, dk, row);
 
         Unfiltered unfiltered = update.unfilteredIterator().next();
         assertTrue(unfiltered.kind() == Unfiltered.Kind.ROW);
@@ -152,11 +153,11 @@ public class RowTest
     public void testExpiringColumnExpiration() throws IOException
     {
         int ttl = 1;
-        ColumnDefinition def = cfm.getColumnDefinition(new ColumnIdentifier("a", true));
+        ColumnMetadata def = metadata.getColumn(new ColumnIdentifier("a", true));
 
         Cell cell = BufferCell.expiring(def, 0, ttl, nowInSeconds, ((AbstractType) def.cellValueType()).decompose("a1"));
 
-        PartitionUpdate update = PartitionUpdate.singleRowUpdate(cfm, dk, BTreeRow.singleCellRow(cfm.comparator.make("c1"), cell));
+        PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata, dk, BTreeRow.singleCellRow(metadata.comparator.make("c1"), cell));
         new Mutation(update).applyUnsafe();
 
         // when we read with a nowInSeconds before the cell has expired,
@@ -172,14 +173,14 @@ public class RowTest
     @Test
     public void testHashCode()
     {
-        ColumnDefinition defA = cfm.getColumnDefinition(new ColumnIdentifier("a", true));
-        ColumnDefinition defB = cfm.getColumnDefinition(new ColumnIdentifier("b", true));
+        ColumnMetadata defA = metadata.getColumn(new ColumnIdentifier("a", true));
+        ColumnMetadata defB = metadata.getColumn(new ColumnIdentifier("b", true));
 
         Row.Builder builder = BTreeRow.unsortedBuilder(nowInSeconds);
-        builder.newRow(cfm.comparator.make("c1"));
-        writeSimpleCellValue(builder, cfm, defA, "a1", 0);
-        writeSimpleCellValue(builder, cfm, defA, "a2", 1);
-        writeSimpleCellValue(builder, cfm, defB, "b1", 1);
+        builder.newRow(metadata.comparator.make("c1"));
+        writeSimpleCellValue(builder, defA, "a1", 0);
+        writeSimpleCellValue(builder, defA, "a2", 1);
+        writeSimpleCellValue(builder, defB, "b1", 1);
         Row row = builder.build();
 
         Map<Row, Integer> map = new HashMap<>();
@@ -189,7 +190,7 @@ public class RowTest
 
     private void assertRangeTombstoneMarkers(ClusteringBound start, ClusteringBound end, DeletionTime deletionTime, Object[] expected)
     {
-        AbstractType clusteringType = (AbstractType)cfm.comparator.subtype(0);
+        AbstractType clusteringType = (AbstractType) metadata.comparator.subtype(0);
 
         assertEquals(1, start.size());
         assertEquals(start.kind(), ClusteringPrefix.Kind.INCL_START_BOUND);
@@ -210,11 +211,10 @@ public class RowTest
     }
 
     private void writeSimpleCellValue(Row.Builder builder,
-                                      CFMetaData cfm,
-                                      ColumnDefinition columnDefinition,
+                                      ColumnMetadata columnMetadata,
                                       String value,
                                       long timestamp)
     {
-       builder.addCell(BufferCell.live(columnDefinition, timestamp, ((AbstractType) columnDefinition.cellValueType()).decompose(value)));
+       builder.addCell(BufferCell.live(columnMetadata, timestamp, ((AbstractType) columnMetadata.cellValueType()).decompose(value)));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/db/RowUpdateBuilder.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowUpdateBuilder.java b/test/unit/org/apache/cassandra/db/RowUpdateBuilder.java
index 8e71d64..f4eafa3 100644
--- a/test/unit/org/apache/cassandra/db/RowUpdateBuilder.java
+++ b/test/unit/org/apache/cassandra/db/RowUpdateBuilder.java
@@ -20,16 +20,10 @@ package org.apache.cassandra.db;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.partitions.*;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.ListType;
-import org.apache.cassandra.db.marshal.MapType;
 import org.apache.cassandra.utils.*;
 
 /**
@@ -52,22 +46,22 @@ public class RowUpdateBuilder
         this.updateBuilder = updateBuilder;
     }
 
-    public RowUpdateBuilder(CFMetaData metadata, long timestamp, Object partitionKey)
+    public RowUpdateBuilder(TableMetadata metadata, long timestamp, Object partitionKey)
     {
         this(metadata, FBUtilities.nowInSeconds(), timestamp, partitionKey);
     }
 
-    public RowUpdateBuilder(CFMetaData metadata, int localDeletionTime, long timestamp, Object partitionKey)
+    public RowUpdateBuilder(TableMetadata metadata, int localDeletionTime, long timestamp, Object partitionKey)
     {
         this(metadata, localDeletionTime, timestamp, metadata.params.defaultTimeToLive, partitionKey);
     }
 
-    public RowUpdateBuilder(CFMetaData metadata, long timestamp, int ttl, Object partitionKey)
+    public RowUpdateBuilder(TableMetadata metadata, long timestamp, int ttl, Object partitionKey)
     {
         this(metadata, FBUtilities.nowInSeconds(), timestamp, ttl, partitionKey);
     }
 
-    public RowUpdateBuilder(CFMetaData metadata, int localDeletionTime, long timestamp, int ttl, Object partitionKey)
+    public RowUpdateBuilder(TableMetadata metadata, int localDeletionTime, long timestamp, int ttl, Object partitionKey)
     {
         this(PartitionUpdate.simpleBuilder(metadata, partitionKey));
 
@@ -137,27 +131,27 @@ public class RowUpdateBuilder
         update.add(builder.build());
     }
 
-    public static Mutation deleteRow(CFMetaData metadata, long timestamp, Object key, Object... clusteringValues)
+    public static Mutation deleteRow(TableMetadata metadata, long timestamp, Object key, Object... clusteringValues)
     {
         return deleteRowAt(metadata, timestamp, FBUtilities.nowInSeconds(), key, clusteringValues);
     }
 
-    public static Mutation deleteRowAt(CFMetaData metadata, long timestamp, int localDeletionTime, Object key, Object... clusteringValues)
+    public static Mutation deleteRowAt(TableMetadata metadata, long timestamp, int localDeletionTime, Object key, Object... clusteringValues)
     {
-        PartitionUpdate update = new PartitionUpdate(metadata, makeKey(metadata, key), metadata.partitionColumns(), 0);
+        PartitionUpdate update = new PartitionUpdate(metadata, makeKey(metadata, key), metadata.regularAndStaticColumns(), 0);
         deleteRow(update, timestamp, localDeletionTime, clusteringValues);
         // note that the created mutation may get further update later on, so we don't use the ctor that create a singletonMap
         // underneath (this class if for convenience, not performance)
-        return new Mutation(update.metadata().ksName, update.partitionKey()).add(update);
+        return new Mutation(update.metadata().keyspace, update.partitionKey()).add(update);
     }
 
-    private static DecoratedKey makeKey(CFMetaData metadata, Object... partitionKey)
+    private static DecoratedKey makeKey(TableMetadata metadata, Object... partitionKey)
     {
         if (partitionKey.length == 1 && partitionKey[0] instanceof DecoratedKey)
             return (DecoratedKey)partitionKey[0];
 
-        ByteBuffer key = CFMetaData.serializePartitionKey(metadata.getKeyValidatorAsClusteringComparator().make(partitionKey));
-        return metadata.decorateKey(key);
+        ByteBuffer key = metadata.partitionKeyAsClusteringComparator().make(partitionKey).serializeAsPartitionKey();
+        return metadata.partitioner.decorateKey(key);
     }
 
     public RowUpdateBuilder addRangeTombstone(RangeTombstone rt)
@@ -178,9 +172,9 @@ public class RowUpdateBuilder
         return this;
     }
 
-    public RowUpdateBuilder add(ColumnDefinition columnDefinition, Object value)
+    public RowUpdateBuilder add(ColumnMetadata columnMetadata, Object value)
     {
-        return add(columnDefinition.name.toString(), value);
+        return add(columnMetadata.name.toString(), value);
     }
 
     public RowUpdateBuilder delete(String columnName)
@@ -189,8 +183,8 @@ public class RowUpdateBuilder
         return this;
     }
 
-    public RowUpdateBuilder delete(ColumnDefinition columnDefinition)
+    public RowUpdateBuilder delete(ColumnMetadata columnMetadata)
     {
-        return delete(columnDefinition.name.toString());
+        return delete(columnMetadata.name.toString());
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index 93ac46e..d9f6433 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -33,7 +33,6 @@ import org.junit.runner.RunWith;
 
 import org.apache.cassandra.*;
 import org.apache.cassandra.cache.ChunkCache;
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.cql3.QueryProcessor;
@@ -59,8 +58,14 @@ import org.apache.cassandra.io.sstable.format.big.BigTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+import static org.apache.cassandra.SchemaLoader.counterCFMD;
+import static org.apache.cassandra.SchemaLoader.createKeyspace;
+import static org.apache.cassandra.SchemaLoader.getCompressionParameters;
+import static org.apache.cassandra.SchemaLoader.loadSchema;
+import static org.apache.cassandra.SchemaLoader.standardCFMD;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -91,19 +96,18 @@ public class ScrubTest
     @BeforeClass
     public static void defineSchema() throws ConfigurationException
     {
-        SchemaLoader.loadSchema();
-        SchemaLoader.createKeyspace(KEYSPACE,
-                                    KeyspaceParams.simple(1),
-                                    SchemaLoader.standardCFMD(KEYSPACE, CF),
-                                    SchemaLoader.standardCFMD(KEYSPACE, CF2),
-                                    SchemaLoader.standardCFMD(KEYSPACE, CF3),
-                                    SchemaLoader.counterCFMD(KEYSPACE, COUNTER_CF)
-                                                .compression(SchemaLoader.getCompressionParameters(COMPRESSION_CHUNK_LENGTH)),
-                                    SchemaLoader.standardCFMD(KEYSPACE, CF_UUID, 0, UUIDType.instance),
-                                    SchemaLoader.keysIndexCFMD(KEYSPACE, CF_INDEX1, true),
-                                    SchemaLoader.compositeIndexCFMD(KEYSPACE, CF_INDEX2, true),
-                                    SchemaLoader.keysIndexCFMD(KEYSPACE, CF_INDEX1_BYTEORDERED, true).copy(ByteOrderedPartitioner.instance),
-                                    SchemaLoader.compositeIndexCFMD(KEYSPACE, CF_INDEX2_BYTEORDERED, true).copy(ByteOrderedPartitioner.instance));
+        loadSchema();
+        createKeyspace(KEYSPACE,
+                       KeyspaceParams.simple(1),
+                       standardCFMD(KEYSPACE, CF),
+                       standardCFMD(KEYSPACE, CF2),
+                       standardCFMD(KEYSPACE, CF3),
+                       counterCFMD(KEYSPACE, COUNTER_CF).compression(getCompressionParameters(COMPRESSION_CHUNK_LENGTH)),
+                       standardCFMD(KEYSPACE, CF_UUID, 0, UUIDType.instance),
+                       SchemaLoader.keysIndexCFMD(KEYSPACE, CF_INDEX1, true),
+                       SchemaLoader.compositeIndexCFMD(KEYSPACE, CF_INDEX2, true),
+                       SchemaLoader.keysIndexCFMD(KEYSPACE, CF_INDEX1_BYTEORDERED, true).partitioner(ByteOrderedPartitioner.instance),
+                       SchemaLoader.compositeIndexCFMD(KEYSPACE, CF_INDEX2_BYTEORDERED, true).partitioner(ByteOrderedPartitioner.instance));
     }
 
     @Test
@@ -335,7 +339,7 @@ public class ScrubTest
 
                 for (String k : keys)
                 {
-                    PartitionUpdate update = UpdateBuilder.create(cfs.metadata, Util.dk(k))
+                    PartitionUpdate update = UpdateBuilder.create(cfs.metadata(), Util.dk(k))
                                                           .newRow("someName").add("val", "someValue")
                                                           .build();
 
@@ -447,7 +451,7 @@ public class ScrubTest
     {
         for (int i = 0; i < partitionsPerSSTable; i++)
         {
-            PartitionUpdate update = UpdateBuilder.create(cfs.metadata, String.valueOf(i))
+            PartitionUpdate update = UpdateBuilder.create(cfs.metadata(), String.valueOf(i))
                                                   .newRow("r1").add("val", "1")
                                                   .newRow("r1").add("val", "1")
                                                   .build();
@@ -463,7 +467,7 @@ public class ScrubTest
         assertTrue(values.length % 2 == 0);
         for (int i = 0; i < values.length; i +=2)
         {
-            UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, String.valueOf(i));
+            UpdateBuilder builder = UpdateBuilder.create(cfs.metadata(), String.valueOf(i));
             if (composite)
             {
                 builder.newRow("c" + i)
@@ -486,7 +490,7 @@ public class ScrubTest
     {
         for (int i = 0; i < partitionsPerSSTable; i++)
         {
-            PartitionUpdate update = UpdateBuilder.create(cfs.metadata, String.valueOf(i))
+            PartitionUpdate update = UpdateBuilder.create(cfs.metadata(), String.valueOf(i))
                                                   .newRow("r1").add("val", 100L)
                                                   .build();
             new CounterMutation(new Mutation(update), ConsistencyLevel.ONE).apply();
@@ -510,7 +514,7 @@ public class ScrubTest
         QueryProcessor.process("CREATE TABLE \"Keyspace1\".test_scrub_validation (a text primary key, b int)", ConsistencyLevel.ONE);
         ColumnFamilyStore cfs2 = keyspace.getColumnFamilyStore("test_scrub_validation");
 
-        new Mutation(UpdateBuilder.create(cfs2.metadata, "key").newRow().add("b", LongType.instance.decompose(1L)).build()).apply();
+        new Mutation(UpdateBuilder.create(cfs2.metadata(), "key").newRow().add("b", LongType.instance.decompose(1L)).build()).apply();
         cfs2.forceBlockingFlush();
 
         CompactionManager.instance.performScrub(cfs2, false, false, 2);
@@ -634,10 +638,10 @@ public class ScrubTest
         assertOrdered(Util.cmd(cfs).filterOn(colName, Operator.EQ, 1L).build(), numRows / 2);
     }
 
-    private static SSTableMultiWriter createTestWriter(Descriptor descriptor, long keyCount, CFMetaData metadata, LifecycleTransaction txn)
+    private static SSTableMultiWriter createTestWriter(Descriptor descriptor, long keyCount, TableMetadataRef metadata, LifecycleTransaction txn)
     {
-        SerializationHeader header = new SerializationHeader(true, metadata, metadata.partitionColumns(), EncodingStats.NO_STATS);
-        MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(0);
+        SerializationHeader header = new SerializationHeader(true, metadata.get(), metadata.get().regularAndStaticColumns(), EncodingStats.NO_STATS);
+        MetadataCollector collector = new MetadataCollector(metadata.get().comparator).sstableLevel(0);
         return new TestMultiWriter(new TestWriter(descriptor, keyCount, 0, metadata, collector, header, txn), txn);
     }
 
@@ -654,7 +658,7 @@ public class ScrubTest
      */
     private static class TestWriter extends BigTableWriter
     {
-        TestWriter(Descriptor descriptor, long keyCount, long repairedAt, CFMetaData metadata,
+        TestWriter(Descriptor descriptor, long keyCount, long repairedAt, TableMetadataRef metadata,
                    MetadataCollector collector, SerializationHeader header, LifecycleTransaction txn)
         {
             super(descriptor, keyCount, repairedAt, metadata, collector, header, Collections.emptySet(), txn);