You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2017/01/27 22:18:21 UTC

[15/37] cassandra git commit: Make TableMetadata immutable, optimize Schema

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/paxos/PaxosState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
index ee1ba6a..cf7f3d3 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
@@ -27,7 +27,7 @@ import com.google.common.base.Throwables;
 import com.google.common.util.concurrent.Striped;
 import com.google.common.util.concurrent.Uninterruptibles;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.tracing.Tracing;
@@ -41,7 +41,7 @@ public class PaxosState
     private final Commit accepted;
     private final Commit mostRecentCommit;
 
-    public PaxosState(DecoratedKey key, CFMetaData metadata)
+    public PaxosState(DecoratedKey key, TableMetadata metadata)
     {
         this(Commit.emptyCommit(key, metadata), Commit.emptyCommit(key, metadata), Commit.emptyCommit(key, metadata));
     }
@@ -92,7 +92,7 @@ public class PaxosState
         }
         finally
         {
-            Keyspace.open(toPrepare.update.metadata().ksName).getColumnFamilyStore(toPrepare.update.metadata().cfId).metric.casPrepare.addNano(System.nanoTime() - start);
+            Keyspace.open(toPrepare.update.metadata().keyspace).getColumnFamilyStore(toPrepare.update.metadata().id).metric.casPrepare.addNano(System.nanoTime() - start);
         }
 
     }
@@ -127,7 +127,7 @@ public class PaxosState
         }
         finally
         {
-            Keyspace.open(proposal.update.metadata().ksName).getColumnFamilyStore(proposal.update.metadata().cfId).metric.casPropose.addNano(System.nanoTime() - start);
+            Keyspace.open(proposal.update.metadata().keyspace).getColumnFamilyStore(proposal.update.metadata().id).metric.casPropose.addNano(System.nanoTime() - start);
         }
     }
 
@@ -143,7 +143,7 @@ public class PaxosState
             // erase the in-progress update.
             // The table may have been truncated since the proposal was initiated. In that case, we
             // don't want to perform the mutation and potentially resurrect truncated data
-            if (UUIDGen.unixTimestamp(proposal.ballot) >= SystemKeyspace.getTruncatedAt(proposal.update.metadata().cfId))
+            if (UUIDGen.unixTimestamp(proposal.ballot) >= SystemKeyspace.getTruncatedAt(proposal.update.metadata().id))
             {
                 Tracing.trace("Committing proposal {}", proposal);
                 Mutation mutation = proposal.makeMutation();
@@ -158,7 +158,7 @@ public class PaxosState
         }
         finally
         {
-            Keyspace.open(proposal.update.metadata().ksName).getColumnFamilyStore(proposal.update.metadata().cfId).metric.casCommit.addNano(System.nanoTime() - start);
+            Keyspace.open(proposal.update.metadata().keyspace).getColumnFamilyStore(proposal.update.metadata().id).metric.casCommit.addNano(System.nanoTime() - start);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
index 5915eab..381c498 100644
--- a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
+++ b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
@@ -28,12 +28,13 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.DecoratedKey;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.utils.UUIDGen;
@@ -49,7 +50,7 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
 
     private final Map<InetAddress, Commit> commitsByReplica = new ConcurrentHashMap<InetAddress, Commit>();
 
-    public PrepareCallback(DecoratedKey key, CFMetaData metadata, int targets, ConsistencyLevel consistency, long queryStartNanoTime)
+    public PrepareCallback(DecoratedKey key, TableMetadata metadata, int targets, ConsistencyLevel consistency, long queryStartNanoTime)
     {
         super(targets, consistency, queryStartNanoTime);
         // need to inject the right key in the empty commit so comparing with empty commits in the reply works as expected
@@ -89,7 +90,7 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
         latch.countDown();
     }
 
-    public Iterable<InetAddress> replicasMissingMostRecentCommit(CFMetaData metadata, int nowInSec)
+    public Iterable<InetAddress> replicasMissingMostRecentCommit(TableMetadata metadata, int nowInSec)
     {
         // In general, we need every replicas that have answered to the prepare (a quorum) to agree on the MRC (see
         // coment in StorageProxy.beginAndRepairPaxos(), but basically we need to make sure at least a quorum of nodes

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index fab9372..0eee6f0 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -31,8 +31,8 @@ import org.slf4j.LoggerFactory;
 
 import com.ning.compress.lzf.LZFInputStream;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
@@ -42,22 +42,19 @@ import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.util.RewindableDataInputStreamPlus;
 import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.streaming.messages.FileMessageHeader;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.io.util.TrackedInputStream;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
-import static org.apache.cassandra.utils.Throwables.extractIOExceptionCause;
-
 /**
  * StreamReader reads from stream and writes to SSTable.
  */
 public class StreamReader
 {
     private static final Logger logger = LoggerFactory.getLogger(StreamReader.class);
-    protected final UUID cfId;
+    protected final TableId tableId;
     protected final long estimatedKeys;
     protected final Collection<Pair<Long, Long>> sections;
     protected final StreamSession session;
@@ -71,7 +68,7 @@ public class StreamReader
     public StreamReader(FileMessageHeader header, StreamSession session)
     {
         this.session = session;
-        this.cfId = header.cfId;
+        this.tableId = header.tableId;
         this.estimatedKeys = header.estimatedKeys;
         this.sections = header.sections;
         this.inputVersion = header.version;
@@ -92,15 +89,11 @@ public class StreamReader
     {
         long totalSize = totalSize();
 
-        Pair<String, String> kscf = Schema.instance.getCF(cfId);
-        ColumnFamilyStore cfs = null;
-        if (kscf != null)
-            cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
-
-        if (kscf == null || cfs == null)
+        ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
+        if (cfs == null)
         {
             // schema was dropped during streaming
-            throw new IOException("CF " + cfId + " was dropped during streaming");
+            throw new IOException("CF " + tableId + " was dropped during streaming");
         }
 
         logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}'.",
@@ -108,7 +101,7 @@ public class StreamReader
                      cfs.getColumnFamilyName());
 
         TrackedInputStream in = new TrackedInputStream(new LZFInputStream(Channels.newInputStream(channel)));
-        StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion, getHeader(cfs.metadata),
+        StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata(), in, inputVersion, getHeader(cfs.metadata()),
                                                                  totalSize, session.planId());
         SSTableMultiWriter writer = null;
         try
@@ -142,7 +135,7 @@ public class StreamReader
         }
     }
 
-    protected SerializationHeader getHeader(CFMetaData metadata)
+    protected SerializationHeader getHeader(TableMetadata metadata)
     {
         return header != null? header.toHeader(metadata) : null; //pre-3.0 sstable have no SerializationHeader
     }
@@ -153,7 +146,7 @@ public class StreamReader
         if (localDir == null)
             throw new IOException(String.format("Insufficient disk space to store %s", FBUtilities.prettyPrintMemory(totalSize)));
 
-        RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, format, sstableLevel, totalSize, session.getTransaction(cfId), getHeader(cfs.metadata));
+        RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, format, sstableLevel, totalSize, session.getTransaction(tableId), getHeader(cfs.metadata()));
         StreamHook.instance.reportIncomingFile(cfs, writer, session, fileSeqNum);
         return writer;
     }
@@ -181,7 +174,7 @@ public class StreamReader
         public static final String BUFFER_FILE_PREFIX = "buf";
         public static final String BUFFER_FILE_SUFFIX = "dat";
 
-        private final CFMetaData metadata;
+        private final TableMetadata metadata;
         private final DataInputPlus in;
         private final SerializationHeader header;
         private final SerializationHelper helper;
@@ -192,7 +185,7 @@ public class StreamReader
         private Row staticRow;
         private IOException exception;
 
-        public StreamDeserializer(CFMetaData metadata, InputStream in, Version version, SerializationHeader header,
+        public StreamDeserializer(TableMetadata metadata, InputStream in, Version version, SerializationHeader header,
                                   long totalSize, UUID sessionId) throws IOException
         {
             this.metadata = metadata;
@@ -203,22 +196,22 @@ public class StreamReader
 
         public StreamDeserializer newPartition() throws IOException
         {
-            key = metadata.decorateKey(ByteBufferUtil.readWithShortLength(in));
+            key = metadata.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in));
             partitionLevelDeletion = DeletionTime.serializer.deserialize(in);
             iterator = SSTableSimpleIterator.create(metadata, in, header, helper, partitionLevelDeletion);
             staticRow = iterator.readStaticRow();
             return this;
         }
 
-        public CFMetaData metadata()
+        public TableMetadata metadata()
         {
             return metadata;
         }
 
-        public PartitionColumns columns()
+        public RegularAndStaticColumns columns()
         {
             // We don't know which columns we'll get so assume it can be all of them
-            return metadata.partitionColumns();
+            return metadata.regularAndStaticColumns();
         }
 
         public boolean isReverseOrder()
@@ -308,13 +301,13 @@ public class StreamReader
             }
         }
 
-        private static File getTempBufferFile(CFMetaData metadata, long totalSize, UUID sessionId) throws IOException
+        private static File getTempBufferFile(TableMetadata metadata, long totalSize, UUID sessionId) throws IOException
         {
-            ColumnFamilyStore cfs = Keyspace.open(metadata.ksName).getColumnFamilyStore(metadata.cfName);
+            ColumnFamilyStore cfs = Keyspace.open(metadata.keyspace).getColumnFamilyStore(metadata.name);
             if (cfs == null)
             {
                 // schema was dropped during streaming
-                throw new RuntimeException(String.format("CF %s.%s was dropped during streaming", metadata.ksName, metadata.cfName));
+                throw new RuntimeException(String.format("Table %s was dropped during streaming", metadata.toString()));
             }
 
             long maxSize = Math.min(MAX_SPILL_FILE_SIZE, totalSize);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 6c60b74..d0c4d50 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -21,7 +21,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -30,7 +29,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Mutation;
@@ -45,8 +43,8 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.Throwables;
 import org.apache.cassandra.utils.concurrent.Refs;
 
@@ -75,9 +73,9 @@ public class StreamReceiveTask extends StreamTask
 
     private int remoteSSTablesReceived = 0;
 
-    public StreamReceiveTask(StreamSession session, UUID cfId, int totalFiles, long totalSize)
+    public StreamReceiveTask(StreamSession session, TableId tableId, int totalFiles, long totalSize)
     {
-        super(session, cfId);
+        super(session, tableId);
         this.totalFiles = totalFiles;
         this.totalSize = totalSize;
         // this is an "offline" transaction, as we currently manually expose the sstables once done;
@@ -102,7 +100,7 @@ public class StreamReceiveTask extends StreamTask
         }
 
         remoteSSTablesReceived++;
-        assert cfId.equals(sstable.getCfId());
+        assert tableId.equals(sstable.getTableId());
 
         Collection<SSTableReader> finished = null;
         try
@@ -136,7 +134,7 @@ public class StreamReceiveTask extends StreamTask
     public synchronized LifecycleTransaction getTransaction()
     {
         if (done)
-            throw new RuntimeException(String.format("Stream receive task %s of cf %s already finished.", session.planId(), cfId));
+            throw new RuntimeException(String.format("Stream receive task %s of cf %s already finished.", session.planId(), tableId));
         return txn;
     }
 
@@ -156,8 +154,8 @@ public class StreamReceiveTask extends StreamTask
             ColumnFamilyStore cfs = null;
             try
             {
-                Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
-                if (kscf == null)
+                cfs = ColumnFamilyStore.getIfExists(task.tableId);
+                if (cfs == null)
                 {
                     // schema was dropped during streaming
                     task.sstables.clear();
@@ -165,9 +163,8 @@ public class StreamReceiveTask extends StreamTask
                     task.session.taskCompleted(task);
                     return;
                 }
-                cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
-                hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right));
-                hasCDC = cfs.metadata.params.cdc;
+                hasViews = !Iterables.isEmpty(View.findAll(cfs.metadata.keyspace, cfs.getTableName()));
+                hasCDC = cfs.metadata().params.cdc;
 
                 Collection<SSTableReader> readers = task.sstables;
 
@@ -193,7 +190,7 @@ public class StreamReceiveTask extends StreamTask
                                 {
                                     try (UnfilteredRowIterator rowIterator = scanner.next())
                                     {
-                                        Mutation m = new Mutation(PartitionUpdate.fromIterator(rowIterator, ColumnFilter.all(cfs.metadata)));
+                                        Mutation m = new Mutation(PartitionUpdate.fromIterator(rowIterator, ColumnFilter.all(cfs.metadata())));
 
                                         // MV *can* be applied unsafe if there's no CDC on the CFS as we flush below
                                         // before transaction is done.
@@ -215,7 +212,7 @@ public class StreamReceiveTask extends StreamTask
                         cfs.indexManager.buildAllIndexesBlocking(readers);
 
                         //invalidate row and counter cache
-                        if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
+                        if (cfs.isRowCacheEnabled() || cfs.metadata().isCounter())
                         {
                             List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
                             readers.forEach(sstable -> boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken())));
@@ -230,7 +227,7 @@ public class StreamReceiveTask extends StreamTask
                                                  cfs.keyspace.getName(), cfs.getTableName());
                             }
 
-                            if (cfs.metadata.isCounter())
+                            if (cfs.metadata().isCounter())
                             {
                                 int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
                                 if (invalidatedKeys > 0)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 736d30f..faa05d1 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -45,6 +45,7 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.metrics.StreamingMetrics;
+import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.streaming.messages.*;
 import org.apache.cassandra.utils.CassandraVersion;
@@ -147,9 +148,9 @@ public class StreamSession implements IEndpointStateChangeSubscriber
     protected final Set<StreamRequest> requests = Sets.newConcurrentHashSet();
     // streaming tasks are created and managed per ColumnFamily ID
     @VisibleForTesting
-    protected final ConcurrentHashMap<UUID, StreamTransferTask> transfers = new ConcurrentHashMap<>();
+    protected final ConcurrentHashMap<TableId, StreamTransferTask> transfers = new ConcurrentHashMap<>();
     // data receivers, filled after receiving prepare message
-    private final Map<UUID, StreamReceiveTask> receivers = new ConcurrentHashMap<>();
+    private final Map<TableId, StreamReceiveTask> receivers = new ConcurrentHashMap<>();
     private final StreamingMetrics metrics;
     /* can be null when session is created in remote */
     private final StreamConnectionFactory factory;
@@ -223,10 +224,10 @@ public class StreamSession implements IEndpointStateChangeSubscriber
     }
 
 
-    public LifecycleTransaction getTransaction(UUID cfId)
+    public LifecycleTransaction getTransaction(TableId tableId)
     {
-        assert receivers.containsKey(cfId);
-        return receivers.get(cfId).getTransaction();
+        assert receivers.containsKey(tableId);
+        return receivers.get(tableId).getTransaction();
     }
 
     private boolean isKeepAliveSupported()
@@ -424,13 +425,13 @@ public class StreamSession implements IEndpointStateChangeSubscriber
                 continue;
             }
 
-            UUID cfId = details.ref.get().metadata.cfId;
-            StreamTransferTask task = transfers.get(cfId);
+            TableId tableId = details.ref.get().metadata().id;
+            StreamTransferTask task = transfers.get(tableId);
             if (task == null)
             {
                 //guarantee atomicity
-                StreamTransferTask newTask = new StreamTransferTask(this, cfId);
-                task = transfers.putIfAbsent(cfId, newTask);
+                StreamTransferTask newTask = new StreamTransferTask(this, tableId);
+                task = transfers.putIfAbsent(tableId, newTask);
                 if (task == null)
                     task = newTask;
             }
@@ -525,7 +526,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
 
             case RECEIVED:
                 ReceivedMessage received = (ReceivedMessage) message;
-                received(received.cfId, received.sequenceNumber);
+                received(received.tableId, received.sequenceNumber);
                 break;
 
             case COMPLETE:
@@ -634,7 +635,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         StreamingMetrics.totalOutgoingBytes.inc(headerSize);
         metrics.outgoingBytes.inc(headerSize);
         // schedule timeout for receiving ACK
-        StreamTransferTask task = transfers.get(header.cfId);
+        StreamTransferTask task = transfers.get(header.tableId);
         if (task != null)
         {
             task.scheduleTimeout(header.sequenceNumber, 12, TimeUnit.HOURS);
@@ -652,8 +653,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         StreamingMetrics.totalIncomingBytes.inc(headerSize);
         metrics.incomingBytes.inc(headerSize);
         // send back file received message
-        handler.sendMessage(new ReceivedMessage(message.header.cfId, message.header.sequenceNumber));
-        receivers.get(message.header.cfId).received(message.sstable);
+        handler.sendMessage(new ReceivedMessage(message.header.tableId, message.header.sequenceNumber));
+        receivers.get(message.header.tableId).received(message.sstable);
     }
 
     public void progress(String filename, ProgressInfo.Direction direction, long bytes, long total)
@@ -662,9 +663,9 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         streamResult.handleProgress(progress);
     }
 
-    public void received(UUID cfId, int sequenceNumber)
+    public void received(TableId tableId, int sequenceNumber)
     {
-        transfers.get(cfId).complete(sequenceNumber);
+        transfers.get(tableId).complete(sequenceNumber);
     }
 
     /**
@@ -723,13 +724,13 @@ public class StreamSession implements IEndpointStateChangeSubscriber
 
     public synchronized void taskCompleted(StreamReceiveTask completedTask)
     {
-        receivers.remove(completedTask.cfId);
+        receivers.remove(completedTask.tableId);
         maybeCompleted();
     }
 
     public synchronized void taskCompleted(StreamTransferTask completedTask)
     {
-        transfers.remove(completedTask.cfId);
+        transfers.remove(completedTask.tableId);
         maybeCompleted();
     }
 
@@ -793,7 +794,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
     {
         failIfFinished();
         if (summary.files > 0)
-            receivers.put(summary.cfId, new StreamReceiveTask(this, summary.cfId, summary.files, summary.totalSize));
+            receivers.put(summary.tableId, new StreamReceiveTask(this, summary.tableId, summary.files, summary.totalSize));
     }
 
     private void startStreamingFiles()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/streaming/StreamSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSummary.java b/src/java/org/apache/cassandra/streaming/StreamSummary.java
index c427283..0d94f57 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSummary.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSummary.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.utils.UUIDSerializer;
 
 /**
@@ -37,7 +38,7 @@ public class StreamSummary implements Serializable
 {
     public static final IVersionedSerializer<StreamSummary> serializer = new StreamSummarySerializer();
 
-    public final UUID cfId;
+    public final TableId tableId;
 
     /**
      * Number of files to transfer. Can be 0 if nothing to transfer for some streaming request.
@@ -45,9 +46,9 @@ public class StreamSummary implements Serializable
     public final int files;
     public final long totalSize;
 
-    public StreamSummary(UUID cfId, int files, long totalSize)
+    public StreamSummary(TableId tableId, int files, long totalSize)
     {
-        this.cfId = cfId;
+        this.tableId = tableId;
         this.files = files;
         this.totalSize = totalSize;
     }
@@ -58,20 +59,20 @@ public class StreamSummary implements Serializable
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         StreamSummary summary = (StreamSummary) o;
-        return files == summary.files && totalSize == summary.totalSize && cfId.equals(summary.cfId);
+        return files == summary.files && totalSize == summary.totalSize && tableId.equals(summary.tableId);
     }
 
     @Override
     public int hashCode()
     {
-        return Objects.hashCode(cfId, files, totalSize);
+        return Objects.hashCode(tableId, files, totalSize);
     }
 
     @Override
     public String toString()
     {
         final StringBuilder sb = new StringBuilder("StreamSummary{");
-        sb.append("path=").append(cfId);
+        sb.append("path=").append(tableId);
         sb.append(", files=").append(files);
         sb.append(", totalSize=").append(totalSize);
         sb.append('}');
@@ -80,25 +81,24 @@ public class StreamSummary implements Serializable
 
     public static class StreamSummarySerializer implements IVersionedSerializer<StreamSummary>
     {
-        // arbitrary version is fine for UUIDSerializer for now...
         public void serialize(StreamSummary summary, DataOutputPlus out, int version) throws IOException
         {
-            UUIDSerializer.serializer.serialize(summary.cfId, out, MessagingService.current_version);
+            summary.tableId.serialize(out);
             out.writeInt(summary.files);
             out.writeLong(summary.totalSize);
         }
 
         public StreamSummary deserialize(DataInputPlus in, int version) throws IOException
         {
-            UUID cfId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
+            TableId tableId = TableId.deserialize(in);
             int files = in.readInt();
             long totalSize = in.readLong();
-            return new StreamSummary(cfId, files, totalSize);
+            return new StreamSummary(tableId, files, totalSize);
         }
 
         public long serializedSize(StreamSummary summary, int version)
         {
-            long size = UUIDSerializer.serializer.serializedSize(summary.cfId, MessagingService.current_version);
+            long size = summary.tableId.serializedSize();
             size += TypeSizes.sizeof(summary.files);
             size += TypeSizes.sizeof(summary.totalSize);
             return size;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/streaming/StreamTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTask.java b/src/java/org/apache/cassandra/streaming/StreamTask.java
index ac72cff..1e22c34 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTask.java
@@ -17,7 +17,7 @@
  */
 package org.apache.cassandra.streaming;
 
-import java.util.UUID;
+import org.apache.cassandra.schema.TableId;
 
 /**
  * StreamTask is an abstraction of the streaming task performed over specific ColumnFamily.
@@ -27,12 +27,12 @@ public abstract class StreamTask
     /** StreamSession that this task belongs */
     protected final StreamSession session;
 
-    protected final UUID cfId;
+    protected final TableId tableId;
 
-    protected StreamTask(StreamSession session, UUID cfId)
+    protected StreamTask(StreamSession session, TableId tableId)
     {
         this.session = session;
-        this.cfId = cfId;
+        this.tableId = tableId;
     }
 
     /**
@@ -56,6 +56,6 @@ public abstract class StreamTask
      */
     public StreamSummary getSummary()
     {
-        return new StreamSummary(cfId, getTotalNumberOfFiles(), getTotalSize());
+        return new StreamSummary(tableId, getTotalNumberOfFiles(), getTotalSize());
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index 4f313c3..aa3251b 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -27,6 +27,7 @@ import com.google.common.base.Throwables;
 
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.concurrent.Ref;
@@ -47,14 +48,14 @@ public class StreamTransferTask extends StreamTask
 
     private long totalSize;
 
-    public StreamTransferTask(StreamSession session, UUID cfId)
+    public StreamTransferTask(StreamSession session, TableId tableId)
     {
-        super(session, cfId);
+        super(session, tableId);
     }
 
     public synchronized void addTransferFile(Ref<SSTableReader> ref, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
     {
-        assert ref.get() != null && cfId.equals(ref.get().metadata.cfId);
+        assert ref.get() != null && tableId.equals(ref.get().metadata().id);
         OutgoingFileMessage message = new OutgoingFileMessage(ref, sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt, session.keepSSTableLevel());
         message = StreamHook.instance.reportOutgoingFile(session, ref.get(), message);
         files.put(message.header.sequenceNumber, message);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 2044d4d..6ac607f 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -25,9 +25,7 @@ import com.google.common.base.Throwables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.util.TrackedInputStream;
@@ -66,15 +64,12 @@ public class CompressedStreamReader extends StreamReader
     {
         long totalSize = totalSize();
 
-        Pair<String, String> kscf = Schema.instance.getCF(cfId);
-        ColumnFamilyStore cfs = null;
-        if (kscf != null)
-            cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+        ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
 
-        if (kscf == null || cfs == null)
+        if (cfs == null)
         {
             // schema was dropped during streaming
-            throw new IOException("CF " + cfId + " was dropped during streaming");
+            throw new IOException("CF " + tableId + " was dropped during streaming");
         }
 
         logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}'.",
@@ -85,7 +80,7 @@ public class CompressedStreamReader extends StreamReader
                                                               ChecksumType.CRC32, cfs::getCrcCheckChance);
         TrackedInputStream in = new TrackedInputStream(cis);
 
-        StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion, getHeader(cfs.metadata),
+        StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata(), in, inputVersion, getHeader(cfs.metadata()),
                                                                  totalSize, session.planId());
         SSTableMultiWriter writer = null;
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/streaming/management/StreamSummaryCompositeData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/StreamSummaryCompositeData.java b/src/java/org/apache/cassandra/streaming/management/StreamSummaryCompositeData.java
index 9ef23ab..a1f2496 100644
--- a/src/java/org/apache/cassandra/streaming/management/StreamSummaryCompositeData.java
+++ b/src/java/org/apache/cassandra/streaming/management/StreamSummaryCompositeData.java
@@ -19,18 +19,18 @@ package org.apache.cassandra.streaming.management;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.UUID;
 import javax.management.openmbean.*;
 
 import com.google.common.base.Throwables;
 
+import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.streaming.StreamSummary;
 
 /**
  */
 public class StreamSummaryCompositeData
 {
-    private static final String[] ITEM_NAMES = new String[]{"cfId",
+    private static final String[] ITEM_NAMES = new String[]{"tableId",
                                                             "files",
                                                             "totalSize"};
     private static final String[] ITEM_DESCS = new String[]{"ColumnFamilu ID",
@@ -60,7 +60,7 @@ public class StreamSummaryCompositeData
     public static CompositeData toCompositeData(StreamSummary streamSummary)
     {
         Map<String, Object> valueMap = new HashMap<>();
-        valueMap.put(ITEM_NAMES[0], streamSummary.cfId.toString());
+        valueMap.put(ITEM_NAMES[0], streamSummary.tableId.toString());
         valueMap.put(ITEM_NAMES[1], streamSummary.files);
         valueMap.put(ITEM_NAMES[2], streamSummary.totalSize);
         try
@@ -76,7 +76,7 @@ public class StreamSummaryCompositeData
     public static StreamSummary fromCompositeData(CompositeData cd)
     {
         Object[] values = cd.getAll(ITEM_NAMES);
-        return new StreamSummary(UUID.fromString((String) values[0]),
+        return new StreamSummary(TableId.fromString((String) values[0]),
                                  (int) values[1],
                                  (long) values[2]);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index b0639ea..a37420b 100644
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.streaming.messages;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.UUID;
 
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.TypeSizes;
@@ -30,9 +29,9 @@ import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.streaming.compress.CompressionInfo;
 import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.UUIDSerializer;
 
 /**
  * StreamingFileHeader is appended before sending actual data to describe what it's sending.
@@ -41,7 +40,7 @@ public class FileMessageHeader
 {
     public static FileMessageHeaderSerializer serializer = new FileMessageHeaderSerializer();
 
-    public final UUID cfId;
+    public final TableId tableId;
     public final int sequenceNumber;
     /** SSTable version */
     public final Version version;
@@ -64,7 +63,7 @@ public class FileMessageHeader
     /* cached size value */
     private transient final long size;
 
-    public FileMessageHeader(UUID cfId,
+    public FileMessageHeader(TableId tableId,
                              int sequenceNumber,
                              Version version,
                              SSTableFormat.Type format,
@@ -75,7 +74,7 @@ public class FileMessageHeader
                              int sstableLevel,
                              SerializationHeader.Component header)
     {
-        this.cfId = cfId;
+        this.tableId = tableId;
         this.sequenceNumber = sequenceNumber;
         this.version = version;
         this.format = format;
@@ -89,7 +88,7 @@ public class FileMessageHeader
         this.size = calculateSize();
     }
 
-    public FileMessageHeader(UUID cfId,
+    public FileMessageHeader(TableId tableId,
                              int sequenceNumber,
                              Version version,
                              SSTableFormat.Type format,
@@ -100,7 +99,7 @@ public class FileMessageHeader
                              int sstableLevel,
                              SerializationHeader.Component header)
     {
-        this.cfId = cfId;
+        this.tableId = tableId;
         this.sequenceNumber = sequenceNumber;
         this.version = version;
         this.format = format;
@@ -152,7 +151,7 @@ public class FileMessageHeader
     public String toString()
     {
         final StringBuilder sb = new StringBuilder("Header (");
-        sb.append("cfId: ").append(cfId);
+        sb.append("tableId: ").append(tableId);
         sb.append(", #").append(sequenceNumber);
         sb.append(", version: ").append(version);
         sb.append(", format: ").append(format);
@@ -171,13 +170,13 @@ public class FileMessageHeader
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         FileMessageHeader that = (FileMessageHeader) o;
-        return sequenceNumber == that.sequenceNumber && cfId.equals(that.cfId);
+        return sequenceNumber == that.sequenceNumber && tableId.equals(that.tableId);
     }
 
     @Override
     public int hashCode()
     {
-        int result = cfId.hashCode();
+        int result = tableId.hashCode();
         result = 31 * result + sequenceNumber;
         return result;
     }
@@ -186,7 +185,7 @@ public class FileMessageHeader
     {
         public CompressionInfo serialize(FileMessageHeader header, DataOutputPlus out, int version) throws IOException
         {
-            UUIDSerializer.serializer.serialize(header.cfId, out, version);
+            header.tableId.serialize(out);
             out.writeInt(header.sequenceNumber);
             out.writeUTF(header.version.toString());
             out.writeUTF(header.format.name);
@@ -212,7 +211,7 @@ public class FileMessageHeader
 
         public FileMessageHeader deserialize(DataInputPlus in, int version) throws IOException
         {
-            UUID cfId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
+            TableId tableId = TableId.deserialize(in);
             int sequenceNumber = in.readInt();
             Version sstableVersion = SSTableFormat.Type.current().info.getVersion(in.readUTF());
             SSTableFormat.Type format = SSTableFormat.Type.validate(in.readUTF());
@@ -227,12 +226,12 @@ public class FileMessageHeader
             int sstableLevel = in.readInt();
             SerializationHeader.Component header =  SerializationHeader.serializer.deserialize(sstableVersion, in);
 
-            return new FileMessageHeader(cfId, sequenceNumber, sstableVersion, format, estimatedKeys, sections, compressionInfo, repairedAt, sstableLevel, header);
+            return new FileMessageHeader(tableId, sequenceNumber, sstableVersion, format, estimatedKeys, sections, compressionInfo, repairedAt, sstableLevel, header);
         }
 
         public long serializedSize(FileMessageHeader header, int version)
         {
-            long size = UUIDSerializer.serializer.serializedSize(header.cfId, version);
+            long size = header.tableId.serializedSize();
             size += TypeSizes.sizeof(header.sequenceNumber);
             size += TypeSizes.sizeof(header.version.toString());
             size += TypeSizes.sizeof(header.format.name);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index 6723d17..fba9ec4 100644
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@ -72,7 +72,7 @@ public class OutgoingFileMessage extends StreamMessage
 
         SSTableReader sstable = ref.get();
         filename = sstable.getFilename();
-        this.header = new FileMessageHeader(sstable.metadata.cfId,
+        this.header = new FileMessageHeader(sstable.metadata().id,
                                             sequenceNumber,
                                             sstable.descriptor.version,
                                             sstable.descriptor.formatType,
@@ -81,7 +81,7 @@ public class OutgoingFileMessage extends StreamMessage
                                             sstable.compression ? sstable.getCompressionMetadata() : null,
                                             repairedAt,
                                             keepSSTableLevel ? sstable.getSSTableLevel() : 0,
-                                            sstable.header == null ? null : sstable.header.toComponent());
+                                            sstable.header.toComponent());
     }
 
     public synchronized void serialize(DataOutputStreamPlus out, int version, StreamSession session) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
index 251b9c8..55dd7e6 100644
--- a/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
@@ -20,14 +20,12 @@ package org.apache.cassandra.streaming.messages;
 import java.io.*;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
-import java.util.UUID;
 
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
-import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.streaming.StreamSession;
-import org.apache.cassandra.utils.UUIDSerializer;
 
 public class ReceivedMessage extends StreamMessage
 {
@@ -37,23 +35,23 @@ public class ReceivedMessage extends StreamMessage
         public ReceivedMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException
         {
             DataInputPlus input = new DataInputStreamPlus(Channels.newInputStream(in));
-            return new ReceivedMessage(UUIDSerializer.serializer.deserialize(input, MessagingService.current_version), input.readInt());
+            return new ReceivedMessage(TableId.deserialize(input), input.readInt());
         }
 
         public void serialize(ReceivedMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
         {
-            UUIDSerializer.serializer.serialize(message.cfId, out, MessagingService.current_version);
+            message.tableId.serialize(out);
             out.writeInt(message.sequenceNumber);
         }
     };
 
-    public final UUID cfId;
+    public final TableId tableId;
     public final int sequenceNumber;
 
-    public ReceivedMessage(UUID cfId, int sequenceNumber)
+    public ReceivedMessage(TableId tableId, int sequenceNumber)
     {
         super(Type.RECEIVED);
-        this.cfId = cfId;
+        this.tableId = tableId;
         this.sequenceNumber = sequenceNumber;
     }
 
@@ -61,7 +59,7 @@ public class ReceivedMessage extends StreamMessage
     public String toString()
     {
         final StringBuilder sb = new StringBuilder("Received (");
-        sb.append(cfId).append(", #").append(sequenceNumber).append(')');
+        sb.append(tableId).append(", #").append(sequenceNumber).append(')');
         return sb.toString();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tools/JsonTransformer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/JsonTransformer.java b/src/java/org/apache/cassandra/tools/JsonTransformer.java
index dde732a..1d05103 100644
--- a/src/java/org/apache/cassandra/tools/JsonTransformer.java
+++ b/src/java/org/apache/cassandra/tools/JsonTransformer.java
@@ -30,8 +30,9 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CollectionType;
@@ -46,14 +47,14 @@ import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.rows.Unfiltered;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.JsonGenerator;
 import org.codehaus.jackson.impl.Indenter;
-import org.codehaus.jackson.util.DefaultPrettyPrinter;
 import org.codehaus.jackson.util.DefaultPrettyPrinter.NopIndenter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.codehaus.jackson.util.DefaultPrettyPrinter;
 
 public final class JsonTransformer
 {
@@ -68,7 +69,7 @@ public final class JsonTransformer
 
     private final CompactIndenter arrayIndenter = new CompactIndenter();
 
-    private final CFMetaData metadata;
+    private final TableMetadata metadata;
 
     private final ISSTableScanner currentScanner;
 
@@ -76,7 +77,7 @@ public final class JsonTransformer
 
     private long currentPosition = 0;
 
-    private JsonTransformer(JsonGenerator json, ISSTableScanner currentScanner, boolean rawTime, CFMetaData metadata)
+    private JsonTransformer(JsonGenerator json, ISSTableScanner currentScanner, boolean rawTime, TableMetadata metadata)
     {
         this.json = json;
         this.metadata = metadata;
@@ -89,7 +90,7 @@ public final class JsonTransformer
         json.setPrettyPrinter(prettyPrinter);
     }
 
-    public static void toJson(ISSTableScanner currentScanner, Stream<UnfilteredRowIterator> partitions, boolean rawTime, CFMetaData metadata, OutputStream out)
+    public static void toJson(ISSTableScanner currentScanner, Stream<UnfilteredRowIterator> partitions, boolean rawTime, TableMetadata metadata, OutputStream out)
             throws IOException
     {
         try (JsonGenerator json = jsonFactory.createJsonGenerator(new OutputStreamWriter(out, StandardCharsets.UTF_8)))
@@ -101,7 +102,7 @@ public final class JsonTransformer
         }
     }
 
-    public static void keysToJson(ISSTableScanner currentScanner, Stream<DecoratedKey> keys, boolean rawTime, CFMetaData metadata, OutputStream out) throws IOException
+    public static void keysToJson(ISSTableScanner currentScanner, Stream<DecoratedKey> keys, boolean rawTime, TableMetadata metadata, OutputStream out) throws IOException
     {
         try (JsonGenerator json = jsonFactory.createJsonGenerator(new OutputStreamWriter(out, StandardCharsets.UTF_8)))
         {
@@ -119,7 +120,7 @@ public final class JsonTransformer
 
     private void serializePartitionKey(DecoratedKey key)
     {
-        AbstractType<?> keyValidator = metadata.getKeyValidator();
+        AbstractType<?> keyValidator = metadata.partitionKeyType;
         objectIndenter.setCompact(true);
         try
         {
@@ -223,7 +224,7 @@ public final class JsonTransformer
         }
         catch (IOException e)
         {
-            String key = metadata.getKeyValidator().getString(partition.partitionKey().getKey());
+            String key = metadata.partitionKeyType.getString(partition.partitionKey().getKey());
             logger.error("Fatal error parsing partition: {}", key, e);
         }
     }
@@ -334,10 +335,10 @@ public final class JsonTransformer
             objectIndenter.setCompact(true);
             json.writeStartArray();
             arrayIndenter.setCompact(true);
-            List<ColumnDefinition> clusteringColumns = metadata.clusteringColumns();
+            List<ColumnMetadata> clusteringColumns = metadata.clusteringColumns();
             for (int i = 0; i < clusteringColumns.size(); i++)
             {
-                ColumnDefinition column = clusteringColumns.get(i);
+                ColumnMetadata column = clusteringColumns.get(i);
                 if (i >= clustering.size())
                 {
                     json.writeString("*");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java b/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java
index 1f407cb..56c57d9 100644
--- a/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java
+++ b/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.tools;
 
-import java.io.IOException;
 import java.io.PrintStream;
 import java.util.Collections;
 import java.util.HashSet;
@@ -27,8 +26,8 @@ import java.util.Set;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Keyspace;
@@ -46,7 +45,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
  */
 public class SSTableExpiredBlockers
 {
-    public static void main(String[] args) throws IOException
+    public static void main(String[] args)
     {
         PrintStream out = System.out;
         if (args.length < 2)
@@ -61,11 +60,7 @@ public class SSTableExpiredBlockers
         String columnfamily = args[args.length - 1];
         Schema.instance.loadFromDisk(false);
 
-        CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnfamily);
-        if (metadata == null)
-            throw new IllegalArgumentException(String.format("Unknown keyspace/table %s.%s",
-                    keyspace,
-                    columnfamily));
+        TableMetadata metadata = Schema.instance.validateTable(keyspace, columnfamily);
 
         Keyspace ks = Keyspace.openWithoutSSTables(keyspace);
         ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnfamily);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tools/SSTableExport.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableExport.java b/src/java/org/apache/cassandra/tools/SSTableExport.java
index 52d5ecf..913ee1f 100644
--- a/src/java/org/apache/cassandra/tools/SSTableExport.java
+++ b/src/java/org/apache/cassandra/tools/SSTableExport.java
@@ -27,7 +27,7 @@ import java.util.stream.StreamSupport;
 
 import org.apache.commons.cli.*;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.db.DecoratedKey;
@@ -43,6 +43,7 @@ import org.apache.cassandra.io.sstable.KeyIterator;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
 import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
@@ -88,10 +89,10 @@ public class SSTableExport
      * Construct table schema from info stored in SSTable's Stats.db
      *
      * @param desc SSTable's descriptor
-     * @return Restored CFMetaData
+     * @return Restored TableMetadata
      * @throws IOException when Stats.db cannot be read
      */
-    public static CFMetaData metadataFromSSTable(Descriptor desc) throws IOException
+    public static TableMetadata metadataFromSSTable(Descriptor desc) throws IOException
     {
         if (!desc.version.isCompatible())
             throw new IOException("Cannot process old and unsupported SSTable version.");
@@ -101,7 +102,7 @@ public class SSTableExport
         SerializationHeader.Component header = (SerializationHeader.Component) sstableMetadata.get(MetadataType.HEADER);
         IPartitioner partitioner = FBUtilities.newPartitioner(desc);
 
-        CFMetaData.Builder builder = CFMetaData.Builder.create("keyspace", "table").withPartitioner(partitioner);
+        TableMetadata.Builder builder = TableMetadata.builder("keyspace", "table").partitioner(partitioner);
         header.getStaticColumns().entrySet().stream()
                 .forEach(entry -> {
                     ColumnIdentifier ident = ColumnIdentifier.getInterned(UTF8Type.instance.getString(entry.getKey()), true);
@@ -112,7 +113,7 @@ public class SSTableExport
                     ColumnIdentifier ident = ColumnIdentifier.getInterned(UTF8Type.instance.getString(entry.getKey()), true);
                     builder.addRegularColumn(ident, entry.getValue());
                 });
-        builder.addPartitionKey("PartitionKey", header.getKeyType());
+        builder.addPartitionKeyColumn("PartitionKey", header.getKeyType());
         for (int i = 0; i < header.getClusteringTypes().size(); i++)
         {
             builder.addClusteringColumn("clustering" + (i > 0 ? i : ""), header.getClusteringTypes().get(i));
@@ -170,7 +171,7 @@ public class SSTableExport
         Descriptor desc = Descriptor.fromFilename(ssTableFileName);
         try
         {
-            CFMetaData metadata = metadataFromSSTable(desc);
+            TableMetadata metadata = metadataFromSSTable(desc);
             if (cmd.hasOption(ENUMERATE_KEYS_OPTION))
             {
                 try (KeyIterator iter = new KeyIterator(desc, metadata))
@@ -183,14 +184,14 @@ public class SSTableExport
             }
             else
             {
-                SSTableReader sstable = SSTableReader.openNoValidation(desc, metadata);
+                SSTableReader sstable = SSTableReader.openNoValidation(desc, TableMetadataRef.forOfflineTools(metadata));
                 IPartitioner partitioner = sstable.getPartitioner();
                 final ISSTableScanner currentScanner;
                 if ((keys != null) && (keys.length > 0))
                 {
                     List<AbstractBounds<PartitionPosition>> bounds = Arrays.stream(keys)
                             .filter(key -> !excludes.contains(key))
-                            .map(metadata.getKeyValidator()::fromString)
+                            .map(metadata.partitionKeyType::fromString)
                             .map(partitioner::decorateKey)
                             .sorted()
                             .map(DecoratedKey::getToken)
@@ -202,7 +203,7 @@ public class SSTableExport
                     currentScanner = sstable.getScanner();
                 }
                 Stream<UnfilteredRowIterator> partitions = iterToStream(currentScanner).filter(i ->
-                    excludes.isEmpty() || !excludes.contains(metadata.getKeyValidator().getString(i.partitionKey().getKey()))
+                    excludes.isEmpty() || !excludes.contains(metadata.partitionKeyType.getString(i.partitionKey().getKey()))
                 );
                 if (cmd.hasOption(DEBUG_OUTPUT_OPTION))
                 {
@@ -213,19 +214,19 @@ public class SSTableExport
 
                         if (!partition.partitionLevelDeletion().isLive())
                         {
-                            System.out.println("[" + metadata.getKeyValidator().getString(partition.partitionKey().getKey()) + "]@" +
+                            System.out.println("[" + metadata.partitionKeyType.getString(partition.partitionKey().getKey()) + "]@" +
                                                position.get() + " " + partition.partitionLevelDeletion());
                         }
                         if (!partition.staticRow().isEmpty())
                         {
-                            System.out.println("[" + metadata.getKeyValidator().getString(partition.partitionKey().getKey()) + "]@" +
+                            System.out.println("[" + metadata.partitionKeyType.getString(partition.partitionKey().getKey()) + "]@" +
                                                position.get() + " " + partition.staticRow().toString(metadata, true));
                         }
                         partition.forEachRemaining(row ->
                         {
                             System.out.println(
-                                    "[" + metadata.getKeyValidator().getString(partition.partitionKey().getKey()) + "]@"
-                                            + position.get() + " " + row.toString(metadata, false, true));
+                            "[" + metadata.partitionKeyType.getString(partition.partitionKey().getKey()) + "]@"
+                            + position.get() + " " + row.toString(metadata, false, true));
                             position.set(currentScanner.getCurrentPosition());
                         });
                     });

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java b/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java
index 915edf1..3a66ef9 100644
--- a/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java
+++ b/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java
@@ -21,7 +21,7 @@ import java.io.PrintStream;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Keyspace;
@@ -69,7 +69,7 @@ public class SSTableLevelResetter
             String keyspaceName = args[1];
             String columnfamily = args[2];
             // validate columnfamily
-            if (Schema.instance.getCFMetaData(keyspaceName, columnfamily) == null)
+            if (Schema.instance.getTableMetadataRef(keyspaceName, columnfamily) == null)
             {
                 System.err.println("ColumnFamily not found: " + keyspaceName + "/" + columnfamily);
                 System.exit(1);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java b/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java
index 9f0395b..1116575 100644
--- a/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java
+++ b/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java
@@ -34,7 +34,7 @@ import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.SetMultimap;
 
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Directories;
@@ -92,7 +92,7 @@ public class SSTableOfflineRelevel
         String columnfamily = args[args.length - 1];
         Schema.instance.loadFromDisk(false);
 
-        if (Schema.instance.getCFMetaData(keyspace, columnfamily) == null)
+        if (Schema.instance.getTableMetadataRef(keyspace, columnfamily) == null)
             throw new IllegalArgumentException(String.format("Unknown keyspace/columnFamily %s.%s",
                     keyspace,
                     columnfamily));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tools/StandaloneSSTableUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneSSTableUtil.java b/src/java/org/apache/cassandra/tools/StandaloneSSTableUtil.java
index 2e8ee0b..adfe7e0 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneSSTableUtil.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneSSTableUtil.java
@@ -18,9 +18,8 @@
  */
 package org.apache.cassandra.tools;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -52,7 +51,7 @@ public class StandaloneSSTableUtil
             Util.initDatabaseDescriptor();
             Schema.instance.loadFromDisk(false);
 
-            CFMetaData metadata = Schema.instance.getCFMetaData(options.keyspaceName, options.cfName);
+            TableMetadata metadata = Schema.instance.getTableMetadata(options.keyspaceName, options.cfName);
             if (metadata == null)
                 throw new IllegalArgumentException(String.format("Unknown keyspace/table %s.%s",
                                                                  options.keyspaceName,
@@ -82,7 +81,7 @@ public class StandaloneSSTableUtil
         }
     }
 
-    private static void listFiles(Options options, CFMetaData metadata, OutputHandler handler) throws IOException
+    private static void listFiles(Options options, TableMetadata metadata, OutputHandler handler) throws IOException
     {
         Directories directories = new Directories(metadata, ColumnFamilyStore.getInitialDirectories());
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
index 54b340e..f7f48c8 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
@@ -28,7 +28,7 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import org.apache.commons.cli.*;
 
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Keyspace;
@@ -61,7 +61,7 @@ public class StandaloneScrubber
             // load keyspace descriptions.
             Schema.instance.loadFromDisk(false);
 
-            if (Schema.instance.getKSMetaData(options.keyspaceName) == null)
+            if (Schema.instance.getKeyspaceMetadata(options.keyspaceName) == null)
                 throw new IllegalArgumentException(String.format("Unknown keyspace %s", options.keyspaceName));
 
             // Do not load sstables since they might be broken

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
index aaaa9db..c5be02e 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
@@ -22,7 +22,7 @@ import java.io.File;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.commons.cli.*;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
index e55b3a8..ed25e42 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
@@ -22,7 +22,6 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.cli.*;
 
-import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Keyspace;
@@ -33,6 +32,7 @@ import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.OutputHandler;
 
@@ -55,7 +55,7 @@ public class StandaloneUpgrader
             // load keyspace descriptions.
             Schema.instance.loadFromDisk(false);
 
-            if (Schema.instance.getCFMetaData(options.keyspace, options.cf) == null)
+            if (Schema.instance.getTableMetadataRef(options.keyspace, options.cf) == null)
                 throw new IllegalArgumentException(String.format("Unknown keyspace/table %s.%s",
                                                                  options.keyspace,
                                                                  options.cf));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tools/StandaloneVerifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneVerifier.java b/src/java/org/apache/cassandra/tools/StandaloneVerifier.java
index ee55dd5..40dfbf7 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneVerifier.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneVerifier.java
@@ -18,7 +18,7 @@
  */
 package org.apache.cassandra.tools;
 
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Keyspace;
@@ -56,7 +56,7 @@ public class StandaloneVerifier
 
             boolean hasFailed = false;
 
-            if (Schema.instance.getCFMetaData(options.keyspaceName, options.cfName) == null)
+            if (Schema.instance.getTableMetadataRef(options.keyspaceName, options.cfName) == null)
                 throw new IllegalArgumentException(String.format("Unknown keyspace/table %s.%s",
                                                                  options.keyspaceName,
                                                                  options.cfName));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tools/nodetool/Cleanup.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Cleanup.java b/src/java/org/apache/cassandra/tools/nodetool/Cleanup.java
index c964b2f..4102846 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Cleanup.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Cleanup.java
@@ -24,7 +24,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import io.airlift.command.Option;
-import org.apache.cassandra.config.SchemaConstants;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.tools.NodeProbe;
 import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tools/nodetool/Repair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Repair.java b/src/java/org/apache/cassandra/tools/nodetool/Repair.java
index 350601a..48f929f 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Repair.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Repair.java
@@ -31,7 +31,7 @@ import java.util.Set;
 
 import com.google.common.collect.Sets;
 
-import org.apache.cassandra.config.SchemaConstants;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.repair.RepairParallelism;
 import org.apache.cassandra.repair.messages.RepairOption;
 import org.apache.cassandra.tools.NodeProbe;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
index ac8b4f7..20c992c 100644
--- a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
+++ b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
@@ -20,18 +20,23 @@ package org.apache.cassandra.tracing;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.SchemaConstants;
+import org.apache.cassandra.cql3.statements.CreateTableStatement;
 import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.Tables;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 
+import static java.lang.String.format;
+
 public final class TraceKeyspace
 {
     private TraceKeyspace()
@@ -41,36 +46,41 @@ public final class TraceKeyspace
     public static final String SESSIONS = "sessions";
     public static final String EVENTS = "events";
 
-    private static final CFMetaData Sessions =
-        compile(SESSIONS,
-                "tracing sessions",
-                "CREATE TABLE %s ("
-                + "session_id uuid,"
-                + "command text,"
-                + "client inet,"
-                + "coordinator inet,"
-                + "duration int,"
-                + "parameters map<text, text>,"
-                + "request text,"
-                + "started_at timestamp,"
-                + "PRIMARY KEY ((session_id)))");
-
-    private static final CFMetaData Events =
-        compile(EVENTS,
-                "tracing events",
-                "CREATE TABLE %s ("
-                + "session_id uuid,"
-                + "event_id timeuuid,"
-                + "activity text,"
-                + "source inet,"
-                + "source_elapsed int,"
-                + "thread text,"
-                + "PRIMARY KEY ((session_id), event_id))");
-
-    private static CFMetaData compile(String name, String description, String schema)
+    private static final TableMetadata Sessions =
+        parse(SESSIONS,
+              "tracing sessions",
+              "CREATE TABLE %s ("
+              + "session_id uuid,"
+              + "command text,"
+              + "client inet,"
+              + "coordinator inet,"
+              + "duration int,"
+              + "parameters map<text, text>,"
+              + "request text,"
+              + "started_at timestamp,"
+              + "PRIMARY KEY ((session_id)))");
+
+    private static final TableMetadata Events =
+        parse(EVENTS,
+              "tracing events",
+              "CREATE TABLE %s ("
+              + "session_id uuid,"
+              + "event_id timeuuid,"
+              + "activity text,"
+              + "source inet,"
+              + "source_elapsed int,"
+              + "thread text,"
+              + "PRIMARY KEY ((session_id), event_id))");
+
+    private static TableMetadata parse(String table, String description, String cql)
     {
-        return CFMetaData.compile(String.format(schema, name), SchemaConstants.TRACE_KEYSPACE_NAME)
-                         .comment(description);
+        return CreateTableStatement.parse(format(cql, table), SchemaConstants.TRACE_KEYSPACE_NAME)
+                                   .id(TableId.forSystemTable(SchemaConstants.TRACE_KEYSPACE_NAME, table))
+                                   .dcLocalReadRepairChance(0.0)
+                                   .gcGraceSeconds(0)
+                                   .memtableFlushPeriod((int) TimeUnit.HOURS.toMillis(1))
+                                   .comment(description)
+                                   .build();
     }
 
     public static KeyspaceMetadata metadata()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 1eeecac..f38d83d 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -49,6 +49,8 @@ import io.netty.util.internal.logging.Slf4JLoggerFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaChangeListener;
 import org.apache.cassandra.security.SSLFactory;
 import org.apache.cassandra.service.*;
 import org.apache.cassandra.transport.messages.EventMessage;
@@ -100,7 +102,7 @@ public class Server implements CassandraDaemon.Server
             eventExecutorGroup = builder.eventExecutorGroup;
         EventNotifier notifier = new EventNotifier(this);
         StorageService.instance.register(notifier);
-        MigrationManager.instance.register(notifier);
+        Schema.instance.registerListener(notifier);
     }
 
     public void stop()
@@ -448,7 +450,7 @@ public class Server implements CassandraDaemon.Server
         }
     }
 
-    private static class EventNotifier extends MigrationListener implements IEndpointLifecycleSubscriber
+    private static class EventNotifier extends SchemaChangeListener implements IEndpointLifecycleSubscriber
     {
         private final Server server;
 
@@ -584,12 +586,12 @@ public class Server implements CassandraDaemon.Server
             send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, ksName));
         }
 
-        public void onCreateColumnFamily(String ksName, String cfName)
+        public void onCreateTable(String ksName, String cfName)
         {
             send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TABLE, ksName, cfName));
         }
 
-        public void onCreateUserType(String ksName, String typeName)
+        public void onCreateType(String ksName, String typeName)
         {
             send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TYPE, ksName, typeName));
         }
@@ -606,28 +608,28 @@ public class Server implements CassandraDaemon.Server
                                         ksName, aggregateName, AbstractType.asCQLTypeStringList(argTypes)));
         }
 
-        public void onUpdateKeyspace(String ksName)
+        public void onAlterKeyspace(String ksName)
         {
             send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, ksName));
         }
 
-        public void onUpdateColumnFamily(String ksName, String cfName, boolean affectsStatements)
+        public void onAlterTable(String ksName, String cfName, boolean affectsStatements)
         {
             send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TABLE, ksName, cfName));
         }
 
-        public void onUpdateUserType(String ksName, String typeName)
+        public void onAlterType(String ksName, String typeName)
         {
             send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TYPE, ksName, typeName));
         }
 
-        public void onUpdateFunction(String ksName, String functionName, List<AbstractType<?>> argTypes)
+        public void onAlterFunction(String ksName, String functionName, List<AbstractType<?>> argTypes)
         {
             send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.FUNCTION,
                                         ksName, functionName, AbstractType.asCQLTypeStringList(argTypes)));
         }
 
-        public void onUpdateAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes)
+        public void onAlterAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes)
         {
             send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.AGGREGATE,
                                         ksName, aggregateName, AbstractType.asCQLTypeStringList(argTypes)));
@@ -638,12 +640,12 @@ public class Server implements CassandraDaemon.Server
             send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, ksName));
         }
 
-        public void onDropColumnFamily(String ksName, String cfName)
+        public void onDropTable(String ksName, String cfName)
         {
             send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TABLE, ksName, cfName));
         }
 
-        public void onDropUserType(String ksName, String typeName)
+        public void onDropType(String ksName, String typeName)
         {
             send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TYPE, ksName, typeName));
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/triggers/TriggerExecutor.java b/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
index 703e69a..906b342 100644
--- a/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
+++ b/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.exceptions.CassandraException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TriggerMetadata;
 import org.apache.cassandra.schema.Triggers;
 import org.apache.cassandra.utils.FBUtilities;
@@ -86,7 +87,7 @@ public class TriggerExecutor
         if (intermediate == null || intermediate.isEmpty())
             return updates;
 
-        return PartitionUpdate.merge(validateForSinglePartition(updates.metadata().cfId, updates.partitionKey(), intermediate));
+        return PartitionUpdate.merge(validateForSinglePartition(updates.metadata().id, updates.partitionKey(), intermediate));
     }
 
     /**
@@ -157,9 +158,9 @@ public class TriggerExecutor
         return merged;
     }
 
-    private List<PartitionUpdate> validateForSinglePartition(UUID cfId,
-                                                                   DecoratedKey key,
-                                                                   Collection<Mutation> tmutations)
+    private List<PartitionUpdate> validateForSinglePartition(TableId tableId,
+                                                             DecoratedKey key,
+                                                             Collection<Mutation> tmutations)
     throws InvalidRequestException
     {
         validate(tmutations);
@@ -169,7 +170,7 @@ public class TriggerExecutor
             List<PartitionUpdate> updates = Lists.newArrayList(Iterables.getOnlyElement(tmutations).getPartitionUpdates());
             if (updates.size() > 1)
                 throw new InvalidRequestException("The updates generated by triggers are not all for the same partition");
-            validateSamePartition(cfId, key, Iterables.getOnlyElement(updates));
+            validateSamePartition(tableId, key, Iterables.getOnlyElement(updates));
             return updates;
         }
 
@@ -178,20 +179,20 @@ public class TriggerExecutor
         {
             for (PartitionUpdate update : mutation.getPartitionUpdates())
             {
-                validateSamePartition(cfId, key, update);
+                validateSamePartition(tableId, key, update);
                 updates.add(update);
             }
         }
         return updates;
     }
 
-    private void validateSamePartition(UUID cfId, DecoratedKey key, PartitionUpdate update)
+    private void validateSamePartition(TableId tableId, DecoratedKey key, PartitionUpdate update)
     throws InvalidRequestException
     {
         if (!key.equals(update.partitionKey()))
             throw new InvalidRequestException("Partition key of additional mutation does not match primary update key");
 
-        if (!cfId.equals(update.metadata().cfId))
+        if (!tableId.equals(update.metadata().id))
             throw new InvalidRequestException("table of additional mutation does not match primary update table");
     }
 
@@ -211,7 +212,7 @@ public class TriggerExecutor
      */
     private List<Mutation> executeInternal(PartitionUpdate update)
     {
-        Triggers triggers = update.metadata().getTriggers();
+        Triggers triggers = update.metadata().triggers;
         if (triggers.isEmpty())
             return null;
         List<Mutation> tmutations = Lists.newLinkedList();
@@ -238,7 +239,7 @@ public class TriggerExecutor
         }
         catch (Exception ex)
         {
-            throw new RuntimeException(String.format("Exception while executing trigger on table with ID: %s", update.metadata().cfId), ex);
+            throw new RuntimeException(String.format("Exception while executing trigger on table with ID: %s", update.metadata().id), ex);
         }
         finally
         {