You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2017/01/27 22:18:26 UTC

[20/37] cassandra git commit: Make TableMetadata immutable, optimize Schema

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index 874c679..26746ad 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -24,9 +24,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
 
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.compaction.OperationType;
@@ -43,6 +41,9 @@ import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
 import org.apache.cassandra.io.sstable.metadata.MetadataType;
 import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.utils.concurrent.Transactional;
 
 /**
@@ -75,24 +76,24 @@ public abstract class SSTableWriter extends SSTable implements Transactional
     protected SSTableWriter(Descriptor descriptor,
                             long keyCount,
                             long repairedAt,
-                            CFMetaData metadata,
+                            TableMetadataRef metadata,
                             MetadataCollector metadataCollector,
                             SerializationHeader header,
                             Collection<SSTableFlushObserver> observers)
     {
-        super(descriptor, components(metadata), metadata, DatabaseDescriptor.getDiskOptimizationStrategy());
+        super(descriptor, components(metadata.get()), metadata, DatabaseDescriptor.getDiskOptimizationStrategy());
         this.keyCount = keyCount;
         this.repairedAt = repairedAt;
         this.metadataCollector = metadataCollector;
-        this.header = header != null ? header : SerializationHeader.makeWithoutStats(metadata); //null header indicates streaming from pre-3.0 sstable
-        this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata, descriptor.version, header);
+        this.header = header;
+        this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata.get(), descriptor.version, header);
         this.observers = observers == null ? Collections.emptySet() : observers;
     }
 
     public static SSTableWriter create(Descriptor descriptor,
                                        Long keyCount,
                                        Long repairedAt,
-                                       CFMetaData metadata,
+                                       TableMetadataRef metadata,
                                        MetadataCollector metadataCollector,
                                        SerializationHeader header,
                                        Collection<Index> indexes,
@@ -110,11 +111,11 @@ public abstract class SSTableWriter extends SSTable implements Transactional
                                        Collection<Index> indexes,
                                        LifecycleTransaction txn)
     {
-        CFMetaData metadata = Schema.instance.getCFMetaData(descriptor);
+        TableMetadataRef metadata = Schema.instance.getTableMetadataRef(descriptor);
         return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, header, indexes, txn);
     }
 
-    public static SSTableWriter create(CFMetaData metadata,
+    public static SSTableWriter create(TableMetadataRef metadata,
                                        Descriptor descriptor,
                                        long keyCount,
                                        long repairedAt,
@@ -123,7 +124,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
                                        Collection<Index> indexes,
                                        LifecycleTransaction txn)
     {
-        MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(sstableLevel);
+        MetadataCollector collector = new MetadataCollector(metadata.get().comparator).sstableLevel(sstableLevel);
         return create(descriptor, keyCount, repairedAt, metadata, collector, header, indexes, txn);
     }
 
@@ -138,7 +139,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
         return create(descriptor, keyCount, repairedAt, 0, header, indexes, txn);
     }
 
-    private static Set<Component> components(CFMetaData metadata)
+    private static Set<Component> components(TableMetadata metadata)
     {
         Set<Component> components = new HashSet<Component>(Arrays.asList(Component.DATA,
                 Component.PRIMARY_INDEX,
@@ -299,7 +300,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
     protected Map<MetadataType, MetadataComponent> finalizeMetadata()
     {
         return metadataCollector.finalizeMetadata(getPartitioner().getClass().getCanonicalName(),
-                                                  metadata.params.bloomFilterFpChance,
+                                                  metadata().params.bloomFilterFpChance,
                                                   repairedAt,
                                                   header);
     }
@@ -329,7 +330,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
         public abstract SSTableWriter open(Descriptor descriptor,
                                            long keyCount,
                                            long repairedAt,
-                                           CFMetaData metadata,
+                                           TableMetadataRef metadata,
                                            MetadataCollector metadataCollector,
                                            SerializationHeader header,
                                            Collection<SSTableFlushObserver> observers,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index 02b685b..d949197 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@ -20,7 +20,8 @@ package org.apache.cassandra.io.sstable.format.big;
 import java.util.Collection;
 import java.util.Set;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -71,7 +72,7 @@ public class BigFormat implements SSTableFormat
     }
 
     @Override
-    public RowIndexEntry.IndexSerializer getIndexSerializer(CFMetaData metadata, Version version, SerializationHeader header)
+    public RowIndexEntry.IndexSerializer getIndexSerializer(TableMetadata metadata, Version version, SerializationHeader header)
     {
         return new RowIndexEntry.Serializer(version, header);
     }
@@ -82,7 +83,7 @@ public class BigFormat implements SSTableFormat
         public SSTableWriter open(Descriptor descriptor,
                                   long keyCount,
                                   long repairedAt,
-                                  CFMetaData metadata,
+                                  TableMetadataRef metadata,
                                   MetadataCollector metadataCollector,
                                   SerializationHeader header,
                                   Collection<SSTableFlushObserver> observers,
@@ -95,7 +96,7 @@ public class BigFormat implements SSTableFormat
     static class ReaderFactory extends SSTableReader.Factory
     {
         @Override
-        public SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, Long maxDataAge, StatsMetadata sstableMetadata, SSTableReader.OpenReason openReason, SerializationHeader header)
+        public SSTableReader open(Descriptor descriptor, Set<Component> components, TableMetadataRef metadata, Long maxDataAge, StatsMetadata sstableMetadata, SSTableReader.OpenReason openReason, SerializationHeader header)
         {
             return new BigTableReader(descriptor, components, metadata, maxDataAge, sstableMetadata, openReason, header);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
index 1fded2f..c29bc5d 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
@@ -17,12 +17,18 @@
  */
 package org.apache.cassandra.io.sstable.format.big;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.cache.KeyCacheKey;
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.columniterator.SSTableIterator;
 import org.apache.cassandra.db.columniterator.SSTableReversedIterator;
+import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.Rows;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.rows.UnfilteredRowIterators;
@@ -33,14 +39,9 @@ import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
 
 /**
  * SSTableReaders are open()ed by Keyspace.onStart; after that they are created by SSTableWriter.renameAndOpen.
@@ -50,7 +51,7 @@ public class BigTableReader extends SSTableReader
 {
     private static final Logger logger = LoggerFactory.getLogger(BigTableReader.class);
 
-    BigTableReader(Descriptor desc, Set<Component> components, CFMetaData metadata, Long maxDataAge, StatsMetadata sstableMetadata, OpenReason openReason, SerializationHeader header)
+    BigTableReader(Descriptor desc, Set<Component> components, TableMetadataRef metadata, Long maxDataAge, StatsMetadata sstableMetadata, OpenReason openReason, SerializationHeader header)
     {
         super(desc, components, metadata, maxDataAge, sstableMetadata, openReason, header);
     }
@@ -64,7 +65,7 @@ public class BigTableReader extends SSTableReader
     public UnfilteredRowIterator iterator(FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, Slices slices, ColumnFilter selectedColumns, boolean reversed)
     {
         if (indexEntry == null)
-            return UnfilteredRowIterators.noRowsIterator(metadata, key, Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE, reversed);
+            return UnfilteredRowIterators.noRowsIterator(metadata(), key, Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE, reversed);
         return reversed
              ? new SSTableReversedIterator(this, file, key, indexEntry, slices, selectedColumns, ifile)
              : new SSTableIterator(this, file, key, indexEntry, slices, selectedColumns, ifile);
@@ -145,7 +146,9 @@ public class BigTableReader extends SSTableReader
         // next, the key cache (only make sense for valid row key)
         if ((op == Operator.EQ || op == Operator.GE) && (key instanceof DecoratedKey))
         {
-            RowIndexEntry cachedPosition = getCachedPosition((DecoratedKey)key, updateCacheAndStats);
+            DecoratedKey decoratedKey = (DecoratedKey)key;
+            KeyCacheKey cacheKey = new KeyCacheKey(metadata(), descriptor, decoratedKey.getKey());
+            RowIndexEntry cachedPosition = getCachedPosition(cacheKey, updateCacheAndStats);
             if (cachedPosition != null)
             {
                 Tracing.trace("Key cache hit for sstable {}", descriptor.generation);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
index 1b33f5b..716ef4c 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
@@ -21,10 +21,10 @@ import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.AbstractIterator;
 import com.google.common.collect.Iterators;
 
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.filter.*;
@@ -68,7 +68,7 @@ public class BigTableScanner implements ISSTableScanner
     // Full scan of the sstables
     public static ISSTableScanner getScanner(SSTableReader sstable)
     {
-        return new BigTableScanner(sstable, ColumnFilter.all(sstable.metadata), null, Iterators.singletonIterator(fullRange(sstable)));
+        return new BigTableScanner(sstable, ColumnFilter.all(sstable.metadata()), null, Iterators.singletonIterator(fullRange(sstable)));
     }
 
     public static ISSTableScanner getScanner(SSTableReader sstable, ColumnFilter columns, DataRange dataRange)
@@ -83,12 +83,12 @@ public class BigTableScanner implements ISSTableScanner
         if (positions.isEmpty())
             return new EmptySSTableScanner(sstable);
 
-        return new BigTableScanner(sstable, ColumnFilter.all(sstable.metadata), null, makeBounds(sstable, tokenRanges).iterator());
+        return new BigTableScanner(sstable, ColumnFilter.all(sstable.metadata()), null, makeBounds(sstable, tokenRanges).iterator());
     }
 
     public static ISSTableScanner getScanner(SSTableReader sstable, Iterator<AbstractBounds<PartitionPosition>> rangeIterator)
     {
-        return new BigTableScanner(sstable, ColumnFilter.all(sstable.metadata), null, rangeIterator);
+        return new BigTableScanner(sstable, ColumnFilter.all(sstable.metadata()), null, rangeIterator);
     }
 
     private BigTableScanner(SSTableReader sstable, ColumnFilter columns, DataRange dataRange, Iterator<AbstractBounds<PartitionPosition>> rangeIterator)
@@ -100,7 +100,7 @@ public class BigTableScanner implements ISSTableScanner
         this.sstable = sstable;
         this.columns = columns;
         this.dataRange = dataRange;
-        this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata,
+        this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata(),
                                                                                                         sstable.descriptor.version,
                                                                                                         sstable.header);
         this.rangeIterator = rangeIterator;
@@ -237,9 +237,10 @@ public class BigTableScanner implements ISSTableScanner
         return sstable.toString();
     }
 
-    public CFMetaData metadata()
+
+    public TableMetadata metadata()
     {
-        return sstable.metadata;
+        return sstable.metadata();
     }
 
     public boolean hasNext()
@@ -414,9 +415,9 @@ public class BigTableScanner implements ISSTableScanner
             return sstable.getFilename();
         }
 
-        public CFMetaData metadata()
+        public TableMetadata metadata()
         {
-            return sstable.metadata;
+            return sstable.metadata();
         }
 
         public boolean hasNext()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 018edac..e134f2d 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -29,7 +29,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cache.ChunkCache;
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
@@ -47,6 +46,7 @@ import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
 import org.apache.cassandra.io.sstable.metadata.MetadataType;
 import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.concurrent.Transactional;
 
@@ -71,7 +71,7 @@ public class BigTableWriter extends SSTableWriter
     public BigTableWriter(Descriptor descriptor,
                           long keyCount,
                           long repairedAt,
-                          CFMetaData metadata,
+                          TableMetadataRef metadata,
                           MetadataCollector metadataCollector, 
                           SerializationHeader header,
                           Collection<SSTableFlushObserver> observers,
@@ -86,7 +86,7 @@ public class BigTableWriter extends SSTableWriter
                                              descriptor.filenameFor(Component.COMPRESSION_INFO),
                                              new File(descriptor.filenameFor(Component.DIGEST)),
                                              writerOption,
-                                             metadata.params.compression,
+                                             metadata().params.compression,
                                              metadataCollector);
         }
         else
@@ -207,8 +207,8 @@ public class BigTableWriter extends SSTableWriter
     {
         if (rowSize > DatabaseDescriptor.getCompactionLargePartitionWarningThreshold())
         {
-            String keyString = metadata.getKeyValidator().getString(key.getKey());
-            logger.warn("Writing large partition {}/{}:{} ({}) to sstable {}", metadata.ksName, metadata.cfName, keyString, FBUtilities.prettyPrintMemory(rowSize), getFilename());
+            String keyString = metadata().partitionKeyType.getString(key.getKey());
+            logger.warn("Writing large partition {}/{}:{} ({}) to sstable {}", metadata.keyspace, metadata.name, keyString, FBUtilities.prettyPrintMemory(rowSize), getFilename());
         }
     }
 
@@ -280,7 +280,7 @@ public class BigTableWriter extends SSTableWriter
         StatsMetadata stats = statsMetadata();
         assert boundary.indexLength > 0 && boundary.dataLength > 0;
         // open the reader early
-        IndexSummary indexSummary = iwriter.summary.build(metadata.partitioner, boundary);
+        IndexSummary indexSummary = iwriter.summary.build(metadata().partitioner, boundary);
         long indexFileLength = new File(descriptor.filenameFor(Component.PRIMARY_INDEX)).length();
         int indexBufferSize = optimizationStrategy.bufferSize(indexFileLength / indexSummary.size());
         FileHandle ifile = iwriter.builder.bufferSize(indexBufferSize).complete(boundary.indexLength);
@@ -326,7 +326,7 @@ public class BigTableWriter extends SSTableWriter
 
         StatsMetadata stats = statsMetadata();
         // finalize in-memory state for the reader
-        IndexSummary indexSummary = iwriter.summary.build(this.metadata.partitioner);
+        IndexSummary indexSummary = iwriter.summary.build(metadata().partitioner);
         long indexFileLength = new File(descriptor.filenameFor(Component.PRIMARY_INDEX)).length();
         int dataBufferSize = optimizationStrategy.bufferSize(stats.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile()));
         int indexBufferSize = optimizationStrategy.bufferSize(indexFileLength / indexSummary.size());
@@ -337,7 +337,7 @@ public class BigTableWriter extends SSTableWriter
         invalidateCacheAtBoundary(dfile);
         SSTableReader sstable = SSTableReader.internalOpen(descriptor,
                                                            components,
-                                                           this.metadata,
+                                                           metadata,
                                                            ifile,
                                                            dfile,
                                                            indexSummary,
@@ -441,8 +441,8 @@ public class BigTableWriter extends SSTableWriter
             indexFile = new SequentialWriter(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), writerOption);
             builder = new FileHandle.Builder(descriptor.filenameFor(Component.PRIMARY_INDEX)).mmapped(DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap);
             chunkCache.ifPresent(builder::withChunkCache);
-            summary = new IndexSummaryBuilder(keyCount, metadata.params.minIndexInterval, Downsampling.BASE_SAMPLING_LEVEL);
-            bf = FilterFactory.getFilter(keyCount, metadata.params.bloomFilterFpChance, true);
+            summary = new IndexSummaryBuilder(keyCount, metadata().params.minIndexInterval, Downsampling.BASE_SAMPLING_LEVEL);
+            bf = FilterFactory.getFilter(keyCount, metadata().params.bloomFilterFpChance, true);
             // register listeners to be alerted when the data files are flushed
             indexFile.setPostFlushListener(() -> summary.markIndexSynced(indexFile.getLastFlushOffset()));
             dataFile.setPostFlushListener(() -> summary.markDataSynced(dataFile.getLastFlushOffset()));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
index 96ca38c..d54090b 100644
--- a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
@@ -24,12 +24,12 @@ import com.codahale.metrics.Counter;
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Meter;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.compaction.CompactionInfo;
 import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
 
 import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
 
@@ -110,25 +110,25 @@ public class CompactionMetrics implements CompactionManager.CompactionExecutorSt
                 // currently running compactions
                 for (CompactionInfo.Holder compaction : compactions)
                 {
-                    CFMetaData metaData = compaction.getCompactionInfo().getCFMetaData();
+                    TableMetadata metaData = compaction.getCompactionInfo().getTableMetadata();
                     if (metaData == null)
                     {
                         continue;
                     }
-                    if (!resultMap.containsKey(metaData.ksName))
+                    if (!resultMap.containsKey(metaData.keyspace))
                     {
-                        resultMap.put(metaData.ksName, new HashMap<>());
+                        resultMap.put(metaData.keyspace, new HashMap<>());
                     }
 
-                    Map<String, Integer> tableNameToCountMap = resultMap.get(metaData.ksName);
-                    if (tableNameToCountMap.containsKey(metaData.cfName))
+                    Map<String, Integer> tableNameToCountMap = resultMap.get(metaData.keyspace);
+                    if (tableNameToCountMap.containsKey(metaData.name))
                     {
-                        tableNameToCountMap.put(metaData.cfName,
-                                                tableNameToCountMap.get(metaData.cfName) + 1);
+                        tableNameToCountMap.put(metaData.name,
+                                                tableNameToCountMap.get(metaData.name) + 1);
                     }
                     else
                     {
-                        tableNameToCountMap.put(metaData.cfName, 1);
+                        tableNameToCountMap.put(metaData.name, 1);
                     }
                 }
                 return resultMap;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index 0a726d4..126abed 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -27,8 +27,8 @@ import com.google.common.collect.Maps;
 
 import com.codahale.metrics.*;
 import com.codahale.metrics.Timer;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.config.SchemaConstants;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Memtable;
@@ -668,7 +668,7 @@ public class TableMetrics
 
         // We do not want to capture view mutation specific metrics for a view
         // They only makes sense to capture on the base table
-        if (cfs.metadata.isView())
+        if (cfs.metadata().isView())
         {
             viewLockAcquireTime = null;
             viewReadTime = null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
index 53e53a4..67e54c8 100644
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
@@ -36,9 +36,8 @@ import net.jpountz.lz4.LZ4Factory;
 import net.jpountz.xxhash.XXHashFactory;
 
 import org.apache.cassandra.config.Config;
-import org.xerial.snappy.SnappyInputStream;
+import org.apache.cassandra.exceptions.UnknownTableException;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.UnknownColumnFamilyException;
 import org.apache.cassandra.db.monitoring.ApproximateTime;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
@@ -98,9 +97,9 @@ public class IncomingTcpConnection extends FastThreadLocalThread implements Clos
             logger.trace("eof reading from socket; closing", e);
             // connection will be reset so no need to throw an exception.
         }
-        catch (UnknownColumnFamilyException e)
+        catch (UnknownTableException e)
         {
-            logger.warn("UnknownColumnFamilyException reading from socket; closing", e);
+            logger.warn("UnknownTableException reading from socket; closing", e);
         }
         catch (IOException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 7215397..ef521ef 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -74,6 +74,8 @@ import org.apache.cassandra.metrics.ConnectionMetrics;
 import org.apache.cassandra.metrics.DroppedMessageMetrics;
 import org.apache.cassandra.metrics.MessagingMetrics;
 import org.apache.cassandra.repair.messages.RepairMessage;
+import org.apache.cassandra.schema.MigrationManager;
+import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.security.SSLFactory;
 import org.apache.cassandra.service.*;
 import org.apache.cassandra.service.paxos.Commit;
@@ -1119,9 +1121,9 @@ public final class MessagingService implements MessagingServiceMBean
     {
         assert mutation != null : "Mutation should not be null when updating dropped mutations count";
 
-        for (UUID columnFamilyId : mutation.getColumnFamilyIds())
+        for (TableId tableId : mutation.getTableIds())
         {
-            ColumnFamilyStore cfs = Keyspace.open(mutation.getKeyspaceName()).getColumnFamilyStore(columnFamilyId);
+            ColumnFamilyStore cfs = Keyspace.open(mutation.getKeyspaceName()).getColumnFamilyStore(tableId);
             if (cfs != null)
             {
                 cfs.metric.droppedMutations.inc();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index 52625bf..d7736f0 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.*;
+import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.ActiveRepairService;
 
 /**
@@ -56,14 +57,14 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
                 case PREPARE_MESSAGE:
                     PrepareMessage prepareMessage = (PrepareMessage) message.payload;
                     logger.debug("Preparing, {}", prepareMessage);
-                    List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>(prepareMessage.cfIds.size());
-                    for (UUID cfId : prepareMessage.cfIds)
+                    List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>(prepareMessage.tableIds.size());
+                    for (TableId tableId : prepareMessage.tableIds)
                     {
-                        ColumnFamilyStore columnFamilyStore = ColumnFamilyStore.getIfExists(cfId);
+                        ColumnFamilyStore columnFamilyStore = ColumnFamilyStore.getIfExists(tableId);
                         if (columnFamilyStore == null)
                         {
                             logErrorAndSendFailureResponse(String.format("Table with id %s was dropped during prepare phase of repair",
-                                                                         cfId.toString()), message.from, id);
+                                                                         tableId), message.from, id);
                             return;
                         }
                         columnFamilyStores.add(columnFamilyStore);
@@ -91,7 +92,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
                     ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId);
                     if (prs.isGlobal)
                     {
-                        prs.maybeSnapshot(cfs.metadata.cfId, desc.parentSessionId);
+                        prs.maybeSnapshot(cfs.metadata.id, desc.parentSessionId);
                     }
                     else
                     {
@@ -100,7 +101,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
                             public boolean apply(SSTableReader sstable)
                             {
                                 return sstable != null &&
-                                       !sstable.metadata.isIndex() && // exclude SSTables from 2i
+                                       !sstable.metadata().isIndex() && // exclude SSTables from 2i
                                        new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(desc.ranges);
                             }
                         }, true, false); //ephemeral snapshot, if repair fails, it will be cleaned next startup

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index c9eed54..6f7297b 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.config.SchemaConstants;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
index 638cf38..fc7aab4 100644
--- a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
+++ b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
@@ -24,7 +24,6 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.concurrent.TimeUnit;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -38,11 +37,10 @@ import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.SchemaConstants;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.cql3.statements.CreateTableStatement;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.Range;
@@ -50,9 +48,14 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.repair.messages.RepairOption;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.Tables;
 import org.apache.cassandra.utils.FBUtilities;
 
+import static java.lang.String.format;
+
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 
 public final class SystemDistributedKeyspace
@@ -69,56 +72,58 @@ public final class SystemDistributedKeyspace
 
     public static final String VIEW_BUILD_STATUS = "view_build_status";
 
-    private static final CFMetaData RepairHistory =
-        compile(REPAIR_HISTORY,
-                "Repair history",
-                "CREATE TABLE %s ("
-                     + "keyspace_name text,"
-                     + "columnfamily_name text,"
-                     + "id timeuuid,"
-                     + "parent_id timeuuid,"
-                     + "range_begin text,"
-                     + "range_end text,"
-                     + "coordinator inet,"
-                     + "participants set<inet>,"
-                     + "exception_message text,"
-                     + "exception_stacktrace text,"
-                     + "status text,"
-                     + "started_at timestamp,"
-                     + "finished_at timestamp,"
-                     + "PRIMARY KEY ((keyspace_name, columnfamily_name), id))");
-
-    private static final CFMetaData ParentRepairHistory =
-        compile(PARENT_REPAIR_HISTORY,
-                "Repair history",
-                "CREATE TABLE %s ("
-                     + "parent_id timeuuid,"
-                     + "keyspace_name text,"
-                     + "columnfamily_names set<text>,"
-                     + "started_at timestamp,"
-                     + "finished_at timestamp,"
-                     + "exception_message text,"
-                     + "exception_stacktrace text,"
-                     + "requested_ranges set<text>,"
-                     + "successful_ranges set<text>,"
-                     + "options map<text, text>,"
-                     + "PRIMARY KEY (parent_id))");
-
-    private static final CFMetaData ViewBuildStatus =
-    compile(VIEW_BUILD_STATUS,
-            "Materialized View build status",
-            "CREATE TABLE %s ("
-                     + "keyspace_name text,"
-                     + "view_name text,"
-                     + "host_id uuid,"
-                     + "status text,"
-                     + "PRIMARY KEY ((keyspace_name, view_name), host_id))");
-
-    private static CFMetaData compile(String name, String description, String schema)
+    private static final TableMetadata RepairHistory =
+        parse(REPAIR_HISTORY,
+              "Repair history",
+              "CREATE TABLE %s ("
+              + "keyspace_name text,"
+              + "columnfamily_name text,"
+              + "id timeuuid,"
+              + "parent_id timeuuid,"
+              + "range_begin text,"
+              + "range_end text,"
+              + "coordinator inet,"
+              + "participants set<inet>,"
+              + "exception_message text,"
+              + "exception_stacktrace text,"
+              + "status text,"
+              + "started_at timestamp,"
+              + "finished_at timestamp,"
+              + "PRIMARY KEY ((keyspace_name, columnfamily_name), id))");
+
+    private static final TableMetadata ParentRepairHistory =
+        parse(PARENT_REPAIR_HISTORY,
+              "Repair history",
+              "CREATE TABLE %s ("
+              + "parent_id timeuuid,"
+              + "keyspace_name text,"
+              + "columnfamily_names set<text>,"
+              + "started_at timestamp,"
+              + "finished_at timestamp,"
+              + "exception_message text,"
+              + "exception_stacktrace text,"
+              + "requested_ranges set<text>,"
+              + "successful_ranges set<text>,"
+              + "options map<text, text>,"
+              + "PRIMARY KEY (parent_id))");
+
+    private static final TableMetadata ViewBuildStatus =
+        parse(VIEW_BUILD_STATUS,
+              "Materialized View build status",
+              "CREATE TABLE %s ("
+              + "keyspace_name text,"
+              + "view_name text,"
+              + "host_id uuid,"
+              + "status text,"
+              + "PRIMARY KEY ((keyspace_name, view_name), host_id))");
+
+    private static TableMetadata parse(String table, String description, String cql)
     {
-        return CFMetaData.compile(String.format(schema, name), SchemaConstants.DISTRIBUTED_KEYSPACE_NAME)
-                         .comment(description)
-                         .gcGraceSeconds((int) TimeUnit.DAYS.toSeconds(10));
+        return CreateTableStatement.parse(format(cql, table), SchemaConstants.DISTRIBUTED_KEYSPACE_NAME)
+                                   .id(TableId.forSystemTable(SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, table))
+                                   .dcLocalReadRepairChance(0.0)
+                                   .comment(description)
+                                   .build();
     }
 
     public static KeyspaceMetadata metadata()
@@ -131,7 +136,7 @@ public final class SystemDistributedKeyspace
         Collection<Range<Token>> ranges = options.getRanges();
         String query = "INSERT INTO %s.%s (parent_id, keyspace_name, columnfamily_names, requested_ranges, started_at,          options)"+
                                  " VALUES (%s,        '%s',          { '%s' },           { '%s' },          toTimestamp(now()), { %s })";
-        String fmtQry = String.format(query,
+        String fmtQry = format(query,
                                       SchemaConstants.DISTRIBUTED_KEYSPACE_NAME,
                                       PARENT_REPAIR_HISTORY,
                                       parent_id.toString(),
@@ -154,7 +159,7 @@ public final class SystemDistributedKeyspace
                 if (!first)
                     map.append(',');
                 first = false;
-                map.append(String.format("'%s': '%s'", entry.getKey(), entry.getValue()));
+                map.append(format("'%s': '%s'", entry.getKey(), entry.getValue()));
             }
         }
         return map.toString();
@@ -167,14 +172,14 @@ public final class SystemDistributedKeyspace
         StringWriter sw = new StringWriter();
         PrintWriter pw = new PrintWriter(sw);
         t.printStackTrace(pw);
-        String fmtQuery = String.format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, PARENT_REPAIR_HISTORY, parent_id.toString());
+        String fmtQuery = format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, PARENT_REPAIR_HISTORY, parent_id.toString());
         processSilent(fmtQuery, t.getMessage(), sw.toString());
     }
 
     public static void successfulParentRepair(UUID parent_id, Collection<Range<Token>> successfulRanges)
     {
         String query = "UPDATE %s.%s SET finished_at = toTimestamp(now()), successful_ranges = {'%s'} WHERE parent_id=%s";
-        String fmtQuery = String.format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, PARENT_REPAIR_HISTORY, Joiner.on("','").join(successfulRanges), parent_id.toString());
+        String fmtQuery = format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, PARENT_REPAIR_HISTORY, Joiner.on("','").join(successfulRanges), parent_id.toString());
         processSilent(fmtQuery);
     }
 
@@ -194,7 +199,7 @@ public final class SystemDistributedKeyspace
         {
             for (Range<Token> range : ranges)
             {
-                String fmtQry = String.format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, REPAIR_HISTORY,
+                String fmtQry = format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, REPAIR_HISTORY,
                                               keyspaceName,
                                               cfname,
                                               id.toString(),
@@ -218,7 +223,7 @@ public final class SystemDistributedKeyspace
     public static void successfulRepairJob(UUID id, String keyspaceName, String cfname)
     {
         String query = "UPDATE %s.%s SET status = '%s', finished_at = toTimestamp(now()) WHERE keyspace_name = '%s' AND columnfamily_name = '%s' AND id = %s";
-        String fmtQuery = String.format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, REPAIR_HISTORY,
+        String fmtQuery = format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, REPAIR_HISTORY,
                                         RepairState.SUCCESS.toString(),
                                         keyspaceName,
                                         cfname,
@@ -232,7 +237,7 @@ public final class SystemDistributedKeyspace
         StringWriter sw = new StringWriter();
         PrintWriter pw = new PrintWriter(sw);
         t.printStackTrace(pw);
-        String fmtQry = String.format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, REPAIR_HISTORY,
+        String fmtQry = format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, REPAIR_HISTORY,
                                       RepairState.FAILED.toString(),
                                       keyspaceName,
                                       cfname,
@@ -243,7 +248,7 @@ public final class SystemDistributedKeyspace
     public static void startViewBuild(String keyspace, String view, UUID hostId)
     {
         String query = "INSERT INTO %s.%s (keyspace_name, view_name, host_id, status) VALUES (?, ?, ?, ?)";
-        QueryProcessor.process(String.format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, VIEW_BUILD_STATUS),
+        QueryProcessor.process(format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, VIEW_BUILD_STATUS),
                                ConsistencyLevel.ONE,
                                Lists.newArrayList(bytes(keyspace),
                                                   bytes(view),
@@ -254,7 +259,7 @@ public final class SystemDistributedKeyspace
     public static void successfulViewBuild(String keyspace, String view, UUID hostId)
     {
         String query = "UPDATE %s.%s SET status = ? WHERE keyspace_name = ? AND view_name = ? AND host_id = ?";
-        QueryProcessor.process(String.format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, VIEW_BUILD_STATUS),
+        QueryProcessor.process(format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, VIEW_BUILD_STATUS),
                                ConsistencyLevel.ONE,
                                Lists.newArrayList(bytes(BuildStatus.SUCCESS.toString()),
                                                   bytes(keyspace),
@@ -268,7 +273,7 @@ public final class SystemDistributedKeyspace
         UntypedResultSet results;
         try
         {
-            results = QueryProcessor.execute(String.format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, VIEW_BUILD_STATUS),
+            results = QueryProcessor.execute(format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, VIEW_BUILD_STATUS),
                                              ConsistencyLevel.ONE,
                                              keyspace,
                                              view);
@@ -290,7 +295,7 @@ public final class SystemDistributedKeyspace
     public static void setViewRemoved(String keyspaceName, String viewName)
     {
         String buildReq = "DELETE FROM %s.%s WHERE keyspace_name = ? AND view_name = ?";
-        QueryProcessor.executeInternal(String.format(buildReq, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, VIEW_BUILD_STATUS), keyspaceName, viewName);
+        QueryProcessor.executeInternal(format(buildReq, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, VIEW_BUILD_STATUS), keyspaceName, viewName);
         forceBlockingFlush(VIEW_BUILD_STATUS);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
index b3efeae..9903114 100644
--- a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
@@ -30,13 +30,14 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.utils.UUIDSerializer;
 
 
 public class PrepareMessage extends RepairMessage
 {
     public final static MessageSerializer serializer = new PrepareMessageSerializer();
-    public final List<UUID> cfIds;
+    public final List<TableId> tableIds;
     public final Collection<Range<Token>> ranges;
 
     public final UUID parentRepairSession;
@@ -44,11 +45,11 @@ public class PrepareMessage extends RepairMessage
     public final long timestamp;
     public final boolean isGlobal;
 
-    public PrepareMessage(UUID parentRepairSession, List<UUID> cfIds, Collection<Range<Token>> ranges, boolean isIncremental, long timestamp, boolean isGlobal)
+    public PrepareMessage(UUID parentRepairSession, List<TableId> tableIds, Collection<Range<Token>> ranges, boolean isIncremental, long timestamp, boolean isGlobal)
     {
         super(Type.PREPARE_MESSAGE, null);
         this.parentRepairSession = parentRepairSession;
-        this.cfIds = cfIds;
+        this.tableIds = tableIds;
         this.ranges = ranges;
         this.isIncremental = isIncremental;
         this.timestamp = timestamp;
@@ -66,23 +67,23 @@ public class PrepareMessage extends RepairMessage
                isIncremental == other.isIncremental &&
                isGlobal == other.isGlobal &&
                timestamp == other.timestamp &&
-               cfIds.equals(other.cfIds) &&
+               tableIds.equals(other.tableIds) &&
                ranges.equals(other.ranges);
     }
 
     @Override
     public int hashCode()
     {
-        return Objects.hash(messageType, parentRepairSession, isGlobal, isIncremental, timestamp, cfIds, ranges);
+        return Objects.hash(messageType, parentRepairSession, isGlobal, isIncremental, timestamp, tableIds, ranges);
     }
 
     public static class PrepareMessageSerializer implements MessageSerializer<PrepareMessage>
     {
         public void serialize(PrepareMessage message, DataOutputPlus out, int version) throws IOException
         {
-            out.writeInt(message.cfIds.size());
-            for (UUID cfId : message.cfIds)
-                UUIDSerializer.serializer.serialize(cfId, out, version);
+            out.writeInt(message.tableIds.size());
+            for (TableId tableId : message.tableIds)
+                tableId.serialize(out);
             UUIDSerializer.serializer.serialize(message.parentRepairSession, out, version);
             out.writeInt(message.ranges.size());
             for (Range<Token> r : message.ranges)
@@ -97,10 +98,10 @@ public class PrepareMessage extends RepairMessage
 
         public PrepareMessage deserialize(DataInputPlus in, int version) throws IOException
         {
-            int cfIdCount = in.readInt();
-            List<UUID> cfIds = new ArrayList<>(cfIdCount);
-            for (int i = 0; i < cfIdCount; i++)
-                cfIds.add(UUIDSerializer.serializer.deserialize(in, version));
+            int tableIdCount = in.readInt();
+            List<TableId> tableIds = new ArrayList<>(tableIdCount);
+            for (int i = 0; i < tableIdCount; i++)
+                tableIds.add(TableId.deserialize(in));
             UUID parentRepairSession = UUIDSerializer.serializer.deserialize(in, version);
             int rangeCount = in.readInt();
             List<Range<Token>> ranges = new ArrayList<>(rangeCount);
@@ -109,15 +110,15 @@ public class PrepareMessage extends RepairMessage
             boolean isIncremental = in.readBoolean();
             long timestamp = in.readLong();
             boolean isGlobal = in.readBoolean();
-            return new PrepareMessage(parentRepairSession, cfIds, ranges, isIncremental, timestamp, isGlobal);
+            return new PrepareMessage(parentRepairSession, tableIds, ranges, isIncremental, timestamp, isGlobal);
         }
 
         public long serializedSize(PrepareMessage message, int version)
         {
             long size;
-            size = TypeSizes.sizeof(message.cfIds.size());
-            for (UUID cfId : message.cfIds)
-                size += UUIDSerializer.serializer.serializedSize(cfId, version);
+            size = TypeSizes.sizeof(message.tableIds.size());
+            for (TableId tableId : message.tableIds)
+                size += tableId.serializedSize();
             size += UUIDSerializer.serializer.serializedSize(message.parentRepairSession, version);
             size += TypeSizes.sizeof(message.ranges.size());
             for (Range<Token> r : message.ranges)
@@ -133,12 +134,12 @@ public class PrepareMessage extends RepairMessage
     public String toString()
     {
         return "PrepareMessage{" +
-                "cfIds='" + cfIds + '\'' +
-                ", ranges=" + ranges +
-                ", parentRepairSession=" + parentRepairSession +
-                ", isIncremental="+isIncremental +
-                ", timestamp=" + timestamp +
-                ", isGlobal=" + isGlobal +
-                '}';
+               "tableIds='" + tableIds + '\'' +
+               ", ranges=" + ranges +
+               ", parentRepairSession=" + parentRepairSession +
+               ", isIncremental=" + isIncremental +
+               ", timestamp=" + timestamp +
+               ", isGlobal=" + isGlobal +
+               '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/ColumnMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/ColumnMetadata.java b/src/java/org/apache/cassandra/schema/ColumnMetadata.java
new file mode 100644
index 0000000..788351e
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/ColumnMetadata.java
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.schema;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import com.google.common.collect.Collections2;
+
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.selection.Selectable;
+import org.apache.cassandra.cql3.selection.Selector;
+import org.apache.cassandra.cql3.selection.SimpleSelector;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public final class ColumnMetadata extends ColumnSpecification implements Selectable, Comparable<ColumnMetadata>
+{
+    public static final Comparator<Object> asymmetricColumnDataComparator =
+        (a, b) -> ((ColumnData) a).column().compareTo((ColumnMetadata) b);
+
+    public static final int NO_POSITION = -1;
+
+    public enum ClusteringOrder
+    {
+        ASC, DESC, NONE
+    }
+
+    /**
+     * The type of CQL3 column this definition represents.
+     * There is 4 main type of CQL3 columns: those parts of the partition key,
+     * those parts of the clustering columns and amongst the others, regular and
+     * static ones.
+     *
+     * IMPORTANT: this enum is serialized as toString() and deserialized by calling
+     * Kind.valueOf(), so do not override toString() or rename existing values.
+     */
+    public enum Kind
+    {
+        // NOTE: if adding a new type, must modify comparisonOrder
+        PARTITION_KEY,
+        CLUSTERING,
+        REGULAR,
+        STATIC;
+
+        public boolean isPrimaryKeyKind()
+        {
+            return this == PARTITION_KEY || this == CLUSTERING;
+        }
+
+    }
+
+    public final Kind kind;
+
+    /*
+     * If the column is a partition key or clustering column, its position relative to
+     * other columns of the same kind. Otherwise,  NO_POSITION (-1).
+     *
+     * Note that partition key and clustering columns are numbered separately so
+     * the first clustering column is 0.
+     */
+    private final int position;
+
+    private final Comparator<CellPath> cellPathComparator;
+    private final Comparator<Object> asymmetricCellPathComparator;
+    private final Comparator<? super Cell> cellComparator;
+
+    private int hash;
+
+    /**
+     * These objects are compared frequently, so we encode several of their comparison components
+     * into a single long value so that this can be done efficiently
+     */
+    private final long comparisonOrder;
+
+    private static long comparisonOrder(Kind kind, boolean isComplex, long position, ColumnIdentifier name)
+    {
+        assert position >= 0 && position < 1 << 12;
+        return   (((long) kind.ordinal()) << 61)
+               | (isComplex ? 1L << 60 : 0)
+               | (position << 48)
+               | (name.prefixComparison >>> 16);
+    }
+
+    public static ColumnMetadata partitionKeyColumn(TableMetadata table, ByteBuffer name, AbstractType<?> type, int position)
+    {
+        return new ColumnMetadata(table, name, type, position, Kind.PARTITION_KEY);
+    }
+
+    public static ColumnMetadata partitionKeyColumn(String keyspace, String table, String name, AbstractType<?> type, int position)
+    {
+        return new ColumnMetadata(keyspace, table, ColumnIdentifier.getInterned(name, true), type, position, Kind.PARTITION_KEY);
+    }
+
+    public static ColumnMetadata clusteringColumn(TableMetadata table, ByteBuffer name, AbstractType<?> type, int position)
+    {
+        return new ColumnMetadata(table, name, type, position, Kind.CLUSTERING);
+    }
+
+    public static ColumnMetadata clusteringColumn(String keyspace, String table, String name, AbstractType<?> type, int position)
+    {
+        return new ColumnMetadata(keyspace, table, ColumnIdentifier.getInterned(name, true), type, position, Kind.CLUSTERING);
+    }
+
+    public static ColumnMetadata regularColumn(TableMetadata table, ByteBuffer name, AbstractType<?> type)
+    {
+        return new ColumnMetadata(table, name, type, NO_POSITION, Kind.REGULAR);
+    }
+
+    public static ColumnMetadata regularColumn(String keyspace, String table, String name, AbstractType<?> type)
+    {
+        return new ColumnMetadata(keyspace, table, ColumnIdentifier.getInterned(name, true), type, NO_POSITION, Kind.REGULAR);
+    }
+
+    public static ColumnMetadata staticColumn(TableMetadata table, ByteBuffer name, AbstractType<?> type)
+    {
+        return new ColumnMetadata(table, name, type, NO_POSITION, Kind.STATIC);
+    }
+
+    public static ColumnMetadata staticColumn(String keyspace, String table, String name, AbstractType<?> type)
+    {
+        return new ColumnMetadata(keyspace, table, ColumnIdentifier.getInterned(name, true), type, NO_POSITION, Kind.STATIC);
+    }
+
+    public ColumnMetadata(TableMetadata table, ByteBuffer name, AbstractType<?> type, int position, Kind kind)
+    {
+        this(table.keyspace,
+             table.name,
+             ColumnIdentifier.getInterned(name, table.columnDefinitionNameComparator(kind)),
+             type,
+             position,
+             kind);
+    }
+
+    @VisibleForTesting
+    public ColumnMetadata(String ksName,
+                          String cfName,
+                          ColumnIdentifier name,
+                          AbstractType<?> type,
+                          int position,
+                          Kind kind)
+    {
+        super(ksName, cfName, name, type);
+        assert name != null && type != null && kind != null;
+        assert name.isInterned();
+        assert (position == NO_POSITION) == !kind.isPrimaryKeyKind(); // The position really only make sense for partition and clustering columns (and those must have one),
+                                                                      // so make sure we don't sneak it for something else since it'd breaks equals()
+        this.kind = kind;
+        this.position = position;
+        this.cellPathComparator = makeCellPathComparator(kind, type);
+        this.cellComparator = cellPathComparator == null ? ColumnData.comparator : (a, b) -> cellPathComparator.compare(a.path(), b.path());
+        this.asymmetricCellPathComparator = cellPathComparator == null ? null : (a, b) -> cellPathComparator.compare(((Cell)a).path(), (CellPath) b);
+        this.comparisonOrder = comparisonOrder(kind, isComplex(), Math.max(0, position), name);
+    }
+
+    private static Comparator<CellPath> makeCellPathComparator(Kind kind, AbstractType<?> type)
+    {
+        if (kind.isPrimaryKeyKind() || !type.isMultiCell())
+            return null;
+
+        AbstractType<?> nameComparator = type.isCollection()
+                                       ? ((CollectionType) type).nameComparator()
+                                       : ((UserType) type).nameComparator();
+
+
+        return (path1, path2) ->
+        {
+            if (path1.size() == 0 || path2.size() == 0)
+            {
+                if (path1 == CellPath.BOTTOM)
+                    return path2 == CellPath.BOTTOM ? 0 : -1;
+                if (path1 == CellPath.TOP)
+                    return path2 == CellPath.TOP ? 0 : 1;
+                return path2 == CellPath.BOTTOM ? 1 : -1;
+            }
+
+            // This will get more complicated once we have non-frozen UDT and nested collections
+            assert path1.size() == 1 && path2.size() == 1;
+            return nameComparator.compare(path1.get(0), path2.get(0));
+        };
+    }
+
+    public ColumnMetadata copy()
+    {
+        return new ColumnMetadata(ksName, cfName, name, type, position, kind);
+    }
+
+    public ColumnMetadata withNewName(ColumnIdentifier newName)
+    {
+        return new ColumnMetadata(ksName, cfName, newName, type, position, kind);
+    }
+
+    public ColumnMetadata withNewType(AbstractType<?> newType)
+    {
+        return new ColumnMetadata(ksName, cfName, name, newType, position, kind);
+    }
+
+    public boolean isPartitionKey()
+    {
+        return kind == Kind.PARTITION_KEY;
+    }
+
+    public boolean isClusteringColumn()
+    {
+        return kind == Kind.CLUSTERING;
+    }
+
+    public boolean isStatic()
+    {
+        return kind == Kind.STATIC;
+    }
+
+    public boolean isRegular()
+    {
+        return kind == Kind.REGULAR;
+    }
+
+    public ClusteringOrder clusteringOrder()
+    {
+        if (!isClusteringColumn())
+            return ClusteringOrder.NONE;
+
+        return type.isReversed() ? ClusteringOrder.DESC : ClusteringOrder.ASC;
+    }
+
+    public int position()
+    {
+        return position;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+            return true;
+
+        if (!(o instanceof ColumnMetadata))
+            return false;
+
+        ColumnMetadata cd = (ColumnMetadata) o;
+
+        return Objects.equal(ksName, cd.ksName)
+            && Objects.equal(cfName, cd.cfName)
+            && Objects.equal(name, cd.name)
+            && Objects.equal(type, cd.type)
+            && Objects.equal(kind, cd.kind)
+            && Objects.equal(position, cd.position);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        // This achieves the same as Objects.hashcode, but avoids the object array allocation
+        // which features significantly in the allocation profile and caches the result.
+        int result = hash;
+        if (result == 0)
+        {
+            result = 31 + (ksName == null ? 0 : ksName.hashCode());
+            result = 31 * result + (cfName == null ? 0 : cfName.hashCode());
+            result = 31 * result + (name == null ? 0 : name.hashCode());
+            result = 31 * result + (type == null ? 0 : type.hashCode());
+            result = 31 * result + (kind == null ? 0 : kind.hashCode());
+            result = 31 * result + position;
+            hash = result;
+        }
+        return result;
+    }
+
+    @Override
+    public String toString()
+    {
+        return name.toString();
+    }
+
+    public String debugString()
+    {
+        return MoreObjects.toStringHelper(this)
+                          .add("name", name)
+                          .add("type", type)
+                          .add("kind", kind)
+                          .add("position", position)
+                          .toString();
+    }
+
+    public boolean isPrimaryKeyColumn()
+    {
+        return kind.isPrimaryKeyKind();
+    }
+
+    /**
+     * Converts the specified column definitions into column identifiers.
+     *
+     * @param definitions the column definitions to convert.
+     * @return the column identifiers corresponding to the specified definitions
+     */
+    public static Collection<ColumnIdentifier> toIdentifiers(Collection<ColumnMetadata> definitions)
+    {
+        return Collections2.transform(definitions, columnDef -> columnDef.name);
+    }
+
+    public int compareTo(ColumnMetadata other)
+    {
+        if (this == other)
+            return 0;
+
+        if (comparisonOrder != other.comparisonOrder)
+            return Long.compare(comparisonOrder, other.comparisonOrder);
+
+        return this.name.compareTo(other.name);
+    }
+
+    public Comparator<CellPath> cellPathComparator()
+    {
+        return cellPathComparator;
+    }
+
+    public Comparator<Object> asymmetricCellPathComparator()
+    {
+        return asymmetricCellPathComparator;
+    }
+
+    public Comparator<? super Cell> cellComparator()
+    {
+        return cellComparator;
+    }
+
+    public boolean isComplex()
+    {
+        return cellPathComparator != null;
+    }
+
+    public boolean isSimple()
+    {
+        return !isComplex();
+    }
+
+    public CellPath.Serializer cellPathSerializer()
+    {
+        // Collections are our only complex so far, so keep it simple
+        return CollectionType.cellPathSerializer;
+    }
+
+    public void validateCell(Cell cell)
+    {
+        if (cell.isTombstone())
+        {
+            if (cell.value().hasRemaining())
+                throw new MarshalException("A tombstone should not have a value");
+            if (cell.path() != null)
+                validateCellPath(cell.path());
+        }
+        else if(type.isUDT())
+        {
+            // To validate a non-frozen UDT field, both the path and the value
+            // are needed, the path being an index into an array of value types.
+            ((UserType)type).validateCell(cell);
+        }
+        else
+        {
+            type.validateCellValue(cell.value());
+            if (cell.path() != null)
+                validateCellPath(cell.path());
+        }
+    }
+
+    private void validateCellPath(CellPath path)
+    {
+        if (!isComplex())
+            throw new MarshalException("Only complex cells should have a cell path");
+
+        assert type.isMultiCell();
+        if (type.isCollection())
+            ((CollectionType)type).nameComparator().validate(path.get(0));
+        else
+            ((UserType)type).nameComparator().validate(path.get(0));
+    }
+
+    public static String toCQLString(Iterable<ColumnMetadata> defs)
+    {
+        return toCQLString(defs.iterator());
+    }
+
+    public static String toCQLString(Iterator<ColumnMetadata> defs)
+    {
+        if (!defs.hasNext())
+            return "";
+
+        StringBuilder sb = new StringBuilder();
+        sb.append(defs.next().name);
+        while (defs.hasNext())
+            sb.append(", ").append(defs.next().name);
+        return sb.toString();
+    }
+
+    /**
+     * The type of the cell values for cell belonging to this column.
+     *
+     * This is the same than the column type, except for collections where it's the 'valueComparator'
+     * of the collection.
+     */
+    public AbstractType<?> cellValueType()
+    {
+        return type instanceof CollectionType
+             ? ((CollectionType)type).valueComparator()
+             : type;
+    }
+
+    public Selector.Factory newSelectorFactory(TableMetadata table, AbstractType<?> expectedType, List<ColumnMetadata> defs, VariableSpecifications boundNames) throws InvalidRequestException
+    {
+        return SimpleSelector.newFactory(this, addAndGetIndex(this, defs));
+    }
+
+    public AbstractType<?> getExactTypeIfKnown(String keyspace)
+    {
+        return type;
+    }
+
+    /**
+     * Because legacy-created tables may have a non-text comparator, we cannot determine the proper 'key' until
+     * we know the comparator. ColumnMetadata.Raw is a placeholder that can be converted to a real ColumnIdentifier
+     * once the comparator is known with prepare(). This should only be used with identifiers that are actual
+     * column names. See CASSANDRA-8178 for more background.
+     */
+    public static abstract class Raw extends Selectable.Raw
+    {
+        /**
+         * Creates a {@code ColumnMetadata.Raw} from an unquoted identifier string.
+         */
+        public static Raw forUnquoted(String text)
+        {
+            return new Literal(text, false);
+        }
+
+        /**
+         * Creates a {@code ColumnMetadata.Raw} from a quoted identifier string.
+         */
+        public static Raw forQuoted(String text)
+        {
+            return new Literal(text, true);
+        }
+
+        /**
+         * Creates a {@code ColumnMetadata.Raw} from a pre-existing {@code ColumnMetadata}
+         * (useful in the rare cases where we already have the column but need
+         * a {@code ColumnMetadata.Raw} for typing purposes).
+         */
+        public static Raw forColumn(ColumnMetadata column)
+        {
+            return new ForColumn(column);
+        }
+
+        /**
+         * Get the identifier corresponding to this raw column, without assuming this is an
+         * existing column (unlike {@link Selectable.Raw#prepare}).
+         */
+        public abstract ColumnIdentifier getIdentifier(TableMetadata table);
+
+        public abstract String rawText();
+
+        @Override
+        public abstract ColumnMetadata prepare(TableMetadata table);
+
+        @Override
+        public boolean processesSelection()
+        {
+            return false;
+        }
+
+        @Override
+        public final int hashCode()
+        {
+            return toString().hashCode();
+        }
+
+        @Override
+        public final boolean equals(Object o)
+        {
+            if(!(o instanceof Raw))
+                return false;
+
+            Raw that = (Raw)o;
+            return this.toString().equals(that.toString());
+        }
+
+        private static class Literal extends Raw
+        {
+            private final String text;
+
+            public Literal(String rawText, boolean keepCase)
+            {
+                this.text =  keepCase ? rawText : rawText.toLowerCase(Locale.US);
+            }
+
+            public ColumnIdentifier getIdentifier(TableMetadata table)
+            {
+                if (!table.isStaticCompactTable())
+                    return ColumnIdentifier.getInterned(text, true);
+
+                AbstractType<?> columnNameType = table.staticCompactOrSuperTableColumnNameType();
+                if (columnNameType instanceof UTF8Type)
+                    return ColumnIdentifier.getInterned(text, true);
+
+                // We have a legacy-created table with a non-text comparator. Check if we have a matching column, otherwise assume we should use
+                // columnNameType
+                ByteBuffer bufferName = ByteBufferUtil.bytes(text);
+                for (ColumnMetadata def : table.columns())
+                {
+                    if (def.name.bytes.equals(bufferName))
+                        return def.name;
+                }
+                return ColumnIdentifier.getInterned(columnNameType, columnNameType.fromString(text), text);
+            }
+
+            public ColumnMetadata prepare(TableMetadata table)
+            {
+                if (!table.isStaticCompactTable())
+                    return find(table);
+
+                AbstractType<?> columnNameType = table.staticCompactOrSuperTableColumnNameType();
+                if (columnNameType instanceof UTF8Type)
+                    return find(table);
+
+                // We have a legacy-created table with a non-text comparator. Check if we have a match column, otherwise assume we should use
+                // columnNameType
+                ByteBuffer bufferName = ByteBufferUtil.bytes(text);
+                for (ColumnMetadata def : table.columns())
+                {
+                    if (def.name.bytes.equals(bufferName))
+                        return def;
+                }
+                return find(columnNameType.fromString(text), table);
+            }
+
+            private ColumnMetadata find(TableMetadata table)
+            {
+                return find(ByteBufferUtil.bytes(text), table);
+            }
+
+            private ColumnMetadata find(ByteBuffer id, TableMetadata table)
+            {
+                ColumnMetadata def = table.getColumn(id);
+                if (def == null)
+                    throw new InvalidRequestException(String.format("Undefined column name %s", toString()));
+                return def;
+            }
+
+            public String rawText()
+            {
+                return text;
+            }
+
+            @Override
+            public String toString()
+            {
+                return ColumnIdentifier.maybeQuote(text);
+            }
+        }
+
+        // Use internally in the rare case where we need a ColumnMetadata.Raw for type-checking but
+        // actually already have the column itself.
+        private static class ForColumn extends Raw
+        {
+            private final ColumnMetadata column;
+
+            private ForColumn(ColumnMetadata column)
+            {
+                this.column = column;
+            }
+
+            public ColumnIdentifier getIdentifier(TableMetadata table)
+            {
+                return column.name;
+            }
+
+            public ColumnMetadata prepare(TableMetadata table)
+            {
+                assert table.getColumn(column.name) != null; // Sanity check that we're not doing something crazy
+                return column;
+            }
+
+            public String rawText()
+            {
+                return column.name.toString();
+            }
+
+            @Override
+            public String toString()
+            {
+                return column.name.toCQLString();
+            }
+        }
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/CompactionParams.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/CompactionParams.java b/src/java/org/apache/cassandra/schema/CompactionParams.java
index 73271f1..1a4a5f1 100644
--- a/src/java/org/apache/cassandra/schema/CompactionParams.java
+++ b/src/java/org/apache/cassandra/schema/CompactionParams.java
@@ -249,7 +249,7 @@ public final class CompactionParams
         return create(classFromName(className), options);
     }
 
-    private static Class<? extends AbstractCompactionStrategy> classFromName(String name)
+    public static Class<? extends AbstractCompactionStrategy> classFromName(String name)
     {
         String className = name.contains(".")
                          ? name

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/DroppedColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/DroppedColumn.java b/src/java/org/apache/cassandra/schema/DroppedColumn.java
new file mode 100644
index 0000000..90dfe65
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/DroppedColumn.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.schema;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+
+public final class DroppedColumn
+{
+    public final ColumnMetadata column;
+    public final long droppedTime; // drop timestamp, in microseconds, yet with millisecond granularity
+
+    public DroppedColumn(ColumnMetadata column, long droppedTime)
+    {
+        this.column = column;
+        this.droppedTime = droppedTime;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+            return true;
+
+        if (!(o instanceof DroppedColumn))
+            return false;
+
+        DroppedColumn dc = (DroppedColumn) o;
+
+        return column.equals(dc.column) && droppedTime == dc.droppedTime;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hashCode(column, droppedTime);
+    }
+
+    @Override
+    public String toString()
+    {
+        return MoreObjects.toStringHelper(this).add("column", column).add("droppedTime", droppedTime).toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/Functions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Functions.java b/src/java/org/apache/cassandra/schema/Functions.java
index a936d81..8e3a3f1 100644
--- a/src/java/org/apache/cassandra/schema/Functions.java
+++ b/src/java/org/apache/cassandra/schema/Functions.java
@@ -22,9 +22,12 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
 
 import org.apache.cassandra.cql3.functions.*;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.utils.Pair;
 
 import static com.google.common.collect.Iterables.filter;
 
@@ -82,6 +85,28 @@ public final class Functions implements Iterable<Function>
         return stream().filter(f -> f instanceof UDAggregate).map(f -> (UDAggregate) f);
     }
 
+    MapDifference<Pair<FunctionName, List<String>>, UDFunction> udfsDiff(Functions other)
+    {
+        Map<Pair<FunctionName, List<String>>, UDFunction> before = new HashMap<>();
+        udfs().forEach(f -> before.put(Pair.create(f.name(), f.argumentsList()), f));
+
+        Map<Pair<FunctionName, List<String>>, UDFunction> after = new HashMap<>();
+        other.udfs().forEach(f -> after.put(Pair.create(f.name(), f.argumentsList()), f));
+
+        return Maps.difference(before, after);
+    }
+
+    MapDifference<Pair<FunctionName, List<String>>, UDAggregate> udasDiff(Functions other)
+    {
+        Map<Pair<FunctionName, List<String>>, UDAggregate> before = new HashMap<>();
+        udas().forEach(f -> before.put(Pair.create(f.name(), f.argumentsList()), f));
+
+        Map<Pair<FunctionName, List<String>>, UDAggregate> after = new HashMap<>();
+        other.udas().forEach(f -> after.put(Pair.create(f.name(), f.argumentsList()), f));
+
+        return Maps.difference(before, after);
+    }
+
     /**
      * @return a collection of aggregates that use the provided function as either a state or a final function
      * @param function the referree function
@@ -206,7 +231,7 @@ public final class Functions implements Iterable<Function>
         private Builder()
         {
             // we need deterministic iteration order; otherwise Functions.equals() breaks down
-            functions.orderValuesBy((f1, f2) -> Integer.compare(f1.hashCode(), f2.hashCode()));
+            functions.orderValuesBy(Comparator.comparingInt(Object::hashCode));
         }
 
         public Functions build()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/IndexMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/IndexMetadata.java b/src/java/org/apache/cassandra/schema/IndexMetadata.java
index b8c4854..190871a 100644
--- a/src/java/org/apache/cassandra/schema/IndexMetadata.java
+++ b/src/java/org/apache/cassandra/schema/IndexMetadata.java
@@ -31,10 +31,10 @@ import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.statements.IndexTarget;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.UnknownIndexException;
 import org.apache.cassandra.index.Index;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -81,15 +81,14 @@ public final class IndexMetadata
         return new IndexMetadata(name, options, kind);
     }
 
-    public static IndexMetadata fromIndexTargets(CFMetaData cfm,
-                                                 List<IndexTarget> targets,
+    public static IndexMetadata fromIndexTargets(List<IndexTarget> targets,
                                                  String name,
                                                  Kind kind,
                                                  Map<String, String> options)
     {
         Map<String, String> newOptions = new HashMap<>(options);
         newOptions.put(IndexTarget.TARGET_OPTION_NAME, targets.stream()
-                                                              .map(target -> target.asCqlString(cfm))
+                                                              .map(target -> target.asCqlString())
                                                               .collect(Collectors.joining(", ")));
         return new IndexMetadata(name, newOptions, kind);
     }
@@ -107,7 +106,7 @@ public final class IndexMetadata
             return PATTERN_NON_WORD_CHAR.matcher(cfName + "_" + root + "_idx").replaceAll("");
     }
 
-    public void validate(CFMetaData cfm)
+    public void validate(TableMetadata table)
     {
         if (!isNameValid(name))
             throw new ConfigurationException("Illegal index name " + name);
@@ -122,29 +121,25 @@ public final class IndexMetadata
                                                                name, IndexTarget.CUSTOM_INDEX_OPTION_NAME));
             String className = options.get(IndexTarget.CUSTOM_INDEX_OPTION_NAME);
             Class<Index> indexerClass = FBUtilities.classForName(className, "custom indexer");
-            if(!Index.class.isAssignableFrom(indexerClass))
+            if (!Index.class.isAssignableFrom(indexerClass))
                 throw new ConfigurationException(String.format("Specified Indexer class (%s) does not implement the Indexer interface", className));
-            validateCustomIndexOptions(cfm, indexerClass, options);
+            validateCustomIndexOptions(table, indexerClass, options);
         }
     }
 
-    private void validateCustomIndexOptions(CFMetaData cfm,
-                                            Class<? extends Index> indexerClass,
-                                            Map<String, String> options)
-    throws ConfigurationException
+    private void validateCustomIndexOptions(TableMetadata table, Class<? extends Index> indexerClass, Map<String, String> options)
     {
         try
         {
-            Map<String, String> filteredOptions =
-                Maps.filterKeys(options,key -> !key.equals(IndexTarget.CUSTOM_INDEX_OPTION_NAME));
+            Map<String, String> filteredOptions = Maps.filterKeys(options, key -> !key.equals(IndexTarget.CUSTOM_INDEX_OPTION_NAME));
 
             if (filteredOptions.isEmpty())
                 return;
 
-            Map<?,?> unknownOptions;
+            Map<?, ?> unknownOptions;
             try
             {
-                unknownOptions = (Map) indexerClass.getMethod("validateOptions", Map.class, CFMetaData.class).invoke(null, filteredOptions, cfm);
+                unknownOptions = (Map) indexerClass.getMethod("validateOptions", Map.class, TableMetadata.class).invoke(null, filteredOptions, table);
             }
             catch (NoSuchMethodException e)
             {
@@ -190,6 +185,7 @@ public final class IndexMetadata
         return kind == Kind.COMPOSITES;
     }
 
+    @Override
     public int hashCode()
     {
         return Objects.hashCode(id, name, kind, options);
@@ -198,9 +194,10 @@ public final class IndexMetadata
     public boolean equalsWithoutName(IndexMetadata other)
     {
         return Objects.equal(kind, other.kind)
-            && Objects.equal(options, other.options);
+               && Objects.equal(options, other.options);
     }
 
+    @Override
     public boolean equals(Object obj)
     {
         if (obj == this)
@@ -209,19 +206,25 @@ public final class IndexMetadata
         if (!(obj instanceof IndexMetadata))
             return false;
 
-        IndexMetadata other = (IndexMetadata)obj;
+        IndexMetadata other = (IndexMetadata) obj;
 
         return Objects.equal(id, other.id) && Objects.equal(name, other.name) && equalsWithoutName(other);
     }
 
+    @Override
     public String toString()
     {
         return new ToStringBuilder(this)
-            .append("id", id.toString())
-            .append("name", name)
-            .append("kind", kind)
-            .append("options", options)
-            .build();
+               .append("id", id.toString())
+               .append("name", name)
+               .append("kind", kind)
+               .append("options", options)
+               .build();
+    }
+
+    public String toCQLString()
+    {
+        return ColumnIdentifier.maybeQuote(name);
     }
 
     public static class Serializer
@@ -231,10 +234,10 @@ public final class IndexMetadata
             UUIDSerializer.serializer.serialize(metadata.id, out, version);
         }
 
-        public IndexMetadata deserialize(DataInputPlus in, int version, CFMetaData cfm) throws IOException
+        public IndexMetadata deserialize(DataInputPlus in, int version, TableMetadata table) throws IOException
         {
             UUID id = UUIDSerializer.serializer.deserialize(in, version);
-            return cfm.getIndexes().get(id).orElseThrow(() -> new UnknownIndexException(cfm, id));
+            return table.indexes.get(id).orElseThrow(() -> new UnknownIndexException(table, id));
         }
 
         public long serializedSize(IndexMetadata metadata, int version)