You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2017/01/27 22:18:38 UTC

[32/37] cassandra git commit: Make TableMetadata immutable, optimize Schema

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/selection/Selection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/Selection.java b/src/java/org/apache/cassandra/cql3/selection/Selection.java
index 401442f..078438b 100644
--- a/src/java/org/apache/cassandra/cql3/selection/Selection.java
+++ b/src/java/org/apache/cassandra/cql3/selection/Selection.java
@@ -25,8 +25,6 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.db.Clustering;
@@ -37,6 +35,8 @@ import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -45,28 +45,22 @@ public abstract class Selection
     /**
      * A predicate that returns <code>true</code> for static columns.
      */
-    private static final Predicate<ColumnDefinition> STATIC_COLUMN_FILTER = new Predicate<ColumnDefinition>()
-    {
-        public boolean apply(ColumnDefinition def)
-        {
-            return def.isStatic();
-        }
-    };
+    private static final Predicate<ColumnMetadata> STATIC_COLUMN_FILTER = (column) -> column.isStatic();
 
-    private final CFMetaData cfm;
-    private final List<ColumnDefinition> columns;
+    private final TableMetadata table;
+    private final List<ColumnMetadata> columns;
     private final SelectionColumnMapping columnMapping;
     private final ResultSet.ResultMetadata metadata;
     private final boolean collectTimestamps;
     private final boolean collectTTLs;
 
-    protected Selection(CFMetaData cfm,
-                        List<ColumnDefinition> columns,
+    protected Selection(TableMetadata table,
+                        List<ColumnMetadata> columns,
                         SelectionColumnMapping columnMapping,
                         boolean collectTimestamps,
                         boolean collectTTLs)
     {
-        this.cfm = cfm;
+        this.table = table;
         this.columns = columns;
         this.columnMapping = columnMapping;
         this.metadata = new ResultSet.ResultMetadata(columnMapping.getColumnSpecifications());
@@ -86,7 +80,7 @@ public abstract class Selection
      */
     public boolean containsStaticColumns()
     {
-        if (!cfm.hasStaticColumns())
+        if (!table.hasStaticColumns())
             return false;
 
         if (isWildcard())
@@ -107,7 +101,7 @@ public abstract class Selection
         if (isWildcard())
             return false;
 
-        for (ColumnDefinition def : getColumns())
+        for (ColumnMetadata def : getColumns())
         {
             if (!def.isPartitionKey() && !def.isStatic())
                 return false;
@@ -126,19 +120,19 @@ public abstract class Selection
         return new ResultSet.ResultMetadata(Arrays.asList(jsonSpec));
     }
 
-    public static Selection wildcard(CFMetaData cfm)
+    public static Selection wildcard(TableMetadata table)
     {
-        List<ColumnDefinition> all = new ArrayList<>(cfm.allColumns().size());
-        Iterators.addAll(all, cfm.allColumnsInSelectOrder());
-        return new SimpleSelection(cfm, all, true);
+        List<ColumnMetadata> all = new ArrayList<>(table.columns().size());
+        Iterators.addAll(all, table.allColumnsInSelectOrder());
+        return new SimpleSelection(table, all, true);
     }
 
-    public static Selection forColumns(CFMetaData cfm, List<ColumnDefinition> columns)
+    public static Selection forColumns(TableMetadata table, List<ColumnMetadata> columns)
     {
-        return new SimpleSelection(cfm, columns, false);
+        return new SimpleSelection(table, columns, false);
     }
 
-    public int addColumnForOrdering(ColumnDefinition c)
+    public int addColumnForOrdering(ColumnMetadata c)
     {
         columns.add(c);
         metadata.addNonSerializedColumn(c);
@@ -159,17 +153,17 @@ public abstract class Selection
         return false;
     }
 
-    public static Selection fromSelectors(CFMetaData cfm, List<RawSelector> rawSelectors, VariableSpecifications boundNames, boolean hasGroupBy)
+    public static Selection fromSelectors(TableMetadata table, List<RawSelector> rawSelectors, VariableSpecifications boundNames, boolean hasGroupBy)
     {
-        List<ColumnDefinition> defs = new ArrayList<>();
+        List<ColumnMetadata> defs = new ArrayList<>();
 
         SelectorFactories factories =
-                SelectorFactories.createFactoriesAndCollectColumnDefinitions(RawSelector.toSelectables(rawSelectors, cfm), null, cfm, defs, boundNames);
-        SelectionColumnMapping mapping = collectColumnMappings(cfm, rawSelectors, factories);
+                SelectorFactories.createFactoriesAndCollectColumnDefinitions(RawSelector.toSelectables(rawSelectors, table), null, table, defs, boundNames);
+        SelectionColumnMapping mapping = collectColumnMappings(table, rawSelectors, factories);
 
         return (processesSelection(rawSelectors) || rawSelectors.size() != defs.size() || hasGroupBy)
-               ? new SelectionWithProcessing(cfm, defs, mapping, factories)
-               : new SimpleSelection(cfm, defs, mapping, false);
+               ? new SelectionWithProcessing(table, defs, mapping, factories)
+               : new SimpleSelection(table, defs, mapping, false);
     }
 
     /**
@@ -177,7 +171,7 @@ public abstract class Selection
      * @param c the column
      * @return the index of the specified column within the resultset or -1
      */
-    public int getResultSetIndex(ColumnDefinition c)
+    public int getResultSetIndex(ColumnMetadata c)
     {
         return getColumnIndex(c);
     }
@@ -187,7 +181,7 @@ public abstract class Selection
      * @param c the column
      * @return the index of the specified column or -1
      */
-    protected final int getColumnIndex(ColumnDefinition c)
+    protected final int getColumnIndex(ColumnMetadata c)
     {
         for (int i = 0, m = columns.size(); i < m; i++)
             if (columns.get(i).name.equals(c.name))
@@ -195,7 +189,7 @@ public abstract class Selection
         return -1;
     }
 
-    private static SelectionColumnMapping collectColumnMappings(CFMetaData cfm,
+    private static SelectionColumnMapping collectColumnMappings(TableMetadata table,
                                                                 List<RawSelector> rawSelectors,
                                                                 SelectorFactories factories)
     {
@@ -203,7 +197,7 @@ public abstract class Selection
         Iterator<RawSelector> iter = rawSelectors.iterator();
         for (Selector.Factory factory : factories)
         {
-            ColumnSpecification colSpec = factory.getColumnSpecification(cfm);
+            ColumnSpecification colSpec = factory.getColumnSpecification(table);
             ColumnIdentifier alias = iter.next().alias;
             factory.addColumnMapping(selectionColumns,
                                      alias == null ? colSpec : colSpec.withAlias(alias));
@@ -216,7 +210,7 @@ public abstract class Selection
     /**
      * @return the list of CQL3 columns value this SelectionClause needs.
      */
-    public List<ColumnDefinition> getColumns()
+    public List<ColumnMetadata> getColumns()
     {
         return columns;
     }
@@ -442,13 +436,13 @@ public abstract class Selection
     {
         private final boolean isWildcard;
 
-        public SimpleSelection(CFMetaData cfm, List<ColumnDefinition> columns, boolean isWildcard)
+        public SimpleSelection(TableMetadata table, List<ColumnMetadata> columns, boolean isWildcard)
         {
-            this(cfm, columns, SelectionColumnMapping.simpleMapping(columns), isWildcard);
+            this(table, columns, SelectionColumnMapping.simpleMapping(columns), isWildcard);
         }
 
-        public SimpleSelection(CFMetaData cfm,
-                               List<ColumnDefinition> columns,
+        public SimpleSelection(TableMetadata table,
+                               List<ColumnMetadata> columns,
                                SelectionColumnMapping metadata,
                                boolean isWildcard)
         {
@@ -457,7 +451,7 @@ public abstract class Selection
              * could filter those duplicate out of columns. But since we're very unlikely to
              * get much duplicate in practice, it's more efficient not to bother.
              */
-            super(cfm, columns, metadata, false, false);
+            super(table, columns, metadata, false, false);
             this.isWildcard = isWildcard;
         }
 
@@ -505,12 +499,12 @@ public abstract class Selection
     {
         private final SelectorFactories factories;
 
-        public SelectionWithProcessing(CFMetaData cfm,
-                                       List<ColumnDefinition> columns,
+        public SelectionWithProcessing(TableMetadata table,
+                                       List<ColumnMetadata> columns,
                                        SelectionColumnMapping metadata,
                                        SelectorFactories factories) throws InvalidRequestException
         {
-            super(cfm,
+            super(table,
                   columns,
                   metadata,
                   factories.containsWritetimeSelectorFactory(),
@@ -526,7 +520,7 @@ public abstract class Selection
         }
 
         @Override
-        public int getResultSetIndex(ColumnDefinition c)
+        public int getResultSetIndex(ColumnMetadata c)
         {
             int index = getColumnIndex(c);
 
@@ -541,7 +535,7 @@ public abstract class Selection
         }
 
         @Override
-        public int addColumnForOrdering(ColumnDefinition c)
+        public int addColumnForOrdering(ColumnMetadata c)
         {
             int index = super.addColumnForOrdering(c);
             factories.addSelectorForOrdering(c, index);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/selection/SelectionColumnMapping.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/SelectionColumnMapping.java b/src/java/org/apache/cassandra/cql3/selection/SelectionColumnMapping.java
index 5072066..cd04d94 100644
--- a/src/java/org/apache/cassandra/cql3/selection/SelectionColumnMapping.java
+++ b/src/java/org/apache/cassandra/cql3/selection/SelectionColumnMapping.java
@@ -26,7 +26,7 @@ import java.util.stream.Collectors;
 import com.google.common.base.Objects;
 import com.google.common.collect.*;
 
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.cql3.ColumnSpecification;
 
 /**
@@ -38,7 +38,7 @@ import org.apache.cassandra.cql3.ColumnSpecification;
 public class SelectionColumnMapping implements SelectionColumns
 {
     private final ArrayList<ColumnSpecification> columnSpecifications;
-    private final HashMultimap<ColumnSpecification, ColumnDefinition> columnMappings;
+    private final HashMultimap<ColumnSpecification, ColumnMetadata> columnMappings;
 
     private SelectionColumnMapping()
     {
@@ -51,15 +51,15 @@ public class SelectionColumnMapping implements SelectionColumns
         return new SelectionColumnMapping();
     }
 
-    protected static SelectionColumnMapping simpleMapping(Iterable<ColumnDefinition> columnDefinitions)
+    protected static SelectionColumnMapping simpleMapping(Iterable<ColumnMetadata> columnDefinitions)
     {
         SelectionColumnMapping mapping = new SelectionColumnMapping();
-        for (ColumnDefinition def: columnDefinitions)
+        for (ColumnMetadata def: columnDefinitions)
             mapping.addMapping(def, def);
         return mapping;
     }
 
-    protected SelectionColumnMapping addMapping(ColumnSpecification colSpec, ColumnDefinition column)
+    protected SelectionColumnMapping addMapping(ColumnSpecification colSpec, ColumnMetadata column)
     {
         columnSpecifications.add(colSpec);
         // functions without arguments do not map to any column, so don't
@@ -69,7 +69,7 @@ public class SelectionColumnMapping implements SelectionColumns
         return this;
     }
 
-    protected SelectionColumnMapping addMapping(ColumnSpecification colSpec, Iterable<ColumnDefinition> columns)
+    protected SelectionColumnMapping addMapping(ColumnSpecification colSpec, Iterable<ColumnMetadata> columns)
     {
         columnSpecifications.add(colSpec);
         columnMappings.putAll(colSpec, columns);
@@ -83,7 +83,7 @@ public class SelectionColumnMapping implements SelectionColumns
         return Lists.newArrayList(columnSpecifications);
     }
 
-    public Multimap<ColumnSpecification, ColumnDefinition> getMappings()
+    public Multimap<ColumnSpecification, ColumnMetadata> getMappings()
     {
         return Multimaps.unmodifiableMultimap(columnMappings);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/selection/SelectionColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/SelectionColumns.java b/src/java/org/apache/cassandra/cql3/selection/SelectionColumns.java
index 151a2f3..f4a8593 100644
--- a/src/java/org/apache/cassandra/cql3/selection/SelectionColumns.java
+++ b/src/java/org/apache/cassandra/cql3/selection/SelectionColumns.java
@@ -24,7 +24,7 @@ import java.util.List;
 
 import com.google.common.collect.Multimap;
 
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.cql3.ColumnSpecification;
 
 /**
@@ -34,5 +34,5 @@ import org.apache.cassandra.cql3.ColumnSpecification;
 public interface SelectionColumns
 {
     List<ColumnSpecification> getColumnSpecifications();
-    Multimap<ColumnSpecification, ColumnDefinition> getMappings();
+    Multimap<ColumnSpecification, ColumnMetadata> getMappings();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/selection/Selector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/Selector.java b/src/java/org/apache/cassandra/cql3/selection/Selector.java
index 6f83dfc..420af9c 100644
--- a/src/java/org/apache/cassandra/cql3/selection/Selector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/Selector.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.cql3.selection;
 import java.nio.ByteBuffer;
 import java.util.List;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.ColumnSpecification;
 import org.apache.cassandra.cql3.QueryOptions;
@@ -51,13 +51,13 @@ public abstract class Selector
          * Returns the column specification corresponding to the output value of the selector instances created by
          * this factory.
          *
-         * @param cfm the column family meta data
+         * @param table the table meta data
          * @return a column specification
          */
-        public final ColumnSpecification getColumnSpecification(CFMetaData cfm)
+        public final ColumnSpecification getColumnSpecification(TableMetadata table)
         {
-            return new ColumnSpecification(cfm.ksName,
-                                           cfm.cfName,
+            return new ColumnSpecification(table.keyspace,
+                                           table.name,
                                            new ColumnIdentifier(getColumnName(), true), // note that the name is not necessarily
                                                                                         // a true column name so we shouldn't intern it
                                            getReturnType());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/selection/SelectorFactories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/SelectorFactories.java b/src/java/org/apache/cassandra/cql3/selection/SelectorFactories.java
index 41bf193..25a1059 100644
--- a/src/java/org/apache/cassandra/cql3/selection/SelectorFactories.java
+++ b/src/java/org/apache/cassandra/cql3/selection/SelectorFactories.java
@@ -21,8 +21,8 @@ import java.util.*;
 
 import com.google.common.collect.Lists;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.VariableSpecifications;
 import org.apache.cassandra.cql3.functions.Function;
@@ -63,7 +63,7 @@ final class SelectorFactories implements Iterable<Selector.Factory>
      * is any such expectations, or {@code null} otherwise. This will be {@code null} when called on
      * the top-level selectables, but may not be for selectable nested within a function for instance
      * (as the argument selectable will be expected to be of the type expected by the function).
-     * @param cfm the Column Family Definition
+     * @param table the table Definition
      * @param defs the collector parameter for the column definitions
      * @param boundNames the collector for the specification of bound markers in the selection
      * @return a new <code>SelectorFactories</code> instance
@@ -71,18 +71,18 @@ final class SelectorFactories implements Iterable<Selector.Factory>
      */
     public static SelectorFactories createFactoriesAndCollectColumnDefinitions(List<Selectable> selectables,
                                                                                List<AbstractType<?>> expectedTypes,
-                                                                               CFMetaData cfm,
-                                                                               List<ColumnDefinition> defs,
+                                                                               TableMetadata table,
+                                                                               List<ColumnMetadata> defs,
                                                                                VariableSpecifications boundNames)
                                                                                throws InvalidRequestException
     {
-        return new SelectorFactories(selectables, expectedTypes, cfm, defs, boundNames);
+        return new SelectorFactories(selectables, expectedTypes, table, defs, boundNames);
     }
 
     private SelectorFactories(List<Selectable> selectables,
                               List<AbstractType<?>> expectedTypes,
-                              CFMetaData cfm,
-                              List<ColumnDefinition> defs,
+                              TableMetadata table,
+                              List<ColumnMetadata> defs,
                               VariableSpecifications boundNames)
                               throws InvalidRequestException
     {
@@ -92,7 +92,7 @@ final class SelectorFactories implements Iterable<Selector.Factory>
         {
             Selectable selectable = selectables.get(i);
             AbstractType<?> expectedType = expectedTypes == null ? null : expectedTypes.get(i);
-            Factory factory = selectable.newSelectorFactory(cfm, expectedType, defs, boundNames);
+            Factory factory = selectable.newSelectorFactory(table, expectedType, defs, boundNames);
             containsWritetimeFactory |= factory.isWritetimeSelectorFactory();
             containsTTLFactory |= factory.isTTLSelectorFactory();
             if (factory.isAggregateSelectorFactory())
@@ -122,7 +122,7 @@ final class SelectorFactories implements Iterable<Selector.Factory>
      * @param def the column that is needed for ordering
      * @param index the index of the column definition in the Selection's list of columns
      */
-    public void addSelectorForOrdering(ColumnDefinition def, int index)
+    public void addSelectorForOrdering(ColumnMetadata def, int index)
     {
         factories.add(SimpleSelector.newFactory(def, index));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/selection/SetSelector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/SetSelector.java b/src/java/org/apache/cassandra/cql3/selection/SetSelector.java
index 34de078..2ee086e 100644
--- a/src/java/org/apache/cassandra/cql3/selection/SetSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/SetSelector.java
@@ -22,8 +22,6 @@ import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
 
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.ColumnSpecification;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.Sets;
 import org.apache.cassandra.cql3.selection.Selection.ResultSetBuilder;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java b/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java
index 8d5a305..cbd65a9 100644
--- a/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.cql3.selection;
 
 import java.nio.ByteBuffer;
 
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.cql3.ColumnSpecification;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.selection.Selection.ResultSetBuilder;
@@ -35,7 +35,7 @@ public final class SimpleSelector extends Selector
     private ByteBuffer current;
     private boolean isSet;
 
-    public static Factory newFactory(final ColumnDefinition def, final int idx)
+    public static Factory newFactory(final ColumnMetadata def, final int idx)
     {
         return new Factory()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/selection/TermSelector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/TermSelector.java b/src/java/org/apache/cassandra/cql3/selection/TermSelector.java
index 2b0e975..bdb4953 100644
--- a/src/java/org/apache/cassandra/cql3/selection/TermSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/TermSelector.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.cql3.selection;
 
 import java.nio.ByteBuffer;
 
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.cql3.ColumnSpecification;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.Term;
@@ -54,7 +54,7 @@ public class TermSelector extends Selector
 
             protected void addColumnMapping(SelectionColumnMapping mapping, ColumnSpecification resultColumn)
             {
-               mapping.addMapping(resultColumn, (ColumnDefinition)null);
+               mapping.addMapping(resultColumn, (ColumnMetadata)null);
             }
 
             public Selector newInstance(QueryOptions options)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/selection/UserTypeSelector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/UserTypeSelector.java b/src/java/org/apache/cassandra/cql3/selection/UserTypeSelector.java
index 3c298b5..7600e1d 100644
--- a/src/java/org/apache/cassandra/cql3/selection/UserTypeSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/UserTypeSelector.java
@@ -23,7 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.cql3.ColumnSpecification;
 import org.apache.cassandra.cql3.FieldIdentifier;
 import org.apache.cassandra.cql3.QueryOptions;
@@ -76,7 +76,7 @@ final class UserTypeSelector extends Selector
 
                 if (tmpMapping.getMappings().get(resultsColumn).isEmpty())
                     // add a null mapping for cases where the collection is empty
-                    mapping.addMapping(resultsColumn, (ColumnDefinition)null);
+                    mapping.addMapping(resultsColumn, (ColumnMetadata)null);
                 else
                     // collate the mapped columns from the child factories & add those
                     mapping.addMapping(resultsColumn, tmpMapping.getMappings().values());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java b/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java
index 939f8c2..1e38337 100644
--- a/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.cql3.selection;
 
 import java.nio.ByteBuffer;
 
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.ColumnSpecification;
 import org.apache.cassandra.cql3.selection.Selection.ResultSetBuilder;
@@ -37,7 +37,7 @@ final class WritetimeOrTTLSelector extends Selector
     private ByteBuffer current;
     private boolean isSet;
 
-    public static Factory newFactory(final ColumnDefinition def, final int idx, final boolean isWritetime)
+    public static Factory newFactory(final ColumnMetadata def, final int idx, final boolean isWritetime)
     {
         return new Factory()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
index f3df6f2..68700f2 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
@@ -18,14 +18,14 @@
 package org.apache.cassandra.cql3.statements;
 
 import org.apache.cassandra.auth.Permission;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.config.SchemaConstants;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.locator.LocalStrategy;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.MigrationManager;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.transport.Event;
 
@@ -54,7 +54,7 @@ public class AlterKeyspaceStatement extends SchemaAlteringStatement
 
     public void validate(ClientState state) throws RequestValidationException
     {
-        KeyspaceMetadata ksm = Schema.instance.getKSMetaData(name);
+        KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(name);
         if (ksm == null)
             throw new InvalidRequestException("Unknown keyspace " + name);
         if (SchemaConstants.isSystemKeyspace(ksm.name))
@@ -79,7 +79,7 @@ public class AlterKeyspaceStatement extends SchemaAlteringStatement
 
     public Event.SchemaChange announceMigration(QueryState queryState, boolean isLocalOnly) throws RequestValidationException
     {
-        KeyspaceMetadata oldKsm = Schema.instance.getKSMetaData(name);
+        KeyspaceMetadata oldKsm = Schema.instance.getKeyspaceMetadata(name);
         // In the (very) unlikely case the keyspace was dropped since validate()
         if (oldKsm == null)
             throw new InvalidRequestException("Unknown keyspace " + name);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
index f2f0698..35459de 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
@@ -23,7 +23,6 @@ import java.util.stream.Collectors;
 import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.auth.Permission;
-import org.apache.cassandra.config.*;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
@@ -31,11 +30,16 @@ import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.db.view.View;
 import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.DroppedColumn;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.schema.Indexes;
+import org.apache.cassandra.schema.MigrationManager;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.TableParams;
+import org.apache.cassandra.schema.ViewMetadata;
 import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.transport.Event;
 
@@ -48,7 +52,7 @@ public class AlterTableStatement extends SchemaAlteringStatement
 
     public final Type oType;
     private final TableAttributes attrs;
-    private final Map<ColumnDefinition.Raw, ColumnDefinition.Raw> renames;
+    private final Map<ColumnMetadata.Raw, ColumnMetadata.Raw> renames;
     private final List<AlterTableStatementColumn> colNameList;
     private final Long deleteTimestamp;
 
@@ -56,7 +60,7 @@ public class AlterTableStatement extends SchemaAlteringStatement
                                Type type,
                                List<AlterTableStatementColumn> colDataList,
                                TableAttributes attrs,
-                               Map<ColumnDefinition.Raw, ColumnDefinition.Raw> renames,
+                               Map<ColumnMetadata.Raw, ColumnMetadata.Raw> renames,
                                Long deleteTimestamp)
     {
         super(name);
@@ -79,32 +83,33 @@ public class AlterTableStatement extends SchemaAlteringStatement
 
     public Event.SchemaChange announceMigration(QueryState queryState, boolean isLocalOnly) throws RequestValidationException
     {
-        CFMetaData meta = Validation.validateColumnFamily(keyspace(), columnFamily());
-        if (meta.isView())
+        TableMetadata current = Schema.instance.validateTable(keyspace(), columnFamily());
+        if (current.isView())
             throw new InvalidRequestException("Cannot use ALTER TABLE on Materialized View");
 
-        CFMetaData cfm = meta.copy();
+        TableMetadata.Builder builder = current.unbuild();
+
         ColumnIdentifier columnName = null;
-        ColumnDefinition def = null;
+        ColumnMetadata def = null;
         CQL3Type.Raw dataType = null;
         boolean isStatic = false;
         CQL3Type validator = null;
 
-        List<ViewDefinition> viewUpdates = null;
-        Iterable<ViewDefinition> views = View.findAll(keyspace(), columnFamily());
+        List<ViewMetadata> viewUpdates = new ArrayList<>();
+        Iterable<ViewMetadata> views = View.findAll(keyspace(), columnFamily());
 
         switch (oType)
         {
             case ALTER:
                 throw new InvalidRequestException("Altering of types is not allowed");
             case ADD:
-                if (cfm.isDense())
+                if (current.isDense())
                     throw new InvalidRequestException("Cannot add new column to a COMPACT STORAGE table");
 
                 for (AlterTableStatementColumn colData : colNameList)
                 {
-                    columnName = colData.getColumnName().getIdentifier(cfm);
-                    def = cfm.getColumnDefinition(columnName);
+                    columnName = colData.getColumnName().getIdentifier(current);
+                    def = builder.getColumn(columnName);
                     dataType = colData.getColumnType();
                     assert dataType != null;
                     isStatic = colData.getStaticType();
@@ -113,9 +118,9 @@ public class AlterTableStatement extends SchemaAlteringStatement
 
                     if (isStatic)
                     {
-                        if (!cfm.isCompound())
+                        if (!current.isCompound())
                             throw new InvalidRequestException("Static columns are not allowed in COMPACT STORAGE tables");
-                        if (cfm.clusteringColumns().isEmpty())
+                        if (current.clusteringColumns().isEmpty())
                             throw new InvalidRequestException("Static columns are only useful (and thus allowed) if the table has at least one clustering column");
                     }
 
@@ -132,64 +137,54 @@ public class AlterTableStatement extends SchemaAlteringStatement
                     }
 
                     // Cannot re-add a dropped counter column. See #7831.
-                    if (meta.isCounter() && meta.getDroppedColumns().containsKey(columnName.bytes))
+                    if (current.isCounter() && current.getDroppedColumn(columnName.bytes) != null)
                         throw new InvalidRequestException(String.format("Cannot re-add previously dropped counter column %s", columnName));
 
                     AbstractType<?> type = validator.getType();
                     if (type.isCollection() && type.isMultiCell())
                     {
-                        if (!cfm.isCompound())
+                        if (!current.isCompound())
                             throw new InvalidRequestException("Cannot use non-frozen collections in COMPACT STORAGE tables");
-                        if (cfm.isSuper())
+                        if (current.isSuper())
                             throw new InvalidRequestException("Cannot use non-frozen collections with super column families");
 
                         // If there used to be a non-frozen collection column with the same name (that has been dropped),
                         // we could still have some data using the old type, and so we can't allow adding a collection
                         // with the same name unless the types are compatible (see #6276).
-                        CFMetaData.DroppedColumn dropped = cfm.getDroppedColumns().get(columnName.bytes);
-                        if (dropped != null && dropped.type instanceof CollectionType
-                            && dropped.type.isMultiCell() && !type.isCompatibleWith(dropped.type))
+                        DroppedColumn dropped = current.droppedColumns.get(columnName.bytes);
+                        if (dropped != null && dropped.column.type instanceof CollectionType
+                            && dropped.column.type.isMultiCell() && !type.isCompatibleWith(dropped.column.type))
                         {
                             String message =
                                 String.format("Cannot add a collection with the name %s because a collection with the same name"
                                               + " and a different type (%s) has already been used in the past",
                                               columnName,
-                                              dropped.type.asCQL3Type());
+                                              dropped.column.type.asCQL3Type());
                             throw new InvalidRequestException(message);
                         }
                     }
 
-                    cfm.addColumnDefinition(isStatic
-                                            ? ColumnDefinition.staticDef(cfm, columnName.bytes, type)
-                                            : ColumnDefinition.regularDef(cfm, columnName.bytes, type));
+                    builder.addColumn(isStatic
+                                    ? ColumnMetadata.staticColumn(current, columnName.bytes, type)
+                                    : ColumnMetadata.regularColumn(current, columnName.bytes, type));
 
                     // Adding a column to a table which has an include all view requires the column to be added to the view
                     // as well
                     if (!isStatic)
-                    {
-                        for (ViewDefinition view : views)
-                        {
+                        for (ViewMetadata view : views)
                             if (view.includeAllColumns)
-                            {
-                                ViewDefinition viewCopy = view.copy();
-                                viewCopy.metadata.addColumnDefinition(ColumnDefinition.regularDef(viewCopy.metadata, columnName.bytes, type));
-                                if (viewUpdates == null)
-                                    viewUpdates = new ArrayList<>();
-                                viewUpdates.add(viewCopy);
-                            }
-                        }
-                    }
+                                viewUpdates.add(view.withAddedRegularColumn(ColumnMetadata.regularColumn(view.metadata, columnName.bytes, type)));
+
                 }
                 break;
-
             case DROP:
-                if (!cfm.isCQLTable())
+                if (!current.isCQLTable())
                     throw new InvalidRequestException("Cannot drop columns from a non-CQL3 table");
 
                 for (AlterTableStatementColumn colData : colNameList)
                 {
-                    columnName = colData.getColumnName().getIdentifier(cfm);
-                    def = cfm.getColumnDefinition(columnName);
+                    columnName = colData.getColumnName().getIdentifier(current);
+                    def = builder.getColumn(columnName);
 
                     if (def == null)
                         throw new InvalidRequestException(String.format("Column %s was not found in table %s", columnName, columnFamily()));
@@ -201,53 +196,45 @@ public class AlterTableStatement extends SchemaAlteringStatement
                               throw new InvalidRequestException(String.format("Cannot drop PRIMARY KEY part %s", columnName));
                          case REGULAR:
                          case STATIC:
-                              ColumnDefinition toDelete = null;
-                              for (ColumnDefinition columnDef : cfm.partitionColumns())
-                              {
-                                   if (columnDef.name.equals(columnName))
-                                   {
-                                       toDelete = columnDef;
-                                       break;
-                                   }
-                               }
-                             assert toDelete != null;
-                             cfm.removeColumnDefinition(toDelete);
-                             cfm.recordColumnDrop(toDelete, deleteTimestamp  == null ? queryState.getTimestamp() : deleteTimestamp);
+                             builder.removeRegularOrStaticColumn(def.name);
+                             builder.recordColumnDrop(def, deleteTimestamp  == null ? queryState.getTimestamp() : deleteTimestamp);
                              break;
                     }
 
                     // If the dropped column is required by any secondary indexes
                     // we reject the operation, as the indexes must be dropped first
-                    Indexes allIndexes = cfm.getIndexes();
+                    Indexes allIndexes = current.indexes;
                     if (!allIndexes.isEmpty())
                     {
-                        ColumnFamilyStore store = Keyspace.openAndGetStore(cfm);
+                        ColumnFamilyStore store = Keyspace.openAndGetStore(current);
                         Set<IndexMetadata> dependentIndexes = store.indexManager.getDependentIndexes(def);
                         if (!dependentIndexes.isEmpty())
+                        {
                             throw new InvalidRequestException(String.format("Cannot drop column %s because it has " +
                                                                             "dependent secondary indexes (%s)",
                                                                             def,
                                                                             dependentIndexes.stream()
                                                                                             .map(i -> i.name)
                                                                                             .collect(Collectors.joining(","))));
+                        }
                     }
 
                     // If a column is dropped which is included in a view, we don't allow the drop to take place.
                     boolean rejectAlter = false;
-                    StringBuilder builder = new StringBuilder();
-                    for (ViewDefinition view : views)
+                    StringBuilder viewNames = new StringBuilder();
+                    for (ViewMetadata view : views)
                     {
                         if (!view.includes(columnName)) continue;
                         if (rejectAlter)
-                            builder.append(',');
+                            viewNames.append(',');
                         rejectAlter = true;
-                        builder.append(view.viewName);
+                        viewNames.append(view.name);
                     }
                     if (rejectAlter)
                         throw new InvalidRequestException(String.format("Cannot drop column %s, depended on by materialized views (%s.{%s})",
                                                                         columnName.toString(),
                                                                         keyspace(),
-                                                                        builder.toString()));
+                                                                        viewNames.toString()));
                 }
                 break;
             case OPTS:
@@ -255,7 +242,7 @@ public class AlterTableStatement extends SchemaAlteringStatement
                     throw new InvalidRequestException("ALTER TABLE WITH invoked, but no parameters found");
                 attrs.validate();
 
-                TableParams params = attrs.asAlteredTableParams(cfm.params);
+                TableParams params = attrs.asAlteredTableParams(current.params);
 
                 if (!Iterables.isEmpty(views) && params.gcGraceSeconds == 0)
                 {
@@ -266,44 +253,62 @@ public class AlterTableStatement extends SchemaAlteringStatement
                                                       "before being replayed.");
                 }
 
-                if (meta.isCounter() && params.defaultTimeToLive > 0)
+                if (current.isCounter() && params.defaultTimeToLive > 0)
                     throw new InvalidRequestException("Cannot set default_time_to_live on a table with counters");
 
-                cfm.params(params);
+                builder.params(params);
 
                 break;
             case RENAME:
-                for (Map.Entry<ColumnDefinition.Raw, ColumnDefinition.Raw> entry : renames.entrySet())
+                for (Map.Entry<ColumnMetadata.Raw, ColumnMetadata.Raw> entry : renames.entrySet())
                 {
-                    ColumnIdentifier from = entry.getKey().getIdentifier(cfm);
-                    ColumnIdentifier to = entry.getValue().getIdentifier(cfm);
-                    cfm.renameColumn(from, to);
+                    ColumnIdentifier from = entry.getKey().getIdentifier(current);
+                    ColumnIdentifier to = entry.getValue().getIdentifier(current);
 
-                    // If the view includes a renamed column, it must be renamed in the view table and the definition.
-                    for (ViewDefinition view : views)
+                    def = current.getColumn(from);
+                    if (def == null)
+                        throw new InvalidRequestException(String.format("Cannot rename unknown column %s in table %s", from, current.name));
+
+                    if (current.getColumn(to) != null)
+                        throw new InvalidRequestException(String.format("Cannot rename column %s to %s in table %s; another column of that name already exist", from, to, current.name));
+
+                    if (!def.isPrimaryKeyColumn())
+                        throw new InvalidRequestException(String.format("Cannot rename non PRIMARY KEY part %s", from));
+
+                    if (!current.indexes.isEmpty())
                     {
-                        if (!view.includes(from)) continue;
+                        ColumnFamilyStore store = Keyspace.openAndGetStore(current);
+                        Set<IndexMetadata> dependentIndexes = store.indexManager.getDependentIndexes(def);
+                        if (!dependentIndexes.isEmpty())
+                            throw new InvalidRequestException(String.format("Cannot rename column %s because it has " +
+                                                                            "dependent secondary indexes (%s)",
+                                                                            from,
+                                                                            dependentIndexes.stream()
+                                                                                            .map(i -> i.name)
+                                                                                            .collect(Collectors.joining(","))));
+                    }
+
+                    builder.renamePrimaryKeyColumn(from, to);
 
-                        ViewDefinition viewCopy = view.copy();
-                        ColumnIdentifier viewFrom = entry.getKey().getIdentifier(viewCopy.metadata);
-                        ColumnIdentifier viewTo = entry.getValue().getIdentifier(viewCopy.metadata);
-                        viewCopy.renameColumn(viewFrom, viewTo);
+                    // If the view includes a renamed column, it must be renamed in the view table and the definition.
+                    for (ViewMetadata view : views)
+                    {
+                        if (!view.includes(from))
+                            continue;
 
-                        if (viewUpdates == null)
-                            viewUpdates = new ArrayList<>();
-                        viewUpdates.add(viewCopy);
+                        ColumnIdentifier viewFrom = entry.getKey().getIdentifier(view.metadata);
+                        ColumnIdentifier viewTo = entry.getValue().getIdentifier(view.metadata);
+                        viewUpdates.add(view.renamePrimaryKeyColumn(viewFrom, viewTo));
                     }
                 }
                 break;
         }
 
-        MigrationManager.announceColumnFamilyUpdate(cfm, isLocalOnly);
+        // FIXME: Should really be a single announce for the table and views.
+        MigrationManager.announceTableUpdate(builder.build(), isLocalOnly);
+        for (ViewMetadata viewUpdate : viewUpdates)
+            MigrationManager.announceViewUpdate(viewUpdate, isLocalOnly);
 
-        if (viewUpdates != null)
-        {
-            for (ViewDefinition viewUpdate : viewUpdates)
-                MigrationManager.announceViewUpdate(viewUpdate, isLocalOnly);
-        }
         return new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TABLE, keyspace(), columnFamily());
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/statements/AlterTableStatementColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatementColumn.java b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatementColumn.java
index 7dea565..c1ba0d3 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatementColumn.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatementColumn.java
@@ -17,7 +17,7 @@
  */
 package org.apache.cassandra.cql3.statements;
 
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.cql3.CQL3Type;
 
 /**
@@ -28,10 +28,10 @@ import org.apache.cassandra.cql3.CQL3Type;
 public class AlterTableStatementColumn
 {
     private final CQL3Type.Raw dataType;
-    private final ColumnDefinition.Raw colName;
+    private final ColumnMetadata.Raw colName;
     private final Boolean isStatic;
 
-    public AlterTableStatementColumn(ColumnDefinition.Raw colName, CQL3Type.Raw dataType, boolean isStatic)
+    public AlterTableStatementColumn(ColumnMetadata.Raw colName, CQL3Type.Raw dataType, boolean isStatic)
     {
         assert colName != null;
         this.dataType = dataType; // will be null when dropping columns, and never null otherwise (for ADD and ALTER).
@@ -39,12 +39,12 @@ public class AlterTableStatementColumn
         this.isStatic = isStatic;
     }
 
-    public AlterTableStatementColumn(ColumnDefinition.Raw colName, CQL3Type.Raw dataType)
+    public AlterTableStatementColumn(ColumnMetadata.Raw colName, CQL3Type.Raw dataType)
     {
         this(colName, dataType, false);
     }
 
-    public AlterTableStatementColumn(ColumnDefinition.Raw colName)
+    public AlterTableStatementColumn(ColumnMetadata.Raw colName)
     {
         this(colName, null, false);
     }
@@ -54,7 +54,7 @@ public class AlterTableStatementColumn
         return dataType;
     }
 
-    public ColumnDefinition.Raw getColumnName()
+    public ColumnMetadata.Raw getColumnName()
     {
         return colName;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/statements/AlterTypeStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterTypeStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterTypeStatement.java
index 71d19fa..614b482 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTypeStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTypeStatement.java
@@ -17,17 +17,16 @@
  */
 package org.apache.cassandra.cql3.statements;
 
-import java.nio.ByteBuffer;
 import java.util.*;
 
 import org.apache.cassandra.auth.Permission;
-import org.apache.cassandra.config.*;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.MigrationManager;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.transport.Event;
 
@@ -86,7 +85,7 @@ public abstract class AlterTypeStatement extends SchemaAlteringStatement
 
     public Event.SchemaChange announceMigration(QueryState queryState, boolean isLocalOnly) throws InvalidRequestException, ConfigurationException
     {
-        KeyspaceMetadata ksm = Schema.instance.getKSMetaData(name.getKeyspace());
+        KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(name.getKeyspace());
         if (ksm == null)
             throw new InvalidRequestException(String.format("Cannot alter type in unknown keyspace %s", name.getKeyspace()));
 
@@ -100,134 +99,9 @@ public abstract class AlterTypeStatement extends SchemaAlteringStatement
         // but we also need to find all existing user types and CF using it and change them.
         MigrationManager.announceTypeUpdate(updated, isLocalOnly);
 
-        for (CFMetaData cfm : ksm.tables)
-        {
-            CFMetaData copy = cfm.copy();
-            boolean modified = false;
-            for (ColumnDefinition def : copy.allColumns())
-                modified |= updateDefinition(copy, def, toUpdate.keyspace, toUpdate.name, updated);
-            if (modified)
-                MigrationManager.announceColumnFamilyUpdate(copy, isLocalOnly);
-        }
-
-        for (ViewDefinition view : ksm.views)
-        {
-            ViewDefinition copy = view.copy();
-            boolean modified = false;
-            for (ColumnDefinition def : copy.metadata.allColumns())
-                modified |= updateDefinition(copy.metadata, def, toUpdate.keyspace, toUpdate.name, updated);
-            if (modified)
-                MigrationManager.announceViewUpdate(copy, isLocalOnly);
-        }
-
-        // Other user types potentially using the updated type
-        for (UserType ut : ksm.types)
-        {
-            // Re-updating the type we've just updated would be harmless but useless so we avoid it.
-            // Besides, we use the occasion to drop the old version of the type if it's a type rename
-            if (ut.keyspace.equals(toUpdate.keyspace) && ut.name.equals(toUpdate.name))
-            {
-                if (!ut.keyspace.equals(updated.keyspace) || !ut.name.equals(updated.name))
-                    MigrationManager.announceTypeDrop(ut);
-                continue;
-            }
-            AbstractType<?> upd = updateWith(ut, toUpdate.keyspace, toUpdate.name, updated);
-            if (upd != null)
-                MigrationManager.announceTypeUpdate((UserType) upd, isLocalOnly);
-        }
         return new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TYPE, keyspace(), name.getStringTypeName());
     }
 
-    private boolean updateDefinition(CFMetaData cfm, ColumnDefinition def, String keyspace, ByteBuffer toReplace, UserType updated)
-    {
-        AbstractType<?> t = updateWith(def.type, keyspace, toReplace, updated);
-        if (t == null)
-            return false;
-
-        // We need to update this validator ...
-        cfm.addOrReplaceColumnDefinition(def.withNewType(t));
-        return true;
-    }
-
-    // Update the provided type were all instance of a given userType is replaced by a new version
-    // Note that this methods reaches inside other UserType, CompositeType and CollectionType.
-    private static AbstractType<?> updateWith(AbstractType<?> type, String keyspace, ByteBuffer toReplace, UserType updated)
-    {
-        if (type instanceof UserType)
-        {
-            UserType ut = (UserType)type;
-
-            // If it's directly the type we've updated, then just use the new one.
-            if (keyspace.equals(ut.keyspace) && toReplace.equals(ut.name))
-                return type.isMultiCell() ? updated : updated.freeze();
-
-            // Otherwise, check for nesting
-            List<AbstractType<?>> updatedTypes = updateTypes(ut.fieldTypes(), keyspace, toReplace, updated);
-            return updatedTypes == null ? null : new UserType(ut.keyspace, ut.name, new ArrayList<>(ut.fieldNames()), updatedTypes, type.isMultiCell());
-        }
-        else if (type instanceof TupleType)
-        {
-            TupleType tt = (TupleType)type;
-            List<AbstractType<?>> updatedTypes = updateTypes(tt.allTypes(), keyspace, toReplace, updated);
-            return updatedTypes == null ? null : new TupleType(updatedTypes);
-        }
-        else if (type instanceof CompositeType)
-        {
-            CompositeType ct = (CompositeType)type;
-            List<AbstractType<?>> updatedTypes = updateTypes(ct.types, keyspace, toReplace, updated);
-            return updatedTypes == null ? null : CompositeType.getInstance(updatedTypes);
-        }
-        else if (type instanceof CollectionType)
-        {
-            if (type instanceof ListType)
-            {
-                AbstractType<?> t = updateWith(((ListType)type).getElementsType(), keyspace, toReplace, updated);
-                if (t == null)
-                    return null;
-                return ListType.getInstance(t, type.isMultiCell());
-            }
-            else if (type instanceof SetType)
-            {
-                AbstractType<?> t = updateWith(((SetType)type).getElementsType(), keyspace, toReplace, updated);
-                if (t == null)
-                    return null;
-                return SetType.getInstance(t, type.isMultiCell());
-            }
-            else
-            {
-                assert type instanceof MapType;
-                MapType mt = (MapType)type;
-                AbstractType<?> k = updateWith(mt.getKeysType(), keyspace, toReplace, updated);
-                AbstractType<?> v = updateWith(mt.getValuesType(), keyspace, toReplace, updated);
-                if (k == null && v == null)
-                    return null;
-                return MapType.getInstance(k == null ? mt.getKeysType() : k, v == null ? mt.getValuesType() : v, type.isMultiCell());
-            }
-        }
-        else
-        {
-            return null;
-        }
-    }
-
-    private static List<AbstractType<?>> updateTypes(List<AbstractType<?>> toUpdate, String keyspace, ByteBuffer toReplace, UserType updated)
-    {
-        // But this can also be nested.
-        List<AbstractType<?>> updatedTypes = null;
-        for (int i = 0; i < toUpdate.size(); i++)
-        {
-            AbstractType<?> t = updateWith(toUpdate.get(i), keyspace, toReplace, updated);
-            if (t == null)
-                continue;
-
-            if (updatedTypes == null)
-                updatedTypes = new ArrayList<>(toUpdate);
-
-            updatedTypes.set(i, t);
-        }
-        return updatedTypes;
-    }
-
     protected void checkTypeNotUsedByAggregate(KeyspaceMetadata ksm)
     {
         ksm.functions.udas().filter(aggregate -> aggregate.initialCondition() != null && aggregate.stateType().referencesUserType(name.getStringTypeName()))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/statements/AlterViewStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterViewStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterViewStatement.java
index e73d9bf..fbfc54c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterViewStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterViewStatement.java
@@ -18,18 +18,18 @@
 package org.apache.cassandra.cql3.statements;
 
 import org.apache.cassandra.auth.Permission;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.config.ViewDefinition;
 import org.apache.cassandra.cql3.CFName;
-import org.apache.cassandra.cql3.Validation;
 import org.apache.cassandra.db.view.View;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.schema.MigrationManager;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.schema.TableParams;
+import org.apache.cassandra.schema.ViewMetadata;
 import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.transport.Event;
 
@@ -45,9 +45,9 @@ public class AlterViewStatement extends SchemaAlteringStatement
 
     public void checkAccess(ClientState state) throws UnauthorizedException, InvalidRequestException
     {
-        CFMetaData baseTable = View.findBaseTable(keyspace(), columnFamily());
+        TableMetadataRef baseTable = View.findBaseTable(keyspace(), columnFamily());
         if (baseTable != null)
-            state.hasColumnFamilyAccess(keyspace(), baseTable.cfName, Permission.ALTER);
+            state.hasColumnFamilyAccess(keyspace(), baseTable.name, Permission.ALTER);
     }
 
     public void validate(ClientState state)
@@ -57,18 +57,18 @@ public class AlterViewStatement extends SchemaAlteringStatement
 
     public Event.SchemaChange announceMigration(QueryState queryState, boolean isLocalOnly) throws RequestValidationException
     {
-        CFMetaData meta = Validation.validateColumnFamily(keyspace(), columnFamily());
+        TableMetadata meta = Schema.instance.validateTable(keyspace(), columnFamily());
         if (!meta.isView())
             throw new InvalidRequestException("Cannot use ALTER MATERIALIZED VIEW on Table");
 
-        ViewDefinition viewCopy = Schema.instance.getView(keyspace(), columnFamily()).copy();
+        ViewMetadata current = Schema.instance.getView(keyspace(), columnFamily());
 
         if (attrs == null)
             throw new InvalidRequestException("ALTER MATERIALIZED VIEW WITH invoked, but no parameters found");
 
         attrs.validate();
 
-        TableParams params = attrs.asAlteredTableParams(viewCopy.metadata.params);
+        TableParams params = attrs.asAlteredTableParams(current.metadata.params);
         if (params.gcGraceSeconds == 0)
         {
             throw new InvalidRequestException("Cannot alter gc_grace_seconds of a materialized view to 0, since this " +
@@ -83,9 +83,9 @@ public class AlterViewStatement extends SchemaAlteringStatement
                                               "the corresponding data in the parent table.");
         }
 
-        viewCopy.metadata.params(params);
+        ViewMetadata updated = current.copy(current.metadata.unbuild().params(params).build());
 
-        MigrationManager.announceViewUpdate(viewCopy, isLocalOnly);
+        MigrationManager.announceViewUpdate(updated, isLocalOnly);
         return new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TABLE, keyspace(), columnFamily());
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 25e3e22..e181968 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -27,8 +27,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.helpers.MessageFormatter;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.*;
@@ -60,9 +61,9 @@ public class BatchStatement implements CQLStatement
     private final List<ModificationStatement> statements;
 
     // Columns modified for each table (keyed by the table ID)
-    private final Map<UUID, PartitionColumns> updatedColumns;
+    private final Map<TableId, RegularAndStaticColumns> updatedColumns;
     // Columns on which there is conditions. Note that if there is any, then the batch can only be on a single partition (and thus table).
-    private final PartitionColumns conditionColumns;
+    private final RegularAndStaticColumns conditionColumns;
 
     private final boolean updatesRegularRows;
     private final boolean updatesStaticRow;
@@ -98,13 +99,13 @@ public class BatchStatement implements CQLStatement
 
         boolean hasConditions = false;
         MultiTableColumnsBuilder regularBuilder = new MultiTableColumnsBuilder();
-        PartitionColumns.Builder conditionBuilder = PartitionColumns.builder();
+        RegularAndStaticColumns.Builder conditionBuilder = RegularAndStaticColumns.builder();
         boolean updateRegular = false;
         boolean updateStatic = false;
 
         for (ModificationStatement stmt : statements)
         {
-            regularBuilder.addAll(stmt.cfm, stmt.updatedColumns());
+            regularBuilder.addAll(stmt.metadata(), stmt.updatedColumns());
             updateRegular |= stmt.updatesRegularRows();
             if (stmt.hasConditions())
             {
@@ -227,11 +228,11 @@ public class BatchStatement implements CQLStatement
         for (int i = 0; i < statements.size(); i++)
         {
             ModificationStatement statement = statements.get(i);
-            if (isLogged() && statement.cfm.params.gcGraceSeconds == 0)
+            if (isLogged() && statement.metadata().params.gcGraceSeconds == 0)
             {
                 if (tablesWithZeroGcGs == null)
                     tablesWithZeroGcGs = new HashSet<>();
-                tablesWithZeroGcGs.add(String.format("%s.%s", statement.cfm.ksName, statement.cfm.cfName));
+                tablesWithZeroGcGs.add(statement.metadata.toString());
             }
             QueryOptions statementOptions = options.forStatement(i);
             long timestamp = attrs.getTimestamp(now, statementOptions);
@@ -278,7 +279,7 @@ public class BatchStatement implements CQLStatement
             for (IMutation mutation : mutations)
             {
                 for (PartitionUpdate update : mutation.getPartitionUpdates())
-                    tableNames.add(String.format("%s.%s", update.metadata().ksName, update.metadata().cfName));
+                    tableNames.add(update.metadata().toString());
             }
 
             long failThreshold = DatabaseDescriptor.getBatchSizeFailThreshold();
@@ -314,7 +315,7 @@ public class BatchStatement implements CQLStatement
                 {
                     keySet.add(update.partitionKey());
 
-                    tableNames.add(String.format("%s.%s", update.metadata().ksName, update.metadata().cfName));
+                    tableNames.add(update.metadata().toString());
                 }
             }
 
@@ -385,12 +386,12 @@ public class BatchStatement implements CQLStatement
     private ResultMessage executeWithConditions(BatchQueryOptions options, QueryState state, long queryStartNanoTime)
     throws RequestExecutionException, RequestValidationException
     {
-        Pair<CQL3CasRequest, Set<ColumnDefinition>> p = makeCasRequest(options, state);
+        Pair<CQL3CasRequest, Set<ColumnMetadata>> p = makeCasRequest(options, state);
         CQL3CasRequest casRequest = p.left;
-        Set<ColumnDefinition> columnsWithConditions = p.right;
+        Set<ColumnMetadata> columnsWithConditions = p.right;
 
-        String ksName = casRequest.cfm.ksName;
-        String tableName = casRequest.cfm.cfName;
+        String ksName = casRequest.metadata.keyspace;
+        String tableName = casRequest.metadata.name;
 
         try (RowIterator result = StorageProxy.cas(ksName,
                                                    tableName,
@@ -411,12 +412,12 @@ public class BatchStatement implements CQLStatement
         }
     }
 
-    private Pair<CQL3CasRequest,Set<ColumnDefinition>> makeCasRequest(BatchQueryOptions options, QueryState state)
+    private Pair<CQL3CasRequest,Set<ColumnMetadata>> makeCasRequest(BatchQueryOptions options, QueryState state)
     {
         long now = state.getTimestamp();
         DecoratedKey key = null;
         CQL3CasRequest casRequest = null;
-        Set<ColumnDefinition> columnsWithConditions = new LinkedHashSet<>();
+        Set<ColumnMetadata> columnsWithConditions = new LinkedHashSet<>();
 
         for (int i = 0; i < statements.size(); i++)
         {
@@ -428,8 +429,8 @@ public class BatchStatement implements CQLStatement
                 throw new IllegalArgumentException("Batch with conditions cannot span multiple partitions (you cannot use IN on the partition key)");
             if (key == null)
             {
-                key = statement.cfm.decorateKey(pks.get(0));
-                casRequest = new CQL3CasRequest(statement.cfm, key, true, conditionColumns, updatesRegularRows, updatesStaticRow);
+                key = statement.metadata().partitioner.decorateKey(pks.get(0));
+                casRequest = new CQL3CasRequest(statement.metadata(), key, true, conditionColumns, updatesRegularRows, updatesStaticRow);
             }
             else if (!key.getKey().equals(pks.get(0)))
             {
@@ -475,12 +476,12 @@ public class BatchStatement implements CQLStatement
 
     private ResultMessage executeInternalWithConditions(BatchQueryOptions options, QueryState state) throws RequestExecutionException, RequestValidationException
     {
-        Pair<CQL3CasRequest, Set<ColumnDefinition>> p = makeCasRequest(options, state);
+        Pair<CQL3CasRequest, Set<ColumnMetadata>> p = makeCasRequest(options, state);
         CQL3CasRequest request = p.left;
-        Set<ColumnDefinition> columnsWithConditions = p.right;
+        Set<ColumnMetadata> columnsWithConditions = p.right;
 
-        String ksName = request.cfm.ksName;
-        String tableName = request.cfm.cfName;
+        String ksName = request.metadata.keyspace;
+        String tableName = request.metadata.name;
 
         try (RowIterator result = ModificationStatement.casInternal(request, state))
         {
@@ -544,10 +545,10 @@ public class BatchStatement implements CQLStatement
             BatchStatement batchStatement = new BatchStatement(boundNames.size(), type, statements, prepAttrs);
             batchStatement.validate();
 
-            // Use the CFMetadata of the first statement for partition key bind indexes.  If the statements affect
+            // Use the TableMetadata of the first statement for partition key bind indexes.  If the statements affect
             // multiple tables, we won't send partition key bind indexes.
             short[] partitionKeyBindIndexes = (haveMultipleCFs || batchStatement.statements.isEmpty())? null
-                                                              : boundNames.getPartitionKeyBindIndexes(batchStatement.statements.get(0).cfm);
+                                                              : boundNames.getPartitionKeyBindIndexes(batchStatement.statements.get(0).metadata());
 
             return new ParsedStatement.Prepared(batchStatement, boundNames, partitionKeyBindIndexes);
         }
@@ -555,23 +556,23 @@ public class BatchStatement implements CQLStatement
 
     private static class MultiTableColumnsBuilder
     {
-        private final Map<UUID, PartitionColumns.Builder> perTableBuilders = new HashMap<>();
+        private final Map<TableId, RegularAndStaticColumns.Builder> perTableBuilders = new HashMap<>();
 
-        public void addAll(CFMetaData table, PartitionColumns columns)
+        public void addAll(TableMetadata table, RegularAndStaticColumns columns)
         {
-            PartitionColumns.Builder builder = perTableBuilders.get(table.cfId);
+            RegularAndStaticColumns.Builder builder = perTableBuilders.get(table.id);
             if (builder == null)
             {
-                builder = PartitionColumns.builder();
-                perTableBuilders.put(table.cfId, builder);
+                builder = RegularAndStaticColumns.builder();
+                perTableBuilders.put(table.id, builder);
             }
             builder.addAll(columns);
         }
 
-        public Map<UUID, PartitionColumns> build()
+        public Map<TableId, RegularAndStaticColumns> build()
         {
-            Map<UUID, PartitionColumns> m = Maps.newHashMapWithExpectedSize(perTableBuilders.size());
-            for (Map.Entry<UUID, PartitionColumns.Builder> p : perTableBuilders.entrySet())
+            Map<TableId, RegularAndStaticColumns> m = Maps.newHashMapWithExpectedSize(perTableBuilders.size());
+            for (Map.Entry<TableId, RegularAndStaticColumns.Builder> p : perTableBuilders.entrySet())
                 m.put(p.getKey(), p.getValue().build());
             return m;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/statements/Bound.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/Bound.java b/src/java/org/apache/cassandra/cql3/statements/Bound.java
index 824743c..7682ac5 100644
--- a/src/java/org/apache/cassandra/cql3/statements/Bound.java
+++ b/src/java/org/apache/cassandra/cql3/statements/Bound.java
@@ -17,7 +17,7 @@
  */
 package org.apache.cassandra.cql3.statements;
 
-import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.schema.ColumnMetadata;
 
 public enum Bound
 {
@@ -33,12 +33,12 @@ public enum Bound
     /**
      * Reverses the bound if the column type is a reversed one.
      *
-     * @param columnDefinition the column definition
+     * @param columnMetadata the column definition
      * @return the bound reversed if the column type was a reversed one or the original bound
      */
-    public Bound reverseIfNeeded(ColumnDefinition columnDefinition)
+    public Bound reverseIfNeeded(ColumnMetadata columnMetadata)
     {
-        return columnDefinition.isReversedType() ? reverse() : this;
+        return columnMetadata.isReversedType() ? reverse() : this;
     }
 
     public Bound reverse()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
index 4cf2baf..c450160 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
@@ -22,7 +22,7 @@ import java.util.*;
 
 import com.google.common.collect.*;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.conditions.ColumnCondition;
 import org.apache.cassandra.db.*;
@@ -40,10 +40,10 @@ import org.apache.cassandra.utils.Pair;
  */
 public class CQL3CasRequest implements CASRequest
 {
-    public final CFMetaData cfm;
+    public final TableMetadata metadata;
     public final DecoratedKey key;
     public final boolean isBatch;
-    private final PartitionColumns conditionColumns;
+    private final RegularAndStaticColumns conditionColumns;
     private final boolean updatesRegularRows;
     private final boolean updatesStaticRow;
     private boolean hasExists; // whether we have an exist or if not exist condition
@@ -58,16 +58,16 @@ public class CQL3CasRequest implements CASRequest
 
     private final List<RowUpdate> updates = new ArrayList<>();
 
-    public CQL3CasRequest(CFMetaData cfm,
+    public CQL3CasRequest(TableMetadata metadata,
                           DecoratedKey key,
                           boolean isBatch,
-                          PartitionColumns conditionColumns,
+                          RegularAndStaticColumns conditionColumns,
                           boolean updatesRegularRows,
                           boolean updatesStaticRow)
     {
-        this.cfm = cfm;
+        this.metadata = metadata;
         this.key = key;
-        this.conditions = new TreeMap<>(cfm.comparator);
+        this.conditions = new TreeMap<>(metadata.comparator);
         this.isBatch = isBatch;
         this.conditionColumns = conditionColumns;
         this.updatesRegularRows = updatesRegularRows;
@@ -156,7 +156,7 @@ public class CQL3CasRequest implements CASRequest
         }
     }
 
-    private PartitionColumns columnsToRead()
+    private RegularAndStaticColumns columnsToRead()
     {
         // If all our conditions are columns conditions (IF x = ?), then it's enough to query
         // the columns from the conditions. If we have a IF EXISTS or IF NOT EXISTS however,
@@ -167,10 +167,10 @@ public class CQL3CasRequest implements CASRequest
         // case.
         if (hasExists)
         {
-            PartitionColumns allColumns = cfm.partitionColumns();
+            RegularAndStaticColumns allColumns = metadata.regularAndStaticColumns();
             Columns statics = updatesStaticRow ? allColumns.statics : Columns.NONE;
             Columns regulars = updatesRegularRows ? allColumns.regulars : Columns.NONE;
-            return new PartitionColumns(statics, regulars);
+            return new RegularAndStaticColumns(statics, regulars);
         }
         return conditionColumns;
     }
@@ -180,12 +180,12 @@ public class CQL3CasRequest implements CASRequest
         assert staticConditions != null || !conditions.isEmpty();
 
         // Fetch all columns, but query only the selected ones
-        ColumnFilter columnFilter = ColumnFilter.selection(cfm, columnsToRead());
+        ColumnFilter columnFilter = ColumnFilter.selection(metadata, columnsToRead());
 
         // With only a static condition, we still want to make the distinction between a non-existing partition and one
         // that exists (has some live data) but has not static content. So we query the first live row of the partition.
         if (conditions.isEmpty())
-            return SinglePartitionReadCommand.create(cfm,
+            return SinglePartitionReadCommand.create(metadata,
                                                      nowInSec,
                                                      columnFilter,
                                                      RowFilter.NONE,
@@ -194,7 +194,7 @@ public class CQL3CasRequest implements CASRequest
                                                      new ClusteringIndexSliceFilter(Slices.ALL, false));
 
         ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(conditions.navigableKeySet(), false);
-        return SinglePartitionReadCommand.create(cfm, nowInSec, key, columnFilter, filter);
+        return SinglePartitionReadCommand.create(metadata, nowInSec, key, columnFilter, filter);
     }
 
     /**
@@ -219,9 +219,9 @@ public class CQL3CasRequest implements CASRequest
         return true;
     }
 
-    private PartitionColumns updatedColumns()
+    private RegularAndStaticColumns updatedColumns()
     {
-        PartitionColumns.Builder builder = PartitionColumns.builder();
+        RegularAndStaticColumns.Builder builder = RegularAndStaticColumns.builder();
         for (RowUpdate upd : updates)
             builder.addAll(upd.stmt.updatedColumns());
         return builder.build();
@@ -229,11 +229,11 @@ public class CQL3CasRequest implements CASRequest
 
     public PartitionUpdate makeUpdates(FilteredPartition current) throws InvalidRequestException
     {
-        PartitionUpdate update = new PartitionUpdate(cfm, key, updatedColumns(), conditions.size());
+        PartitionUpdate update = new PartitionUpdate(metadata, key, updatedColumns(), conditions.size());
         for (RowUpdate upd : updates)
             upd.applyUpdates(current, update);
 
-        Keyspace.openAndGetStore(cfm).indexManager.validate(update);
+        Keyspace.openAndGetStore(metadata).indexManager.validate(update);
         return update;
     }
 
@@ -261,7 +261,7 @@ public class CQL3CasRequest implements CASRequest
         public void applyUpdates(FilteredPartition current, PartitionUpdate updates) throws InvalidRequestException
         {
             Map<DecoratedKey, Partition> map = stmt.requiresRead() ? Collections.<DecoratedKey, Partition>singletonMap(key, current) : null;
-            UpdateParameters params = new UpdateParameters(cfm, updates.columns(), options, timestamp, stmt.getTimeToLive(options), map);
+            UpdateParameters params = new UpdateParameters(metadata, updates.columns(), options, timestamp, stmt.getTimeToLive(options), map);
             stmt.addUpdateForKey(updates, clustering, params);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
index c887255..ade2752 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
@@ -25,14 +25,14 @@ import java.util.List;
 
 import org.apache.cassandra.auth.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.functions.*;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.schema.MigrationManager;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.transport.Event;
 import org.apache.cassandra.transport.ProtocolVersion;
@@ -160,7 +160,7 @@ public final class CreateAggregateStatement extends SchemaAlteringStatement
         if (!functionName.hasKeyspace())
             throw new InvalidRequestException("Functions must be fully qualified with a keyspace name if a keyspace is not set for the session");
 
-        Validation.validateKeyspaceNotSystem(functionName.keyspace);
+        Schema.validateKeyspaceNotSystem(functionName.keyspace);
 
         stateFunc = new FunctionName(functionName.keyspace, stateFunc.name);
         if (finalFunc != null)
@@ -203,7 +203,7 @@ public final class CreateAggregateStatement extends SchemaAlteringStatement
         if (ifNotExists && orReplace)
             throw new InvalidRequestException("Cannot use both 'OR REPLACE' and 'IF NOT EXISTS' directives");
 
-        if (Schema.instance.getKSMetaData(functionName.keyspace) == null)
+        if (Schema.instance.getKeyspaceMetadata(functionName.keyspace) == null)
             throw new InvalidRequestException(String.format("Cannot add aggregate '%s' to non existing keyspace '%s'.", functionName.name, functionName.keyspace));
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
index 96b9721..2e1f78c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
@@ -23,16 +23,15 @@ import java.util.List;
 
 import org.apache.cassandra.auth.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.cql3.CQL3Type;
 import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.cql3.Validation;
 import org.apache.cassandra.cql3.functions.*;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.schema.Functions;
 import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.schema.MigrationManager;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.transport.Event;
 
@@ -98,7 +97,7 @@ public final class CreateFunctionStatement extends SchemaAlteringStatement
         if (!functionName.hasKeyspace())
             throw new InvalidRequestException("Functions must be fully qualified with a keyspace name if a keyspace is not set for the session");
 
-        Validation.validateKeyspaceNotSystem(functionName.keyspace);
+        Schema.validateKeyspaceNotSystem(functionName.keyspace);
     }
 
     protected void grantPermissionsToCreator(QueryState state)
@@ -134,7 +133,7 @@ public final class CreateFunctionStatement extends SchemaAlteringStatement
         if (ifNotExists && orReplace)
             throw new InvalidRequestException("Cannot use both 'OR REPLACE' and 'IF NOT EXISTS' directives");
 
-        if (Schema.instance.getKSMetaData(functionName.keyspace) == null)
+        if (Schema.instance.getKeyspaceMetadata(functionName.keyspace) == null)
             throw new InvalidRequestException(String.format("Cannot add function '%s' to non existing keyspace '%s'.", functionName.name, functionName.keyspace));
     }