You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2017/01/27 22:18:28 UTC

[22/37] cassandra git commit: Make TableMetadata immutable, optimize Schema

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java b/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
index edbda1c..e276f62 100644
--- a/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
+++ b/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
@@ -23,8 +23,9 @@ import java.util.*;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.PeekingIterator;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.*;
@@ -45,11 +46,11 @@ public class ViewUpdateGenerator
     private final View view;
     private final int nowInSec;
 
-    private final CFMetaData baseMetadata;
+    private final TableMetadata baseMetadata;
     private final DecoratedKey baseDecoratedKey;
     private final ByteBuffer[] basePartitionKey;
 
-    private final CFMetaData viewMetadata;
+    private final TableMetadata viewMetadata;
 
     private final Map<DecoratedKey, PartitionUpdate> updates = new HashMap<>();
 
@@ -87,9 +88,9 @@ public class ViewUpdateGenerator
 
         this.baseMetadata = view.getDefinition().baseTableMetadata();
         this.baseDecoratedKey = basePartitionKey;
-        this.basePartitionKey = extractKeyComponents(basePartitionKey, baseMetadata.getKeyValidator());
+        this.basePartitionKey = extractKeyComponents(basePartitionKey, baseMetadata.partitionKeyType);
 
-        this.viewMetadata = view.getDefinition().metadata;
+        this.viewMetadata = Schema.instance.getTableMetadata(view.getDefinition().metadata.id);
 
         this.currentViewEntryPartitionKey = new ByteBuffer[viewMetadata.partitionKeyColumns().size()];
         this.currentViewEntryBuilder = BTreeRow.sortedBuilder();
@@ -191,7 +192,7 @@ public class ViewUpdateGenerator
                  : (mergedHasLiveData ? UpdateAction.NEW_ENTRY : UpdateAction.NONE);
         }
 
-        ColumnDefinition baseColumn = view.baseNonPKColumnsInViewPK.get(0);
+        ColumnMetadata baseColumn = view.baseNonPKColumnsInViewPK.get(0);
         assert !baseColumn.isComplex() : "A complex column couldn't be part of the view PK";
         Cell before = existingBaseRow == null ? null : existingBaseRow.getCell(baseColumn);
         Cell after = mergedBaseRow.getCell(baseColumn);
@@ -237,7 +238,7 @@ public class ViewUpdateGenerator
 
         for (ColumnData data : baseRow)
         {
-            ColumnDefinition viewColumn = view.getViewColumn(data.column());
+            ColumnMetadata viewColumn = view.getViewColumn(data.column());
             // If that base table column is not denormalized in the view, we had nothing to do.
             // Alose, if it's part of the view PK it's already been taken into account in the clustering.
             if (viewColumn == null || viewColumn.isPrimaryKeyColumn())
@@ -293,8 +294,8 @@ public class ViewUpdateGenerator
         PeekingIterator<ColumnData> existingIter = Iterators.peekingIterator(existingBaseRow.iterator());
         for (ColumnData mergedData : mergedBaseRow)
         {
-            ColumnDefinition baseColumn = mergedData.column();
-            ColumnDefinition viewColumn = view.getViewColumn(baseColumn);
+            ColumnMetadata baseColumn = mergedData.column();
+            ColumnMetadata viewColumn = view.getViewColumn(baseColumn);
             // If that base table column is not denormalized in the view, we had nothing to do.
             // Alose, if it's part of the view PK it's already been taken into account in the clustering.
             if (viewColumn == null || viewColumn.isPrimaryKeyColumn())
@@ -397,9 +398,9 @@ public class ViewUpdateGenerator
     private void startNewUpdate(Row baseRow)
     {
         ByteBuffer[] clusteringValues = new ByteBuffer[viewMetadata.clusteringColumns().size()];
-        for (ColumnDefinition viewColumn : viewMetadata.primaryKeyColumns())
+        for (ColumnMetadata viewColumn : viewMetadata.primaryKeyColumns())
         {
-            ColumnDefinition baseColumn = view.getBaseColumn(viewColumn);
+            ColumnMetadata baseColumn = view.getBaseColumn(viewColumn);
             ByteBuffer value = getValueForPK(baseColumn, baseRow);
             if (viewColumn.isPartitionKey())
                 currentViewEntryPartitionKey[viewColumn.position()] = value;
@@ -457,7 +458,7 @@ public class ViewUpdateGenerator
                  : LivenessInfo.withExpirationTime(baseLiveness.timestamp(), ttl, expirationTime);
         }
 
-        ColumnDefinition baseColumn = view.baseNonPKColumnsInViewPK.get(0);
+        ColumnMetadata baseColumn = view.baseNonPKColumnsInViewPK.get(0);
         Cell cell = baseRow.getCell(baseColumn);
         assert isLive(cell) : "We shouldn't have got there if the base row had no associated entry";
 
@@ -486,7 +487,7 @@ public class ViewUpdateGenerator
         return timestamp;
     }
 
-    private void addColumnData(ColumnDefinition viewColumn, ColumnData baseTableData)
+    private void addColumnData(ColumnMetadata viewColumn, ColumnData baseTableData)
     {
         assert viewColumn.isComplex() == baseTableData.column().isComplex();
         if (!viewColumn.isComplex())
@@ -501,7 +502,7 @@ public class ViewUpdateGenerator
             addCell(viewColumn, cell);
     }
 
-    private void addCell(ColumnDefinition viewColumn, Cell baseTableCell)
+    private void addCell(ColumnMetadata viewColumn, Cell baseTableCell)
     {
         assert !viewColumn.isPrimaryKeyColumn();
         currentViewEntryBuilder.addCell(baseTableCell.withUpdatedColumn(viewColumn));
@@ -525,7 +526,7 @@ public class ViewUpdateGenerator
         {
             // We can't really know which columns of the view will be updated nor how many row will be updated for this key
             // so we rely on hopefully sane defaults.
-            update = new PartitionUpdate(viewMetadata, partitionKey, viewMetadata.partitionColumns(), 4);
+            update = new PartitionUpdate(viewMetadata, partitionKey, viewMetadata.regularAndStaticColumns(), 4);
             updates.put(partitionKey, update);
         }
         update.add(row);
@@ -537,10 +538,10 @@ public class ViewUpdateGenerator
                           ? currentViewEntryPartitionKey[0]
                           : CompositeType.build(currentViewEntryPartitionKey);
 
-        return viewMetadata.decorateKey(rawKey);
+        return viewMetadata.partitioner.decorateKey(rawKey);
     }
 
-    private ByteBuffer getValueForPK(ColumnDefinition column, Row row)
+    private ByteBuffer getValueForPK(ColumnMetadata column, Row row)
     {
         switch (column.kind)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index 15e75fe..9235844 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -27,7 +27,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.dht.tokenallocator.TokenAllocation;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java b/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
index 1271a5a..ff7f2f7 100644
--- a/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
@@ -17,8 +17,8 @@
  */
 package org.apache.cassandra.dht;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.db.BufferDecoratedKey;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -287,12 +287,12 @@ public class ByteOrderedPartitioner implements IPartitioner
 
         for (String ks : Schema.instance.getKeyspaces())
         {
-            for (CFMetaData cfmd : Schema.instance.getTablesAndViews(ks))
+            for (TableMetadata cfmd : Schema.instance.getTablesAndViews(ks))
             {
                 for (Range<Token> r : sortedRanges)
                 {
                     // Looping over every KS:CF:Range, get the splits size and add it to the count
-                    allTokens.put(r.right, allTokens.get(r.right) + StorageService.instance.getSplits(ks, cfmd.cfName, r, 1).size());
+                    allTokens.put(r.right, allTokens.get(r.right) + StorageService.instance.getSplits(ks, cfmd.name, r, 1).size());
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
index 954b0af..16c5db1 100644
--- a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
@@ -23,13 +23,14 @@ import java.nio.charset.CharacterCodingException;
 import java.util.*;
 import java.util.concurrent.ThreadLocalRandom;
 
-import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.CachedHashDecoratedKey;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -226,12 +227,12 @@ public class OrderPreservingPartitioner implements IPartitioner
 
         for (String ks : Schema.instance.getKeyspaces())
         {
-            for (CFMetaData cfmd : Schema.instance.getTablesAndViews(ks))
+            for (TableMetadata cfmd : Schema.instance.getTablesAndViews(ks))
             {
                 for (Range<Token> r : sortedRanges)
                 {
                     // Looping over every KS:CF:Range, get the splits size and add it to the count
-                    allTokens.put(r.right, allTokens.get(r.right) + StorageService.instance.getSplits(ks, cfmd.cfName, r, cfmd.params.minIndexInterval).size());
+                    allTokens.put(r.right, allTokens.get(r.right) + StorageService.instance.getSplits(ks, cfmd.name, r, cfmd.params.minIndexInterval).size());
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/exceptions/UnknownIndexException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/exceptions/UnknownIndexException.java b/src/java/org/apache/cassandra/exceptions/UnknownIndexException.java
new file mode 100644
index 0000000..fdc6840
--- /dev/null
+++ b/src/java/org/apache/cassandra/exceptions/UnknownIndexException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.exceptions;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.cassandra.schema.TableMetadata;
+
+/**
+ * Exception thrown when we read an index id from a serialized ReadCommand and no corresponding IndexMetadata
+ * can be found in the TableMetadata#indexes collection. Note that this is an internal exception and is not meant
+ * to be user facing, the node reading the ReadCommand should proceed as if no index id were present.
+ */
+public final class UnknownIndexException extends IOException
+{
+    public final UUID indexId;
+    public UnknownIndexException(TableMetadata metadata, UUID id)
+    {
+        super(String.format("Unknown index %s for table %s", id.toString(), metadata.toString()));
+        indexId = id;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/exceptions/UnknownTableException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/exceptions/UnknownTableException.java b/src/java/org/apache/cassandra/exceptions/UnknownTableException.java
new file mode 100644
index 0000000..2cd7aab
--- /dev/null
+++ b/src/java/org/apache/cassandra/exceptions/UnknownTableException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.exceptions;
+
+import java.io.IOException;
+
+import org.apache.cassandra.schema.TableId;
+
+public class UnknownTableException extends IOException
+{
+    public final TableId id;
+
+    public UnknownTableException(String msg, TableId id)
+    {
+        super(msg);
+        this.id = id;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
index fd9ed00..0f44e0c 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
@@ -28,8 +28,8 @@ import java.util.concurrent.*;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Config;
+
+import org.apache.cassandra.cql3.statements.CreateTableStatement;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Murmur3Partitioner;
@@ -39,6 +39,7 @@ import org.apache.cassandra.hadoop.HadoopCompat;
 import org.apache.cassandra.io.sstable.CQLSSTableWriter;
 import org.apache.cassandra.io.sstable.SSTableLoader;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.streaming.StreamState;
 import org.apache.cassandra.utils.NativeSSTableLoaderClient;
 import org.apache.cassandra.utils.OutputHandler;
@@ -171,7 +172,7 @@ public class CqlBulkRecordWriter extends RecordWriter<Object, List<ByteBuffer>>
         if (loader == null)
         {
             ExternalClient externalClient = new ExternalClient(conf);
-            externalClient.setTableMetadata(CFMetaData.compile(schema, keyspace));
+            externalClient.setTableMetadata(TableMetadataRef.forOfflineTools(CreateTableStatement.parse(schema, keyspace).build()));
 
             loader = new SSTableLoader(outputDir, externalClient, new NullOutputHandler())
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
index 45a227b..9259042 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
@@ -29,7 +29,7 @@ import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.TokenRange;
 
-import org.apache.cassandra.config.SchemaConstants;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/hints/Hint.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/Hint.java b/src/java/org/apache/cassandra/hints/Hint.java
index 17fbf5d..4e8f139 100644
--- a/src/java/org/apache/cassandra/hints/Hint.java
+++ b/src/java/org/apache/cassandra/hints/Hint.java
@@ -18,9 +18,7 @@
 package org.apache.cassandra.hints;
 
 import java.io.IOException;
-import java.util.*;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Throwables;
@@ -29,6 +27,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.TableId;
 
 import static org.apache.cassandra.db.TypeSizes.sizeof;
 import static org.apache.cassandra.db.TypeSizes.sizeofUnsignedVInt;
@@ -89,9 +88,9 @@ public final class Hint
     {
         if (isLive())
         {
-            // filter out partition update for table that have been truncated since hint's creation
+            // filter out partition update for tables that have been truncated since hint's creation
             Mutation filtered = mutation;
-            for (UUID id : mutation.getColumnFamilyIds())
+            for (TableId id : mutation.getTableIds())
                 if (creationTime <= SystemKeyspace.getTruncatedAt(id))
                     filtered = filtered.without(id);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/hints/HintMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintMessage.java b/src/java/org/apache/cassandra/hints/HintMessage.java
index 723ab6d..683b894 100644
--- a/src/java/org/apache/cassandra/hints/HintMessage.java
+++ b/src/java/org/apache/cassandra/hints/HintMessage.java
@@ -27,13 +27,14 @@ import javax.annotation.Nullable;
 import com.google.common.primitives.Ints;
 
 import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.db.UnknownColumnFamilyException;
+import org.apache.cassandra.exceptions.UnknownTableException;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.io.util.TrackedDataInputPlus;
+import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.utils.UUIDSerializer;
 
 /**
@@ -58,7 +59,7 @@ public final class HintMessage
     final Hint hint;
 
     @Nullable // will usually be null, unless a hint deserialization fails due to an unknown table id
-    final UUID unknownTableID;
+    final TableId unknownTableID;
 
     HintMessage(UUID hostId, Hint hint)
     {
@@ -67,7 +68,7 @@ public final class HintMessage
         this.unknownTableID = null;
     }
 
-    HintMessage(UUID hostId, UUID unknownTableID)
+    HintMessage(UUID hostId, TableId unknownTableID)
     {
         this.hostId = hostId;
         this.hint = null;
@@ -122,10 +123,10 @@ public final class HintMessage
             {
                 return new HintMessage(hostId, Hint.serializer.deserialize(countingIn, version));
             }
-            catch (UnknownColumnFamilyException e)
+            catch (UnknownTableException e)
             {
                 in.skipBytes(Ints.checkedCast(hintSize - countingIn.getBytesRead()));
-                return new HintMessage(hostId, e.cfId);
+                return new HintMessage(hostId, e.id);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/hints/HintsReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsReader.java b/src/java/org/apache/cassandra/hints/HintsReader.java
index e0a73c1..6ede205 100644
--- a/src/java/org/apache/cassandra/hints/HintsReader.java
+++ b/src/java/org/apache/cassandra/hints/HintsReader.java
@@ -30,11 +30,10 @@ import com.google.common.util.concurrent.RateLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.db.UnknownColumnFamilyException;
+import org.apache.cassandra.exceptions.UnknownTableException;
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.utils.AbstractIterator;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.CLibrary;
 
 /**
  * A paged non-compressed hints reader that provides two iterators:
@@ -225,11 +224,11 @@ class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
                 hint = Hint.serializer.deserialize(input, descriptor.messagingVersion());
                 input.checkLimit(0);
             }
-            catch (UnknownColumnFamilyException e)
+            catch (UnknownTableException e)
             {
                 logger.warn("Failed to read a hint for {} - table with id {} is unknown in file {}",
                             descriptor.hostId,
-                            e.cfId,
+                            e.id,
                             descriptor.fileName());
                 input.skipBytes(Ints.checkedCast(size - input.bytesPastLimit()));
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/index/Index.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/Index.java b/src/java/org/apache/cassandra/index/Index.java
index e254555..1b4573d 100644
--- a/src/java/org/apache/cassandra/index/Index.java
+++ b/src/java/org/apache/cassandra/index/Index.java
@@ -26,7 +26,7 @@ import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.function.BiFunction;
 
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.OperationType;
@@ -123,7 +123,7 @@ import org.apache.cassandra.utils.concurrent.OpOrder;
  *
  * The input is the map of index options supplied in the WITH clause of a CREATE INDEX statement.
  *
- * <pre>{@code public static Map<String, String> validateOptions(Map<String, String> options, CFMetaData cfm);}</pre>
+ * <pre>{@code public static Map<String, String> validateOptions(Map<String, String> options, TableMetadata metadata);}</pre>
  *
  * In this version, the base table's metadata is also supplied as an argument.
  * If both overloaded methods are provided, only the one including the base table's metadata will be invoked.
@@ -303,7 +303,7 @@ public interface Index
      * @return true if the index depends on the supplied column being present; false if the column may be
      *              safely dropped or modified without adversely affecting the index
      */
-    public boolean dependsOn(ColumnDefinition column);
+    public boolean dependsOn(ColumnMetadata column);
 
     /**
      * Called to determine whether this index can provide a searcher to execute a query on the
@@ -313,7 +313,7 @@ public interface Index
      * @param operator the operator of a search query predicate
      * @return true if this index is capable of supporting such expressions, false otherwise
      */
-    public boolean supportsExpression(ColumnDefinition column, Operator operator);
+    public boolean supportsExpression(ColumnMetadata column, Operator operator);
 
     /**
      * If the index supports custom search expressions using the
@@ -385,7 +385,7 @@ public interface Index
      * that type of transaction, ...).
      */
     public Indexer indexerFor(DecoratedKey key,
-                              PartitionColumns columns,
+                              RegularAndStaticColumns columns,
                               int nowInSec,
                               OpOrder.Group opGroup,
                               IndexTransaction.Type transactionType);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index 08b4f8b..f7b7d13 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.statements.IndexTarget;
 import org.apache.cassandra.db.*;
@@ -54,12 +53,12 @@ import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.index.internal.CassandraIndex;
 import org.apache.cassandra.index.transactions.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.schema.Indexes;
 import org.apache.cassandra.service.pager.SinglePartitionPager;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.transport.ProtocolVersion;
-import org.apache.cassandra.transport.Server;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.concurrent.Refs;
@@ -140,7 +139,7 @@ public class SecondaryIndexManager implements IndexRegistry
     public void reload()
     {
         // figure out what needs to be added and dropped.
-        Indexes tableIndexes = baseCfs.metadata.getIndexes();
+        Indexes tableIndexes = baseCfs.metadata().indexes;
         indexes.keySet()
                .stream()
                .filter(indexName -> !tableIndexes.has(indexName))
@@ -214,7 +213,7 @@ public class SecondaryIndexManager implements IndexRegistry
     }
 
 
-    public Set<IndexMetadata> getDependentIndexes(ColumnDefinition column)
+    public Set<IndexMetadata> getDependentIndexes(ColumnMetadata column)
     {
         if (indexes.isEmpty())
             return Collections.emptySet();
@@ -547,11 +546,11 @@ public class SecondaryIndexManager implements IndexRegistry
     public void indexPartition(DecoratedKey key, Set<Index> indexes, int pageSize)
     {
         if (logger.isTraceEnabled())
-            logger.trace("Indexing partition {}", baseCfs.metadata.getKeyValidator().getString(key.getKey()));
+            logger.trace("Indexing partition {}", baseCfs.metadata().partitionKeyType.getString(key.getKey()));
 
         if (!indexes.isEmpty())
         {
-            SinglePartitionReadCommand cmd = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata,
+            SinglePartitionReadCommand cmd = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata(),
                                                                                           FBUtilities.nowInSeconds(),
                                                                                           key);
             int nowInSec = cmd.nowInSec();
@@ -562,7 +561,7 @@ public class SecondaryIndexManager implements IndexRegistry
             {
                 try (ReadExecutionController controller = cmd.executionController();
                      OpOrder.Group writeGroup = Keyspace.writeOrder.start();
-                     UnfilteredPartitionIterator page = pager.fetchPageUnfiltered(baseCfs.metadata, pageSize, controller))
+                     UnfilteredPartitionIterator page = pager.fetchPageUnfiltered(baseCfs.metadata(), pageSize, controller))
                 {
                     if (!page.hasNext())
                         break;
@@ -642,7 +641,7 @@ public class SecondaryIndexManager implements IndexRegistry
         if (meanCellsPerPartition <= 0)
             return DEFAULT_PAGE_SIZE;
 
-        int columnsPerRow = baseCfs.metadata.partitionColumns().regulars.size();
+        int columnsPerRow = baseCfs.metadata().regularColumns().size();
         if (columnsPerRow <= 0)
             return DEFAULT_PAGE_SIZE;
 
@@ -653,8 +652,8 @@ public class SecondaryIndexManager implements IndexRegistry
 
         logger.trace("Calculated page size {} for indexing {}.{} ({}/{}/{}/{})",
                      pageSize,
-                     baseCfs.metadata.ksName,
-                     baseCfs.metadata.cfName,
+                     baseCfs.metadata.keyspace,
+                     baseCfs.metadata.name,
                      meanPartitionSize,
                      meanCellsPerPartition,
                      meanRowsPerPartition,
@@ -855,25 +854,25 @@ public class SecondaryIndexManager implements IndexRegistry
      * Transaction for use when merging rows during compaction
      */
     public CompactionTransaction newCompactionTransaction(DecoratedKey key,
-                                                          PartitionColumns partitionColumns,
+                                                          RegularAndStaticColumns regularAndStaticColumns,
                                                           int versions,
                                                           int nowInSec)
     {
         // the check for whether there are any registered indexes is already done in CompactionIterator
-        return new IndexGCTransaction(key, partitionColumns, versions, nowInSec, listIndexes());
+        return new IndexGCTransaction(key, regularAndStaticColumns, versions, nowInSec, listIndexes());
     }
 
     /**
      * Transaction for use when removing partitions during cleanup
      */
     public CleanupTransaction newCleanupTransaction(DecoratedKey key,
-                                                    PartitionColumns partitionColumns,
+                                                    RegularAndStaticColumns regularAndStaticColumns,
                                                     int nowInSec)
     {
         if (!hasIndexes())
             return CleanupTransaction.NO_OP;
 
-        return new CleanupGCTransaction(key, partitionColumns, nowInSec, listIndexes());
+        return new CleanupGCTransaction(key, regularAndStaticColumns, nowInSec, listIndexes());
     }
 
     /**
@@ -935,7 +934,7 @@ public class SecondaryIndexManager implements IndexRegistry
                 {
                 }
 
-                public void onComplexDeletion(int i, Clustering clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original)
+                public void onComplexDeletion(int i, Clustering clustering, ColumnMetadata column, DeletionTime merged, DeletionTime original)
                 {
                 }
 
@@ -986,7 +985,7 @@ public class SecondaryIndexManager implements IndexRegistry
     private static final class IndexGCTransaction implements CompactionTransaction
     {
         private final DecoratedKey key;
-        private final PartitionColumns columns;
+        private final RegularAndStaticColumns columns;
         private final int versions;
         private final int nowInSec;
         private final Collection<Index> indexes;
@@ -994,7 +993,7 @@ public class SecondaryIndexManager implements IndexRegistry
         private Row[] rows;
 
         private IndexGCTransaction(DecoratedKey key,
-                                   PartitionColumns columns,
+                                   RegularAndStaticColumns columns,
                                    int versions,
                                    int nowInSec,
                                    Collection<Index> indexes)
@@ -1029,7 +1028,7 @@ public class SecondaryIndexManager implements IndexRegistry
                 {
                 }
 
-                public void onComplexDeletion(int i, Clustering clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original)
+                public void onComplexDeletion(int i, Clustering clustering, ColumnMetadata column, DeletionTime merged, DeletionTime original)
                 {
                 }
 
@@ -1089,7 +1088,7 @@ public class SecondaryIndexManager implements IndexRegistry
     private static final class CleanupGCTransaction implements CleanupTransaction
     {
         private final DecoratedKey key;
-        private final PartitionColumns columns;
+        private final RegularAndStaticColumns columns;
         private final int nowInSec;
         private final Collection<Index> indexes;
 
@@ -1097,7 +1096,7 @@ public class SecondaryIndexManager implements IndexRegistry
         private DeletionTime partitionDelete;
 
         private CleanupGCTransaction(DecoratedKey key,
-                                     PartitionColumns columns,
+                                     RegularAndStaticColumns columns,
                                      int nowInSec,
                                      Collection<Index> indexes)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/index/TargetParser.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/TargetParser.java b/src/java/org/apache/cassandra/index/TargetParser.java
index 849ad16..ec25259 100644
--- a/src/java/org/apache/cassandra/index/TargetParser.java
+++ b/src/java/org/apache/cassandra/index/TargetParser.java
@@ -22,8 +22,8 @@ import java.util.regex.Pattern;
 
 import org.apache.commons.lang3.StringUtils;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.statements.IndexTarget;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -36,17 +36,17 @@ public class TargetParser
     private static final Pattern TWO_QUOTES = Pattern.compile("\"\"");
     private static final String QUOTE = "\"";
 
-    public static Pair<ColumnDefinition, IndexTarget.Type> parse(CFMetaData cfm, IndexMetadata indexDef)
+    public static Pair<ColumnMetadata, IndexTarget.Type> parse(TableMetadata metadata, IndexMetadata indexDef)
     {
         String target = indexDef.options.get("target");
         assert target != null : String.format("No target definition found for index %s", indexDef.name);
-        Pair<ColumnDefinition, IndexTarget.Type> result = parse(cfm, target);
+        Pair<ColumnMetadata, IndexTarget.Type> result = parse(metadata, target);
         if (result == null)
             throw new ConfigurationException(String.format("Unable to parse targets for index %s (%s)", indexDef.name, target));
         return result;
     }
 
-    public static Pair<ColumnDefinition, IndexTarget.Type> parse(CFMetaData cfm, String target)
+    public static Pair<ColumnMetadata, IndexTarget.Type> parse(TableMetadata metadata, String target)
     {
         // if the regex matches then the target is in the form "keys(foo)", "entries(bar)" etc
         // if not, then it must be a simple column name and implictly its type is VALUES
@@ -77,11 +77,11 @@ public class TargetParser
         }
 
         // if it's not a CQL table, we can't assume that the column name is utf8, so
-        // in that case we have to do a linear scan of the cfm's columns to get the matching one
-        if (cfm.isCQLTable())
-            return Pair.create(cfm.getColumnDefinition(new ColumnIdentifier(columnName, true)), targetType);
+        // in that case we have to do a linear scan of the table's columns to get the matching one
+        if (metadata.isCQLTable())
+            return Pair.create(metadata.getColumn(new ColumnIdentifier(columnName, true)), targetType);
         else
-            for (ColumnDefinition column : cfm.allColumns())
+            for (ColumnMetadata column : metadata.columns())
                 if (column.name.toString().equals(columnName))
                     return Pair.create(column, targetType);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
index 70aaf0d..af75906 100644
--- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
@@ -33,8 +33,9 @@ import com.google.common.collect.ImmutableSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.cql3.statements.IndexTarget;
 import org.apache.cassandra.db.*;
@@ -76,7 +77,7 @@ public abstract class CassandraIndex implements Index
     public final ColumnFamilyStore baseCfs;
     protected IndexMetadata metadata;
     protected ColumnFamilyStore indexCfs;
-    protected ColumnDefinition indexedColumn;
+    protected ColumnMetadata indexedColumn;
     protected CassandraIndexFunctions functions;
 
     protected CassandraIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
@@ -91,7 +92,7 @@ public abstract class CassandraIndex implements Index
      * @param operator
      * @return
      */
-    protected boolean supportsOperator(ColumnDefinition indexedColumn, Operator operator)
+    protected boolean supportsOperator(ColumnMetadata indexedColumn, Operator operator)
     {
         return operator == Operator.EQ;
     }
@@ -145,14 +146,14 @@ public abstract class CassandraIndex implements Index
                                                   CellPath path,
                                                   ByteBuffer cellValue);
 
-    public ColumnDefinition getIndexedColumn()
+    public ColumnMetadata getIndexedColumn()
     {
         return indexedColumn;
     }
 
     public ClusteringComparator getIndexComparator()
     {
-        return indexCfs.metadata.comparator;
+        return indexCfs.metadata().comparator;
     }
 
     public ColumnFamilyStore getIndexCfs()
@@ -201,7 +202,6 @@ public abstract class CassandraIndex implements Index
     public Callable<?> getMetadataReloadTask(IndexMetadata indexDef)
     {
         return () -> {
-            indexCfs.metadata.reloadIndexMetadataProperties(baseCfs.metadata);
             indexCfs.reload();
             return null;
         };
@@ -223,12 +223,12 @@ public abstract class CassandraIndex implements Index
     private void setMetadata(IndexMetadata indexDef)
     {
         metadata = indexDef;
-        Pair<ColumnDefinition, IndexTarget.Type> target = TargetParser.parse(baseCfs.metadata, indexDef);
+        Pair<ColumnMetadata, IndexTarget.Type> target = TargetParser.parse(baseCfs.metadata(), indexDef);
         functions = getFunctions(indexDef, target);
-        CFMetaData cfm = indexCfsMetadata(baseCfs.metadata, indexDef);
+        TableMetadataRef tableRef = TableMetadataRef.forOfflineTools(indexCfsMetadata(baseCfs.metadata(), indexDef));
         indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.keyspace,
-                                                             cfm.cfName,
-                                                             cfm,
+                                                             tableRef.name,
+                                                             tableRef,
                                                              baseCfs.getTracker().loadsstables);
         indexedColumn = target.left;
     }
@@ -247,12 +247,12 @@ public abstract class CassandraIndex implements Index
         return true;
     }
 
-    public boolean dependsOn(ColumnDefinition column)
+    public boolean dependsOn(ColumnMetadata column)
     {
         return indexedColumn.name.equals(column.name);
     }
 
-    public boolean supportsExpression(ColumnDefinition column, Operator operator)
+    public boolean supportsExpression(ColumnMetadata column, Operator operator)
     {
         return indexedColumn.name.equals(column.name)
                && supportsOperator(indexedColumn, operator);
@@ -338,7 +338,7 @@ public abstract class CassandraIndex implements Index
     }
 
     public Indexer indexerFor(final DecoratedKey key,
-                              final PartitionColumns columns,
+                              final RegularAndStaticColumns columns,
                               final int nowInSec,
                               final OpOrder.Group opGroup,
                               final IndexTransaction.Type transactionType)
@@ -618,11 +618,10 @@ public abstract class CassandraIndex implements Index
     {
         if (value != null && value.remaining() >= FBUtilities.MAX_UNSIGNED_SHORT)
             throw new InvalidRequestException(String.format(
-                                                           "Cannot index value of size %d for index %s on %s.%s(%s) (maximum allowed size=%d)",
+                                                           "Cannot index value of size %d for index %s on %s(%s) (maximum allowed size=%d)",
                                                            value.remaining(),
                                                            metadata.name,
-                                                           baseCfs.metadata.ksName,
-                                                           baseCfs.metadata.cfName,
+                                                           baseCfs.metadata,
                                                            indexedColumn.name.toString(),
                                                            FBUtilities.MAX_UNSIGNED_SHORT));
     }
@@ -654,7 +653,7 @@ public abstract class CassandraIndex implements Index
 
     private PartitionUpdate partitionUpdate(DecoratedKey valueKey, Row row)
     {
-        return PartitionUpdate.singleRowUpdate(indexCfs.metadata, valueKey, row);
+        return PartitionUpdate.singleRowUpdate(indexCfs.metadata(), valueKey, row);
     }
 
     private void invalidate()
@@ -697,8 +696,8 @@ public abstract class CassandraIndex implements Index
             if (sstables.isEmpty())
             {
                 logger.info("No SSTable data for {}.{} to build index {} from, marking empty index as built",
-                            baseCfs.metadata.ksName,
-                            baseCfs.metadata.cfName,
+                            baseCfs.metadata.keyspace,
+                            baseCfs.metadata.name,
                             metadata.name);
                 baseCfs.indexManager.markIndexBuilt(metadata.name);
                 return;
@@ -727,31 +726,27 @@ public abstract class CassandraIndex implements Index
     }
 
     /**
-     * Construct the CFMetadata for an index table, the clustering columns in the index table
+     * Construct the TableMetadata for an index table, the clustering columns in the index table
      * vary dependent on the kind of the indexed value.
      * @param baseCfsMetadata
      * @param indexMetadata
      * @return
      */
-    public static final CFMetaData indexCfsMetadata(CFMetaData baseCfsMetadata, IndexMetadata indexMetadata)
+    public static TableMetadata indexCfsMetadata(TableMetadata baseCfsMetadata, IndexMetadata indexMetadata)
     {
-        Pair<ColumnDefinition, IndexTarget.Type> target = TargetParser.parse(baseCfsMetadata, indexMetadata);
+        Pair<ColumnMetadata, IndexTarget.Type> target = TargetParser.parse(baseCfsMetadata, indexMetadata);
         CassandraIndexFunctions utils = getFunctions(indexMetadata, target);
-        ColumnDefinition indexedColumn = target.left;
+        ColumnMetadata indexedColumn = target.left;
         AbstractType<?> indexedValueType = utils.getIndexedValueType(indexedColumn);
 
-        // Tables for legacy KEYS indexes are non-compound and dense
-        CFMetaData.Builder builder = indexMetadata.isKeys()
-                                     ? CFMetaData.Builder.create(baseCfsMetadata.ksName,
-                                                                 baseCfsMetadata.indexColumnFamilyName(indexMetadata),
-                                                                 true, false, false)
-                                     : CFMetaData.Builder.create(baseCfsMetadata.ksName,
-                                                                 baseCfsMetadata.indexColumnFamilyName(indexMetadata));
-
-        builder =  builder.withId(baseCfsMetadata.cfId)
-                          .withPartitioner(new LocalPartitioner(indexedValueType))
-                          .addPartitionKey(indexedColumn.name, indexedColumn.type)
-                          .addClusteringColumn("partition_key", baseCfsMetadata.partitioner.partitionOrdering());
+        TableMetadata.Builder builder =
+            TableMetadata.builder(baseCfsMetadata.keyspace, baseCfsMetadata.indexTableName(indexMetadata), baseCfsMetadata.id)
+                         // tables for legacy KEYS indexes are non-compound and dense
+                         .isDense(indexMetadata.isKeys())
+                         .isCompound(!indexMetadata.isKeys())
+                         .partitioner(new LocalPartitioner(indexedValueType))
+                         .addPartitionKeyColumn(indexedColumn.name, indexedColumn.type)
+                         .addClusteringColumn("partition_key", baseCfsMetadata.partitioner.partitionOrdering());
 
         if (indexMetadata.isKeys())
         {
@@ -759,16 +754,16 @@ public abstract class CassandraIndex implements Index
             // value column defined, even though it is never used
             CompactTables.DefaultNames names =
                 CompactTables.defaultNameGenerator(ImmutableSet.of(indexedColumn.name.toString(), "partition_key"));
-            builder = builder.addRegularColumn(names.defaultCompactValueName(), EmptyType.instance);
+            builder.addRegularColumn(names.defaultCompactValueName(), EmptyType.instance);
         }
         else
         {
             // The clustering columns for a table backing a COMPOSITES index are dependent
             // on the specific type of index (there are specializations for indexes on collections)
-            builder = utils.addIndexClusteringColumns(builder, baseCfsMetadata, indexedColumn);
+            utils.addIndexClusteringColumns(builder, baseCfsMetadata, indexedColumn);
         }
 
-        return builder.build().reloadIndexMetadataProperties(baseCfsMetadata);
+        return builder.build().updateIndexTableMetadata(baseCfsMetadata.params);
     }
 
     /**
@@ -779,16 +774,16 @@ public abstract class CassandraIndex implements Index
      */
     public static CassandraIndex newIndex(ColumnFamilyStore baseCfs, IndexMetadata indexMetadata)
     {
-        return getFunctions(indexMetadata, TargetParser.parse(baseCfs.metadata, indexMetadata)).newIndexInstance(baseCfs, indexMetadata);
+        return getFunctions(indexMetadata, TargetParser.parse(baseCfs.metadata(), indexMetadata)).newIndexInstance(baseCfs, indexMetadata);
     }
 
     static CassandraIndexFunctions getFunctions(IndexMetadata indexDef,
-                                                Pair<ColumnDefinition, IndexTarget.Type> target)
+                                                Pair<ColumnMetadata, IndexTarget.Type> target)
     {
         if (indexDef.isKeys())
             return CassandraIndexFunctions.KEYS_INDEX_FUNCTIONS;
 
-        ColumnDefinition indexedColumn = target.left;
+        ColumnMetadata indexedColumn = target.left;
         if (indexedColumn.type.isCollection() && indexedColumn.type.isMultiCell())
         {
             switch (((CollectionType)indexedColumn.type).kind)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/index/internal/CassandraIndexFunctions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndexFunctions.java b/src/java/org/apache/cassandra/index/internal/CassandraIndexFunctions.java
index 8047e1d..3d500a1 100644
--- a/src/java/org/apache/cassandra/index/internal/CassandraIndexFunctions.java
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndexFunctions.java
@@ -20,8 +20,8 @@ package org.apache.cassandra.index.internal;
 
 import java.util.List;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CollectionType;
@@ -46,14 +46,14 @@ public interface CassandraIndexFunctions
      * @param indexedColumn
      * @return
      */
-    default AbstractType<?> getIndexedValueType(ColumnDefinition indexedColumn)
+    default AbstractType<?> getIndexedValueType(ColumnMetadata indexedColumn)
     {
         return indexedColumn.type;
     }
 
     /**
-     * Add the clustering columns for a specific type of index table to the a CFMetaData.Builder (which is being
-     * used to construct the index table's CFMetadata. In the default implementation, the clustering columns of the
+     * Add the clustering columns for a specific type of index table to the a TableMetadata.Builder (which is being
+     * used to construct the index table's TableMetadata. In the default implementation, the clustering columns of the
      * index table hold the partition key and clustering columns of the base table. This is overridden in several cases:
      * * When the indexed value is itself a clustering column, in which case, we only need store the base table's
      *   *other* clustering values in the index - the indexed value being the index table's partition key
@@ -68,11 +68,11 @@ public interface CassandraIndexFunctions
      * @param cfDef
      * @return
      */
-    default CFMetaData.Builder addIndexClusteringColumns(CFMetaData.Builder builder,
-                                                         CFMetaData baseMetadata,
-                                                         ColumnDefinition cfDef)
+    default TableMetadata.Builder addIndexClusteringColumns(TableMetadata.Builder builder,
+                                                            TableMetadata baseMetadata,
+                                                            ColumnMetadata cfDef)
     {
-        for (ColumnDefinition def : baseMetadata.clusteringColumns())
+        for (ColumnMetadata def : baseMetadata.clusteringColumns())
             builder.addClusteringColumn(def.name, def.type);
         return builder;
     }
@@ -104,21 +104,22 @@ public interface CassandraIndexFunctions
             return new ClusteringColumnIndex(baseCfs, indexMetadata);
         }
 
-        public CFMetaData.Builder addIndexClusteringColumns(CFMetaData.Builder builder,
-                                                            CFMetaData baseMetadata,
-                                                            ColumnDefinition columnDef)
+        public TableMetadata.Builder addIndexClusteringColumns(TableMetadata.Builder builder,
+                                                               TableMetadata baseMetadata,
+                                                               ColumnMetadata columnDef)
         {
-            List<ColumnDefinition> cks = baseMetadata.clusteringColumns();
+            List<ColumnMetadata> cks = baseMetadata.clusteringColumns();
             for (int i = 0; i < columnDef.position(); i++)
             {
-                ColumnDefinition def = cks.get(i);
+                ColumnMetadata def = cks.get(i);
                 builder.addClusteringColumn(def.name, def.type);
             }
             for (int i = columnDef.position() + 1; i < cks.size(); i++)
             {
-                ColumnDefinition def = cks.get(i);
+                ColumnMetadata def = cks.get(i);
                 builder.addClusteringColumn(def.name, def.type);
             }
+
             return builder;
         }
     };
@@ -138,7 +139,7 @@ public interface CassandraIndexFunctions
             return new CollectionKeyIndex(baseCfs, indexMetadata);
         }
 
-        public AbstractType<?> getIndexedValueType(ColumnDefinition indexedColumn)
+        public AbstractType<?> getIndexedValueType(ColumnMetadata indexedColumn)
         {
             return ((CollectionType) indexedColumn.type).nameComparator();
         }
@@ -152,16 +153,16 @@ public interface CassandraIndexFunctions
             return new CollectionValueIndex(baseCfs, indexMetadata);
         }
 
-        public AbstractType<?> getIndexedValueType(ColumnDefinition indexedColumn)
+        public AbstractType<?> getIndexedValueType(ColumnMetadata indexedColumn)
         {
             return ((CollectionType)indexedColumn.type).valueComparator();
         }
 
-        public CFMetaData.Builder addIndexClusteringColumns(CFMetaData.Builder builder,
-                                                            CFMetaData baseMetadata,
-                                                            ColumnDefinition columnDef)
+        public TableMetadata.Builder addIndexClusteringColumns(TableMetadata.Builder builder,
+                                                               TableMetadata baseMetadata,
+                                                               ColumnMetadata columnDef)
         {
-            for (ColumnDefinition def : baseMetadata.clusteringColumns())
+            for (ColumnMetadata def : baseMetadata.clusteringColumns())
                 builder.addClusteringColumn(def.name, def.type);
 
             // collection key
@@ -177,7 +178,7 @@ public interface CassandraIndexFunctions
             return new CollectionEntryIndex(baseCfs, indexMetadata);
         }
 
-        public AbstractType<?> getIndexedValueType(ColumnDefinition indexedColumn)
+        public AbstractType<?> getIndexedValueType(ColumnMetadata indexedColumn)
         {
             CollectionType colType = (CollectionType)indexedColumn.type;
             return CompositeType.getInstance(colType.nameComparator(), colType.valueComparator());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/index/internal/CassandraIndexSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndexSearcher.java b/src/java/org/apache/cassandra/index/internal/CassandraIndexSearcher.java
index 7b622e3..005e5b9 100644
--- a/src/java/org/apache/cassandra/index/internal/CassandraIndexSearcher.java
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndexSearcher.java
@@ -26,7 +26,8 @@ import java.util.NavigableSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
@@ -76,8 +77,8 @@ public abstract class CassandraIndexSearcher implements Index.Searcher
     {
         ClusteringIndexFilter filter = makeIndexFilter(command);
         ColumnFamilyStore indexCfs = index.getBackingTable().get();
-        CFMetaData indexCfm = indexCfs.metadata;
-        return SinglePartitionReadCommand.create(indexCfm, command.nowInSec(), indexKey, ColumnFilter.all(indexCfm), filter)
+        TableMetadata indexMetadata = indexCfs.metadata();
+        return SinglePartitionReadCommand.create(indexMetadata, command.nowInSec(), indexKey, ColumnFilter.all(indexMetadata), filter)
                                          .queryMemtableAndDisk(indexCfs, executionController.indexReadController());
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java b/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java
index 811d857..103a1ee 100644
--- a/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java
+++ b/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java
@@ -51,7 +51,7 @@ public class CollatedViewIndexBuilder extends SecondaryIndexBuilder
 
     public CompactionInfo getCompactionInfo()
     {
-        return new CompactionInfo(cfs.metadata,
+        return new CompactionInfo(cfs.metadata(),
                 OperationType.INDEX_BUILD,
                 iter.getBytesRead(),
                 iter.getTotalBytes(),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/index/internal/composites/ClusteringColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/composites/ClusteringColumnIndex.java b/src/java/org/apache/cassandra/index/internal/composites/ClusteringColumnIndex.java
index f207e9b..ab05a4e 100644
--- a/src/java/org/apache/cassandra/index/internal/composites/ClusteringColumnIndex.java
+++ b/src/java/org/apache/cassandra/index/internal/composites/ClusteringColumnIndex.java
@@ -76,7 +76,7 @@ public class ClusteringColumnIndex extends CassandraIndex
     public IndexEntry decodeEntry(DecoratedKey indexedValue,
                                   Row indexEntry)
     {
-        int ckCount = baseCfs.metadata.clusteringColumns().size();
+        int ckCount = baseCfs.metadata().clusteringColumns().size();
 
         Clustering clustering = indexEntry.clustering();
         CBuilder builder = CBuilder.create(baseCfs.getComparator());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/index/internal/composites/CollectionEntryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/composites/CollectionEntryIndex.java b/src/java/org/apache/cassandra/index/internal/composites/CollectionEntryIndex.java
index 1113600..efe84b6 100644
--- a/src/java/org/apache/cassandra/index/internal/composites/CollectionEntryIndex.java
+++ b/src/java/org/apache/cassandra/index/internal/composites/CollectionEntryIndex.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.index.internal.composites;
 
 import java.nio.ByteBuffer;
 
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.db.Clustering;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -57,7 +57,7 @@ public class CollectionEntryIndex extends CollectionKeyIndexBase
         ByteBuffer mapKey = components[0];
         ByteBuffer mapValue = components[1];
 
-        ColumnDefinition columnDef = indexedColumn;
+        ColumnMetadata columnDef = indexedColumn;
         Cell cell = data.getCell(columnDef, CellPath.create(mapKey));
         if (cell == null || !cell.isLive(nowInSec))
             return true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/index/internal/composites/CollectionKeyIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/composites/CollectionKeyIndex.java b/src/java/org/apache/cassandra/index/internal/composites/CollectionKeyIndex.java
index 42c45e5..4fc20ae 100644
--- a/src/java/org/apache/cassandra/index/internal/composites/CollectionKeyIndex.java
+++ b/src/java/org/apache/cassandra/index/internal/composites/CollectionKeyIndex.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.index.internal.composites;
 
 import java.nio.ByteBuffer;
 
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.Clustering;
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -56,7 +56,7 @@ public class CollectionKeyIndex extends CollectionKeyIndexBase
         return cell == null || !cell.isLive(nowInSec);
     }
 
-    public boolean supportsOperator(ColumnDefinition indexedColumn, Operator operator)
+    public boolean supportsOperator(ColumnMetadata indexedColumn, Operator operator)
     {
         return operator == Operator.CONTAINS_KEY ||
                operator == Operator.CONTAINS && indexedColumn.type instanceof SetType;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/index/internal/composites/CollectionKeyIndexBase.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/composites/CollectionKeyIndexBase.java b/src/java/org/apache/cassandra/index/internal/composites/CollectionKeyIndexBase.java
index ef76870..fccf522 100644
--- a/src/java/org/apache/cassandra/index/internal/composites/CollectionKeyIndexBase.java
+++ b/src/java/org/apache/cassandra/index/internal/composites/CollectionKeyIndexBase.java
@@ -73,7 +73,7 @@ public abstract class CollectionKeyIndexBase extends CassandraIndex
             indexedEntryClustering = Clustering.STATIC_CLUSTERING;
         else
         {
-            int count = 1 + baseCfs.metadata.clusteringColumns().size();
+            int count = 1 + baseCfs.metadata().clusteringColumns().size();
             CBuilder builder = CBuilder.create(baseCfs.getComparator());
             for (int i = 0; i < count - 1; i++)
                 builder.add(clustering.get(i + 1));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/index/internal/composites/CollectionValueIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/composites/CollectionValueIndex.java b/src/java/org/apache/cassandra/index/internal/composites/CollectionValueIndex.java
index 5929e69..4f0f2df 100644
--- a/src/java/org/apache/cassandra/index/internal/composites/CollectionValueIndex.java
+++ b/src/java/org/apache/cassandra/index/internal/composites/CollectionValueIndex.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.index.internal.composites;
 
 import java.nio.ByteBuffer;
 
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.CollectionType;
@@ -67,7 +67,7 @@ public class CollectionValueIndex extends CassandraIndex
         // partition key is needed at query time.
         // In the non-static case, cell will be present during indexing but
         // not when searching (CASSANDRA-7525).
-        if (prefix.size() == baseCfs.metadata.clusteringColumns().size() && path != null)
+        if (prefix.size() == baseCfs.metadata().clusteringColumns().size() && path != null)
             builder.add(path.get(0));
 
         return builder;
@@ -94,14 +94,14 @@ public class CollectionValueIndex extends CassandraIndex
                                 indexedEntryClustering);
     }
 
-    public boolean supportsOperator(ColumnDefinition indexedColumn, Operator operator)
+    public boolean supportsOperator(ColumnMetadata indexedColumn, Operator operator)
     {
         return operator == Operator.CONTAINS && !(indexedColumn.type instanceof SetType);
     }
 
     public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
     {
-        ColumnDefinition columnDef = indexedColumn;
+        ColumnMetadata columnDef = indexedColumn;
         ComplexColumnData complexData = data.getComplexColumnData(columnDef);
         if (complexData == null)
             return true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
index b5e4a78..7ee3bb5 100644
--- a/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ClusteringIndexNamesFilter;
 import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
@@ -69,7 +69,7 @@ public class CompositesSearcher extends CassandraIndexSearcher
 
             private UnfilteredRowIterator next;
 
-            public CFMetaData metadata()
+            public TableMetadata metadata()
             {
                 return command.metadata();
             }
@@ -111,7 +111,7 @@ public class CompositesSearcher extends CassandraIndexSearcher
                     {
                         // If the index is on a static column, we just need to do a full read on the partition.
                         // Note that we want to re-use the command.columnFilter() in case of future change.
-                        dataCmd = SinglePartitionReadCommand.create(index.baseCfs.metadata,
+                        dataCmd = SinglePartitionReadCommand.create(index.baseCfs.metadata(),
                                                                     command.nowInSec(),
                                                                     command.columnFilter(),
                                                                     RowFilter.NONE,
@@ -148,7 +148,7 @@ public class CompositesSearcher extends CassandraIndexSearcher
 
                         // Query the gathered index hits. We still need to filter stale hits from the resulting query.
                         ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(clusterings.build(), false);
-                        dataCmd = SinglePartitionReadCommand.create(index.baseCfs.metadata,
+                        dataCmd = SinglePartitionReadCommand.create(index.baseCfs.metadata(),
                                                                     command.nowInSec(),
                                                                     command.columnFilter(),
                                                                     command.rowFilter(),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/index/internal/composites/PartitionKeyIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/composites/PartitionKeyIndex.java b/src/java/org/apache/cassandra/index/internal/composites/PartitionKeyIndex.java
index 2c0b5aa..810571c 100644
--- a/src/java/org/apache/cassandra/index/internal/composites/PartitionKeyIndex.java
+++ b/src/java/org/apache/cassandra/index/internal/composites/PartitionKeyIndex.java
@@ -57,7 +57,7 @@ public class PartitionKeyIndex extends CassandraIndex
                                       CellPath path,
                                       ByteBuffer cellValue)
     {
-        CompositeType keyComparator = (CompositeType)baseCfs.metadata.getKeyValidator();
+        CompositeType keyComparator = (CompositeType)baseCfs.metadata().partitionKeyType;
         ByteBuffer[] components = keyComparator.split(partitionKey);
         return components[indexedColumn.position()];
     }
@@ -75,7 +75,7 @@ public class PartitionKeyIndex extends CassandraIndex
 
     public IndexEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry)
     {
-        int ckCount = baseCfs.metadata.clusteringColumns().size();
+        int ckCount = baseCfs.metadata().clusteringColumns().size();
         Clustering clustering = indexEntry.clustering();
         CBuilder builder = CBuilder.create(baseCfs.getComparator());
         for (int i = 0; i < ckCount; i++)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/index/internal/keys/KeysIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/keys/KeysIndex.java b/src/java/org/apache/cassandra/index/internal/keys/KeysIndex.java
index d680253..20a1915 100644
--- a/src/java/org/apache/cassandra/index/internal/keys/KeysIndex.java
+++ b/src/java/org/apache/cassandra/index/internal/keys/KeysIndex.java
@@ -22,8 +22,9 @@ package org.apache.cassandra.index.internal.keys;
 
 import java.nio.ByteBuffer;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.db.rows.CellPath;
@@ -39,17 +40,17 @@ public class KeysIndex extends CassandraIndex
         super(baseCfs, indexDef);
     }
 
-    public CFMetaData.Builder addIndexClusteringColumns(CFMetaData.Builder builder,
-                                                        CFMetaData baseMetadata,
-                                                        ColumnDefinition cfDef)
+    public TableMetadata.Builder addIndexClusteringColumns(TableMetadata.Builder builder,
+                                                           TableMetadataRef baseMetadata,
+                                                           ColumnMetadata cfDef)
     {
         // no additional clustering columns required
         return builder;
     }
 
     protected CBuilder buildIndexClusteringPrefix(ByteBuffer partitionKey,
-                                               ClusteringPrefix prefix,
-                                               CellPath path)
+                                                  ClusteringPrefix prefix,
+                                                  CellPath path)
     {
         CBuilder builder = CBuilder.create(getIndexComparator());
         builder.add(partitionKey);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java b/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
index febb09f..2ab5345 100644
--- a/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
@@ -22,7 +22,6 @@ import java.nio.ByteBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.filter.DataLimits;
@@ -31,6 +30,7 @@ import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.index.internal.CassandraIndex;
 import org.apache.cassandra.index.internal.CassandraIndexSearcher;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 
 public class KeysSearcher extends CassandraIndexSearcher
@@ -55,7 +55,7 @@ public class KeysSearcher extends CassandraIndexSearcher
         {
             private UnfilteredRowIterator next;
 
-            public CFMetaData metadata()
+            public TableMetadata metadata()
             {
                 return command.metadata();
             }
@@ -85,7 +85,7 @@ public class KeysSearcher extends CassandraIndexSearcher
                         continue;
 
                     ColumnFilter extendedFilter = getExtendedFilter(command.columnFilter());
-                    SinglePartitionReadCommand dataCmd = SinglePartitionReadCommand.create(index.baseCfs.metadata,
+                    SinglePartitionReadCommand dataCmd = SinglePartitionReadCommand.create(index.baseCfs.metadata(),
                                                                                            command.nowInSec(),
                                                                                            extendedFilter,
                                                                                            command.rowFilter(),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
index 4375964..5257cb7 100644
--- a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
@@ -52,7 +52,10 @@ import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.notifications.*;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -65,7 +68,7 @@ public class SASIIndex implements Index, INotificationConsumer
                                                        Set<Index> indexes,
                                                        Collection<SSTableReader> sstablesToRebuild)
         {
-            NavigableMap<SSTableReader, Map<ColumnDefinition, ColumnIndex>> sstables = new TreeMap<>((a, b) -> {
+            NavigableMap<SSTableReader, Map<ColumnMetadata, ColumnIndex>> sstables = new TreeMap<>((a, b) -> {
                 return Integer.compare(a.descriptor.generation, b.descriptor.generation);
             });
 
@@ -77,7 +80,7 @@ public class SASIIndex implements Index, INotificationConsumer
                        sstablesToRebuild.stream()
                                         .filter((sstable) -> !sasi.index.hasSSTable(sstable))
                                         .forEach((sstable) -> {
-                                            Map<ColumnDefinition, ColumnIndex> toBuild = sstables.get(sstable);
+                                            Map<ColumnMetadata, ColumnIndex> toBuild = sstables.get(sstable);
                                             if (toBuild == null)
                                                 sstables.put(sstable, (toBuild = new HashMap<>()));
 
@@ -100,18 +103,18 @@ public class SASIIndex implements Index, INotificationConsumer
         this.baseCfs = baseCfs;
         this.config = config;
 
-        ColumnDefinition column = TargetParser.parse(baseCfs.metadata, config).left;
-        this.index = new ColumnIndex(baseCfs.metadata.getKeyValidator(), column, config);
+        ColumnMetadata column = TargetParser.parse(baseCfs.metadata(), config).left;
+        this.index = new ColumnIndex(baseCfs.metadata().partitionKeyType, column, config);
 
         Tracker tracker = baseCfs.getTracker();
         tracker.subscribe(this);
 
-        SortedMap<SSTableReader, Map<ColumnDefinition, ColumnIndex>> toRebuild = new TreeMap<>((a, b)
+        SortedMap<SSTableReader, Map<ColumnMetadata, ColumnIndex>> toRebuild = new TreeMap<>((a, b)
                                                 -> Integer.compare(a.descriptor.generation, b.descriptor.generation));
 
         for (SSTableReader sstable : index.init(tracker.getView().liveSSTables()))
         {
-            Map<ColumnDefinition, ColumnIndex> perSSTable = toRebuild.get(sstable);
+            Map<ColumnMetadata, ColumnIndex> perSSTable = toRebuild.get(sstable);
             if (perSSTable == null)
                 toRebuild.put(sstable, (perSSTable = new HashMap<>()));
 
@@ -121,16 +124,16 @@ public class SASIIndex implements Index, INotificationConsumer
         CompactionManager.instance.submitIndexBuild(new SASIIndexBuilder(baseCfs, toRebuild));
     }
 
-    public static Map<String, String> validateOptions(Map<String, String> options, CFMetaData cfm)
+    public static Map<String, String> validateOptions(Map<String, String> options, TableMetadata metadata)
     {
-        if (!(cfm.partitioner instanceof Murmur3Partitioner))
+        if (!(metadata.partitioner instanceof Murmur3Partitioner))
             throw new ConfigurationException("SASI only supports Murmur3Partitioner.");
 
         String targetColumn = options.get("target");
         if (targetColumn == null)
             throw new ConfigurationException("unknown target column");
 
-        Pair<ColumnDefinition, IndexTarget.Type> target = TargetParser.parse(cfm, targetColumn);
+        Pair<ColumnMetadata, IndexTarget.Type> target = TargetParser.parse(metadata, targetColumn);
         if (target == null)
             throw new ConfigurationException("failed to retrieve target column for: " + targetColumn);
 
@@ -200,17 +203,17 @@ public class SASIIndex implements Index, INotificationConsumer
         return Optional.empty();
     }
 
-    public boolean indexes(PartitionColumns columns)
+    public boolean indexes(RegularAndStaticColumns columns)
     {
         return columns.contains(index.getDefinition());
     }
 
-    public boolean dependsOn(ColumnDefinition column)
+    public boolean dependsOn(ColumnMetadata column)
     {
         return index.getDefinition().compareTo(column) == 0;
     }
 
-    public boolean supportsExpression(ColumnDefinition column, Operator operator)
+    public boolean supportsExpression(ColumnMetadata column, Operator operator)
     {
         return dependsOn(column) && index.supports(operator);
     }
@@ -236,7 +239,7 @@ public class SASIIndex implements Index, INotificationConsumer
     public void validate(PartitionUpdate update) throws InvalidRequestException
     {}
 
-    public Indexer indexerFor(DecoratedKey key, PartitionColumns columns, int nowInSec, OpOrder.Group opGroup, IndexTransaction.Type transactionType)
+    public Indexer indexerFor(DecoratedKey key, RegularAndStaticColumns columns, int nowInSec, OpOrder.Group opGroup, IndexTransaction.Type transactionType)
     {
         return new Indexer()
         {
@@ -282,14 +285,14 @@ public class SASIIndex implements Index, INotificationConsumer
 
     public Searcher searcherFor(ReadCommand command) throws InvalidRequestException
     {
-        CFMetaData config = command.metadata();
-        ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(config.cfId);
+        TableMetadata config = command.metadata();
+        ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(config.id);
         return controller -> new QueryPlan(cfs, command, DatabaseDescriptor.getRangeRpcTimeout()).execute(controller);
     }
 
     public SSTableFlushObserver getFlushObserver(Descriptor descriptor, OperationType opType)
     {
-        return newWriter(baseCfs.metadata.getKeyValidator(), descriptor, Collections.singletonMap(index.getDefinition(), index), opType);
+        return newWriter(baseCfs.metadata().partitionKeyType, descriptor, Collections.singletonMap(index.getDefinition(), index), opType);
     }
 
     public BiFunction<PartitionIterator, ReadCommand, PartitionIterator> postProcessorFor(ReadCommand command)
@@ -336,7 +339,7 @@ public class SASIIndex implements Index, INotificationConsumer
 
     protected static PerSSTableIndexWriter newWriter(AbstractType<?> keyValidator,
                                                      Descriptor descriptor,
-                                                     Map<ColumnDefinition, ColumnIndex> indexes,
+                                                     Map<ColumnMetadata, ColumnIndex> indexes,
                                                      OperationType opType)
     {
         return new PerSSTableIndexWriter(keyValidator, descriptor, opType, indexes);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
index d50875a..a01e45b 100644
--- a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
@@ -24,7 +24,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.*;
 
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowIndexEntry;
@@ -49,12 +49,12 @@ class SASIIndexBuilder extends SecondaryIndexBuilder
     private final ColumnFamilyStore cfs;
     private final UUID compactionId = UUIDGen.getTimeUUID();
 
-    private final SortedMap<SSTableReader, Map<ColumnDefinition, ColumnIndex>> sstables;
+    private final SortedMap<SSTableReader, Map<ColumnMetadata, ColumnIndex>> sstables;
 
     private long bytesProcessed = 0;
     private final long totalSizeInBytes;
 
-    public SASIIndexBuilder(ColumnFamilyStore cfs, SortedMap<SSTableReader, Map<ColumnDefinition, ColumnIndex>> sstables)
+    public SASIIndexBuilder(ColumnFamilyStore cfs, SortedMap<SSTableReader, Map<ColumnMetadata, ColumnIndex>> sstables)
     {
         long totalIndexBytes = 0;
         for (SSTableReader sstable : sstables.keySet())
@@ -67,18 +67,18 @@ class SASIIndexBuilder extends SecondaryIndexBuilder
 
     public void build()
     {
-        AbstractType<?> keyValidator = cfs.metadata.getKeyValidator();
-        for (Map.Entry<SSTableReader, Map<ColumnDefinition, ColumnIndex>> e : sstables.entrySet())
+        AbstractType<?> keyValidator = cfs.metadata().partitionKeyType;
+        for (Map.Entry<SSTableReader, Map<ColumnMetadata, ColumnIndex>> e : sstables.entrySet())
         {
             SSTableReader sstable = e.getKey();
-            Map<ColumnDefinition, ColumnIndex> indexes = e.getValue();
+            Map<ColumnMetadata, ColumnIndex> indexes = e.getValue();
 
             try (RandomAccessReader dataFile = sstable.openDataReader())
             {
                 PerSSTableIndexWriter indexWriter = SASIIndex.newWriter(keyValidator, sstable.descriptor, indexes, OperationType.COMPACTION);
 
                 long previousKeyPosition = 0;
-                try (KeyIterator keys = new KeyIterator(sstable.descriptor, cfs.metadata))
+                try (KeyIterator keys = new KeyIterator(sstable.descriptor, cfs.metadata()))
                 {
                     while (keys.hasNext())
                     {
@@ -99,7 +99,7 @@ class SASIIndexBuilder extends SecondaryIndexBuilder
                             try (SSTableIdentityIterator partition = SSTableIdentityIterator.create(sstable, dataFile, key))
                             {
                                 // if the row has statics attached, it has to be indexed separately
-                                if (cfs.metadata.hasStaticColumns())
+                                if (cfs.metadata().hasStaticColumns())
                                     indexWriter.nextUnfilteredCluster(partition.staticRow());
 
                                 while (partition.hasNext())
@@ -123,7 +123,7 @@ class SASIIndexBuilder extends SecondaryIndexBuilder
 
     public CompactionInfo getCompactionInfo()
     {
-        return new CompactionInfo(cfs.metadata,
+        return new CompactionInfo(cfs.metadata(),
                                   OperationType.INDEX_BUILD,
                                   bytesProcessed,
                                   totalSizeInBytes,