You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2017/01/27 22:18:23 UTC

[17/37] cassandra git commit: Make TableMetadata immutable, optimize Schema

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/TableMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/TableMetadata.java b/src/java/org/apache/cassandra/schema/TableMetadata.java
new file mode 100644
index 0000000..44b1f8a
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/TableMetadata.java
@@ -0,0 +1,956 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.schema;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.Objects;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.*;
+
+import org.apache.cassandra.auth.DataResource;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.utils.AbstractIterator;
+import org.github.jamm.Unmetered;
+
+import static java.lang.String.format;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
+
+import static com.google.common.collect.Iterables.transform;
+import static org.apache.cassandra.schema.IndexMetadata.isNameValid;
+
+@Unmetered
+public final class TableMetadata
+{
+    public enum Flag
+    {
+        SUPER, COUNTER, DENSE, COMPOUND;
+
+        public static Set<Flag> fromStringSet(Set<String> strings)
+        {
+            return strings.stream().map(String::toUpperCase).map(Flag::valueOf).collect(toSet());
+        }
+
+        public static Set<String> toStringSet(Set<Flag> flags)
+        {
+            return flags.stream().map(Flag::toString).map(String::toLowerCase).collect(toSet());
+        }
+    }
+
+    public final String keyspace;
+    public final String name;
+    public final TableId id;
+
+    public final IPartitioner partitioner;
+    public final TableParams params;
+    public final ImmutableSet<Flag> flags;
+
+    private final boolean isView;
+    private final String indexName; // derived from table name
+
+    /*
+     * All CQL3 columns definition are stored in the columns map.
+     * On top of that, we keep separated collection of each kind of definition, to
+     * 1) allow easy access to each kind and
+     * 2) for the partition key and clustering key ones, those list are ordered by the "component index" of the elements.
+     */
+    public final ImmutableMap<ByteBuffer, DroppedColumn> droppedColumns;
+    final ImmutableMap<ByteBuffer, ColumnMetadata> columns;
+
+    private final ImmutableList<ColumnMetadata> partitionKeyColumns;
+    private final ImmutableList<ColumnMetadata> clusteringColumns;
+    private final RegularAndStaticColumns regularAndStaticColumns;
+
+    public final Indexes indexes;
+    public final Triggers triggers;
+
+    // derived automatically from flags and columns
+    public final AbstractType<?> partitionKeyType;
+    public final ClusteringComparator comparator;
+
+    /*
+     * For dense tables, this alias the single non-PK column the table contains (since it can only have one). We keep
+     * that as convenience to access that column more easily (but we could replace calls by regularAndStaticColumns().iterator().next()
+     * for those tables in practice).
+     */
+    public final ColumnMetadata compactValueColumn;
+
+    // performance hacks; TODO see if all are really necessary
+    public final DataResource resource;
+
+    private TableMetadata(Builder builder)
+    {
+        keyspace = builder.keyspace;
+        name = builder.name;
+        id = builder.id;
+
+        partitioner = builder.partitioner;
+        params = builder.params.build();
+        flags = Sets.immutableEnumSet(builder.flags);
+        isView = builder.isView;
+
+        indexName = name.contains(".")
+                  ? name.substring(name.indexOf('.') + 1)
+                  : null;
+
+        droppedColumns = ImmutableMap.copyOf(builder.droppedColumns);
+        Collections.sort(builder.partitionKeyColumns);
+        partitionKeyColumns = ImmutableList.copyOf(builder.partitionKeyColumns);
+        Collections.sort(builder.clusteringColumns);
+        clusteringColumns = ImmutableList.copyOf(builder.clusteringColumns);
+        regularAndStaticColumns = RegularAndStaticColumns.builder().addAll(builder.regularAndStaticColumns).build();
+        columns = ImmutableMap.copyOf(builder.columns);
+
+        indexes = builder.indexes;
+        triggers = builder.triggers;
+
+        partitionKeyType = partitionKeyColumns.size() == 1
+                         ? partitionKeyColumns.get(0).type
+                         : CompositeType.getInstance(transform(partitionKeyColumns, t -> t.type));
+
+        comparator = new ClusteringComparator(transform(clusteringColumns, c -> c.type));
+
+        compactValueColumn = isCompactTable()
+                           ? CompactTables.getCompactValueColumn(regularAndStaticColumns, isSuper())
+                           : null;
+
+        resource = DataResource.table(keyspace, name);
+    }
+
+    public static Builder builder(String keyspace, String table)
+    {
+        return new Builder(keyspace, table);
+    }
+
+    public static Builder builder(String keyspace, String table, TableId id)
+    {
+        return new Builder(keyspace, table, id);
+    }
+
+    public Builder unbuild()
+    {
+        return builder(keyspace, name, id)
+               .partitioner(partitioner)
+               .params(params)
+               .flags(flags)
+               .isView(isView)
+               .addColumns(columns())
+               .droppedColumns(droppedColumns)
+               .indexes(indexes)
+               .triggers(triggers);
+    }
+
+    public boolean isView()
+    {
+        return isView;
+    }
+
+    public boolean isIndex()
+    {
+        return indexName != null;
+    }
+
+    public Optional<String> indexName()
+    {
+        return Optional.ofNullable(indexName);
+    }
+
+    /*
+     *  We call dense a CF for which each component of the comparator is a clustering column, i.e. no
+     * component is used to store a regular column names. In other words, non-composite static "thrift"
+     * and CQL3 CF are *not* dense.
+     */
+    public boolean isDense()
+    {
+        return flags.contains(Flag.DENSE);
+    }
+
+    public boolean isCompound()
+    {
+        return flags.contains(Flag.COMPOUND);
+    }
+
+    public boolean isSuper()
+    {
+        return flags.contains(Flag.SUPER);
+    }
+
+    public boolean isCounter()
+    {
+        return flags.contains(Flag.COUNTER);
+    }
+
+    public boolean isCQLTable()
+    {
+        return !isSuper() && !isDense() && isCompound();
+    }
+
+    public boolean isCompactTable()
+    {
+        return !isCQLTable();
+    }
+
+    public boolean isStaticCompactTable()
+    {
+        return !isSuper() && !isDense() && !isCompound();
+    }
+
+    public ImmutableCollection<ColumnMetadata> columns()
+    {
+        return columns.values();
+    }
+
+    public Iterable<ColumnMetadata> primaryKeyColumns()
+    {
+        return Iterables.concat(partitionKeyColumns, clusteringColumns);
+    }
+
+    public ImmutableList<ColumnMetadata> partitionKeyColumns()
+    {
+        return partitionKeyColumns;
+    }
+
+    public ImmutableList<ColumnMetadata> clusteringColumns()
+    {
+        return clusteringColumns;
+    }
+
+    public RegularAndStaticColumns regularAndStaticColumns()
+    {
+        return regularAndStaticColumns;
+    }
+
+    public Columns regularColumns()
+    {
+        return regularAndStaticColumns.regulars;
+    }
+
+    public Columns staticColumns()
+    {
+        return regularAndStaticColumns.statics;
+    }
+
+    /*
+     * An iterator over all column definitions but that respect the order of a SELECT *.
+     * This also "hide" the clustering/regular columns for a non-CQL3 non-dense table for backward compatibility
+     * sake.
+     */
+    public Iterator<ColumnMetadata> allColumnsInSelectOrder()
+    {
+        final boolean isStaticCompactTable = isStaticCompactTable();
+        final boolean noNonPkColumns = isCompactTable() && CompactTables.hasEmptyCompactValue(this);
+
+        return new AbstractIterator<ColumnMetadata>()
+        {
+            private final Iterator<ColumnMetadata> partitionKeyIter = partitionKeyColumns.iterator();
+            private final Iterator<ColumnMetadata> clusteringIter =
+                isStaticCompactTable ? Collections.emptyIterator() : clusteringColumns.iterator();
+            private final Iterator<ColumnMetadata> otherColumns =
+                noNonPkColumns
+              ? Collections.emptyIterator()
+              : (isStaticCompactTable ? staticColumns().selectOrderIterator()
+                                      : regularAndStaticColumns.selectOrderIterator());
+
+            protected ColumnMetadata computeNext()
+            {
+                if (partitionKeyIter.hasNext())
+                    return partitionKeyIter.next();
+
+                if (clusteringIter.hasNext())
+                    return clusteringIter.next();
+
+                return otherColumns.hasNext() ? otherColumns.next() : endOfData();
+            }
+        };
+    }
+
+    /**
+     * Returns the ColumnMetadata for {@code name}.
+     */
+    public ColumnMetadata getColumn(ColumnIdentifier name)
+    {
+        return columns.get(name.bytes);
+    }
+
+    /*
+     * In general it is preferable to work with ColumnIdentifier to make it
+     * clear that we are talking about a CQL column, not a cell name, but there
+     * is a few cases where all we have is a ByteBuffer (when dealing with IndexExpression
+     * for instance) so...
+     */
+    public ColumnMetadata getColumn(ByteBuffer name)
+    {
+        return columns.get(name);
+    }
+
+    public ColumnMetadata getDroppedColumn(ByteBuffer name)
+    {
+        DroppedColumn dropped = droppedColumns.get(name);
+        return dropped == null ? null : dropped.column;
+    }
+
+    /**
+     * Returns a "fake" ColumnMetadata corresponding to the dropped column {@code name}
+     * of {@code null} if there is no such dropped column.
+     *
+     * @param name - the column name
+     * @param isStatic - whether the column was a static column, if known
+     */
+    public ColumnMetadata getDroppedColumn(ByteBuffer name, boolean isStatic)
+    {
+        DroppedColumn dropped = droppedColumns.get(name);
+        if (dropped == null)
+            return null;
+
+        if (isStatic && !dropped.column.isStatic())
+            return ColumnMetadata.staticColumn(this, name, dropped.column.type);
+
+        return dropped.column;
+    }
+
+    public boolean hasStaticColumns()
+    {
+        return !staticColumns().isEmpty();
+    }
+
+    public void validate()
+    {
+        if (!isNameValid(keyspace))
+            except("Keyspace name must not be empty, more than %s characters long, or contain non-alphanumeric-underscore characters (got \"%s\")", SchemaConstants.NAME_LENGTH, keyspace);
+
+        if (!isNameValid(name))
+            except("Table name must not be empty, more than %s characters long, or contain non-alphanumeric-underscore characters (got \"%s\")", SchemaConstants.NAME_LENGTH, name);
+
+        params.validate();
+
+        if (partitionKeyColumns.stream().anyMatch(c -> c.type.isCounter()))
+            except("PRIMARY KEY columns cannot contain counters");
+
+        // Mixing counter with non counter columns is not supported (#2614)
+        if (isCounter())
+        {
+            for (ColumnMetadata column : regularAndStaticColumns)
+                if (!(column.type.isCounter()) && !CompactTables.isSuperColumnMapColumn(column))
+                    except("Cannot have a non counter column (\"%s\") in a counter table", column.name);
+        }
+        else
+        {
+            for (ColumnMetadata column : regularAndStaticColumns)
+                if (column.type.isCounter())
+                    except("Cannot have a counter column (\"%s\") in a non counter column table", column.name);
+        }
+
+        // All tables should have a partition key
+        if (partitionKeyColumns.isEmpty())
+            except("Missing partition keys for table %s", toString());
+
+        // A compact table should always have a clustering
+        if (isCompactTable() && clusteringColumns.isEmpty())
+            except("For table %s, isDense=%b, isCompound=%b, clustering=%s", toString(), isDense(), isCompound(), clusteringColumns);
+
+        if (!indexes.isEmpty() && isSuper())
+            except("Secondary indexes are not supported on super column families");
+
+        indexes.validate(this);
+    }
+
+    void validateCompatibility(TableMetadata other)
+    {
+        if (isIndex())
+            return;
+
+        if (!other.keyspace.equals(keyspace))
+            except("Keyspace mismatch (found %s; expected %s)", other.keyspace, keyspace);
+
+        if (!other.name.equals(name))
+            except("Table mismatch (found %s; expected %s)", other.name, name);
+
+        if (!other.id.equals(id))
+            except("Table ID mismatch (found %s; expected %s)", other.id, id);
+
+        if (!other.flags.equals(flags))
+            except("Table type mismatch (found %s; expected %s)", other.flags, flags);
+
+        if (other.partitionKeyColumns.size() != partitionKeyColumns.size())
+            except("Partition keys of different length (found %s; expected %s)", other.partitionKeyColumns.size(), partitionKeyColumns.size());
+
+        for (int i = 0; i < partitionKeyColumns.size(); i++)
+            if (!other.partitionKeyColumns.get(i).type.isCompatibleWith(partitionKeyColumns.get(i).type))
+                except("Partition key column mismatch (found %s; expected %s)", other.partitionKeyColumns.get(i).type, partitionKeyColumns.get(i).type);
+
+        if (other.clusteringColumns.size() != clusteringColumns.size())
+            except("Clustering columns of different length (found %s; expected %s)", other.clusteringColumns.size(), clusteringColumns.size());
+
+        for (int i = 0; i < clusteringColumns.size(); i++)
+            if (!other.clusteringColumns.get(i).type.isCompatibleWith(clusteringColumns.get(i).type))
+                except("Clustering column mismatch (found %s; expected %s)", other.clusteringColumns.get(i).type, clusteringColumns.get(i).type);
+
+        for (ColumnMetadata otherColumn : other.regularAndStaticColumns)
+        {
+            ColumnMetadata column = getColumn(otherColumn.name);
+            if (column != null && !otherColumn.type.isCompatibleWith(column.type))
+                except("Column mismatch (found %s; expected %s", otherColumn, column);
+        }
+    }
+
+    public ClusteringComparator partitionKeyAsClusteringComparator()
+    {
+        return new ClusteringComparator(partitionKeyColumns.stream().map(c -> c.type).collect(toList()));
+    }
+
+    /**
+     * The type to use to compare column names in "static compact"
+     * tables or superColum ones.
+     * <p>
+     * This exists because for historical reasons, "static compact" tables as
+     * well as super column ones can have non-UTF8 column names.
+     * <p>
+     * This method should only be called for superColumn tables and "static
+     * compact" ones. For any other table, all column names are UTF8.
+     */
+    public AbstractType<?> staticCompactOrSuperTableColumnNameType()
+    {
+        if (isSuper())
+        {
+            assert compactValueColumn != null && compactValueColumn.type instanceof MapType;
+            return ((MapType) compactValueColumn.type).nameComparator();
+        }
+
+        assert isStaticCompactTable();
+        return clusteringColumns.get(0).type;
+    }
+
+    public AbstractType<?> columnDefinitionNameComparator(ColumnMetadata.Kind kind)
+    {
+        return (isSuper() && kind == ColumnMetadata.Kind.REGULAR) || (isStaticCompactTable() && kind == ColumnMetadata.Kind.STATIC)
+             ? staticCompactOrSuperTableColumnNameType()
+             : UTF8Type.instance;
+    }
+
+    /**
+     * Generate a table name for an index corresponding to the given column.
+     * This is NOT the same as the index's name! This is only used in sstable filenames and is not exposed to users.
+     *
+     * @param info A definition of the column with index
+     *
+     * @return name of the index table
+     */
+    public String indexTableName(IndexMetadata info)
+    {
+        // TODO simplify this when info.index_name is guaranteed to be set
+        return name + Directories.SECONDARY_INDEX_NAME_SEPARATOR + info.name;
+    }
+
+    /**
+     * @return true if the change as made impacts queries/updates on the table,
+     *         e.g. any columns or indexes were added, removed, or altered; otherwise, false is returned.
+     *         Used to determine whether prepared statements against this table need to be re-prepared.
+     */
+    boolean changeAffectsPreparedStatements(TableMetadata updated)
+    {
+        return !partitionKeyColumns.equals(updated.partitionKeyColumns)
+            || !clusteringColumns.equals(updated.clusteringColumns)
+            || !regularAndStaticColumns.equals(updated.regularAndStaticColumns)
+            || !indexes.equals(updated.indexes)
+            || params.defaultTimeToLive != updated.params.defaultTimeToLive
+            || params.gcGraceSeconds != updated.params.gcGraceSeconds;
+    }
+
+    /**
+     * There is a couple of places in the code where we need a TableMetadata object and don't have one readily available
+     * and know that only the keyspace and name matter. This creates such "fake" metadata. Use only if you know what
+     * you're doing.
+     */
+    public static TableMetadata minimal(String keyspace, String name)
+    {
+        return TableMetadata.builder(keyspace, name)
+                            .addPartitionKeyColumn("key", BytesType.instance)
+                            .build();
+    }
+
+    public TableMetadata updateIndexTableMetadata(TableParams baseTableParams)
+    {
+        TableParams.Builder builder =
+            baseTableParams.unbuild()
+                           .readRepairChance(0.0)
+                           .dcLocalReadRepairChance(0.0)
+                           .gcGraceSeconds(0);
+
+        // Depends on parent's cache setting, turn on its index table's cache.
+        // Row caching is never enabled; see CASSANDRA-5732
+        builder.caching(baseTableParams.caching.cacheKeys() ? CachingParams.CACHE_KEYS : CachingParams.CACHE_NOTHING);
+
+        return unbuild().params(builder.build()).build();
+    }
+
+    private void except(String format, Object... args)
+    {
+        throw new ConfigurationException(format(format, args));
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+            return true;
+
+        if (!(o instanceof TableMetadata))
+            return false;
+
+        TableMetadata tm = (TableMetadata) o;
+
+        return keyspace.equals(tm.keyspace)
+            && name.equals(tm.name)
+            && id.equals(tm.id)
+            && partitioner.equals(tm.partitioner)
+            && params.equals(tm.params)
+            && flags.equals(tm.flags)
+            && isView == tm.isView
+            && columns.equals(tm.columns)
+            && droppedColumns.equals(tm.droppedColumns)
+            && indexes.equals(tm.indexes)
+            && triggers.equals(tm.triggers);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(keyspace, name, id, partitioner, params, flags, isView, columns, droppedColumns, indexes, triggers);
+    }
+
+    @Override
+    public String toString()
+    {
+        return String.format("%s.%s", ColumnIdentifier.maybeQuote(keyspace), ColumnIdentifier.maybeQuote(name));
+    }
+
+    public String toDebugString()
+    {
+        return MoreObjects.toStringHelper(this)
+                          .add("keyspace", keyspace)
+                          .add("table", name)
+                          .add("id", id)
+                          .add("partitioner", partitioner)
+                          .add("params", params)
+                          .add("flags", flags)
+                          .add("isView", isView)
+                          .add("columns", columns())
+                          .add("droppedColumns", droppedColumns.values())
+                          .add("indexes", indexes)
+                          .add("triggers", triggers)
+                          .toString();
+    }
+
+    public static final class Builder
+    {
+        final String keyspace;
+        final String name;
+
+        private TableId id;
+
+        private IPartitioner partitioner;
+        private TableParams.Builder params = TableParams.builder();
+
+        // Setting compound as default as "normal" CQL tables are compound and that's what we want by default
+        private Set<Flag> flags = EnumSet.of(Flag.COMPOUND);
+        private Triggers triggers = Triggers.none();
+        private Indexes indexes = Indexes.none();
+
+        private final Map<ByteBuffer, DroppedColumn> droppedColumns = new HashMap<>();
+        private final Map<ByteBuffer, ColumnMetadata> columns = new HashMap<>();
+        private final List<ColumnMetadata> partitionKeyColumns = new ArrayList<>();
+        private final List<ColumnMetadata> clusteringColumns = new ArrayList<>();
+        private final List<ColumnMetadata> regularAndStaticColumns = new ArrayList<>();
+
+        private boolean isView;
+
+        private Builder(String keyspace, String name, TableId id)
+        {
+            this.keyspace = keyspace;
+            this.name = name;
+            this.id = id;
+        }
+
+        private Builder(String keyspace, String name)
+        {
+            this.keyspace = keyspace;
+            this.name = name;
+        }
+
+        public TableMetadata build()
+        {
+            if (partitioner == null)
+                partitioner = DatabaseDescriptor.getPartitioner();
+
+            if (id == null)
+                id = TableId.generate();
+
+            return new TableMetadata(this);
+        }
+
+        public Builder id(TableId val)
+        {
+            id = val;
+            return this;
+        }
+
+        public Builder partitioner(IPartitioner val)
+        {
+            partitioner = val;
+            return this;
+        }
+
+        public Builder params(TableParams val)
+        {
+            params = val.unbuild();
+            return this;
+        }
+
+        public Builder bloomFilterFpChance(double val)
+        {
+            params.bloomFilterFpChance(val);
+            return this;
+        }
+
+        public Builder caching(CachingParams val)
+        {
+            params.caching(val);
+            return this;
+        }
+
+        public Builder comment(String val)
+        {
+            params.comment(val);
+            return this;
+        }
+
+        public Builder compaction(CompactionParams val)
+        {
+            params.compaction(val);
+            return this;
+        }
+
+        public Builder compression(CompressionParams val)
+        {
+            params.compression(val);
+            return this;
+        }
+
+        public Builder dcLocalReadRepairChance(double val)
+        {
+            params.dcLocalReadRepairChance(val);
+            return this;
+        }
+
+        public Builder defaultTimeToLive(int val)
+        {
+            params.defaultTimeToLive(val);
+            return this;
+        }
+
+        public Builder gcGraceSeconds(int val)
+        {
+            params.gcGraceSeconds(val);
+            return this;
+        }
+
+        public Builder maxIndexInterval(int val)
+        {
+            params.maxIndexInterval(val);
+            return this;
+        }
+
+        public Builder memtableFlushPeriod(int val)
+        {
+            params.memtableFlushPeriodInMs(val);
+            return this;
+        }
+
+        public Builder minIndexInterval(int val)
+        {
+            params.minIndexInterval(val);
+            return this;
+        }
+
+        public Builder readRepairChance(double val)
+        {
+            params.readRepairChance(val);
+            return this;
+        }
+
+        public Builder crcCheckChance(double val)
+        {
+            params.crcCheckChance(val);
+            return this;
+        }
+
+        public Builder speculativeRetry(SpeculativeRetryParam val)
+        {
+            params.speculativeRetry(val);
+            return this;
+        }
+
+        public Builder extensions(Map<String, ByteBuffer> val)
+        {
+            params.extensions(val);
+            return this;
+        }
+
+        public Builder isView(boolean val)
+        {
+            isView = val;
+            return this;
+        }
+
+        public Builder flags(Set<Flag> val)
+        {
+            flags = val;
+            return this;
+        }
+
+        public Builder isSuper(boolean val)
+        {
+            return flag(Flag.SUPER, val);
+        }
+
+        public Builder isCounter(boolean val)
+        {
+            return flag(Flag.COUNTER, val);
+        }
+
+        public Builder isDense(boolean val)
+        {
+            return flag(Flag.DENSE, val);
+        }
+
+        public Builder isCompound(boolean val)
+        {
+            return flag(Flag.COMPOUND, val);
+        }
+
+        private Builder flag(Flag flag, boolean set)
+        {
+            if (set) flags.add(flag); else flags.remove(flag);
+            return this;
+        }
+
+        public Builder triggers(Triggers val)
+        {
+            triggers = val;
+            return this;
+        }
+
+        public Builder indexes(Indexes val)
+        {
+            indexes = val;
+            return this;
+        }
+
+        public Builder addPartitionKeyColumn(String name, AbstractType type)
+        {
+            return addPartitionKeyColumn(ColumnIdentifier.getInterned(name, false), type);
+        }
+
+        public Builder addPartitionKeyColumn(ColumnIdentifier name, AbstractType type)
+        {
+            return addColumn(new ColumnMetadata(keyspace, this.name, name, type, partitionKeyColumns.size(), ColumnMetadata.Kind.PARTITION_KEY));
+        }
+
+        public Builder addClusteringColumn(String name, AbstractType type)
+        {
+            return addClusteringColumn(ColumnIdentifier.getInterned(name, false), type);
+        }
+
+        public Builder addClusteringColumn(ColumnIdentifier name, AbstractType type)
+        {
+            return addColumn(new ColumnMetadata(keyspace, this.name, name, type, clusteringColumns.size(), ColumnMetadata.Kind.CLUSTERING));
+        }
+
+        public Builder addRegularColumn(String name, AbstractType type)
+        {
+            return addRegularColumn(ColumnIdentifier.getInterned(name, false), type);
+        }
+
+        public Builder addRegularColumn(ColumnIdentifier name, AbstractType type)
+        {
+            return addColumn(new ColumnMetadata(keyspace, this.name, name, type, ColumnMetadata.NO_POSITION, ColumnMetadata.Kind.REGULAR));
+        }
+
+        public Builder addStaticColumn(String name, AbstractType type)
+        {
+            return addStaticColumn(ColumnIdentifier.getInterned(name, false), type);
+        }
+
+        public Builder addStaticColumn(ColumnIdentifier name, AbstractType type)
+        {
+            return addColumn(new ColumnMetadata(keyspace, this.name, name, type, ColumnMetadata.NO_POSITION, ColumnMetadata.Kind.STATIC));
+        }
+
+        public Builder addColumn(ColumnMetadata column)
+        {
+            if (columns.containsKey(column.name.bytes))
+                throw new IllegalArgumentException();
+
+            switch (column.kind)
+            {
+                case PARTITION_KEY:
+                    partitionKeyColumns.add(column);
+                    Collections.sort(partitionKeyColumns);
+                    break;
+                case CLUSTERING:
+                    column.type.checkComparable();
+                    clusteringColumns.add(column);
+                    Collections.sort(clusteringColumns);
+                    break;
+                default:
+                    regularAndStaticColumns.add(column);
+            }
+
+            columns.put(column.name.bytes, column);
+
+            return this;
+        }
+
+        public Builder addColumns(Iterable<ColumnMetadata> columns)
+        {
+            columns.forEach(this::addColumn);
+            return this;
+        }
+
+        public Builder droppedColumns(Map<ByteBuffer, DroppedColumn> droppedColumns)
+        {
+            this.droppedColumns.clear();
+            this.droppedColumns.putAll(droppedColumns);
+            return this;
+        }
+
+        /**
+         * Records a deprecated column for a system table.
+         */
+        public Builder recordDeprecatedSystemColumn(String name, AbstractType<?> type)
+        {
+            // As we play fast and loose with the removal timestamp, make sure this is misued for a non system table.
+            assert SchemaConstants.isSystemKeyspace(keyspace);
+            recordColumnDrop(ColumnMetadata.regularColumn(keyspace, this.name, name, type), Long.MAX_VALUE);
+            return this;
+        }
+
+        public Builder recordColumnDrop(ColumnMetadata column, long timeMicros)
+        {
+            droppedColumns.put(column.name.bytes, new DroppedColumn(column, timeMicros));
+            return this;
+        }
+
+        public Iterable<ColumnMetadata> columns()
+        {
+            return columns.values();
+        }
+
+        public Set<String> columnNames()
+        {
+            return columns.values().stream().map(c -> c.name.toString()).collect(toSet());
+        }
+
+        public ColumnMetadata getColumn(ColumnIdentifier identifier)
+        {
+            return columns.get(identifier.bytes);
+        }
+
+        public ColumnMetadata getColumn(ByteBuffer name)
+        {
+            return columns.get(name);
+        }
+
+        public boolean hasRegularColumns()
+        {
+            return regularAndStaticColumns.stream().anyMatch(ColumnMetadata::isRegular);
+        }
+
+        /*
+         * The following methods all assume a Builder with valid set of partition key, clustering, regular and static columns.
+         */
+
+        public Builder removeRegularOrStaticColumn(ColumnIdentifier identifier)
+        {
+            ColumnMetadata column = columns.get(identifier.bytes);
+            if (column == null || column.isPrimaryKeyColumn())
+                throw new IllegalArgumentException();
+
+            columns.remove(identifier.bytes);
+            regularAndStaticColumns.remove(column);
+
+            return this;
+        }
+
+        public Builder renamePrimaryKeyColumn(ColumnIdentifier from, ColumnIdentifier to)
+        {
+            if (columns.containsKey(to.bytes))
+                throw new IllegalArgumentException();
+
+            ColumnMetadata column = columns.get(from.bytes);
+            if (column == null || !column.isPrimaryKeyColumn())
+                throw new IllegalArgumentException();
+
+            ColumnMetadata newColumn = column.withNewName(to);
+            if (column.isPartitionKey())
+                partitionKeyColumns.set(column.position(), newColumn);
+            else
+                clusteringColumns.set(column.position(), newColumn);
+
+            columns.remove(from.bytes);
+            columns.put(to.bytes, newColumn);
+
+            return this;
+        }
+
+        public Builder alterColumnType(ColumnIdentifier name, AbstractType<?> type)
+        {
+            ColumnMetadata column = columns.get(name.bytes);
+            if (column == null)
+                throw new IllegalArgumentException();
+
+            ColumnMetadata newColumn = column.withNewType(type);
+
+            switch (column.kind)
+            {
+                case PARTITION_KEY:
+                    partitionKeyColumns.set(column.position(), newColumn);
+                    break;
+                case CLUSTERING:
+                    clusteringColumns.set(column.position(), newColumn);
+                    break;
+                case REGULAR:
+                case STATIC:
+                    regularAndStaticColumns.remove(column);
+                    regularAndStaticColumns.add(newColumn);
+                    break;
+            }
+
+            columns.put(column.name.bytes, newColumn);
+
+            return this;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/TableMetadataRef.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/TableMetadataRef.java b/src/java/org/apache/cassandra/schema/TableMetadataRef.java
new file mode 100644
index 0000000..5ff9d5b
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/TableMetadataRef.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.schema;
+
+import org.github.jamm.Unmetered;
+
+/**
+ * Encapsulates a volatile reference to an immutable {@link TableMetadata} instance.
+ *
+ * Used in classes that need up-to-date metadata to avoid the cost of looking up {@link Schema} hashmaps.
+ */
+@Unmetered
+public final class TableMetadataRef
+{
+    public final TableId id;
+    public final String keyspace;
+    public final String name;
+
+    private volatile TableMetadata metadata;
+
+    TableMetadataRef(TableMetadata metadata)
+    {
+        this.metadata = metadata;
+
+        id = metadata.id;
+        keyspace = metadata.keyspace;
+        name = metadata.name;
+    }
+
+    /**
+     * Create a new ref to the passed {@link TableMetadata} for use by offline tools only.
+     *
+     * @param metadata {@link TableMetadata} to reference
+     * @return a new TableMetadataRef instance linking to the passed {@link TableMetadata}
+     */
+    public static TableMetadataRef forOfflineTools(TableMetadata metadata)
+    {
+        return new TableMetadataRef(metadata);
+    }
+
+    public TableMetadata get()
+    {
+        return metadata;
+    }
+
+    /**
+     * Update the reference with the most current version of {@link TableMetadata}
+     * <p>
+     * Must only be used by methods in {@link Schema}, *DO NOT* make public
+     * even for testing purposes, it isn't safe.
+     */
+    void set(TableMetadata metadata)
+    {
+        get().validateCompatibility(metadata);
+        this.metadata = metadata;
+    }
+
+    @Override
+    public String toString()
+    {
+        return get().toString();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/TableParams.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/TableParams.java b/src/java/org/apache/cassandra/schema/TableParams.java
index 02112af..e68048c 100644
--- a/src/java/org/apache/cassandra/schema/TableParams.java
+++ b/src/java/org/apache/cassandra/schema/TableParams.java
@@ -133,6 +133,11 @@ public final class TableParams
                             .cdc(params.cdc);
     }
 
+    public Builder unbuild()
+    {
+        return builder(this);
+    }
+
     public void validate()
     {
         compaction.validate();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/Tables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Tables.java b/src/java/org/apache/cassandra/schema/Tables.java
index 4f728d4..a83c061 100644
--- a/src/java/org/apache/cassandra/schema/Tables.java
+++ b/src/java/org/apache/cassandra/schema/Tables.java
@@ -17,7 +17,9 @@
  */
 package org.apache.cassandra.schema;
 
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Optional;
 
 import javax.annotation.Nullable;
@@ -26,20 +28,22 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.MapDifference;
 import com.google.common.collect.Maps;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.index.internal.CassandraIndex;
 
 import static com.google.common.collect.Iterables.filter;
 
 /**
  * An immutable container for a keyspace's Tables.
  */
-public final class Tables implements Iterable<CFMetaData>
+public final class Tables implements Iterable<TableMetadata>
 {
-    private final ImmutableMap<String, CFMetaData> tables;
+    private final ImmutableMap<String, TableMetadata> tables;
+    private final ImmutableMap<String, TableMetadata> indexTables;
 
     private Tables(Builder builder)
     {
         tables = builder.tables.build();
+        indexTables = builder.indexTables.build();
     }
 
     public static Builder builder()
@@ -52,21 +56,26 @@ public final class Tables implements Iterable<CFMetaData>
         return builder().build();
     }
 
-    public static Tables of(CFMetaData... tables)
+    public static Tables of(TableMetadata... tables)
     {
         return builder().add(tables).build();
     }
 
-    public static Tables of(Iterable<CFMetaData> tables)
+    public static Tables of(Iterable<TableMetadata> tables)
     {
         return builder().add(tables).build();
     }
 
-    public Iterator<CFMetaData> iterator()
+    public Iterator<TableMetadata> iterator()
     {
         return tables.values().iterator();
     }
 
+    ImmutableMap<String, TableMetadata> indexTables()
+    {
+        return indexTables;
+    }
+
     public int size()
     {
         return tables.size();
@@ -76,9 +85,9 @@ public final class Tables implements Iterable<CFMetaData>
      * Get the table with the specified name
      *
      * @param name a non-qualified table name
-     * @return an empty {@link Optional} if the table name is not found; a non-empty optional of {@link CFMetaData} otherwise
+     * @return an empty {@link Optional} if the table name is not found; a non-empty optional of {@link TableMetadataRef} otherwise
      */
-    public Optional<CFMetaData> get(String name)
+    public Optional<TableMetadata> get(String name)
     {
         return Optional.ofNullable(tables.get(name));
     }
@@ -87,39 +96,67 @@ public final class Tables implements Iterable<CFMetaData>
      * Get the table with the specified name
      *
      * @param name a non-qualified table name
-     * @return null if the table name is not found; the found {@link CFMetaData} otherwise
+     * @return null if the table name is not found; the found {@link TableMetadataRef} otherwise
      */
     @Nullable
-    public CFMetaData getNullable(String name)
+    public TableMetadata getNullable(String name)
     {
         return tables.get(name);
     }
 
+    @Nullable
+    public TableMetadata getIndexTableNullable(String name)
+    {
+        return indexTables.get(name);
+    }
+
     /**
      * Create a Tables instance with the provided table added
      */
-    public Tables with(CFMetaData table)
+    public Tables with(TableMetadata table)
     {
-        if (get(table.cfName).isPresent())
-            throw new IllegalStateException(String.format("Table %s already exists", table.cfName));
+        if (get(table.name).isPresent())
+            throw new IllegalStateException(String.format("Table %s already exists", table.name));
 
         return builder().add(this).add(table).build();
     }
 
+    public Tables withSwapped(TableMetadata table)
+    {
+        return without(table.name).with(table);
+    }
+
     /**
      * Creates a Tables instance with the table with the provided name removed
      */
     public Tables without(String name)
     {
-        CFMetaData table =
+        TableMetadata table =
             get(name).orElseThrow(() -> new IllegalStateException(String.format("Table %s doesn't exists", name)));
 
         return builder().add(filter(this, t -> t != table)).build();
     }
 
-    MapDifference<String, CFMetaData> diff(Tables other)
+    MapDifference<TableId, TableMetadata> diff(Tables other)
     {
-        return Maps.difference(tables, other.tables);
+        Map<TableId, TableMetadata> thisTables = new HashMap<>();
+        this.forEach(t -> thisTables.put(t.id, t));
+
+        Map<TableId, TableMetadata> otherTables = new HashMap<>();
+        other.forEach(t -> otherTables.put(t.id, t));
+
+        return Maps.difference(thisTables, otherTables);
+    }
+
+    MapDifference<String, TableMetadata> indexesDiff(Tables other)
+    {
+        Map<String, TableMetadata> thisIndexTables = new HashMap<>();
+        this.indexTables.values().forEach(t -> thisIndexTables.put(t.indexName().get(), t));
+
+        Map<String, TableMetadata> otherIndexTables = new HashMap<>();
+        other.indexTables.values().forEach(t -> otherIndexTables.put(t.indexName().get(), t));
+
+        return Maps.difference(thisIndexTables, otherIndexTables);
     }
 
     @Override
@@ -142,7 +179,8 @@ public final class Tables implements Iterable<CFMetaData>
 
     public static final class Builder
     {
-        final ImmutableMap.Builder<String, CFMetaData> tables = new ImmutableMap.Builder<>();
+        final ImmutableMap.Builder<String, TableMetadata> tables = new ImmutableMap.Builder<>();
+        final ImmutableMap.Builder<String, TableMetadata> indexTables = new ImmutableMap.Builder<>();
 
         private Builder()
         {
@@ -153,20 +191,27 @@ public final class Tables implements Iterable<CFMetaData>
             return new Tables(this);
         }
 
-        public Builder add(CFMetaData table)
+        public Builder add(TableMetadata table)
         {
-            tables.put(table.cfName, table);
+            tables.put(table.name, table);
+
+            table.indexes
+                 .stream()
+                 .filter(i -> !i.isCustom())
+                 .map(i -> CassandraIndex.indexCfsMetadata(table, i))
+                 .forEach(i -> indexTables.put(i.indexName().get(), i));
+
             return this;
         }
 
-        public Builder add(CFMetaData... tables)
+        public Builder add(TableMetadata... tables)
         {
-            for (CFMetaData table : tables)
+            for (TableMetadata table : tables)
                 add(table);
             return this;
         }
 
-        public Builder add(Iterable<CFMetaData> tables)
+        public Builder add(Iterable<TableMetadata> tables)
         {
             tables.forEach(this::add);
             return this;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/Triggers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Triggers.java b/src/java/org/apache/cassandra/schema/Triggers.java
index bb39f1f..5e10722 100644
--- a/src/java/org/apache/cassandra/schema/Triggers.java
+++ b/src/java/org/apache/cassandra/schema/Triggers.java
@@ -43,6 +43,16 @@ public final class Triggers implements Iterable<TriggerMetadata>
         return builder().build();
     }
 
+    public static Triggers of(TriggerMetadata... triggers)
+    {
+        return builder().add(triggers).build();
+    }
+
+    public static Triggers of(Iterable<TriggerMetadata> triggers)
+    {
+        return builder().add(triggers).build();
+    }
+
     public Iterator<TriggerMetadata> iterator()
     {
         return triggers.values().iterator();
@@ -128,6 +138,13 @@ public final class Triggers implements Iterable<TriggerMetadata>
             return this;
         }
 
+        public Builder add(TriggerMetadata... triggers)
+        {
+            for (TriggerMetadata trigger : triggers)
+                add(trigger);
+            return this;
+        }
+
         public Builder add(Iterable<TriggerMetadata> triggers)
         {
             triggers.forEach(this::add);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/UnknownIndexException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/UnknownIndexException.java b/src/java/org/apache/cassandra/schema/UnknownIndexException.java
deleted file mode 100644
index 5daf631..0000000
--- a/src/java/org/apache/cassandra/schema/UnknownIndexException.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.schema;
-
-import java.io.IOException;
-import java.util.UUID;
-
-import org.apache.cassandra.config.CFMetaData;
-
-/**
- * Exception thrown when we read an index id from a serialized ReadCommand and no corresponding IndexMetadata
- * can be found in the CFMetaData#indexes collection. Note that this is an internal exception and is not meant
- * to be user facing, the node reading the ReadCommand should proceed as if no index id were present.
- */
-public class UnknownIndexException extends IOException
-{
-    public final UUID indexId;
-    public UnknownIndexException(CFMetaData metadata, UUID id)
-    {
-        super(String.format("Unknown index %s for table %s.%s", id.toString(), metadata.ksName, metadata.cfName));
-        indexId = id;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/ViewMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/ViewMetadata.java b/src/java/org/apache/cassandra/schema/ViewMetadata.java
new file mode 100644
index 0000000..57f4092
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/ViewMetadata.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.schema;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import org.antlr.runtime.*;
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.statements.SelectStatement;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.view.View;
+import org.apache.cassandra.exceptions.SyntaxException;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+
+public final class ViewMetadata
+{
+    public final String keyspace;
+    public final String name;
+    public final TableId baseTableId;
+    public final String baseTableName;
+    public final boolean includeAllColumns;
+    public final TableMetadata metadata;
+
+    public final SelectStatement.RawStatement select;
+    public final String whereClause;
+
+    /**
+     * @param name              Name of the view
+     * @param baseTableId       Internal ID of the table which this view is based off of
+     * @param includeAllColumns Whether to include all columns or not
+     */
+    public ViewMetadata(String keyspace,
+                        String name,
+                        TableId baseTableId,
+                        String baseTableName,
+                        boolean includeAllColumns,
+                        SelectStatement.RawStatement select,
+                        String whereClause,
+                        TableMetadata metadata)
+    {
+        this.keyspace = keyspace;
+        this.name = name;
+        this.baseTableId = baseTableId;
+        this.baseTableName = baseTableName;
+        this.includeAllColumns = includeAllColumns;
+        this.select = select;
+        this.whereClause = whereClause;
+        this.metadata = metadata;
+    }
+
+    /**
+     * @return true if the view specified by this definition will include the column, false otherwise
+     */
+    public boolean includes(ColumnIdentifier column)
+    {
+        return metadata.getColumn(column) != null;
+    }
+
+    public ViewMetadata copy(TableMetadata newMetadata)
+    {
+        return new ViewMetadata(keyspace, name, baseTableId, baseTableName, includeAllColumns, select, whereClause, newMetadata);
+    }
+
+    public TableMetadata baseTableMetadata()
+    {
+        return Schema.instance.getTableMetadata(baseTableId);
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+            return true;
+
+        if (!(o instanceof ViewMetadata))
+            return false;
+
+        ViewMetadata other = (ViewMetadata) o;
+        return Objects.equals(keyspace, other.keyspace)
+               && Objects.equals(name, other.name)
+               && Objects.equals(baseTableId, other.baseTableId)
+               && Objects.equals(includeAllColumns, other.includeAllColumns)
+               && Objects.equals(whereClause, other.whereClause)
+               && Objects.equals(metadata, other.metadata);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return new HashCodeBuilder(29, 1597)
+               .append(keyspace)
+               .append(name)
+               .append(baseTableId)
+               .append(includeAllColumns)
+               .append(whereClause)
+               .append(metadata)
+               .toHashCode();
+    }
+
+    @Override
+    public String toString()
+    {
+        return new ToStringBuilder(this)
+               .append("keyspace", keyspace)
+               .append("name", name)
+               .append("baseTableId", baseTableId)
+               .append("baseTableName", baseTableName)
+               .append("includeAllColumns", includeAllColumns)
+               .append("whereClause", whereClause)
+               .append("metadata", metadata)
+               .toString();
+    }
+
+    /**
+     * Replace the column 'from' with 'to' in this materialized view definition's partition,
+     * clustering, or included columns.
+     * @param from the existing column
+     * @param to the new column
+     */
+    public ViewMetadata renamePrimaryKeyColumn(ColumnIdentifier from, ColumnIdentifier to)
+    {
+        // convert whereClause to Relations, rename ids in Relations, then convert back to whereClause
+        List<Relation> relations = whereClauseToRelations(whereClause);
+        ColumnMetadata.Raw fromRaw = ColumnMetadata.Raw.forQuoted(from.toString());
+        ColumnMetadata.Raw toRaw = ColumnMetadata.Raw.forQuoted(to.toString());
+        List<Relation> newRelations =
+            relations.stream()
+                     .map(r -> r.renameIdentifier(fromRaw, toRaw))
+                     .collect(Collectors.toList());
+
+        String rawSelect = View.buildSelectStatement(baseTableName, metadata.columns(), whereClause);
+
+        return new ViewMetadata(keyspace,
+                                name,
+                                baseTableId,
+                                baseTableName,
+                                includeAllColumns,
+                                (SelectStatement.RawStatement) QueryProcessor.parseStatement(rawSelect),
+                                View.relationsToWhereClause(newRelations),
+                                metadata.unbuild().renamePrimaryKeyColumn(from, to).build());
+    }
+
+    public ViewMetadata withAddedRegularColumn(ColumnMetadata column)
+    {
+        return new ViewMetadata(keyspace,
+                                name,
+                                baseTableId,
+                                baseTableName,
+                                includeAllColumns,
+                                select,
+                                whereClause,
+                                metadata.unbuild().addColumn(column).build());
+    }
+
+    public ViewMetadata withAlteredColumnType(ColumnIdentifier name, AbstractType<?> type)
+    {
+        return new ViewMetadata(keyspace,
+                                this.name,
+                                baseTableId,
+                                baseTableName,
+                                includeAllColumns,
+                                select,
+                                whereClause,
+                                metadata.unbuild().alterColumnType(name, type).build());
+    }
+
+    private static List<Relation> whereClauseToRelations(String whereClause)
+    {
+        try
+        {
+            return CQLFragmentParser.parseAnyUnhandled(CqlParser::whereClause, whereClause).build().relations;
+        }
+        catch (RecognitionException | SyntaxException exc)
+        {
+            throw new RuntimeException("Unexpected error parsing materialized view's where clause while handling column rename: ", exc);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/Views.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Views.java b/src/java/org/apache/cassandra/schema/Views.java
index b8fdd4b..6578b14 100644
--- a/src/java/org/apache/cassandra/schema/Views.java
+++ b/src/java/org/apache/cassandra/schema/Views.java
@@ -15,12 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.cassandra.schema;
 
-
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Optional;
+import java.util.UUID;
 
 import javax.annotation.Nullable;
 
@@ -29,14 +30,11 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.MapDifference;
 import com.google.common.collect.Maps;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ViewDefinition;
-
 import static com.google.common.collect.Iterables.filter;
 
-public final class Views implements Iterable<ViewDefinition>
+public final class Views implements Iterable<ViewMetadata>
 {
-    private final ImmutableMap<String, ViewDefinition> views;
+    private final ImmutableMap<String, ViewMetadata> views;
 
     private Views(Builder builder)
     {
@@ -53,12 +51,12 @@ public final class Views implements Iterable<ViewDefinition>
         return builder().build();
     }
 
-    public Iterator<ViewDefinition> iterator()
+    public Iterator<ViewMetadata> iterator()
     {
         return views.values().iterator();
     }
 
-    public Iterable<CFMetaData> metadatas()
+    public Iterable<TableMetadata> metadatas()
     {
         return Iterables.transform(views.values(), view -> view.metadata);
     }
@@ -73,13 +71,18 @@ public final class Views implements Iterable<ViewDefinition>
         return views.isEmpty();
     }
 
+    public Iterable<ViewMetadata> forTable(UUID tableId)
+    {
+        return Iterables.filter(this, v -> v.baseTableId.equals(tableId));
+    }
+
     /**
      * Get the materialized view with the specified name
      *
      * @param name a non-qualified materialized view name
-     * @return an empty {@link Optional} if the materialized view name is not found; a non-empty optional of {@link ViewDefinition} otherwise
+     * @return an empty {@link Optional} if the materialized view name is not found; a non-empty optional of {@link ViewMetadata} otherwise
      */
-    public Optional<ViewDefinition> get(String name)
+    public Optional<ViewMetadata> get(String name)
     {
         return Optional.ofNullable(views.get(name));
     }
@@ -88,10 +91,10 @@ public final class Views implements Iterable<ViewDefinition>
      * Get the view with the specified name
      *
      * @param name a non-qualified view name
-     * @return null if the view name is not found; the found {@link ViewDefinition} otherwise
+     * @return null if the view name is not found; the found {@link ViewMetadata} otherwise
      */
     @Nullable
-    public ViewDefinition getNullable(String name)
+    public ViewMetadata getNullable(String name)
     {
         return views.get(name);
     }
@@ -99,36 +102,39 @@ public final class Views implements Iterable<ViewDefinition>
     /**
      * Create a MaterializedViews instance with the provided materialized view added
      */
-    public Views with(ViewDefinition view)
+    public Views with(ViewMetadata view)
     {
-        if (get(view.viewName).isPresent())
-            throw new IllegalStateException(String.format("Materialized View %s already exists", view.viewName));
+        if (get(view.name).isPresent())
+            throw new IllegalStateException(String.format("Materialized View %s already exists", view.name));
 
         return builder().add(this).add(view).build();
     }
 
+    public Views withSwapped(ViewMetadata view)
+    {
+        return without(view.name).with(view);
+    }
+
     /**
      * Creates a MaterializedViews instance with the materializedView with the provided name removed
      */
     public Views without(String name)
     {
-        ViewDefinition materializedView =
+        ViewMetadata materializedView =
             get(name).orElseThrow(() -> new IllegalStateException(String.format("Materialized View %s doesn't exists", name)));
 
         return builder().add(filter(this, v -> v != materializedView)).build();
     }
 
-    /**
-     * Creates a MaterializedViews instance which contains an updated materialized view
-     */
-    public Views replace(ViewDefinition view, CFMetaData cfm)
+    MapDifference<TableId, ViewMetadata> diff(Views other)
     {
-        return without(view.viewName).with(view);
-    }
+        Map<TableId, ViewMetadata> thisViews = new HashMap<>();
+        this.forEach(v -> thisViews.put(v.metadata.id, v));
 
-    MapDifference<String, ViewDefinition> diff(Views other)
-    {
-        return Maps.difference(views, other.views);
+        Map<TableId, ViewMetadata> otherViews = new HashMap<>();
+        other.forEach(v -> otherViews.put(v.metadata.id, v));
+
+        return Maps.difference(thisViews, otherViews);
     }
 
     @Override
@@ -151,7 +157,7 @@ public final class Views implements Iterable<ViewDefinition>
 
     public static final class Builder
     {
-        final ImmutableMap.Builder<String, ViewDefinition> views = new ImmutableMap.Builder<>();
+        final ImmutableMap.Builder<String, ViewMetadata> views = new ImmutableMap.Builder<>();
 
         private Builder()
         {
@@ -163,13 +169,13 @@ public final class Views implements Iterable<ViewDefinition>
         }
 
 
-        public Builder add(ViewDefinition view)
+        public Builder add(ViewMetadata view)
         {
-            views.put(view.viewName, view);
+            views.put(view.name, view);
             return this;
         }
 
-        public Builder add(Iterable<ViewDefinition> views)
+        public Builder add(Iterable<ViewMetadata> views)
         {
             views.forEach(this::add);
             return this;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index 8944b7c..6e0f45b 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.service;
 import java.net.InetAddress;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Iterables;
@@ -28,7 +29,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.ReadRepairDecision;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.ReadCommand;
@@ -42,6 +42,7 @@ import org.apache.cassandra.metrics.ReadRepairMetrics;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.SpeculativeRetryParam;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.StorageProxy.LocalReadRunnable;
 import org.apache.cassandra.tracing.TraceState;
 import org.apache.cassandra.tracing.Tracing;
@@ -145,17 +146,29 @@ public abstract class AbstractReadExecutor
         return handler.get();
     }
 
+    private static ReadRepairDecision newReadRepairDecision(TableMetadata metadata)
+    {
+        double chance = ThreadLocalRandom.current().nextDouble();
+        if (metadata.params.readRepairChance > chance)
+            return ReadRepairDecision.GLOBAL;
+
+        if (metadata.params.dcLocalReadRepairChance > chance)
+            return ReadRepairDecision.DC_LOCAL;
+
+        return ReadRepairDecision.NONE;
+    }
+
     /**
      * @return an executor appropriate for the configured speculative read policy
      */
     public static AbstractReadExecutor getReadExecutor(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel, long queryStartNanoTime) throws UnavailableException
     {
-        Keyspace keyspace = Keyspace.open(command.metadata().ksName);
+        Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
         List<InetAddress> allReplicas = StorageProxy.getLiveSortedEndpoints(keyspace, command.partitionKey());
         // 11980: Excluding EACH_QUORUM reads from potential RR, so that we do not miscount DC responses
         ReadRepairDecision repairDecision = consistencyLevel == ConsistencyLevel.EACH_QUORUM
                                             ? ReadRepairDecision.NONE
-                                            : command.metadata().newReadRepairDecision();
+                                            : newReadRepairDecision(command.metadata());
         List<InetAddress> targetReplicas = consistencyLevel.filterForQuery(keyspace, allReplicas, repairDecision);
 
         // Throw UAE early if we don't have enough replicas.
@@ -167,8 +180,8 @@ public abstract class AbstractReadExecutor
             ReadRepairMetrics.attempted.mark();
         }
 
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().cfId);
-        SpeculativeRetryParam retry = cfs.metadata.params.speculativeRetry;
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().id);
+        SpeculativeRetryParam retry = cfs.metadata().params.speculativeRetry;
 
         // Speculative retry is disabled *OR* there are simply no extra replicas to speculate.
         // 11980: Disable speculative retry if using EACH_QUORUM in order to prevent miscounting DC responses

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 7476cd9..e7c6640 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -64,6 +64,7 @@ import org.apache.cassandra.repair.RepairJobDesc;
 import org.apache.cassandra.repair.RepairParallelism;
 import org.apache.cassandra.repair.RepairSession;
 import org.apache.cassandra.repair.messages.*;
+import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.utils.CassandraVersion;
 import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.FBUtilities;
@@ -307,15 +308,15 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
             }
         };
 
-        List<UUID> cfIds = new ArrayList<>(columnFamilyStores.size());
+        List<TableId> tableIds = new ArrayList<>(columnFamilyStores.size());
         for (ColumnFamilyStore cfs : columnFamilyStores)
-            cfIds.add(cfs.metadata.cfId);
+            tableIds.add(cfs.metadata.id);
 
         for (InetAddress neighbour : endpoints)
         {
             if (FailureDetector.instance.isAlive(neighbour))
             {
-                PrepareMessage message = new PrepareMessage(parentRepairSession, cfIds, options.getRanges(), options.isIncremental(), timestamp, options.isGlobal());
+                PrepareMessage message = new PrepareMessage(parentRepairSession, tableIds, options.getRanges(), options.isIncremental(), timestamp, options.isGlobal());
                 MessageOut<RepairMessage> msg = message.createMessage();
                 MessagingService.instance().sendRR(msg, neighbour, callback, TimeUnit.HOURS.toMillis(1), true);
             }
@@ -357,12 +358,12 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
         parentRepairSessions.put(parentRepairSession, new ParentRepairSession(coordinator, columnFamilyStores, ranges, isIncremental, timestamp, isGlobal));
     }
 
-    public Set<SSTableReader> currentlyRepairing(UUID cfId, UUID parentRepairSession)
+    public Set<SSTableReader> currentlyRepairing(TableId tableId, UUID parentRepairSession)
     {
         Set<SSTableReader> repairing = new HashSet<>();
         for (Map.Entry<UUID, ParentRepairSession> entry : parentRepairSessions.entrySet())
         {
-            Collection<SSTableReader> sstables = entry.getValue().getActiveSSTables(cfId);
+            Collection<SSTableReader> sstables = entry.getValue().getActiveSSTables(tableId);
             if (sstables != null && !entry.getKey().equals(parentRepairSession))
                 repairing.addAll(sstables);
         }
@@ -447,7 +448,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
         // if we don't have successful repair ranges, then just skip anticompaction
         if (!successfulRanges.isEmpty())
         {
-            for (Map.Entry<UUID, ColumnFamilyStore> columnFamilyStoreEntry : prs.columnFamilyStores.entrySet())
+            for (Map.Entry<TableId, ColumnFamilyStore> columnFamilyStoreEntry : prs.columnFamilyStores.entrySet())
             {
                 Refs<SSTableReader> sstables = prs.getActiveRepairedSSTableRefsForAntiCompaction(columnFamilyStoreEntry.getKey(), parentRepairSession);
                 ColumnFamilyStore cfs = columnFamilyStoreEntry.getValue();
@@ -504,9 +505,9 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
      */
     public static class ParentRepairSession
     {
-        private final Map<UUID, ColumnFamilyStore> columnFamilyStores = new HashMap<>();
+        private final Map<TableId, ColumnFamilyStore> columnFamilyStores = new HashMap<>();
         private final Collection<Range<Token>> ranges;
-        public final Map<UUID, Set<String>> sstableMap = new HashMap<>();
+        public final Map<TableId, Set<String>> sstableMap = new HashMap<>();
         public final boolean isIncremental;
         public final boolean isGlobal;
         public final long repairedAt;
@@ -514,15 +515,15 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
         /**
          * Indicates whether we have marked sstables as repairing. Can only be done once per table per ParentRepairSession
          */
-        private final Set<UUID> marked = new HashSet<>();
+        private final Set<TableId> marked = new HashSet<>();
 
         public ParentRepairSession(InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt, boolean isGlobal)
         {
             this.coordinator = coordinator;
             for (ColumnFamilyStore cfs : columnFamilyStores)
             {
-                this.columnFamilyStores.put(cfs.metadata.cfId, cfs);
-                sstableMap.put(cfs.metadata.cfId, new HashSet<String>());
+                this.columnFamilyStores.put(cfs.metadata.id, cfs);
+                sstableMap.put(cfs.metadata.id, new HashSet<>());
             }
             this.ranges = ranges;
             this.repairedAt = repairedAt;
@@ -535,22 +536,22 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
          *
          * whether this is an incremental or full repair
          *
-         * @param cfId the column family
+         * @param tableId the table
          * @param parentSessionId the parent repair session id, used to make sure we don't start multiple repairs over the same sstables
          */
-        public synchronized void markSSTablesRepairing(UUID cfId, UUID parentSessionId)
+        public synchronized void markSSTablesRepairing(TableId tableId, UUID parentSessionId)
         {
-            if (!marked.contains(cfId))
+            if (!marked.contains(tableId))
             {
-                List<SSTableReader> sstables = columnFamilyStores.get(cfId).select(View.select(SSTableSet.CANONICAL, (s) -> !isIncremental || !s.isRepaired())).sstables;
-                Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfId, parentSessionId);
+                List<SSTableReader> sstables = columnFamilyStores.get(tableId).select(View.select(SSTableSet.CANONICAL, (s) -> !isIncremental || !s.isRepaired())).sstables;
+                Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(tableId, parentSessionId);
                 if (!Sets.intersection(currentlyRepairing, Sets.newHashSet(sstables)).isEmpty())
                 {
                     logger.error("Cannot start multiple repair sessions over the same sstables");
                     throw new RuntimeException("Cannot start multiple repair sessions over the same sstables");
                 }
-                addSSTables(cfId, sstables);
-                marked.add(cfId);
+                addSSTables(tableId, sstables);
+                marked.add(tableId);
             }
         }
 
@@ -560,26 +561,26 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
          * note that validation and streaming do not call this method - they have to work on the actual active sstables on the node, we only call this
          * to know which sstables are still there that were there when we started the repair
          *
-         * @param cfId
+         * @param tableId
          * @param parentSessionId for checking if there exists a snapshot for this repair
          * @return
          */
         @SuppressWarnings("resource")
-        public synchronized Refs<SSTableReader> getActiveRepairedSSTableRefsForAntiCompaction(UUID cfId, UUID parentSessionId)
+        public synchronized Refs<SSTableReader> getActiveRepairedSSTableRefsForAntiCompaction(TableId tableId, UUID parentSessionId)
         {
-            assert marked.contains(cfId);
-            if (!columnFamilyStores.containsKey(cfId))
-                throw new RuntimeException("Not possible to get sstables for anticompaction for " + cfId);
-            boolean isSnapshotRepair = columnFamilyStores.get(cfId).snapshotExists(parentSessionId.toString());
+            assert marked.contains(tableId);
+            if (!columnFamilyStores.containsKey(tableId))
+                throw new RuntimeException("Not possible to get sstables for anticompaction for " + tableId);
+            boolean isSnapshotRepair = columnFamilyStores.get(tableId).snapshotExists(parentSessionId.toString());
             ImmutableMap.Builder<SSTableReader, Ref<SSTableReader>> references = ImmutableMap.builder();
-            Iterable<SSTableReader> sstables = isSnapshotRepair ? getSSTablesForSnapshotRepair(cfId, parentSessionId) : getActiveSSTables(cfId);
-            // we check this above - if columnFamilyStores contains the cfId sstables will not be null
+            Iterable<SSTableReader> sstables = isSnapshotRepair ? getSSTablesForSnapshotRepair(tableId, parentSessionId) : getActiveSSTables(tableId);
+            // we check this above - if columnFamilyStores contains the tableId sstables will not be null
             assert sstables != null;
             for (SSTableReader sstable : sstables)
             {
                 Ref<SSTableReader> ref = sstable.tryRef();
                 if (ref == null)
-                    sstableMap.get(cfId).remove(sstable.getFilename());
+                    sstableMap.get(tableId).remove(sstable.getFilename());
                 else
                     references.put(sstable, ref);
             }
@@ -592,14 +593,14 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
          * We use the generation of the sstables as identifiers instead of the file name to avoid having to parse out the
          * actual filename.
          *
-         * @param cfId
+         * @param tableId
          * @param parentSessionId
          * @return
          */
-        private Set<SSTableReader> getSSTablesForSnapshotRepair(UUID cfId, UUID parentSessionId)
+        private Set<SSTableReader> getSSTablesForSnapshotRepair(TableId tableId, UUID parentSessionId)
         {
             Set<SSTableReader> activeSSTables = new HashSet<>();
-            ColumnFamilyStore cfs = columnFamilyStores.get(cfId);
+            ColumnFamilyStore cfs = columnFamilyStores.get(tableId);
             if (cfs == null)
                 return null;
 
@@ -621,30 +622,30 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
             return activeSSTables;
         }
 
-        public synchronized void maybeSnapshot(UUID cfId, UUID parentSessionId)
+        public synchronized void maybeSnapshot(TableId tableId, UUID parentSessionId)
         {
             String snapshotName = parentSessionId.toString();
-            if (!columnFamilyStores.get(cfId).snapshotExists(snapshotName))
+            if (!columnFamilyStores.get(tableId).snapshotExists(snapshotName))
             {
-                Set<SSTableReader> snapshottedSSTables = columnFamilyStores.get(cfId).snapshot(snapshotName, new Predicate<SSTableReader>()
+                Set<SSTableReader> snapshottedSSTables = columnFamilyStores.get(tableId).snapshot(snapshotName, new Predicate<SSTableReader>()
                 {
                     public boolean apply(SSTableReader sstable)
                     {
                         return sstable != null &&
                                (!isIncremental || !sstable.isRepaired()) &&
-                               !(sstable.metadata.isIndex()) && // exclude SSTables from 2i
+                               !(sstable.metadata().isIndex()) && // exclude SSTables from 2i
                                new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges);
                     }
                 }, true, false);
 
-                if (isAlreadyRepairing(cfId, parentSessionId, snapshottedSSTables))
+                if (isAlreadyRepairing(tableId, parentSessionId, snapshottedSSTables))
                 {
-                    columnFamilyStores.get(cfId).clearSnapshot(snapshotName);
+                    columnFamilyStores.get(tableId).clearSnapshot(snapshotName);
                     logger.error("Cannot start multiple repair sessions over the same sstables");
                     throw new RuntimeException("Cannot start multiple repair sessions over the same sstables");
                 }
-                addSSTables(cfId, snapshottedSSTables);
-                marked.add(cfId);
+                addSSTables(tableId, snapshottedSSTables);
+                marked.add(tableId);
             }
         }
 
@@ -654,14 +655,14 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
          *
          * we compare generations since the sstables have different paths due to snapshot names
          *
-         * @param cfId id of the column family store
+         * @param tableId id of table store
          * @param parentSessionId parent repair session
          * @param sstables the newly snapshotted sstables
          * @return
          */
-        private boolean isAlreadyRepairing(UUID cfId, UUID parentSessionId, Collection<SSTableReader> sstables)
+        private boolean isAlreadyRepairing(TableId tableId, UUID parentSessionId, Collection<SSTableReader> sstables)
         {
-            Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfId, parentSessionId);
+            Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(tableId, parentSessionId);
             Set<Integer> currentlyRepairingGenerations = new HashSet<>();
             Set<Integer> newRepairingGenerations = new HashSet<>();
             for (SSTableReader sstable : currentlyRepairing)
@@ -672,15 +673,15 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
             return !Sets.intersection(currentlyRepairingGenerations, newRepairingGenerations).isEmpty();
         }
 
-        private Set<SSTableReader> getActiveSSTables(UUID cfId)
+        private Set<SSTableReader> getActiveSSTables(TableId tableId)
         {
-            if (!columnFamilyStores.containsKey(cfId))
+            if (!columnFamilyStores.containsKey(tableId))
                 return null;
 
-            Set<String> repairedSSTables = sstableMap.get(cfId);
+            Set<String> repairedSSTables = sstableMap.get(tableId);
             Set<SSTableReader> activeSSTables = new HashSet<>();
             Set<String> activeSSTableNames = new HashSet<>();
-            ColumnFamilyStore cfs = columnFamilyStores.get(cfId);
+            ColumnFamilyStore cfs = columnFamilyStores.get(tableId);
             for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL))
             {
                 if (repairedSSTables.contains(sstable.getFilename()))
@@ -689,14 +690,14 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
                     activeSSTableNames.add(sstable.getFilename());
                 }
             }
-            sstableMap.put(cfId, activeSSTableNames);
+            sstableMap.put(tableId, activeSSTableNames);
             return activeSSTables;
         }
 
-        private void addSSTables(UUID cfId, Collection<SSTableReader> sstables)
+        private void addSSTables(TableId tableId, Collection<SSTableReader> sstables)
         {
             for (SSTableReader sstable : sstables)
-                sstableMap.get(cfId).add(sstable.getFilename());
+                sstableMap.get(tableId).add(sstable.getFilename());
         }