You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2015/08/18 16:06:05 UTC

[2/7] cassandra git commit: Move 2i metadata out of system_schema.columns and ColumnDefinition

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/src/java/org/apache/cassandra/schema/IndexMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/IndexMetadata.java b/src/java/org/apache/cassandra/schema/IndexMetadata.java
new file mode 100644
index 0000000..af07cee
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/IndexMetadata.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.schema;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.exceptions.ConfigurationException;
+
+/**
+ * An immutable representation of secondary index metadata.
+ */
+public final class IndexMetadata
+{
+    public enum IndexType
+    {
+        KEYS, CUSTOM, COMPOSITES
+    }
+
+    public enum TargetType
+    {
+        COLUMN, ROW
+    }
+
+    public final String name;
+    public final IndexType indexType;
+    public final TargetType targetType;
+    public final Map<String, String> options;
+    public final Set<ColumnIdentifier> columns;
+
+    private IndexMetadata(String name,
+                          Map<String, String> options,
+                          IndexType indexType,
+                          TargetType targetType,
+                          Set<ColumnIdentifier> columns)
+    {
+        this.name = name;
+        this.options = options == null ? ImmutableMap.of() : ImmutableMap.copyOf(options);
+        this.indexType = indexType;
+        this.targetType = targetType;
+        this.columns = columns == null ? ImmutableSet.of() : ImmutableSet.copyOf(columns);
+    }
+
+    public static IndexMetadata legacyIndex(ColumnIdentifier column,
+                                              String name,
+                                              IndexType type,
+                                              Map<String, String> options)
+    {
+        return new IndexMetadata(name, options, type, TargetType.COLUMN, Collections.singleton(column));
+    }
+
+    public static IndexMetadata legacyIndex(ColumnDefinition column,
+                                              String name,
+                                              IndexType type,
+                                              Map<String, String> options)
+    {
+        return legacyIndex(column.name, name, type, options);
+    }
+
+    public static boolean isNameValid(String name)
+    {
+        return name != null && !name.isEmpty() && name.matches("\\w+");
+    }
+
+    // these will go away as part of #9459 as we enable real per-row indexes
+    public static String getDefaultIndexName(String cfName, ColumnIdentifier columnName)
+    {
+        return (cfName + "_" + columnName + "_idx").replaceAll("\\W", "");
+    }
+
+    public void validate()
+    {
+        if (!isNameValid(name))
+            throw new ConfigurationException("Illegal index name " + name);
+
+        if (indexType == null)
+            throw new ConfigurationException("Index type is null for index " + name);
+
+        if (targetType == null)
+            throw new ConfigurationException("Target type is null for index " + name);
+
+        if (indexType == IndexMetadata.IndexType.CUSTOM)
+            if (options == null || !options.containsKey(SecondaryIndex.CUSTOM_INDEX_OPTION_NAME))
+                throw new ConfigurationException(String.format("Required option missing for index %s : %s",
+                                                               name, SecondaryIndex.CUSTOM_INDEX_OPTION_NAME));
+    }
+
+    public ColumnDefinition indexedColumn(CFMetaData cfm)
+    {
+       return cfm.getColumnDefinition(columns.iterator().next());
+    }
+
+    public boolean isCustom()
+    {
+        return indexType == IndexType.CUSTOM;
+    }
+
+    public boolean isKeys()
+    {
+        return indexType == IndexType.KEYS;
+    }
+
+    public boolean isComposites()
+    {
+        return indexType == IndexType.COMPOSITES;
+    }
+
+    public int hashCode()
+    {
+        return Objects.hashCode(name, indexType, targetType, options, columns);
+    }
+
+    public boolean equals(Object obj)
+    {
+        if (obj == this)
+            return true;
+
+        if (!(obj instanceof IndexMetadata))
+            return false;
+
+        IndexMetadata other = (IndexMetadata)obj;
+
+        return Objects.equal(name, other.name)
+            && Objects.equal(indexType, other.indexType)
+            && Objects.equal(targetType, other.targetType)
+            && Objects.equal(options, other.options)
+            && Objects.equal(columns, other.columns);
+    }
+
+    public String toString()
+    {
+        return new ToStringBuilder(this)
+            .append("name", name)
+            .append("indexType", indexType)
+            .append("targetType", targetType)
+            .append("columns", columns)
+            .append("options", options)
+            .build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/src/java/org/apache/cassandra/schema/Indexes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Indexes.java b/src/java/org/apache/cassandra/schema/Indexes.java
new file mode 100644
index 0000000..7c930b3
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/Indexes.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.schema;
+
+import java.util.*;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+
+import static com.google.common.collect.Iterables.filter;
+
+/**
+ * For backwards compatibility, in the first instance an IndexMetadata must have
+ * TargetType.COLUMN and its Set of target columns must contain only a single
+ * ColumnIdentifier. Hence, this is what is enforced by the public factory methods
+ * on IndexMetadata.
+ * These constraints, along with the internal datastructures here will be relaxed as
+ * support is added for multiple target columns per-index and for indexes with
+ * TargetType.ROW
+ */
+public class Indexes implements Iterable<IndexMetadata>
+{
+    // lookup for index by target column
+    private final ImmutableMap<ColumnIdentifier, IndexMetadata> indexes;
+
+    private Indexes(Builder builder)
+    {
+        ImmutableMap.Builder<ColumnIdentifier, IndexMetadata> internalBuilder = ImmutableMap.builder();
+        builder.indexes.build()
+                       .values()
+                       .stream()
+                       .forEach(def -> internalBuilder.put(def.columns.iterator().next(), def));
+        indexes = internalBuilder.build();
+    }
+
+    public static Builder builder()
+    {
+        return new Builder();
+    }
+
+    public static Indexes none()
+    {
+        return builder().build();
+    }
+
+    public Iterator<IndexMetadata> iterator()
+    {
+        return indexes.values().iterator();
+    }
+
+    public int size()
+    {
+        return indexes.size();
+    }
+
+    public boolean isEmpty()
+    {
+        return indexes.isEmpty();
+    }
+
+    /**
+     * Get the index with the specified name
+     *
+     * @param name a non-qualified index name
+     * @return an empty {@link Optional} if the named index is not found; a non-empty optional of {@link IndexMetadata} otherwise
+     */
+    public Optional<IndexMetadata> get(String name)
+    {
+        return indexes.values().stream().filter(def -> def.name.equals(name)).findFirst();
+    }
+
+    /**
+     * Answer true if contains an index with the specified name.
+     * @param name a non-qualified index name.
+     * @return true if the named index is found; false otherwise
+     */
+    public boolean has(String name)
+    {
+        return get(name).isPresent();
+    }
+
+    /**
+     * Get the index associated with the specified column. This may be removed or modified as support is added
+     * for indexes with multiple target columns and with TargetType.ROW
+     *
+     * @param column a column definition for which an {@link IndexMetadata} is being sought
+     * @return an empty {@link Optional} if the named index is not found; a non-empty optional of {@link IndexMetadata} otherwise
+     */
+    public Optional<IndexMetadata> get(ColumnDefinition column)
+    {
+        return Optional.ofNullable(indexes.get(column.name));
+    }
+
+    /**
+     * Answer true if an index is associated with the specified column.
+     * @param column
+     * @return
+     */
+    public boolean hasIndexFor(ColumnDefinition column)
+    {
+        return indexes.get(column.name) != null;
+    }
+
+    /**
+     * Create a SecondaryIndexes instance with the provided index added
+     */
+    public Indexes with(IndexMetadata index)
+    {
+        if (get(index.name).isPresent())
+            throw new IllegalStateException(String.format("Index %s already exists", index.name));
+
+        return builder().add(this).add(index).build();
+    }
+
+    /**
+     * Creates a SecondaryIndexes instance with the index with the provided name removed
+     */
+    public Indexes without(String name)
+    {
+        IndexMetadata index = get(name).orElseThrow(() -> new IllegalStateException(String.format("Index %s doesn't exist", name)));
+        return builder().add(filter(this, v -> v != index)).build();
+    }
+
+    /**
+     * Creates a SecondaryIndexes instance which contains an updated index definition
+     */
+    public Indexes replace(IndexMetadata index)
+    {
+        return without(index.name).with(index);
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        return this == o || (o instanceof Indexes && indexes.equals(((Indexes) o).indexes));
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return indexes.hashCode();
+    }
+
+    @Override
+    public String toString()
+    {
+        return indexes.values().toString();
+    }
+
+    public static String getAvailableIndexName(String ksName, String cfName, ColumnIdentifier columnName)
+    {
+
+        KeyspaceMetadata ksm = Schema.instance.getKSMetaData(ksName);
+        Set<String> existingNames = ksm == null ? new HashSet<>() : ksm.existingIndexNames(null);
+        String baseName = IndexMetadata.getDefaultIndexName(cfName, columnName);
+        String acceptedName = baseName;
+        int i = 0;
+        while (existingNames.contains(acceptedName))
+            acceptedName = baseName + '_' + (++i);
+
+        return acceptedName;
+    }
+
+    public static final class Builder
+    {
+        final ImmutableMap.Builder<String, IndexMetadata> indexes = new ImmutableMap.Builder<>();
+
+        private Builder()
+        {
+        }
+
+        public Indexes build()
+        {
+            return new Indexes(this);
+        }
+
+        public Builder add(IndexMetadata index)
+        {
+            indexes.put(index.name, index);
+            return this;
+        }
+
+        public Builder add(Iterable<IndexMetadata> indexes)
+        {
+            indexes.forEach(this::add);
+            return this;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java b/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
index 0ce22fb..372ff6e 100644
--- a/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
+++ b/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
@@ -17,6 +17,10 @@
  */
 package org.apache.cassandra.schema;
 
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+
 import com.google.common.base.Objects;
 
 import org.apache.cassandra.config.CFMetaData;
@@ -78,6 +82,25 @@ public final class KeyspaceMetadata
         return new KeyspaceMetadata(name, params, tables, types, functions);
     }
 
+    public Set<String> existingIndexNames(String cfToExclude)
+    {
+        Set<String> indexNames = new HashSet<>();
+        for (CFMetaData table : tables)
+            if (cfToExclude == null || !table.cfName.equals(cfToExclude))
+                for (IndexMetadata index : table.getIndexes())
+                    indexNames.add(index.name);
+        return indexNames;
+    }
+
+    public Optional<CFMetaData> findIndexedTable(String indexName)
+    {
+        for (CFMetaData cfm : tables)
+            if (cfm.getIndexes().has(indexName))
+                return Optional.of(cfm);
+
+        return Optional.empty();
+    }
+
     @Override
     public int hashCode()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
index 8dac03b..43f3dc3 100644
--- a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
+++ b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
@@ -121,7 +121,6 @@ public final class LegacySchemaMigrator
     private static void storeKeyspaceInNewSchemaTables(Keyspace keyspace)
     {
         Mutation mutation = SchemaKeyspace.makeCreateKeyspaceMutation(keyspace.name, keyspace.params, keyspace.timestamp);
-
         for (Table table : keyspace.tables)
             SchemaKeyspace.addTableToSchemaMutation(table.metadata, table.timestamp, true, mutation);
 
@@ -293,6 +292,16 @@ public final class LegacySchemaMigrator
                                                                         isStaticCompactTable,
                                                                         needsUpgrade);
 
+        Indexes indexes = createIndexesFromColumnRows(columnRows,
+                                                      ksName,
+                                                      cfName,
+                                                      rawComparator,
+                                                      subComparator,
+                                                      isSuper,
+                                                      isCQLTable,
+                                                      isStaticCompactTable,
+                                                      needsUpgrade);
+
         if (needsUpgrade)
         {
             addDefinitionForUpgrade(columnDefs,
@@ -315,6 +324,7 @@ public final class LegacySchemaMigrator
                                            false, // legacy schema did not contain views
                                            columnDefs,
                                            DatabaseDescriptor.getPartitioner());
+        cfm.indexes(indexes);
 
         if (tableRow.has("dropped_columns"))
             addDroppedColumns(cfm, rawComparator, tableRow.getMap("dropped_columns", UTF8Type.instance, LongType.instance));
@@ -530,29 +540,73 @@ public final class LegacySchemaMigrator
             if (isEmptyCompactValueColumn(row))
                 continue;
 
-            ColumnDefinition.Kind kind = deserializeKind(row.getString("type"));
-            if (needsUpgrade && isStaticCompactTable && kind == ColumnDefinition.Kind.REGULAR)
-                kind = ColumnDefinition.Kind.STATIC;
+            columns.add(createColumnFromColumnRow(row,
+                                                  keyspace,
+                                                  table,
+                                                  rawComparator,
+                                                  rawSubComparator,
+                                                  isSuper,
+                                                  isCQLTable,
+                                                  isStaticCompactTable,
+                                                  needsUpgrade));
+        }
 
-            Integer componentIndex = null;
-            // Note that the component_index is not useful for non-primary key parts (it never really in fact since there is
-            // no particular ordering of non-PK columns, we only used to use it as a simplification but that's not needed
-            // anymore)
-            if (kind.isPrimaryKeyKind() && row.has("component_index"))
-                componentIndex = row.getInt("component_index");
+        return columns;
+    }
 
-            // Note: we save the column name as string, but we should not assume that it is an UTF8 name, we
-            // we need to use the comparator fromString method
-            AbstractType<?> comparator = isCQLTable
-                                       ? UTF8Type.instance
-                                       : CompactTables.columnDefinitionComparator(kind, isSuper, rawComparator, rawSubComparator);
-            ColumnIdentifier name = ColumnIdentifier.getInterned(comparator.fromString(row.getString("column_name")), comparator);
+    private static ColumnDefinition createColumnFromColumnRow(UntypedResultSet.Row row,
+                                                              String keyspace,
+                                                              String table,
+                                                              AbstractType<?> rawComparator,
+                                                              AbstractType<?> rawSubComparator,
+                                                              boolean isSuper,
+                                                              boolean isCQLTable,
+                                                              boolean isStaticCompactTable,
+                                                              boolean needsUpgrade)
+    {
+        ColumnDefinition.Kind kind = deserializeKind(row.getString("type"));
+        if (needsUpgrade && isStaticCompactTable && kind == ColumnDefinition.Kind.REGULAR)
+            kind = ColumnDefinition.Kind.STATIC;
+
+        Integer componentIndex = null;
+        // Note that the component_index is not useful for non-primary key parts (it never really in fact since there is
+        // no particular ordering of non-PK columns, we only used to use it as a simplification but that's not needed
+        // anymore)
+        if (kind.isPrimaryKeyKind() && row.has("component_index"))
+            componentIndex = row.getInt("component_index");
 
-            AbstractType<?> validator = parseType(row.getString("validator"));
+        // Note: we save the column name as string, but we should not assume that it is an UTF8 name, we
+        // we need to use the comparator fromString method
+        AbstractType<?> comparator = isCQLTable
+                                     ? UTF8Type.instance
+                                     : CompactTables.columnDefinitionComparator(kind, isSuper, rawComparator, rawSubComparator);
+        ColumnIdentifier name = ColumnIdentifier.getInterned(comparator.fromString(row.getString("column_name")), comparator);
 
-            IndexType indexType = null;
+        AbstractType<?> validator = parseType(row.getString("validator"));
+
+        return new ColumnDefinition(keyspace, table, name, validator, componentIndex, kind);
+    }
+
+    private static Indexes createIndexesFromColumnRows(UntypedResultSet rows,
+                                                       String keyspace,
+                                                       String table,
+                                                       AbstractType<?> rawComparator,
+                                                       AbstractType<?> rawSubComparator,
+                                                       boolean isSuper,
+                                                       boolean isCQLTable,
+                                                       boolean isStaticCompactTable,
+                                                       boolean needsUpgrade)
+    {
+        Indexes.Builder indexes = Indexes.builder();
+
+        for (UntypedResultSet.Row row : rows)
+        {
+            IndexMetadata.IndexType indexType = null;
             if (row.has("index_type"))
-                indexType = IndexType.valueOf(row.getString("index_type"));
+                indexType = IndexMetadata.IndexType.valueOf(row.getString("index_type"));
+
+            if (indexType == null)
+                continue;
 
             Map<String, String> indexOptions = null;
             if (row.has("index_options"))
@@ -562,10 +616,20 @@ public final class LegacySchemaMigrator
             if (row.has("index_name"))
                 indexName = row.getString("index_name");
 
-            columns.add(new ColumnDefinition(keyspace, table, name, validator, indexType, indexOptions, indexName, componentIndex, kind));
+            ColumnDefinition column = createColumnFromColumnRow(row,
+                                                                keyspace,
+                                                                table,
+                                                                rawComparator,
+                                                                rawSubComparator,
+                                                                isSuper,
+                                                                isCQLTable,
+                                                                isStaticCompactTable,
+                                                                needsUpgrade);
+
+            indexes.add(IndexMetadata.legacyIndex(column, indexName, indexType, indexOptions));
         }
 
-        return columns;
+        return indexes.build();
     }
 
     private static ColumnDefinition.Kind deserializeKind(String kind)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index 4f3fe93..f77b2bc 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -24,6 +24,7 @@ import java.security.NoSuchAlgorithmException;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.MapDifference;
@@ -48,8 +49,6 @@ import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 
 import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal;
-import static org.apache.cassandra.utils.FBUtilities.fromJsonMap;
-import static org.apache.cassandra.utils.FBUtilities.json;
 
 /**
  * system_schema.* tables and methods for manipulating them.
@@ -73,9 +72,10 @@ public final class SchemaKeyspace
     public static final String TYPES = "types";
     public static final String FUNCTIONS = "functions";
     public static final String AGGREGATES = "aggregates";
+    public static final String INDEXES = "indexes";
 
     public static final List<String> ALL =
-        ImmutableList.of(KEYSPACES, TABLES, COLUMNS, TRIGGERS, MATERIALIZED_VIEWS, TYPES, FUNCTIONS, AGGREGATES);
+        ImmutableList.of(KEYSPACES, TABLES, COLUMNS, TRIGGERS, MATERIALIZED_VIEWS, TYPES, FUNCTIONS, AGGREGATES, INDEXES);
 
     private static final CFMetaData Keyspaces =
         compile(KEYSPACES,
@@ -119,9 +119,6 @@ public final class SchemaKeyspace
                 + "column_name text,"
                 + "column_name_bytes blob,"
                 + "component_index int,"
-                + "index_name text,"
-                + "index_options text,"
-                + "index_type text,"
                 + "type text,"
                 + "validator text,"
                 + "PRIMARY KEY ((keyspace_name), table_name, column_name))");
@@ -159,6 +156,19 @@ public final class SchemaKeyspace
                 + "included_columns list<text>,"
                 + "PRIMARY KEY ((keyspace_name), table_name, view_name))");
 
+    private static final CFMetaData Indexes =
+        compile(INDEXES,
+                "secondary index definitions",
+                "CREATE TABLE %s ("
+                + "keyspace_name text,"
+                + "table_name text,"
+                + "index_name text,"
+                + "index_type text,"
+                + "options map<text, text>,"
+                + "target_columns set<text>,"
+                + "target_type text,"
+                + "PRIMARY KEY ((keyspace_name), table_name, index_name))");
+
     private static final CFMetaData Types =
         compile(TYPES,
                 "user defined type definitions",
@@ -199,8 +209,8 @@ public final class SchemaKeyspace
                 + "state_type text,"
                 + "PRIMARY KEY ((keyspace_name), aggregate_name, signature))");
 
-    public static final List<CFMetaData> All =
-        ImmutableList.of(Keyspaces, Tables, Columns, Triggers, DroppedColumns, MaterializedViews, Types, Functions, Aggregates);
+    public static final List<CFMetaData> ALL_TABLE_METADATA =
+        ImmutableList.of(Keyspaces, Tables, Columns, Triggers, DroppedColumns, MaterializedViews, Types, Functions, Aggregates, Indexes);
 
     private static CFMetaData compile(String name, String description, String schema)
     {
@@ -211,7 +221,7 @@ public final class SchemaKeyspace
 
     public static KeyspaceMetadata metadata()
     {
-        return KeyspaceMetadata.create(NAME, KeyspaceParams.local(), org.apache.cassandra.schema.Tables.of(All));
+        return KeyspaceMetadata.create(NAME, KeyspaceParams.local(), org.apache.cassandra.schema.Tables.of(ALL_TABLE_METADATA));
     }
 
     /**
@@ -699,7 +709,7 @@ public final class SchemaKeyspace
         int nowInSec = FBUtilities.nowInSeconds();
         Mutation mutation = new Mutation(NAME, Keyspaces.decorateKey(getSchemaKSKey(keyspace.name)));
 
-        for (CFMetaData schemaTable : All)
+        for (CFMetaData schemaTable : ALL_TABLE_METADATA)
             mutation.add(PartitionUpdate.fullPartitionDelete(schemaTable, mutation.key(), timestamp, nowInSec));
 
         return mutation;
@@ -836,6 +846,9 @@ public final class SchemaKeyspace
 
             for (MaterializedViewDefinition materializedView: table.getMaterializedViews())
                 addMaterializedViewToSchemaMutation(table, materializedView, timestamp, mutation);
+
+            for (IndexMetadata index : table.getIndexes())
+                addIndexToSchemaMutation(table, index, timestamp, mutation);
         }
     }
 
@@ -921,17 +934,45 @@ public final class SchemaKeyspace
 
         // newly created materialized views
         for (MaterializedViewDefinition materializedView : materializedViewDiff.entriesOnlyOnRight().values())
-            addMaterializedViewToSchemaMutation(oldTable, materializedView, timestamp, mutation);
+            addMaterializedViewToSchemaMutation(newTable, materializedView, timestamp, mutation);
 
         // updated materialized views need to be updated
         for (MapDifference.ValueDifference<MaterializedViewDefinition> diff : materializedViewDiff.entriesDiffering().values())
         {
-            addUpdatedMaterializedViewDefinitionToSchemaMutation(oldTable, diff.rightValue(), timestamp, mutation);
+            addUpdatedMaterializedViewDefinitionToSchemaMutation(newTable, diff.rightValue(), timestamp, mutation);
+        }
+
+        MapDifference<String, IndexMetadata> indexesDiff = indexesDiff(oldTable.getIndexes(),
+                                                                       newTable.getIndexes());
+
+        // dropped indexes
+        for (IndexMetadata index : indexesDiff.entriesOnlyOnLeft().values())
+            dropIndexFromSchemaMutation(oldTable, index, timestamp, mutation);
+
+        // newly created indexes
+        for (IndexMetadata index : indexesDiff.entriesOnlyOnRight().values())
+            addIndexToSchemaMutation(newTable, index, timestamp, mutation);
+
+        // updated indexes need to be updated
+        for (MapDifference.ValueDifference<IndexMetadata> diff : indexesDiff.entriesDiffering().values())
+        {
+            addUpdatedIndexToSchemaMutation(newTable, diff.rightValue(), timestamp, mutation);
         }
 
         return mutation;
     }
 
+    private static MapDifference<String, IndexMetadata> indexesDiff(Indexes before, Indexes after)
+    {
+        Map<String, IndexMetadata> beforeMap = new HashMap<>();
+        before.forEach(i -> beforeMap.put(i.name, i));
+
+        Map<String, IndexMetadata> afterMap = new HashMap<>();
+        after.forEach(i -> afterMap.put(i.name, i));
+
+        return Maps.difference(beforeMap, afterMap);
+    }
+
     private static MapDifference<String, TriggerMetadata> triggersDiff(Triggers before, Triggers after)
     {
         Map<String, TriggerMetadata> beforeMap = new HashMap<>();
@@ -970,6 +1011,9 @@ public final class SchemaKeyspace
         for (MaterializedViewDefinition materializedView : table.getMaterializedViews())
             dropMaterializedViewFromSchemaMutation(table, materializedView, timestamp, mutation);
 
+        for (IndexMetadata index : table.getIndexes())
+            dropIndexFromSchemaMutation(table, index, timestamp, mutation);
+
         return mutation;
     }
 
@@ -1037,9 +1081,18 @@ public final class SchemaKeyspace
         MaterializedViews views =
             readSchemaPartitionForTableAndApply(MATERIALIZED_VIEWS, keyspace, table, SchemaKeyspace::createMaterializedViewsFromMaterializedViewsPartition);
 
-        return createTableFromTableRowAndColumns(row, columns).droppedColumns(droppedColumns)
-                                                              .triggers(triggers)
-                                                              .materializedViews(views);
+        CFMetaData cfm = createTableFromTableRowAndColumns(row, columns).droppedColumns(droppedColumns)
+                                                                        .triggers(triggers)
+                                                                        .materializedViews(views);
+
+        // the CFMetaData itself is required to build the collection of indexes as
+        // the column definitions are needed because we store only the name each
+        // index's target columns and this is not enough to reconstruct a ColumnIdentifier
+        org.apache.cassandra.schema.Indexes indexes =
+            readSchemaPartitionForTableAndApply(INDEXES, keyspace, table, rowIterator -> createIndexesFromIndexesPartition(cfm, rowIterator));
+        cfm.indexes(indexes);
+
+        return cfm;
     }
 
     public static CFMetaData createTableFromTableRowAndColumns(UntypedResultSet.Row row, List<ColumnDefinition> columns)
@@ -1107,9 +1160,6 @@ public final class SchemaKeyspace
              .add("validator", column.type.toString())
              .add("type", column.kind.toString().toLowerCase())
              .add("component_index", column.isOnAllComponents() ? null : column.position())
-             .add("index_name", column.getIndexName())
-             .add("index_type", column.getIndexType() == null ? null : column.getIndexType().toString())
-             .add("index_options", json(column.getIndexOptions()))
              .build();
     }
 
@@ -1141,19 +1191,7 @@ public final class SchemaKeyspace
 
         AbstractType<?> validator = parseType(row.getString("validator"));
 
-        IndexType indexType = null;
-        if (row.has("index_type"))
-            indexType = IndexType.valueOf(row.getString("index_type"));
-
-        Map<String, String> indexOptions = null;
-        if (row.has("index_options"))
-            indexOptions = fromJsonMap(row.getString("index_options"));
-
-        String indexName = null;
-        if (row.has("index_name"))
-            indexName = row.getString("index_name");
-
-        return new ColumnDefinition(keyspace, table, name, validator, indexType, indexOptions, indexName, componentIndex, kind);
+        return new ColumnDefinition(keyspace, table, name, validator, componentIndex, kind);
     }
 
     /*
@@ -1233,7 +1271,7 @@ public final class SchemaKeyspace
     }
 
     /*
-     * Global Index metadata serialization/deserialization.
+     * Materialized View metadata serialization/deserialization.
      */
 
     private static void addMaterializedViewToSchemaMutation(CFMetaData table, MaterializedViewDefinition materializedView, long timestamp, Mutation mutation)
@@ -1330,6 +1368,100 @@ public final class SchemaKeyspace
     }
 
     /*
+     * Secondary Index metadata serialization/deserialization.
+     */
+
+    private static void addIndexToSchemaMutation(CFMetaData table,
+                                                 IndexMetadata index,
+                                                 long timestamp,
+                                                 Mutation mutation)
+    {
+        RowUpdateBuilder builder = new RowUpdateBuilder(Indexes, timestamp, mutation)
+                                   .clustering(table.cfName, index.name);
+
+        builder.add("index_type", index.indexType.toString());
+        builder.map("options", index.options);
+        builder.set("target_columns", index.columns.stream()
+                                                   .map(ColumnIdentifier::toString)
+                                                   .collect(Collectors.toSet()));
+        builder.add("target_type", index.targetType.toString());
+        builder.build();
+    }
+
+    private static void dropIndexFromSchemaMutation(CFMetaData table,
+                                                    IndexMetadata index,
+                                                    long timestamp,
+                                                    Mutation mutation)
+    {
+        RowUpdateBuilder.deleteRow(Indexes, timestamp, mutation, table.cfName, index.name);
+    }
+
+    private static void addUpdatedIndexToSchemaMutation(CFMetaData table,
+                                                        IndexMetadata index,
+                                                        long timestamp,
+                                                        Mutation mutation)
+    {
+        RowUpdateBuilder builder = new RowUpdateBuilder(Indexes, timestamp, mutation).clustering(table.cfName, index.name);
+
+        builder.add("index_type", index.indexType.toString());
+        builder.map("options", index.options);
+        builder.set("target_columns", index.columns.stream().map(ColumnIdentifier::toString).collect(Collectors.toSet()));
+        builder.add("target_type", index.targetType.toString());
+        builder.build();
+    }
+    /**
+     * Deserialize secondary indexes from storage-level representation.
+     *
+     * @param partition storage-level partition containing the index definitions
+     * @return the list of processed IndexMetadata
+     */
+    private static Indexes createIndexesFromIndexesPartition(CFMetaData cfm, RowIterator partition)
+    {
+        Indexes.Builder indexes = org.apache.cassandra.schema.Indexes.builder();
+        String query = String.format("SELECT * FROM %s.%s", NAME, INDEXES);
+        QueryProcessor.resultify(query, partition).forEach(row -> indexes.add(createIndexMetadataFromIndexesRow(cfm, row)));
+        return indexes.build();
+    }
+
+    private static IndexMetadata createIndexMetadataFromIndexesRow(CFMetaData cfm, UntypedResultSet.Row row)
+    {
+        String name = row.getString("index_name");
+        IndexMetadata.IndexType type = IndexMetadata.IndexType.valueOf(row.getString("index_type"));
+        IndexMetadata.TargetType targetType = IndexMetadata.TargetType.valueOf(row.getString("target_type"));
+        Map<String, String> options = row.getTextMap("options");
+        if (options == null)
+            options = Collections.emptyMap();
+
+        Set<String> targetColumnNames = row.getSet("target_columns", UTF8Type.instance);
+        assert targetType == IndexMetadata.TargetType.COLUMN : "Per row indexes with dynamic target columns are not supported yet";
+        assert targetColumnNames.size() == 1 : "Secondary indexes targetting multiple columns are not supported yet";
+
+        Set<ColumnIdentifier> targetColumns = new HashSet<>();
+        // if it's not a CQL table, we can't assume that the column name is utf8, so
+        // in that case we have to do a linear scan of the cfm's columns to get the matching one
+        if (targetColumnNames != null)
+        {
+            targetColumnNames.forEach(targetColumnName -> {
+                if (cfm.isCQLTable())
+                    targetColumns.add(ColumnIdentifier.getInterned(targetColumnName, true));
+                else
+                    findColumnIdentifierWithName(targetColumnName, cfm.allColumns()).ifPresent(targetColumns::add);
+            });
+        }
+        return IndexMetadata.legacyIndex(targetColumns.iterator().next(), name, type, options);
+    }
+
+    private static Optional<ColumnIdentifier> findColumnIdentifierWithName(String name,
+                                                                           Iterable<ColumnDefinition> columns)
+    {
+        for (ColumnDefinition column : columns)
+            if (column.name.toString().equals(name))
+                return Optional.of(column.name);
+
+        return Optional.empty();
+    }
+
+    /*
      * UDF metadata serialization/deserialization.
      */
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index cb0667a..7a2126c 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -1857,7 +1857,6 @@ public class CassandraServer implements Cassandra.Iface
             cf_def.unsetId(); // explicitly ignore any id set by client (Hector likes to set zero)
             CFMetaData cfm = ThriftConversion.fromThrift(cf_def);
             cfm.params.compaction.validate();
-            cfm.addDefaultIndexNames();
 
             if (!cfm.getTriggers().isEmpty())
                 state().ensureIsSuper("Only superusers are allowed to add triggers.");
@@ -1921,7 +1920,6 @@ public class CassandraServer implements Cassandra.Iface
             {
                 cf_def.unsetId(); // explicitly ignore any id set by client (same as system_add_column_family)
                 CFMetaData cfm = ThriftConversion.fromThrift(cf_def);
-                cfm.addDefaultIndexNames();
 
                 if (!cfm.getTriggers().isEmpty())
                     state().ensureIsSuper("Only superusers are allowed to add triggers.");
@@ -2007,7 +2005,6 @@ public class CassandraServer implements Cassandra.Iface
 
             CFMetaData cfm = ThriftConversion.fromThriftForUpdate(cf_def, oldCfm);
             cfm.params.compaction.validate();
-            cfm.addDefaultIndexNames();
 
             if (!oldCfm.getTriggers().equals(cfm.getTriggers()))
                 state().ensureIsSuper("Only superusers are allowed to add or remove triggers.");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/src/java/org/apache/cassandra/thrift/ThriftConversion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftConversion.java b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
index 1744177..a74bcea 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftConversion.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.thrift;
 import java.util.*;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
 import com.google.common.collect.Maps;
 
 import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
@@ -278,7 +279,23 @@ public class ThriftConversion
             // We do not allow Thrift materialized views, so we always set it to false
             boolean isMaterializedView = false;
 
-            CFMetaData newCFMD = CFMetaData.create(cf_def.keyspace, cf_def.name, cfId, isDense, isCompound, isSuper, isCounter, isMaterializedView, defs, DatabaseDescriptor.getPartitioner());
+            CFMetaData newCFMD = CFMetaData.create(cf_def.keyspace,
+                                                   cf_def.name,
+                                                   cfId,
+                                                   isDense,
+                                                   isCompound,
+                                                   isSuper,
+                                                   isCounter,
+                                                   isMaterializedView,
+                                                   defs,
+                                                   DatabaseDescriptor.getPartitioner());
+
+            // Convert any secondary indexes defined in the thrift column_metadata
+            newCFMD.indexes(indexDefsFromThrift(cf_def.keyspace,
+                                                cf_def.name,
+                                                rawComparator,
+                                                subComparator,
+                                                cf_def.column_metadata));
 
             if (cf_def.isSetGc_grace_seconds())
                 newCFMD.gcGraceSeconds(cf_def.gc_grace_seconds);
@@ -492,9 +509,6 @@ public class ThriftConversion
                                     cfName,
                                     ColumnIdentifier.getInterned(ByteBufferUtil.clone(thriftColumnDef.name), comparator),
                                     TypeParser.parse(thriftColumnDef.validation_class),
-                                    thriftColumnDef.index_type == null ? null : org.apache.cassandra.config.IndexType.valueOf(thriftColumnDef.index_type.name()),
-                                    thriftColumnDef.index_options,
-                                    thriftColumnDef.index_name,
                                     null,
                                     kind);
     }
@@ -516,16 +530,58 @@ public class ThriftConversion
         return defs;
     }
 
+    private static Indexes indexDefsFromThrift(String ksName,
+                                               String cfName,
+                                               AbstractType<?> thriftComparator,
+                                               AbstractType<?> thriftSubComparator,
+                                               List<ColumnDef> thriftDefs)
+    {
+        if (thriftDefs == null)
+            return Indexes.none();
+
+        Set<String> indexNames = new HashSet<>();
+        Indexes.Builder indexes = Indexes.builder();
+        for (ColumnDef def : thriftDefs)
+        {
+            if (def.isSetIndex_type())
+            {
+                ColumnDefinition column = fromThrift(ksName, cfName, thriftComparator, thriftSubComparator, def);
+
+                String indexName = def.getIndex_name();
+                // add a generated index name if none was supplied
+                if (Strings.isNullOrEmpty(indexName))
+                    indexName = Indexes.getAvailableIndexName(ksName, cfName, column.name);
+
+                if (indexNames.contains(indexName))
+                    throw new ConfigurationException("Duplicate index name " + indexName);
+
+                indexNames.add(indexName);
+
+                Map<String, String> indexOptions = def.getIndex_options();
+                IndexMetadata.IndexType indexType = IndexMetadata.IndexType.valueOf(def.index_type.name());
+
+                indexes.add(IndexMetadata.legacyIndex(column,
+                                                      indexName,
+                                                      indexType,
+                                                      indexOptions));
+            }
+        }
+        return indexes.build();
+    }
+
     @VisibleForTesting
-    public static ColumnDef toThrift(ColumnDefinition column)
+    public static ColumnDef toThrift(CFMetaData cfMetaData, ColumnDefinition column)
     {
         ColumnDef cd = new ColumnDef();
 
         cd.setName(ByteBufferUtil.clone(column.name.bytes));
         cd.setValidation_class(column.type.toString());
-        cd.setIndex_type(column.getIndexType() == null ? null : org.apache.cassandra.thrift.IndexType.valueOf(column.getIndexType().name()));
-        cd.setIndex_name(column.getIndexName());
-        cd.setIndex_options(column.getIndexOptions() == null ? null : Maps.newHashMap(column.getIndexOptions()));
+        Optional<IndexMetadata> index = cfMetaData.getIndexes().get(column);
+        index.ifPresent(def -> {
+            cd.setIndex_type(org.apache.cassandra.thrift.IndexType.valueOf(def.indexType.name()));
+            cd.setIndex_name(def.name);
+            cd.setIndex_options(def.options == null || def.options.isEmpty() ? null : Maps.newHashMap(def.options));
+        });
 
         return cd;
     }
@@ -535,7 +591,7 @@ public class ThriftConversion
         List<ColumnDef> thriftDefs = new ArrayList<>(columns.size());
         for (ColumnDefinition def : columns)
             if (def.isPartOfCellName(metadata.isCQLTable(), metadata.isSuper()))
-                thriftDefs.add(ThriftConversion.toThrift(def));
+                thriftDefs.add(ThriftConversion.toThrift(metadata, def));
         return thriftDefs;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
index fb65e34..f862816 100644
--- a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
+++ b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
@@ -158,6 +158,6 @@ public class NativeSSTableLoaderClient extends SSTableLoader.Client
 
         AbstractType<?> validator = TypeParser.parse(row.getString("validator"));
 
-        return new ColumnDefinition(keyspace, table, name, validator, null, null, null, componentIndex, kind);
+        return new ColumnDefinition(keyspace, table, name, validator, componentIndex, kind);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java
index e914f13..1c0dd76 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -34,13 +34,8 @@ import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.schema.CompressionParams;
+import org.apache.cassandra.schema.*;
 import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.schema.CachingParams;
-import org.apache.cassandra.schema.CompactionParams;
-import org.apache.cassandra.schema.KeyspaceMetadata;
-import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.schema.Tables;
 import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -280,9 +275,6 @@ public class SchemaLoader
                                     ColumnIdentifier.getInterned(IntegerType.instance.fromString("42"), IntegerType.instance),
                                     UTF8Type.instance,
                                     null,
-                                    null,
-                                    null,
-                                    null,
                                     ColumnDefinition.Kind.REGULAR);
     }
 
@@ -293,9 +285,6 @@ public class SchemaLoader
                                     ColumnIdentifier.getInterned("fortytwo", true),
                                     UTF8Type.instance,
                                     null,
-                                    null,
-                                    null,
-                                    null,
                                     ColumnDefinition.Kind.REGULAR);
     }
 
@@ -309,8 +298,16 @@ public class SchemaLoader
                 .addPartitionKey("key", AsciiType.instance)
                 .build();
 
-        return cfm.addOrReplaceColumnDefinition(ColumnDefinition.regularDef(ksName, cfName, "indexed", AsciiType.instance)
-                                                                .setIndex("indexe1", IndexType.CUSTOM, indexOptions));
+        ColumnDefinition indexedColumn = ColumnDefinition.regularDef(ksName, cfName, "indexed", AsciiType.instance);
+        cfm.addOrReplaceColumnDefinition(indexedColumn);
+
+        cfm.indexes(
+            cfm.getIndexes()
+               .with(IndexMetadata.legacyIndex(indexedColumn,
+                                               "indexe1",
+                                               IndexMetadata.IndexType.CUSTOM,
+                                               indexOptions)));
+        return cfm;
     }
 
     private static void useCompression(List<KeyspaceMetadata> schema)
@@ -415,8 +412,12 @@ public class SchemaLoader
                 .build();
 
         if (withIndex)
-            cfm.getColumnDefinition(new ColumnIdentifier("birthdate", true))
-               .setIndex("birthdate_key_index", IndexType.COMPOSITES, Collections.EMPTY_MAP);
+            cfm.indexes(
+                cfm.getIndexes()
+                    .with(IndexMetadata.legacyIndex(cfm.getColumnDefinition(new ColumnIdentifier("birthdate", true)),
+                                                    "birthdate_key_index",
+                                                    IndexMetadata.IndexType.COMPOSITES,
+                                                    Collections.EMPTY_MAP)));
 
         return cfm.compression(getCompressionParameters());
     }
@@ -431,8 +432,13 @@ public class SchemaLoader
                                            .build();
 
         if (withIndex)
-            cfm.getColumnDefinition(new ColumnIdentifier("birthdate", true))
-               .setIndex("birthdate_composite_index", IndexType.KEYS, Collections.EMPTY_MAP);
+            cfm.indexes(
+                cfm.getIndexes()
+                    .with(IndexMetadata.legacyIndex(cfm.getColumnDefinition(new ColumnIdentifier("birthdate", true)),
+                                                    "birthdate_composite_index",
+                                                    IndexMetadata.IndexType.KEYS,
+                                                    Collections.EMPTY_MAP)));
+
 
         return cfm.compression(getCompressionParameters());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/test/unit/org/apache/cassandra/config/ColumnDefinitionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/config/ColumnDefinitionTest.java b/test/unit/org/apache/cassandra/config/ColumnDefinitionTest.java
index c875165..933d231 100644
--- a/test/unit/org/apache/cassandra/config/ColumnDefinitionTest.java
+++ b/test/unit/org/apache/cassandra/config/ColumnDefinitionTest.java
@@ -38,9 +38,7 @@ public class ColumnDefinitionTest
                          .addRegularColumn("val", AsciiType.instance)
                          .build();
 
-        ColumnDefinition cd0 = ColumnDefinition.staticDef(cfm, ByteBufferUtil.bytes("TestColumnDefinitionName0"), BytesType.instance)
-                                               .setIndex("random index name 0", IndexType.KEYS, null);
-
+        ColumnDefinition cd0 = ColumnDefinition.staticDef(cfm, ByteBufferUtil.bytes("TestColumnDefinitionName0"), BytesType.instance);
         ColumnDefinition cd1 = ColumnDefinition.staticDef(cfm, ByteBufferUtil.bytes("TestColumnDefinition1"), LongType.instance);
 
         testSerializeDeserialize(cfm, cd0);
@@ -49,7 +47,7 @@ public class ColumnDefinitionTest
 
     protected void testSerializeDeserialize(CFMetaData cfm, ColumnDefinition cd) throws Exception
     {
-        ColumnDefinition newCd = ThriftConversion.fromThrift(cfm.ksName, cfm.cfName, cfm.comparator.subtype(0), null, ThriftConversion.toThrift(cd));
+        ColumnDefinition newCd = ThriftConversion.fromThrift(cfm.ksName, cfm.cfName, cfm.comparator.subtype(0), null, ThriftConversion.toThrift(cfm, cd));
         Assert.assertNotSame(cd, newCd);
         Assert.assertEquals(cd.hashCode(), newCd.hashCode());
         Assert.assertEquals(cd, newCd);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 2bd390f..e540ec5 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -670,6 +670,11 @@ public abstract class CQLTester
 
     protected void assertRowsNet(int protocolVersion, ResultSet result, Object[]... rows)
     {
+        // necessary as we need cluster objects to supply CodecRegistry.
+        // It's reasonably certain that the network setup has already been done
+        // by the time we arrive at this point, but adding this check doesn't hurt
+        requireNetwork();
+
         if (result == null)
         {
             if (rows.length > 0)
@@ -692,19 +697,21 @@ public abstract class CQLTester
             for (int j = 0; j < meta.size(); j++)
             {
                 DataType type = meta.getType(j);
-                ByteBuffer expectedByteValue = type.serialize(expected[j], ProtocolVersion.fromInt(protocolVersion));
+                com.datastax.driver.core.TypeCodec<Object> codec = cluster[protocolVersion -1].getConfiguration()
+                                                                                              .getCodecRegistry()
+                                                                                              .codecFor(type);
+                ByteBuffer expectedByteValue = codec.serialize(expected[j], ProtocolVersion.fromInt(protocolVersion));
                 int expectedBytes = expectedByteValue.remaining();
                 ByteBuffer actualValue = actual.getBytesUnsafe(meta.getName(j));
                 int actualBytes = actualValue.remaining();
-
                 if (!Objects.equal(expectedByteValue, actualValue))
                     Assert.fail(String.format("Invalid value for row %d column %d (%s of type %s), " +
                                               "expected <%s> (%d bytes) but got <%s> (%d bytes) " +
                                               "(using protocol version %d)",
                                               i, j, meta.getName(j), type,
-                                              type.format(expected[j]),
+                                              codec.format(expected[j]),
                                               expectedBytes,
-                                              type.format(type.deserialize(actualValue, ProtocolVersion.fromInt(protocolVersion))),
+                                              codec.format(codec.deserialize(actualValue, ProtocolVersion.fromInt(protocolVersion))),
                                               actualBytes,
                                               protocolVersion));
             }
@@ -1228,6 +1235,12 @@ public abstract class CQLTester
         return m;
     }
 
+    protected com.datastax.driver.core.TupleType tupleTypeOf(int protocolVersion, DataType...types)
+    {
+        requireNetwork();
+        return cluster[protocolVersion -1].getMetadata().newTupleType(types);
+    }
+
     // Attempt to find an AbstracType from a value (for serialization/printing sake).
     // Will work as long as we use types we know of, which is good enough for testing
     private static AbstractType typeFor(Object value)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/test/unit/org/apache/cassandra/cql3/IndexQueryPagingTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/IndexQueryPagingTest.java b/test/unit/org/apache/cassandra/cql3/IndexQueryPagingTest.java
index 35d57d9..0d590b2 100644
--- a/test/unit/org/apache/cassandra/cql3/IndexQueryPagingTest.java
+++ b/test/unit/org/apache/cassandra/cql3/IndexQueryPagingTest.java
@@ -78,7 +78,7 @@ public class IndexQueryPagingTest extends CQLTester
         // that all rows are returned, so we know that paging
         // of the results was involved.
         Session session = sessionNet(maxProtocolVersion);
-        Statement stmt = new SimpleStatement(String.format(cql, KEYSPACE + "." + currentTable()));
+        Statement stmt = session.newSimpleStatement(String.format(cql, KEYSPACE + "." + currentTable()));
         stmt.setFetchSize(rowCount - 1);
         assertEquals(rowCount, session.execute(stmt).all().size());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/test/unit/org/apache/cassandra/cql3/validation/entities/UFPureScriptTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/UFPureScriptTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/UFPureScriptTest.java
index 1563197..f43e335 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/UFPureScriptTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/UFPureScriptTest.java
@@ -183,10 +183,15 @@ public class UFPureScriptTest extends CQLTester
                    row(map));
 
         // same test - but via native protocol
-        TupleType tType = TupleType.of(DataType.cdouble(),
-                                       DataType.list(DataType.cdouble()),
-                                       DataType.set(DataType.text()),
-                                       DataType.map(DataType.cint(), DataType.cboolean()));
+        // we use protocol V3 here to encode the expected version because the server
+        // always serializes Collections using V3 - see CollectionSerializer's
+        // serialize and deserialize methods.
+        TupleType tType = tupleTypeOf(Server.VERSION_3,
+                                      DataType.cdouble(),
+                                      DataType.list(DataType.cdouble()),
+                                      DataType.set(DataType.text()),
+                                      DataType.map(DataType.cint(),
+                                                   DataType.cboolean()));
         TupleValue tup = tType.newValue(1d, list, set, map);
         for (int version = Server.VERSION_2; version <= maxProtocolVersion; version++)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
index 2d9c540..6bd03ad 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
@@ -29,6 +29,7 @@ import java.util.TreeSet;
 import java.util.UUID;
 import java.security.AccessControlException;
 
+import com.google.common.collect.Lists;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -1157,10 +1158,15 @@ public class UFTest extends CQLTester
         assertRows(execute("SELECT " + fTup4 + "(tup) FROM %s WHERE key = 1"),
                    row(map));
 
-        TupleType tType = TupleType.of(DataType.cdouble(),
-                                       DataType.list(DataType.cdouble()),
-                                       DataType.set(DataType.text()),
-                                       DataType.map(DataType.cint(), DataType.cboolean()));
+        // same test - but via native protocol
+        // we use protocol V3 here to encode the expected version because the server
+        // always serializes Collections using V3 - see CollectionSerializer's
+        // serialize and deserialize methods.
+        TupleType tType = tupleTypeOf(Server.VERSION_3,
+                                      DataType.cdouble(),
+                                      DataType.list(DataType.cdouble()),
+                                      DataType.set(DataType.text()),
+                                      DataType.map(DataType.cint(), DataType.cboolean()));
         TupleValue tup = tType.newValue(1d, list, set, map);
         for (int version = Server.VERSION_2; version <= maxProtocolVersion; version++)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
index 8c79689..568e23d 100644
--- a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
@@ -95,8 +95,8 @@ public class BatchlogManagerTest
         InetAddress localhost = InetAddress.getByName("127.0.0.1");
         metadata.updateNormalToken(Util.token("A"), localhost);
         metadata.updateHostId(UUIDGen.getTimeUUID(), localhost);
-        Schema.instance.getColumnFamilyStoreInstance(SystemKeyspace.Batches.cfId).truncateBlocking();
-        Schema.instance.getColumnFamilyStoreInstance(SystemKeyspace.LegacyBatchlog.cfId).truncateBlocking();
+        Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).truncateBlocking();
+        Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_BATCHLOG).truncateBlocking();
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/test/unit/org/apache/cassandra/db/DirectoriesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/DirectoriesTest.java b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
index e0786f9..cb38e37 100644
--- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java
+++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
@@ -30,16 +30,15 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.Config.DiskFailurePolicy;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.IndexType;
 import org.apache.cassandra.db.Directories.DataDirectory;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.utils.Pair;
@@ -162,9 +161,12 @@ public class DirectoriesTest
                                   .addPartitionKey("thekey", UTF8Type.instance)
                                   .addClusteringColumn("col", UTF8Type.instance)
                                   .build();
-        ColumnDefinition def = PARENT_CFM.getColumnDefinition(ByteBufferUtil.bytes("col"));
-        def.setIndex("idx", IndexType.KEYS, Collections.emptyMap());
-        CFMetaData INDEX_CFM = SecondaryIndex.newIndexMetadata(PARENT_CFM, def);
+        IndexMetadata indexDef = IndexMetadata.legacyIndex(PARENT_CFM.getColumnDefinition(ByteBufferUtil.bytes("col")),
+                                                           "idx",
+                                                           IndexMetadata.IndexType.KEYS,
+                                                           Collections.emptyMap());
+        PARENT_CFM.indexes(PARENT_CFM.getIndexes().with(indexDef));
+        CFMetaData INDEX_CFM = SecondaryIndex.newIndexMetadata(PARENT_CFM, indexDef);
         Directories parentDirectories = new Directories(PARENT_CFM);
         Directories indexDirectories = new Directories(INDEX_CFM);
         // secondary index has its own directory

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
index 1538665..70fbfe3 100644
--- a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
+++ b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
@@ -48,6 +48,7 @@ import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -68,7 +69,12 @@ public class RangeTombstoneTest
         SchemaLoader.prepareServer();
         SchemaLoader.createKeyspace(KSNAME,
                                     KeyspaceParams.simple(1),
-                                    SchemaLoader.standardCFMD(KSNAME, CFNAME, 0, UTF8Type.instance, Int32Type.instance, Int32Type.instance));
+                                    SchemaLoader.standardCFMD(KSNAME,
+                                                              CFNAME,
+                                                              0,
+                                                              UTF8Type.instance,
+                                                              Int32Type.instance,
+                                                              Int32Type.instance));
     }
 
     @Test
@@ -453,7 +459,49 @@ public class RangeTombstoneTest
     @Test
     public void testRowWithRangeTombstonesUpdatesSecondaryIndex() throws Exception
     {
-        runCompactionWithRangeTombstoneAndCheckSecondaryIndex();
+        Keyspace table = Keyspace.open(KSNAME);
+        ColumnFamilyStore cfs = table.getColumnFamilyStore(CFNAME);
+        ByteBuffer key = ByteBufferUtil.bytes("k5");
+        ByteBuffer indexedColumnName = ByteBufferUtil.bytes("val");
+
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+
+        ColumnDefinition cd = cfs.metadata.getColumnDefinition(indexedColumnName).copy();
+        IndexMetadata indexDef = IndexMetadata.legacyIndex(cd,
+                                                           "test_index",
+                                                           IndexMetadata.IndexType.CUSTOM,
+                                                           ImmutableMap.of(SecondaryIndex.CUSTOM_INDEX_OPTION_NAME,
+                                                                           TestIndex.class.getName()));
+
+        if (!cfs.metadata.getIndexes().get("test_index").isPresent())
+            cfs.metadata.indexes(cfs.metadata.getIndexes().with(indexDef));
+
+        Future<?> rebuild = cfs.indexManager.addIndexedColumn(indexDef);
+        // If rebuild there is, wait for the rebuild to finish so it doesn't race with the following insertions
+        if (rebuild != null)
+            rebuild.get();
+
+        TestIndex index = ((TestIndex)cfs.indexManager.getIndexForColumn(cd));
+        index.resetCounts();
+
+        UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, key).withTimestamp(0);
+        for (int i = 0; i < 10; i++)
+            builder.newRow(i).add("val", i);
+        builder.applyUnsafe();
+        cfs.forceBlockingFlush();
+
+        new RowUpdateBuilder(cfs.metadata, 0, key).addRangeTombstone(0, 7).build().applyUnsafe();
+        cfs.forceBlockingFlush();
+
+        assertEquals(10, index.inserts.size());
+
+        CompactionManager.instance.performMaximal(cfs, false);
+
+        // compacted down to single sstable
+        assertEquals(1, cfs.getLiveSSTables().size());
+
+        assertEquals(8, index.deletes.size());
     }
 
     @Test
@@ -512,12 +560,19 @@ public class RangeTombstoneTest
         cfs.disableAutoCompaction();
 
         ColumnDefinition cd = cfs.metadata.getColumnDefinition(indexedColumnName).copy();
-        cd.setIndex("test_index", IndexType.CUSTOM, ImmutableMap.of(SecondaryIndex.CUSTOM_INDEX_OPTION_NAME, TestIndex.class.getName()));
-        Future<?> rebuild = cfs.indexManager.addIndexedColumn(cd);
+        IndexMetadata indexDef = IndexMetadata.legacyIndex(cd,
+                                                           "test_index",
+                                                           IndexMetadata.IndexType.CUSTOM,
+                                                           ImmutableMap.of(SecondaryIndex.CUSTOM_INDEX_OPTION_NAME,
+                                                                           TestIndex.class.getName()));
+
+        if (!cfs.metadata.getIndexes().get("test_index").isPresent())
+            cfs.metadata.indexes(cfs.metadata.getIndexes().with(indexDef));
+
+        Future<?> rebuild = cfs.indexManager.addIndexedColumn(indexDef);
         // If rebuild there is, wait for the rebuild to finish so it doesn't race with the following insertions
         if (rebuild != null)
             rebuild.get();
-
         TestIndex index = ((TestIndex)cfs.indexManager.getIndexForColumn(cd));
         index.resetCounts();
 
@@ -537,45 +592,6 @@ public class RangeTombstoneTest
         assertEquals(1, index.updates.size());
     }
 
-    private void runCompactionWithRangeTombstoneAndCheckSecondaryIndex() throws Exception
-    {
-        Keyspace table = Keyspace.open(KSNAME);
-        ColumnFamilyStore cfs = table.getColumnFamilyStore(CFNAME);
-        ByteBuffer key = ByteBufferUtil.bytes("k5");
-        ByteBuffer indexedColumnName = ByteBufferUtil.bytes("val");
-
-        cfs.truncateBlocking();
-        cfs.disableAutoCompaction();
-
-        ColumnDefinition cd = cfs.metadata.getColumnDefinition(indexedColumnName).copy();
-        cd.setIndex("test_index", IndexType.CUSTOM, ImmutableMap.of(SecondaryIndex.CUSTOM_INDEX_OPTION_NAME, TestIndex.class.getName()));
-        Future<?> rebuild = cfs.indexManager.addIndexedColumn(cd);
-        // If rebuild there is, wait for the rebuild to finish so it doesn't race with the following insertions
-        if (rebuild != null)
-            rebuild.get();
-
-        TestIndex index = ((TestIndex)cfs.indexManager.getIndexForColumn(cd));
-        index.resetCounts();
-
-        UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, key).withTimestamp(0);
-        for (int i = 0; i < 10; i++)
-            builder.newRow(i).add("val", i);
-        builder.applyUnsafe();
-        cfs.forceBlockingFlush();
-
-        new RowUpdateBuilder(cfs.metadata, 0, key).addRangeTombstone(0, 7).build().applyUnsafe();
-        cfs.forceBlockingFlush();
-
-        assertEquals(10, index.inserts.size());
-
-        CompactionManager.instance.performMaximal(cfs, false);
-
-        // compacted down to single sstable
-        assertEquals(1, cfs.getLiveSSTables().size());
-
-        assertEquals(8, index.deletes.size());
-    }
-
     private static ByteBuffer bb(int i)
     {
         return ByteBufferUtil.bytes(i);
@@ -621,7 +637,7 @@ public class RangeTombstoneTest
 
         public void reload(){}
 
-        public void validateOptions() throws ConfigurationException{}
+        public void validateOptions(CFMetaData cfm, IndexMetadata def) throws ConfigurationException{}
 
         public String getIndexName(){ return "TestIndex";}
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java b/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
index 0974fc4..115b13c 100644
--- a/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
+++ b/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
@@ -32,7 +32,6 @@ import org.junit.Test;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.IndexType;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexSearcher;
@@ -40,6 +39,7 @@ import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -60,8 +60,8 @@ public class SecondaryIndexTest
         SchemaLoader.prepareServer();
         SchemaLoader.createKeyspace(KEYSPACE1,
                                     KeyspaceParams.simple(1),
-                                    SchemaLoader.compositeIndexCFMD(KEYSPACE1, WITH_COMPOSITE_INDEX, true).gcGraceSeconds(0),
-                                    SchemaLoader.compositeIndexCFMD(KEYSPACE1, COMPOSITE_INDEX_TO_BE_ADDED, false).gcGraceSeconds(0),
+                                    SchemaLoader.compositeIndexCFMD(KEYSPACE1, WITH_COMPOSITE_INDEX, true) .gcGraceSeconds(0),
+                                    SchemaLoader.compositeIndexCFMD(KEYSPACE1, COMPOSITE_INDEX_TO_BE_ADDED, false) .gcGraceSeconds(0),
                                     SchemaLoader.keysIndexCFMD(KEYSPACE1, WITH_KEYS_INDEX, true).gcGraceSeconds(0));
     }
 
@@ -426,8 +426,12 @@ public class SecondaryIndexTest
         new RowUpdateBuilder(cfs.metadata, 0, "k1").clustering("c").add("birthdate", 1L).build().applyUnsafe();
 
         ColumnDefinition old = cfs.metadata.getColumnDefinition(ByteBufferUtil.bytes("birthdate"));
-        old.setIndex("birthdate_index", IndexType.COMPOSITES, Collections.EMPTY_MAP);
-        Future<?> future = cfs.indexManager.addIndexedColumn(old);
+        IndexMetadata indexDef = IndexMetadata.legacyIndex(old,
+                                                           "birthdate_index",
+                                                           IndexMetadata.IndexType.COMPOSITES,
+                                                           Collections.EMPTY_MAP);
+        cfs.metadata.indexes(cfs.metadata.getIndexes().with(indexDef));
+        Future<?> future = cfs.indexManager.addIndexedColumn(indexDef);
         future.get();
         // we had a bug (CASSANDRA-2244) where index would get created but not flushed -- check for that
         ColumnDefinition cDef = cfs.metadata.getColumnDefinition(ByteBufferUtil.bytes("birthdate"));
@@ -441,7 +445,7 @@ public class SecondaryIndexTest
         assert !indexedCfs.isIndexBuilt(ByteBufferUtil.bytes("birthdate"));
 
         // rebuild & re-query
-        future = cfs.indexManager.addIndexedColumn(cDef);
+        future = cfs.indexManager.addIndexedColumn(indexDef);
         future.get();
         assertIndexedOne(cfs, ByteBufferUtil.bytes("birthdate"), 1L);
     }
@@ -477,8 +481,9 @@ public class SecondaryIndexTest
         if (count != 0)
             assertTrue(searchers.size() > 0);
 
-        try (ReadOrderGroup orderGroup = rc.startOrderGroup(); PartitionIterator iter = UnfilteredPartitionIterators.filter(searchers.get(0).search(rc, orderGroup), FBUtilities.nowInSeconds()))
-        {
+        try (ReadOrderGroup orderGroup = rc.startOrderGroup();
+             PartitionIterator iter = UnfilteredPartitionIterators.filter(searchers.get(0).search(rc, orderGroup),
+                                                                          FBUtilities.nowInSeconds())) {
             assertEquals(count, Util.size(iter));
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java b/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java
index 77de665..8996ded 100644
--- a/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java
+++ b/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java
@@ -41,6 +41,7 @@ import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -205,7 +206,7 @@ public class PerRowSecondaryIndexTest
         }
 
         @Override
-        public void validateOptions() throws ConfigurationException
+        public void validateOptions(CFMetaData cfm, IndexMetadata def) throws ConfigurationException{}
         {
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/test/unit/org/apache/cassandra/schema/DefsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/schema/DefsTest.java b/test/unit/org/apache/cassandra/schema/DefsTest.java
index 98a954c..c10865a 100644
--- a/test/unit/org/apache/cassandra/schema/DefsTest.java
+++ b/test/unit/org/apache/cassandra/schema/DefsTest.java
@@ -34,13 +34,13 @@ import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.IndexType;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.lifecycle.TransactionLog;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.marshal.UTF8Type;
@@ -101,7 +101,7 @@ public class DefsTest
         for (int i = 0; i < 5; i++)
         {
             ByteBuffer name = ByteBuffer.wrap(new byte[] { (byte)i });
-            cfm.addColumnDefinition(ColumnDefinition.regularDef(cfm, name, BytesType.instance).setIndex(Integer.toString(i), IndexType.KEYS, null));
+            cfm.addColumnDefinition(ColumnDefinition.regularDef(cfm, name, BytesType.instance));
         }
 
         cfm.comment("No comment")
@@ -116,13 +116,11 @@ public class DefsTest
         CFMetaData cfNew = cfm.copy();
 
         // add one.
-        ColumnDefinition addIndexDef = ColumnDefinition.regularDef(cfm, ByteBuffer.wrap(new byte[] { 5 }), BytesType.instance)
-                                                       .setIndex("5", IndexType.KEYS, null);
+        ColumnDefinition addIndexDef = ColumnDefinition.regularDef(cfm, ByteBuffer.wrap(new byte[] { 5 }), BytesType.instance);
         cfNew.addColumnDefinition(addIndexDef);
 
         // remove one.
-        ColumnDefinition removeIndexDef = ColumnDefinition.regularDef(cfm, ByteBuffer.wrap(new byte[] { 0 }), BytesType.instance)
-                                                          .setIndex("0", IndexType.KEYS, null);
+        ColumnDefinition removeIndexDef = ColumnDefinition.regularDef(cfm, ByteBuffer.wrap(new byte[] { 0 }), BytesType.instance);
         assertTrue(cfNew.removeColumnDefinition(removeIndexDef));
 
         cfm.apply(cfNew);
@@ -505,19 +503,36 @@ public class DefsTest
         ColumnFamilyStore cfs = Keyspace.open(KEYSPACE6).getColumnFamilyStore(TABLE1i);
 
         // insert some data.  save the sstable descriptor so we can make sure it's marked for delete after the drop
-        QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (key, c1, birthdate, notbirthdate) VALUES (?, ?, ?, ?)",
-                                                     KEYSPACE6, TABLE1i),
+        QueryProcessor.executeInternal(String.format(
+                                                    "INSERT INTO %s.%s (key, c1, birthdate, notbirthdate) VALUES (?, ?, ?, ?)",
+                                                    KEYSPACE6,
+                                                    TABLE1i),
                                        "key0", "col0", 1L, 1L);
 
         cfs.forceBlockingFlush();
-        ColumnFamilyStore indexedCfs = cfs.indexManager.getIndexForColumn(cfs.metadata.getColumnDefinition(ByteBufferUtil.bytes("birthdate"))).getIndexCfs();
+        ColumnDefinition indexedColumn = cfs.metadata.getColumnDefinition(ByteBufferUtil.bytes("birthdate"));
+        SecondaryIndex index = cfs.indexManager.getIndexForColumn(indexedColumn);
+        ColumnFamilyStore indexedCfs = index.getIndexCfs();
         Descriptor desc = indexedCfs.getLiveSSTables().iterator().next().descriptor;
 
         // drop the index
         CFMetaData meta = cfs.metadata.copy();
-        ColumnDefinition cdOld = cfs.metadata.getColumnDefinition(ByteBufferUtil.bytes("birthdate"));
-        ColumnDefinition cdNew = ColumnDefinition.regularDef(meta, cdOld.name.bytes, cdOld.type);
-        meta.addOrReplaceColumnDefinition(cdNew);
+        // We currently have a mismatch between IndexMetadata.name (which is simply the name
+        // of the index) and what gets returned from SecondaryIndex#getIndexName() (usually, this
+        // defaults to <tablename>.<indexname>.
+        // IndexMetadata takes its lead from the prior implementation of ColumnDefinition.name
+        // which did not include the table name.
+        // This mismatch causes some other, long standing inconsistencies:
+        // nodetool rebuild_index <ks> <tbl> <idx>  - <idx> must be qualified, i.e. include the redundant table name
+        //                                            without it, the rebuild silently fails
+        // system.IndexInfo (which is also exposed over JMX as CF.BuildIndexes) uses the form <tbl>.<idx>
+        // cqlsh> describe index [<ks>.]<idx>  - here <idx> must not be qualified by the table name.
+        //
+        // This should get resolved as part of #9459 by better separating the index name from the
+        // name of it's underlying CFS (if it as one), as the comment in CFMetaData#indexColumnFamilyName promises
+        // Then we will be able to just use the value of SI#getIndexName() when removing an index from CFMetaData
+        IndexMetadata existing = meta.getIndexes().iterator().next();
+        meta.indexes(meta.getIndexes().without(existing.name));
         MigrationManager.announceColumnFamilyUpdate(meta, false);
 
         // check

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java b/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java
index 1b0fb12..a1f9570 100644
--- a/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java
+++ b/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java
@@ -474,15 +474,39 @@ public class LegacySchemaMigratorTest
                     ? ""
                     : column.name.toString();
 
-        RowUpdateBuilder adder = new RowUpdateBuilder(SystemKeyspace.LegacyColumns, timestamp, mutation).clustering(table.cfName, name);
+        final RowUpdateBuilder adder = new RowUpdateBuilder(SystemKeyspace.LegacyColumns, timestamp, mutation).clustering(table.cfName, name);
 
         adder.add("validator", column.type.toString())
              .add("type", serializeKind(column.kind, table.isDense()))
-             .add("component_index", column.isOnAllComponents() ? null : column.position())
-             .add("index_name", column.getIndexName())
-             .add("index_type", column.getIndexType() == null ? null : column.getIndexType().toString())
-             .add("index_options", json(column.getIndexOptions()))
-             .build();
+             .add("component_index", column.isOnAllComponents() ? null : column.position());
+
+        Optional<IndexMetadata> index = findIndexForColumn(table.getIndexes(), table, column);
+        if (index.isPresent())
+        {
+            IndexMetadata i = index.get();
+            adder.add("index_name", i.name);
+            adder.add("index_type", i.indexType.toString());
+            adder.add("index_options", json(i.options));
+        }
+        else
+        {
+            adder.add("index_name", null);
+            adder.add("index_type", null);
+            adder.add("index_options", null);
+        }
+
+        adder.build();
+    }
+
+    private static Optional<IndexMetadata> findIndexForColumn(Indexes indexes,
+                                                                CFMetaData table,
+                                                                ColumnDefinition column)
+    {
+        for (IndexMetadata index : indexes)
+            if (index.indexedColumn(table).equals(column))
+                return Optional.of(index);
+
+        return Optional.empty();
     }
 
     private static String serializeKind(ColumnDefinition.Kind kind, boolean isDense)