You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2015/08/18 16:06:09 UTC

[6/7] cassandra git commit: Move 2i metadata out of system_schema.columns and ColumnDefinition

Move 2i metadata out of system_schema.columns and ColumnDefinition

Patch by Sam Tunnicliffe; reviewed by Aleksey Yeschenko and Robert Stupp
for CASSANDRA-6717


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/06c130e3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/06c130e3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/06c130e3

Branch: refs/heads/trunk
Commit: 06c130e3cb85577041b475084400c08c505d8f9e
Parents: 310378d
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Mon Aug 10 16:09:15 2015 +0100
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Tue Aug 18 14:59:44 2015 +0100

----------------------------------------------------------------------
 build.xml                                       |   4 +-
 ...ra-driver-core-2.2.0-rc2-SNAPSHOT-shaded.jar | Bin 2163939 -> 0 bytes
 ...ore-3.0.0-alpha2-188d996-SNAPSHOT-shaded.jar | Bin 0 -> 2204619 bytes
 ...ver-internal-only-2.6.0rc2.post0-1a480f1.zip | Bin 200621 -> 0 bytes
 ...iver-internal-only-3.0.0a1.post0-90a797e.zip | Bin 0 -> 201086 bytes
 .../org/apache/cassandra/config/CFMetaData.java | 159 ++++----------
 .../cassandra/config/ColumnDefinition.java      | 116 +----------
 .../org/apache/cassandra/config/IndexType.java  |  25 ---
 .../cql3/functions/JavaBasedUDFunction.java     |   2 +-
 .../cassandra/cql3/functions/JavaUDF.java       |  16 +-
 .../cql3/functions/ScriptBasedUDFunction.java   |   2 +-
 .../cassandra/cql3/functions/UDFunction.java    |  12 +-
 .../cassandra/cql3/functions/UDHelper.java      |  34 ++-
 .../cql3/statements/AlterTableStatement.java    |  19 +-
 .../cql3/statements/CreateIndexStatement.java   |  79 ++++---
 .../cql3/statements/DropIndexStatement.java     |  79 +++----
 .../cassandra/cql3/statements/IndexTarget.java  |  23 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  |  19 +-
 .../AbstractSimplePerColumnSecondaryIndex.java  |   2 +-
 .../cassandra/db/index/SecondaryIndex.java      |  92 ++++----
 .../db/index/SecondaryIndexManager.java         |  27 +--
 .../db/index/composites/CompositesIndex.java    |  19 +-
 .../cassandra/db/index/keys/KeysIndex.java      |   5 +-
 .../cassandra/hadoop/cql3/CqlRecordReader.java  |  38 ++++
 .../io/sstable/format/SSTableReader.java        |   6 +-
 .../apache/cassandra/schema/IndexMetadata.java  | 167 +++++++++++++++
 .../org/apache/cassandra/schema/Indexes.java    | 208 +++++++++++++++++++
 .../cassandra/schema/KeyspaceMetadata.java      |  23 ++
 .../cassandra/schema/LegacySchemaMigrator.java  | 106 ++++++++--
 .../apache/cassandra/schema/SchemaKeyspace.java | 196 ++++++++++++++---
 .../cassandra/thrift/CassandraServer.java       |   3 -
 .../cassandra/thrift/ThriftConversion.java      |  74 ++++++-
 .../utils/NativeSSTableLoaderClient.java        |   2 +-
 .../unit/org/apache/cassandra/SchemaLoader.java |  42 ++--
 .../cassandra/config/ColumnDefinitionTest.java  |   6 +-
 .../org/apache/cassandra/cql3/CQLTester.java    |  21 +-
 .../cassandra/cql3/IndexQueryPagingTest.java    |   2 +-
 .../validation/entities/UFPureScriptTest.java   |  13 +-
 .../cql3/validation/entities/UFTest.java        |  14 +-
 .../cassandra/db/BatchlogManagerTest.java       |   4 +-
 .../apache/cassandra/db/DirectoriesTest.java    |  12 +-
 .../apache/cassandra/db/RangeTombstoneTest.java | 106 ++++++----
 .../apache/cassandra/db/SecondaryIndexTest.java |  21 +-
 .../db/index/PerRowSecondaryIndexTest.java      |   3 +-
 .../org/apache/cassandra/schema/DefsTest.java   |  39 ++--
 .../schema/LegacySchemaMigratorTest.java        |  36 +++-
 .../cassandra/stress/util/JavaDriverClient.java |   2 +-
 47 files changed, 1261 insertions(+), 617 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 06638f6..8df738c 100644
--- a/build.xml
+++ b/build.xml
@@ -134,13 +134,11 @@
     <!-- Check if all tests are being run or just one. If it's all tests don't spam the console with test output.
          If it's an individual test print the output from the test under the assumption someone is debugging the test
          and wants to know what is going on without having to context switch to the log file that is generated.
-         This may be overridden when running a single test by adding the -Dtest.brief.output property to the ant
-         command (its value is unimportant).
          Debug level output still needs to be retrieved from the log file.  -->
     <script language="javascript">
         if (project.getProperty("cassandra.keepBriefBrief") == null)
         {
-            if (project.getProperty("test.name").equals("*Test") || project.getProperty("test.brief.output"))
+            if (project.getProperty("test.name").equals("*Test"))
                 project.setProperty("cassandra.keepBriefBrief", "true");
             else
                 project.setProperty("cassandra.keepBriefBrief", "false");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/lib/cassandra-driver-core-2.2.0-rc2-SNAPSHOT-shaded.jar
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-core-2.2.0-rc2-SNAPSHOT-shaded.jar b/lib/cassandra-driver-core-2.2.0-rc2-SNAPSHOT-shaded.jar
deleted file mode 100644
index edb926d..0000000
Binary files a/lib/cassandra-driver-core-2.2.0-rc2-SNAPSHOT-shaded.jar and /dev/null differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/lib/cassandra-driver-core-3.0.0-alpha2-188d996-SNAPSHOT-shaded.jar
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-core-3.0.0-alpha2-188d996-SNAPSHOT-shaded.jar b/lib/cassandra-driver-core-3.0.0-alpha2-188d996-SNAPSHOT-shaded.jar
new file mode 100644
index 0000000..14354bd
Binary files /dev/null and b/lib/cassandra-driver-core-3.0.0-alpha2-188d996-SNAPSHOT-shaded.jar differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/lib/cassandra-driver-internal-only-2.6.0rc2.post0-1a480f1.zip
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-internal-only-2.6.0rc2.post0-1a480f1.zip b/lib/cassandra-driver-internal-only-2.6.0rc2.post0-1a480f1.zip
deleted file mode 100644
index a611501..0000000
Binary files a/lib/cassandra-driver-internal-only-2.6.0rc2.post0-1a480f1.zip and /dev/null differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/lib/cassandra-driver-internal-only-3.0.0a1.post0-90a797e.zip
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-internal-only-3.0.0a1.post0-90a797e.zip b/lib/cassandra-driver-internal-only-3.0.0a1.post0-90a797e.zip
new file mode 100644
index 0000000..0331144
Binary files /dev/null and b/lib/cassandra-driver-internal-only-3.0.0a1.post0-90a797e.zip differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index a830469..e794901 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -30,7 +30,8 @@ import java.util.stream.Collectors;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
-import com.google.common.collect.*;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.commons.lang3.builder.ToStringBuilder;
@@ -42,16 +43,16 @@ import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.statements.CFStatement;
 import org.apache.cassandra.cql3.statements.CreateTableStatement;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.compaction.*;
+import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.schema.*;
 import org.apache.cassandra.utils.*;
-import org.apache.cassandra.utils.AbstractIterator;
 import org.github.jamm.Unmetered;
 
 /**
@@ -95,6 +96,7 @@ public final class CFMetaData
     private volatile Map<ByteBuffer, DroppedColumn> droppedColumns = new HashMap<>();
     private volatile Triggers triggers = Triggers.none();
     private volatile MaterializedViews materializedViews = MaterializedViews.none();
+    private volatile Indexes indexes = Indexes.none();
 
     /*
      * All CQL3 columns definition are stored in the columnMetadata map.
@@ -224,6 +226,12 @@ public final class CFMetaData
         return this;
     }
 
+    public CFMetaData indexes(Indexes indexes)
+    {
+        this.indexes = indexes;
+        return this;
+    }
+
     private CFMetaData(String keyspace,
                        String name,
                        UUID cfId,
@@ -302,6 +310,11 @@ public final class CFMetaData
         return materializedViews;
     }
 
+    public Indexes getIndexes()
+    {
+        return indexes;
+    }
+
     public static CFMetaData create(String ksName,
                                     String name,
                                     UUID cfId,
@@ -491,7 +504,8 @@ public final class CFMetaData
         return newCFMD.params(oldCFMD.params)
                       .droppedColumns(new HashMap<>(oldCFMD.droppedColumns))
                       .triggers(oldCFMD.triggers)
-                      .materializedViews(oldCFMD.materializedViews);
+                      .materializedViews(oldCFMD.materializedViews)
+                      .indexes(oldCFMD.indexes);
     }
 
     /**
@@ -502,18 +516,10 @@ public final class CFMetaData
      *
      * @return name of the index ColumnFamily
      */
-    public String indexColumnFamilyName(ColumnDefinition info)
+    public String indexColumnFamilyName(IndexMetadata info)
     {
         // TODO simplify this when info.index_name is guaranteed to be set
-        return cfName + Directories.SECONDARY_INDEX_NAME_SEPARATOR + (info.getIndexName() == null ? ByteBufferUtil.bytesToHex(info.name.bytes) : info.getIndexName());
-    }
-
-    /**
-     * The '.' char is the only way to identify if the CFMetadata is for a secondary index
-     */
-    public boolean isSecondaryIndex()
-    {
-        return cfName.contains(".");
+        return cfName + Directories.SECONDARY_INDEX_NAME_SEPARATOR + info.name;
     }
 
     /**
@@ -689,7 +695,8 @@ public final class CFMetaData
             && Objects.equal(columnMetadata, other.columnMetadata)
             && Objects.equal(droppedColumns, other.droppedColumns)
             && Objects.equal(triggers, other.triggers)
-            && Objects.equal(materializedViews, other.materializedViews);
+            && Objects.equal(materializedViews, other.materializedViews)
+            && Objects.equal(indexes, other.indexes);
     }
 
     @Override
@@ -707,6 +714,7 @@ public final class CFMetaData
             .append(droppedColumns)
             .append(triggers)
             .append(materializedViews)
+            .append(indexes)
             .toHashCode();
     }
 
@@ -752,6 +760,7 @@ public final class CFMetaData
 
         triggers = cfm.triggers;
         materializedViews = cfm.materializedViews;
+        indexes = cfm.indexes;
 
         logger.debug("application result is {}", this);
 
@@ -820,74 +829,11 @@ public final class CFMetaData
         return columnMetadata.get(name);
     }
 
-    public ColumnDefinition getColumnDefinitionForIndex(String indexName)
-    {
-        for (ColumnDefinition def : allColumns())
-        {
-            if (indexName.equals(def.getIndexName()))
-                return def;
-        }
-        return null;
-    }
-
-    /**
-     * Convert a null index_name to appropriate default name according to column status
-     */
-    public void addDefaultIndexNames() throws ConfigurationException
-    {
-        // if this is ColumnFamily update we need to add previously defined index names to the existing columns first
-        UUID cfId = Schema.instance.getId(ksName, cfName);
-        if (cfId != null)
-        {
-            CFMetaData cfm = Schema.instance.getCFMetaData(cfId);
-
-            for (ColumnDefinition newDef : allColumns())
-            {
-                if (!cfm.columnMetadata.containsKey(newDef.name.bytes) || newDef.getIndexType() == null)
-                    continue;
-
-                String oldIndexName = cfm.getColumnDefinition(newDef.name).getIndexName();
-
-                if (oldIndexName == null)
-                    continue;
-
-                if (newDef.getIndexName() != null && !oldIndexName.equals(newDef.getIndexName()))
-                    throw new ConfigurationException("Can't modify index name: was '" + oldIndexName + "' changed to '" + newDef.getIndexName() + "'.");
-
-                newDef.setIndexName(oldIndexName);
-            }
-        }
-
-        Set<String> existingNames = existingIndexNames(null);
-        for (ColumnDefinition column : allColumns())
-        {
-            if (column.getIndexType() != null && column.getIndexName() == null)
-            {
-                String baseName = getDefaultIndexName(cfName, column.name);
-                String indexName = baseName;
-                int i = 0;
-                while (existingNames.contains(indexName))
-                    indexName = baseName + '_' + (++i);
-                column.setIndexName(indexName);
-            }
-        }
-    }
-
-    public static String getDefaultIndexName(String cfName, ColumnIdentifier columnName)
-    {
-        return (cfName + "_" + columnName + "_idx").replaceAll("\\W", "");
-    }
-
     public static boolean isNameValid(String name)
     {
         return name != null && !name.isEmpty() && name.length() <= Schema.NAME_LENGTH && name.matches("\\w+");
     }
 
-    public static boolean isIndexNameValid(String name)
-    {
-        return name != null && !name.isEmpty() && name.matches("\\w+");
-    }
-
     public CFMetaData validate() throws ConfigurationException
     {
         rebuild();
@@ -921,49 +867,28 @@ public final class CFMetaData
                     throw new ConfigurationException("Cannot add a counter column (" + def.name + ") in a non counter column family");
         }
 
+        if (!indexes.isEmpty() && isSuper())
+            throw new ConfigurationException("Secondary indexes are not supported on super column families");
+
         // initialize a set of names NOT in the CF under consideration
-        Set<String> indexNames = existingIndexNames(cfName);
-        for (ColumnDefinition c : allColumns())
+        KeyspaceMetadata ksm = Schema.instance.getKSMetaData(ksName);
+        Set<String> indexNames = ksm == null ? new HashSet<>() : ksm.existingIndexNames(cfName);
+        for (IndexMetadata index : indexes)
         {
-            if (c.getIndexType() == null)
-            {
-                if (c.getIndexName() != null)
-                    throw new ConfigurationException("Index name cannot be set without index type");
-            }
-            else
-            {
-                if (isSuper())
-                    throw new ConfigurationException("Secondary indexes are not supported on super column families");
-                if (!isIndexNameValid(c.getIndexName()))
-                    throw new ConfigurationException("Illegal index name " + c.getIndexName());
-                // check index names against this CF _and_ globally
-                if (indexNames.contains(c.getIndexName()))
-                    throw new ConfigurationException("Duplicate index name " + c.getIndexName());
-                indexNames.add(c.getIndexName());
-
-                if (c.getIndexType() == IndexType.CUSTOM)
-                {
-                    if (c.getIndexOptions() == null || !c.hasIndexOption(SecondaryIndex.CUSTOM_INDEX_OPTION_NAME))
-                        throw new ConfigurationException("Required index option missing: " + SecondaryIndex.CUSTOM_INDEX_OPTION_NAME);
-                }
-
-                // This method validates the column metadata but does not intialize the index
-                SecondaryIndex.createInstance(null, c);
-            }
+            index.validate();
+
+            // check index names against this CF _and_ globally
+            if (indexNames.contains(index.name))
+                throw new ConfigurationException("Duplicate index name " + index.name);
+            indexNames.add(index.name);
+
+            // This method validates any custom options in the index metadata but does not intialize the index
+            SecondaryIndex.validate(this, index);
         }
 
         return this;
     }
 
-    private static Set<String> existingIndexNames(String cfToExclude)
-    {
-        Set<String> indexNames = new HashSet<>();
-        for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
-            if (cfToExclude == null || !cfs.name.equals(cfToExclude))
-                for (ColumnDefinition cd : cfs.metadata.allColumns())
-                    indexNames.add(cd.getIndexName());
-        return indexNames;
-    }
 
     // The comparator to validate the definition name with thrift.
     public AbstractType<?> thriftColumnNameType()
@@ -1048,7 +973,7 @@ public final class CFMetaData
         {
             throw new InvalidRequestException(String.format("Cannot rename non PRIMARY KEY part %s", from));
         }
-        else if (def.isIndexed())
+        else if (getIndexes().hasIndexFor(def))
         {
             throw new InvalidRequestException(String.format("Cannot rename column %s because it is secondary indexed", from));
         }
@@ -1172,6 +1097,7 @@ public final class CFMetaData
                     .collect(Collectors.toSet());
     }
 
+
     @Override
     public String toString()
     {
@@ -1190,6 +1116,7 @@ public final class CFMetaData
             .append("droppedColumns", droppedColumns)
             .append("triggers", triggers)
             .append("materializedViews", materializedViews)
+            .append("indexes", indexes)
             .toString();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/src/java/org/apache/cassandra/config/ColumnDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/ColumnDefinition.java b/src/java/org/apache/cassandra/config/ColumnDefinition.java
index 8d8a929..6afd3e7 100644
--- a/src/java/org/apache/cassandra/config/ColumnDefinition.java
+++ b/src/java/org/apache/cassandra/config/ColumnDefinition.java
@@ -29,7 +29,6 @@ import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.exceptions.*;
 
 public class ColumnDefinition extends ColumnSpecification implements Comparable<ColumnDefinition>
 {
@@ -60,10 +59,6 @@ public class ColumnDefinition extends ColumnSpecification implements Comparable<
 
     public final Kind kind;
 
-    private String indexName;
-    private IndexType indexType;
-    private Map<String,String> indexOptions;
-
     /*
      * If the column comparator is a composite type, indicates to which
      * component this definition refers to. If null, the definition refers to
@@ -93,7 +88,7 @@ public class ColumnDefinition extends ColumnSpecification implements Comparable<
 
     public static ColumnDefinition partitionKeyDef(String ksName, String cfName, String name, AbstractType<?> validator, Integer componentIndex)
     {
-        return new ColumnDefinition(ksName, cfName, ColumnIdentifier.getInterned(name, true), validator, null, null, null, componentIndex, Kind.PARTITION_KEY);
+        return new ColumnDefinition(ksName, cfName, ColumnIdentifier.getInterned(name, true), validator, componentIndex, Kind.PARTITION_KEY);
     }
 
     public static ColumnDefinition clusteringKeyDef(CFMetaData cfm, ByteBuffer name, AbstractType<?> validator, Integer componentIndex)
@@ -103,7 +98,7 @@ public class ColumnDefinition extends ColumnSpecification implements Comparable<
 
     public static ColumnDefinition clusteringKeyDef(String ksName, String cfName, String name, AbstractType<?> validator, Integer componentIndex)
     {
-        return new ColumnDefinition(ksName, cfName, ColumnIdentifier.getInterned(name, true),  validator, null, null, null, componentIndex, Kind.CLUSTERING);
+        return new ColumnDefinition(ksName, cfName, ColumnIdentifier.getInterned(name, true),  validator, componentIndex, Kind.CLUSTERING);
     }
 
     public static ColumnDefinition regularDef(CFMetaData cfm, ByteBuffer name, AbstractType<?> validator)
@@ -113,7 +108,7 @@ public class ColumnDefinition extends ColumnSpecification implements Comparable<
 
     public static ColumnDefinition regularDef(String ksName, String cfName, String name, AbstractType<?> validator)
     {
-        return new ColumnDefinition(ksName, cfName, ColumnIdentifier.getInterned(name, true), validator, null, null, null, null, Kind.REGULAR);
+        return new ColumnDefinition(ksName, cfName, ColumnIdentifier.getInterned(name, true), validator, null, Kind.REGULAR);
     }
 
     public static ColumnDefinition staticDef(CFMetaData cfm, ByteBuffer name, AbstractType<?> validator)
@@ -127,26 +122,15 @@ public class ColumnDefinition extends ColumnSpecification implements Comparable<
              cfm.cfName,
              ColumnIdentifier.getInterned(name, cfm.getColumnDefinitionNameComparator(kind)),
              validator,
-             null,
-             null,
-             null,
              componentIndex,
              kind);
     }
 
-    public ColumnDefinition(String ksName, String cfName, ColumnIdentifier name, AbstractType<?> type, Integer componentIndex, Kind kind)
-    {
-        this(ksName, cfName, name, type, null, null, null, componentIndex, kind);
-    }
-
     @VisibleForTesting
     public ColumnDefinition(String ksName,
                             String cfName,
                             ColumnIdentifier name,
                             AbstractType<?> validator,
-                            IndexType indexType,
-                            Map<String, String> indexOptions,
-                            String indexName,
                             Integer componentIndex,
                             Kind kind)
     {
@@ -156,9 +140,7 @@ public class ColumnDefinition extends ColumnSpecification implements Comparable<
         assert componentIndex == null || kind.isPrimaryKeyKind(); // The componentIndex really only make sense for partition and clustering columns,
                                                                   // so make sure we don't sneak it for something else since it'd breaks equals()
         this.kind = kind;
-        this.indexName = indexName;
         this.componentIndex = componentIndex;
-        this.setIndexType(indexType, indexOptions);
         this.cellPathComparator = makeCellPathComparator(kind, validator);
         this.cellComparator = cellPathComparator == null ? ColumnData.comparator : (a, b) -> cellPathComparator.compare(a.path(), b.path());
         this.asymmetricCellPathComparator = cellPathComparator == null ? null : (a, b) -> cellPathComparator.compare(((Cell)a).path(), (CellPath) b);
@@ -193,17 +175,17 @@ public class ColumnDefinition extends ColumnSpecification implements Comparable<
 
     public ColumnDefinition copy()
     {
-        return new ColumnDefinition(ksName, cfName, name, type, indexType, indexOptions, indexName, componentIndex, kind);
+        return new ColumnDefinition(ksName, cfName, name, type, componentIndex, kind);
     }
 
     public ColumnDefinition withNewName(ColumnIdentifier newName)
     {
-        return new ColumnDefinition(ksName, cfName, newName, type, indexType, indexOptions, indexName, componentIndex, kind);
+        return new ColumnDefinition(ksName, cfName, newName, type, componentIndex, kind);
     }
 
     public ColumnDefinition withNewType(AbstractType<?> newType)
     {
-        return new ColumnDefinition(ksName, cfName, name, newType, indexType, indexOptions, indexName, componentIndex, kind);
+        return new ColumnDefinition(ksName, cfName, name, newType, componentIndex, kind);
     }
 
     public boolean isOnAllComponents()
@@ -255,16 +237,13 @@ public class ColumnDefinition extends ColumnSpecification implements Comparable<
             && Objects.equal(name, cd.name)
             && Objects.equal(type, cd.type)
             && Objects.equal(kind, cd.kind)
-            && Objects.equal(componentIndex, cd.componentIndex)
-            && Objects.equal(indexName, cd.indexName)
-            && Objects.equal(indexType, cd.indexType)
-            && Objects.equal(indexOptions, cd.indexOptions);
+            && Objects.equal(componentIndex, cd.componentIndex);
     }
 
     @Override
     public int hashCode()
     {
-        return Objects.hashCode(ksName, cfName, name, type, kind, componentIndex, indexName, indexType, indexOptions);
+        return Objects.hashCode(ksName, cfName, name, type, kind, componentIndex);
     }
 
     @Override
@@ -275,8 +254,6 @@ public class ColumnDefinition extends ColumnSpecification implements Comparable<
                       .add("type", type)
                       .add("kind", kind)
                       .add("componentIndex", componentIndex)
-                      .add("indexName", indexName)
-                      .add("indexType", indexType)
                       .toString();
     }
 
@@ -302,83 +279,6 @@ public class ColumnDefinition extends ColumnSpecification implements Comparable<
             return kind == Kind.STATIC;
     }
 
-    public ColumnDefinition apply(ColumnDefinition def)  throws ConfigurationException
-    {
-        assert kind == def.kind && Objects.equal(componentIndex, def.componentIndex);
-
-        if (getIndexType() != null && def.getIndexType() != null)
-        {
-            // If an index is set (and not drop by this update), the validator shouldn't be change to a non-compatible one
-            // (and we want true comparator compatibility, not just value one, since the validator is used by LocalPartitioner to order index rows)
-            if (!def.type.isCompatibleWith(type))
-                throw new ConfigurationException(String.format("Cannot modify validator to a non-order-compatible one for column %s since an index is set", name));
-
-            assert getIndexName() != null;
-            if (!getIndexName().equals(def.getIndexName()))
-                throw new ConfigurationException("Cannot modify index name: " + def.getIndexName());
-        }
-
-        return new ColumnDefinition(ksName,
-                                    cfName,
-                                    name,
-                                    def.type,
-                                    def.getIndexType(),
-                                    def.getIndexOptions(),
-                                    def.getIndexName(),
-                                    componentIndex,
-                                    kind);
-    }
-
-    public String getIndexName()
-    {
-        return indexName;
-    }
-
-    public ColumnDefinition setIndexName(String indexName)
-    {
-        this.indexName = indexName;
-        return this;
-    }
-
-    public ColumnDefinition setIndexType(IndexType indexType, Map<String,String> indexOptions)
-    {
-        this.indexType = indexType;
-        this.indexOptions = indexOptions;
-        return this;
-    }
-
-    public ColumnDefinition setIndex(String indexName, IndexType indexType, Map<String,String> indexOptions)
-    {
-        return setIndexName(indexName).setIndexType(indexType, indexOptions);
-    }
-
-    public boolean isIndexed()
-    {
-        return indexType != null;
-    }
-
-    public IndexType getIndexType()
-    {
-        return indexType;
-    }
-
-    public Map<String,String> getIndexOptions()
-    {
-        return indexOptions;
-    }
-
-    /**
-     * Checks if the index option with the specified name has been specified.
-     *
-     * @param name index option name
-     * @return <code>true</code> if the index option with the specified name has been specified, <code>false</code>
-     * otherwise.
-     */
-    public boolean hasIndexOption(String name)
-    {
-        return indexOptions != null && indexOptions.containsKey(name);
-    }
-
     /**
      * Converts the specified column definitions into column identifiers.
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/src/java/org/apache/cassandra/config/IndexType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/IndexType.java b/src/java/org/apache/cassandra/config/IndexType.java
deleted file mode 100644
index d39dccb..0000000
--- a/src/java/org/apache/cassandra/config/IndexType.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.config;
-
-public enum IndexType
-{
-    KEYS,
-    CUSTOM,
-    COMPOSITES
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java b/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java
index 126160d..2aafeb9 100644
--- a/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/JavaBasedUDFunction.java
@@ -171,7 +171,7 @@ final class JavaBasedUDFunction extends UDFunction
         // javaParamTypes is just the Java representation for argTypes resp. argDataTypes
         Class<?>[] javaParamTypes = UDHelper.javaTypes(argDataTypes, calledOnNullInput);
         // javaReturnType is just the Java representation for returnType resp. returnDataType
-        Class<?> javaReturnType = returnDataType.asJavaClass();
+        Class<?> javaReturnType = UDHelper.asJavaClass(returnDataType);
 
         // put each UDF in a separate package to prevent cross-UDF code access
         String pkgName = BASE_PACKAGE + '.' + generateClassName(name, 'p');

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/src/java/org/apache/cassandra/cql3/functions/JavaUDF.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/JavaUDF.java b/src/java/org/apache/cassandra/cql3/functions/JavaUDF.java
index 09b6d58..47f5d47 100644
--- a/src/java/org/apache/cassandra/cql3/functions/JavaUDF.java
+++ b/src/java/org/apache/cassandra/cql3/functions/JavaUDF.java
@@ -22,8 +22,6 @@ import java.nio.ByteBuffer;
 import java.util.List;
 
 import com.datastax.driver.core.DataType;
-import com.datastax.driver.core.ProtocolVersion;
-import org.apache.cassandra.db.marshal.AbstractType;
 
 /**
  * Base class for all Java UDFs.
@@ -58,48 +56,48 @@ public abstract class JavaUDF
     protected float compose_float(int protocolVersion, int argIndex, ByteBuffer value)
     {
         assert value != null && value.remaining() > 0;
-        return (float) DataType.cfloat().deserialize(value, ProtocolVersion.fromInt(protocolVersion));
+        return (float) UDHelper.deserialize(DataType.cfloat(), protocolVersion, value);
     }
 
     // do not remove - used by generated Java UDFs
     protected double compose_double(int protocolVersion, int argIndex, ByteBuffer value)
     {
         assert value != null && value.remaining() > 0;
-        return (double) DataType.cdouble().deserialize(value, ProtocolVersion.fromInt(protocolVersion));
+        return (double) UDHelper.deserialize(DataType.cdouble(), protocolVersion, value);
     }
 
     // do not remove - used by generated Java UDFs
     protected byte compose_byte(int protocolVersion, int argIndex, ByteBuffer value)
     {
         assert value != null && value.remaining() > 0;
-        return (byte) DataType.tinyint().deserialize(value, ProtocolVersion.fromInt(protocolVersion));
+        return (byte) UDHelper.deserialize(DataType.tinyint(), protocolVersion, value);
     }
 
     // do not remove - used by generated Java UDFs
     protected short compose_short(int protocolVersion, int argIndex, ByteBuffer value)
     {
         assert value != null && value.remaining() > 0;
-        return (short) DataType.smallint().deserialize(value, ProtocolVersion.fromInt(protocolVersion));
+        return (short) UDHelper.deserialize(DataType.smallint(), protocolVersion, value);
     }
 
     // do not remove - used by generated Java UDFs
     protected int compose_int(int protocolVersion, int argIndex, ByteBuffer value)
     {
         assert value != null && value.remaining() > 0;
-        return (int) DataType.cint().deserialize(value, ProtocolVersion.fromInt(protocolVersion));
+        return (int) UDHelper.deserialize(DataType.cint(), protocolVersion, value);
     }
 
     // do not remove - used by generated Java UDFs
     protected long compose_long(int protocolVersion, int argIndex, ByteBuffer value)
     {
         assert value != null && value.remaining() > 0;
-        return (long) DataType.bigint().deserialize(value, ProtocolVersion.fromInt(protocolVersion));
+        return (long) UDHelper.deserialize(DataType.bigint(), protocolVersion, value);
     }
 
     // do not remove - used by generated Java UDFs
     protected boolean compose_boolean(int protocolVersion, int argIndex, ByteBuffer value)
     {
         assert value != null && value.remaining() > 0;
-        return (boolean) DataType.cboolean().deserialize(value, ProtocolVersion.fromInt(protocolVersion));
+        return (boolean) UDHelper.deserialize(DataType.cboolean(), protocolVersion, value);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDFunction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDFunction.java b/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDFunction.java
index c4faa72..19ef769 100644
--- a/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDFunction.java
@@ -185,7 +185,7 @@ final class ScriptBasedUDFunction extends UDFunction
         if (result == null)
             return null;
 
-        Class<?> javaReturnType = returnDataType.asJavaClass();
+        Class<?> javaReturnType = UDHelper.asJavaClass(returnDataType);
         Class<?> resultType = result.getClass();
         if (!javaReturnType.isAssignableFrom(resultType))
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/UDFunction.java b/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
index 12b1f93..e21d8af 100644
--- a/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
@@ -36,14 +36,11 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import com.google.common.base.Objects;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.datastax.driver.core.DataType;
-import com.datastax.driver.core.ProtocolVersion;
 import com.datastax.driver.core.UserType;
-
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
@@ -274,6 +271,7 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
             ByteBuffer result = DatabaseDescriptor.enableUserDefinedFunctionsThreads()
                                 ? executeAsync(protocolVersion, parameters)
                                 : executeUserDefined(protocolVersion, parameters);
+
             Tracing.trace("Executed UDF {} in {}\u03bcs", name(), (System.nanoTime() - tStart) / 1000);
             return result;
         }
@@ -313,8 +311,8 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
             threadMXBean.getCurrentThreadCpuTime();
             //
             // Get the TypeCodec stuff in Java Driver initialized.
-            DataType.inet().format(InetAddress.getLoopbackAddress());
-            DataType.list(DataType.ascii()).format(Collections.emptyList());
+            UDHelper.codecRegistry.codecFor(DataType.inet()).format(InetAddress.getLoopbackAddress());
+            UDHelper.codecRegistry.codecFor(DataType.ascii()).format("");
         }
 
         void setup()
@@ -475,7 +473,7 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
 
     protected static Object compose(DataType[] argDataTypes, int protocolVersion, int argIndex, ByteBuffer value)
     {
-        return value == null ? null : argDataTypes[argIndex].deserialize(value, ProtocolVersion.fromInt(protocolVersion));
+        return value == null ? null : UDHelper.deserialize(argDataTypes[argIndex], protocolVersion, value);
     }
 
     /**
@@ -492,7 +490,7 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct
 
     protected static ByteBuffer decompose(DataType dataType, int protocolVersion, Object value)
     {
-        return value == null ? null : dataType.serialize(value, ProtocolVersion.fromInt(protocolVersion));
+        return value == null ? null : UDHelper.serialize(dataType, protocolVersion, value);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/src/java/org/apache/cassandra/cql3/functions/UDHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/UDHelper.java b/src/java/org/apache/cassandra/cql3/functions/UDHelper.java
index d1d12e6..df6ca1f 100644
--- a/src/java/org/apache/cassandra/cql3/functions/UDHelper.java
+++ b/src/java/org/apache/cassandra/cql3/functions/UDHelper.java
@@ -23,9 +23,14 @@ import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
 import java.util.List;
 
+import com.datastax.driver.core.CodecRegistry;
 import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ProtocolVersion;
+import com.datastax.driver.core.TypeCodec;
+import com.datastax.driver.core.exceptions.InvalidTypeException;
 import org.apache.cassandra.cql3.CQL3Type;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.transport.Server;
 
 /**
  * Helper class for User Defined Functions + Aggregates.
@@ -34,14 +39,16 @@ public final class UDHelper
 {
     // TODO make these c'tors and methods public in Java-Driver - see https://datastax-oss.atlassian.net/browse/JAVA-502
     static final MethodHandle methodParseOne;
+    static final CodecRegistry codecRegistry;
     static
     {
         try
         {
             Class<?> cls = Class.forName("com.datastax.driver.core.CassandraTypeParser");
-            Method m = cls.getDeclaredMethod("parseOne", String.class);
+            Method m = cls.getDeclaredMethod("parseOne", String.class, ProtocolVersion.class, CodecRegistry.class);
             m.setAccessible(true);
             methodParseOne = MethodHandles.lookup().unreflect(m);
+            codecRegistry = new CodecRegistry();
         }
         catch (Exception e)
         {
@@ -61,7 +68,7 @@ public final class UDHelper
         Class<?>[] paramTypes = new Class[dataTypes.length];
         for (int i = 0; i < paramTypes.length; i++)
         {
-            Class<?> clazz = dataTypes[i].asJavaClass();
+            Class<?> clazz = asJavaClass(dataTypes[i]);
             if (!calledOnNullInput)
             {
                 // only care about classes that can be used in a data type
@@ -108,7 +115,9 @@ public final class UDHelper
         CQL3Type cqlType = abstractType.asCQL3Type();
         try
         {
-            return (DataType) methodParseOne.invoke(cqlType.getType().toString());
+            return (DataType) methodParseOne.invoke(cqlType.getType().toString(),
+                                                    ProtocolVersion.fromInt(Server.CURRENT_VERSION),
+                                                    codecRegistry);
         }
         catch (RuntimeException | Error e)
         {
@@ -121,6 +130,25 @@ public final class UDHelper
         }
     }
 
+    public static Object deserialize(DataType dataType, int protocolVersion, ByteBuffer value)
+    {
+        return codecRegistry.codecFor(dataType).deserialize(value, ProtocolVersion.fromInt(protocolVersion));
+    }
+
+    public static ByteBuffer serialize(DataType dataType, int protocolVersion, Object value)
+    {
+        TypeCodec<Object> codec = codecRegistry.codecFor(dataType);
+        if (! codec.getJavaType().getRawType().isAssignableFrom(value.getClass()))
+            throw new InvalidTypeException("Invalid value for CQL type " + dataType.getName().toString());
+
+        return codec.serialize(value, ProtocolVersion.fromInt(protocolVersion));
+    }
+
+    public static Class<?> asJavaClass(DataType dataType)
+    {
+        return codecRegistry.codecFor(dataType).getJavaType().getRawType();
+    }
+
     public static boolean isNullOrEmpty(AbstractType<?> type, ByteBuffer bb)
     {
         return bb == null ||

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
index b7e09d9..a0136fa 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
@@ -22,12 +22,13 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.cassandra.auth.Permission;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.MaterializedViewDefinition;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.config.*;
+import org.apache.cassandra.cql3.CFName;
+import org.apache.cassandra.cql3.CQL3Type;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.CounterColumnType;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.schema.TableParams;
 import org.apache.cassandra.service.ClientState;
@@ -263,6 +264,12 @@ public class AlterTableStatement extends SchemaAlteringStatement
                         break;
                 }
 
+                // If a column is dropped which is the target of a secondary index
+                // we need to also drop the index.
+                cfm.getIndexes().get(def).ifPresent(indexMetadata -> {
+                    cfm.indexes(cfm.getIndexes().without(indexMetadata.name));
+                });
+
                 // If a column is dropped which is the target of a materialized view,
                 // then we need to drop the view.
                 // If a column is dropped which was selected into a materialized view,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
index cc808ac..64bb1da 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.cql3.statements;
 import java.util.Collections;
 import java.util.Map;
 
+import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,11 +28,15 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.IndexType;
 import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.CFName;
+import org.apache.cassandra.cql3.IndexName;
 import org.apache.cassandra.db.marshal.MapType;
-import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.schema.Indexes;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.thrift.ThriftValidation;
@@ -91,21 +96,31 @@ public class CreateIndexStatement extends SchemaAlteringStatement
             validateTargetColumnIsMapIfIndexInvolvesKeys(isMap, target);
         }
 
-        if (cd.getIndexType() != null)
+        Indexes existingIndexes = cfm.getIndexes();
+        for (IndexMetadata index : existingIndexes)
         {
-            IndexTarget.TargetType prevType = IndexTarget.TargetType.fromColumnDefinition(cd);
-            if (isMap && target.type != prevType)
+            if (index.indexedColumn(cfm).equals(cd))
             {
-                String msg = "Cannot create index on %s(%s): an index on %s(%s) already exists and indexing " +
-                             "a map on more than one dimension at the same time is not currently supported";
-                throw new InvalidRequestException(String.format(msg,
-                                                                target.type, target.column,
-                                                                prevType, target.column));
+                IndexTarget.TargetType prevType = IndexTarget.TargetType.fromIndexMetadata(index, cfm);
+                if (isMap && target.type != prevType)
+                {
+                    String msg = "Cannot create index on %s(%s): an index on %s(%s) already exists and indexing " +
+                                 "a map on more than one dimension at the same time is not currently supported";
+                    throw new InvalidRequestException(String.format(msg,
+                                                                    target.type, target.column,
+                                                                    prevType, target.column));
+                }
+
+                if (ifNotExists)
+                    return;
+                else
+                    throw new InvalidRequestException("Index already existss");
             }
+        }
 
-            if (ifNotExists)
-                return;
-            else
+        if (!Strings.isNullOrEmpty(indexName))
+        {
+            if (Schema.instance.getKSMetaData(keyspace()).existingIndexNames(null).contains(indexName))
                 throw new InvalidRequestException("Index already exists");
         }
 
@@ -166,15 +181,25 @@ public class CreateIndexStatement extends SchemaAlteringStatement
     {
         CFMetaData cfm = Schema.instance.getCFMetaData(keyspace(), columnFamily()).copy();
         IndexTarget target = rawTarget.prepare(cfm);
-        logger.debug("Updating column {} definition for index {}", target.column, indexName);
-        ColumnDefinition cd = cfm.getColumnDefinition(target.column);
-
-        if (cd.getIndexType() != null && ifNotExists)
-            return false;
 
+        ColumnDefinition cd = cfm.getColumnDefinition(target.column);
+        String acceptedName = indexName;
+        if (Strings.isNullOrEmpty(acceptedName))
+            acceptedName = Indexes.getAvailableIndexName(keyspace(), columnFamily(), cd.name);
+
+        for (IndexMetadata existing : cfm.getIndexes())
+            if (existing.indexedColumn(cfm).equals(cd) || existing.name.equals(acceptedName))
+                if (ifNotExists)
+                    return false;
+                else
+                    throw new InvalidRequestException("Index already exists");
+
+        IndexMetadata.IndexType indexType;
+        Map<String, String> indexOptions;
         if (properties.isCustom)
         {
-            cd.setIndexType(IndexType.CUSTOM, properties.getOptions());
+            indexType = IndexMetadata.IndexType.CUSTOM;
+            indexOptions = properties.getOptions();
         }
         else if (cfm.isCompound())
         {
@@ -184,15 +209,21 @@ public class CreateIndexStatement extends SchemaAlteringStatement
             // lives easier then.
             if (cd.type.isCollection() && cd.type.isMultiCell())
                 options = ImmutableMap.of(target.type.indexOption(), "");
-            cd.setIndexType(IndexType.COMPOSITES, options);
+            indexType = IndexMetadata.IndexType.COMPOSITES;
+            indexOptions = options;
         }
         else
         {
-            cd.setIndexType(IndexType.KEYS, Collections.<String, String>emptyMap());
+            indexType = IndexMetadata.IndexType.KEYS;
+            indexOptions = Collections.emptyMap();
         }
 
-        cd.setIndexName(indexName);
-        cfm.addDefaultIndexNames();
+
+
+        logger.debug("Updating index definition for {}", indexName);
+
+        IndexMetadata index = IndexMetadata.legacyIndex(cd, acceptedName, indexType, indexOptions);
+        cfm.indexes(cfm.getIndexes().with(index));
         MigrationManager.announceColumnFamilyUpdate(cfm, false, isLocalOnly);
         return true;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java
index 4aae9ac..63a1200 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java
@@ -19,9 +19,9 @@ package org.apache.cassandra.cql3.statements;
 
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.IndexName;
+import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.db.KeyspaceNotDefinedException;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.schema.KeyspaceMetadata;
@@ -37,7 +37,7 @@ public class DropIndexStatement extends SchemaAlteringStatement
     public final boolean ifExists;
 
     // initialized in announceMigration()
-    private String indexedCF;
+    private CFMetaData indexedTable;
 
     public DropIndexStatement(IndexName indexName, boolean ifExists)
     {
@@ -48,23 +48,16 @@ public class DropIndexStatement extends SchemaAlteringStatement
 
     public String columnFamily()
     {
-        if (indexedCF != null)
-            return indexedCF;
-
-        try
-        {
-            CFMetaData cfm = findIndexedCF();
-            return cfm == null ? null : cfm.cfName;
-        }
-        catch (InvalidRequestException ire)
-        {
-            throw new RuntimeException(ire);
-        }
+        if (indexedTable != null)
+            return indexedTable.cfName;
+
+        indexedTable = lookupIndexedTable();
+        return indexedTable == null ? null : indexedTable.cfName;
     }
 
     public void checkAccess(ClientState state) throws UnauthorizedException, InvalidRequestException
     {
-        CFMetaData cfm = findIndexedCF();
+        CFMetaData cfm = lookupIndexedTable();
         if (cfm == null)
             return;
 
@@ -73,7 +66,7 @@ public class DropIndexStatement extends SchemaAlteringStatement
 
     public void validate(ClientState state)
     {
-        // validated in findIndexedCf()
+        // validated in lookupIndexedTable()
     }
 
     public Event.SchemaChange changeEvent()
@@ -85,57 +78,39 @@ public class DropIndexStatement extends SchemaAlteringStatement
     @Override
     public ResultMessage execute(QueryState state, QueryOptions options) throws RequestValidationException
     {
-        announceMigration(false);
-        return indexedCF == null ? null : new ResultMessage.SchemaChange(changeEvent());
+        return announceMigration(false) ? new ResultMessage.SchemaChange(changeEvent()) : null;
     }
 
     public boolean announceMigration(boolean isLocalOnly) throws InvalidRequestException, ConfigurationException
     {
-        CFMetaData cfm = findIndexedCF();
+        CFMetaData cfm = lookupIndexedTable();
         if (cfm == null)
             return false;
 
-        CFMetaData updatedCfm = updateCFMetadata(cfm);
-        indexedCF = updatedCfm.cfName;
+        indexedTable = cfm;
+        CFMetaData updatedCfm = cfm.copy();
+        updatedCfm.indexes(updatedCfm.getIndexes().without(indexName));
         MigrationManager.announceColumnFamilyUpdate(updatedCfm, false, isLocalOnly);
         return true;
     }
 
-    private CFMetaData updateCFMetadata(CFMetaData cfm)
+    private CFMetaData lookupIndexedTable()
     {
-        ColumnDefinition column = findIndexedColumn(cfm);
-        assert column != null;
-        CFMetaData cloned = cfm.copy();
-        ColumnDefinition toChange = cloned.getColumnDefinition(column.name);
-        assert toChange.getIndexName() != null && toChange.getIndexName().equals(indexName);
-        toChange.setIndexName(null);
-        toChange.setIndexType(null, null);
-        return cloned;
-    }
+        if (indexedTable != null)
+            return indexedTable;
 
-    private CFMetaData findIndexedCF() throws InvalidRequestException
-    {
         KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace());
         if (ksm == null)
             throw new KeyspaceNotDefinedException("Keyspace " + keyspace() + " does not exist");
 
-        for (CFMetaData cfm : ksm.tables)
-            if (findIndexedColumn(cfm) != null)
-                return cfm;
-
-        if (ifExists)
-            return null;
-        else
-            throw new InvalidRequestException("Index '" + indexName + "' could not be found in any of the tables of keyspace '" + keyspace() + '\'');
-    }
-
-    private ColumnDefinition findIndexedColumn(CFMetaData cfm)
-    {
-        for (ColumnDefinition column : cfm.allColumns())
-        {
-            if (column.getIndexType() != null && column.getIndexName() != null && column.getIndexName().equals(indexName))
-                return column;
-        }
-        return null;
+        return ksm.findIndexedTable(indexName)
+                  .orElseGet(() -> {
+                      if (ifExists)
+                          return null;
+                      else
+                          throw new InvalidRequestException(String.format("Index '%s' could not be found in any " +
+                                                                          "of the tables of keyspace '%s'",
+                                                                          indexName, keyspace()));
+                  });
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/src/java/org/apache/cassandra/cql3/statements/IndexTarget.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/IndexTarget.java b/src/java/org/apache/cassandra/cql3/statements/IndexTarget.java
index d602388..4abb077 100644
--- a/src/java/org/apache/cassandra/cql3/statements/IndexTarget.java
+++ b/src/java/org/apache/cassandra/cql3/statements/IndexTarget.java
@@ -23,6 +23,7 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.schema.IndexMetadata;
 
 public class IndexTarget
 {
@@ -98,17 +99,29 @@ public class IndexTarget
             }
         }
 
-        public static TargetType fromColumnDefinition(ColumnDefinition cd)
+        public static TargetType fromIndexMetadata(IndexMetadata index, CFMetaData cfm)
         {
-            Map<String, String> options = cd.getIndexOptions();
+            Map<String, String> options = index.options;
             if (options.containsKey(SecondaryIndex.INDEX_KEYS_OPTION_NAME))
+            {
                 return KEYS;
+            }
             else if (options.containsKey(SecondaryIndex.INDEX_ENTRIES_OPTION_NAME))
+            {
                 return KEYS_AND_VALUES;
-            else if (cd.type.isCollection() && !cd.type.isMultiCell())
-                return FULL;
+            }
             else
-                return VALUES;
+            {
+                ColumnDefinition cd = index.indexedColumn(cfm);
+                if (cd.type.isCollection() && !cd.type.isMultiCell())
+                {
+                    return FULL;
+                }
+                else
+                {
+                    return VALUES;
+                }
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 202047b..a12de0a 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -371,11 +371,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
 
         // create the private ColumnFamilyStores for the secondary column indexes
-        for (ColumnDefinition info : metadata.allColumns())
-        {
-            if (info.getIndexType() != null)
-                indexManager.addIndexedColumn(info);
-        }
+        for (IndexMetadata info : metadata.getIndexes())
+            indexManager.addIndexedColumn(info);
 
         if (registerBookkeeping)
         {
@@ -571,15 +568,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
 
         // also clean out any index leftovers.
-        for (ColumnDefinition def : metadata.allColumns())
-        {
-            if (def.isIndexed())
-            {
-                CFMetaData indexMetadata = SecondaryIndex.newIndexMetadata(metadata, def);
-                if (indexMetadata != null)
-                    scrubDataDirectories(indexMetadata);
-            }
-        }
+        for (IndexMetadata def : metadata.getIndexes())
+            if (!def.isCustom())
+                scrubDataDirectories(SecondaryIndex.newIndexMetadata(metadata, def));
     }
 
     // must be called after all sstables are loaded since row cache merges all row versions

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
index eccd7e2..a631b9a 100644
--- a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
@@ -53,7 +53,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
 
         columnDef = columnDefs.iterator().next();
 
-        CFMetaData indexedCfMetadata = SecondaryIndex.newIndexMetadata(baseCfs.metadata, columnDef, getIndexKeyComparator());
+        CFMetaData indexedCfMetadata = SecondaryIndex.newIndexMetadata(baseCfs.metadata, indexMetadata, getIndexKeyComparator());
         indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.keyspace,
                                                              indexedCfMetadata.cfName,
                                                              indexedCfMetadata,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
index 9221090..4302112 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
@@ -18,9 +18,7 @@
 package org.apache.cassandra.db.index;
 
 import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.FutureTask;
@@ -32,7 +30,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.IndexType;
+import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
@@ -47,6 +45,7 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.io.sstable.ReducingKeyIterator;
+import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.Refs;
 
@@ -87,6 +86,8 @@ public abstract class SecondaryIndex
      */
     protected final Set<ColumnDefinition> columnDefs = Collections.newSetFromMap(new ConcurrentHashMap<ColumnDefinition,Boolean>());
 
+    protected IndexMetadata indexMetadata;
+
     /**
      * Perform any initialization work
      */
@@ -101,10 +102,10 @@ public abstract class SecondaryIndex
     public abstract void reload();
 
     /**
-     * Validates the index_options passed in the ColumnDef
+     * Validates the index_options passed in the IndexMetadata
      * @throws ConfigurationException
      */
-    public abstract void validateOptions() throws ConfigurationException;
+    public abstract void validateOptions(CFMetaData baseCfm, IndexMetadata def) throws ConfigurationException;
 
     /**
      * @return The name of the index
@@ -268,6 +269,13 @@ public abstract class SecondaryIndex
         return columnDefs;
     }
 
+    void setIndexMetadata(IndexMetadata indexDef)
+    {
+        this.indexMetadata = indexDef;
+        for (ColumnIdentifier col : indexDef.columns)
+            this.columnDefs.add(baseCfs.metadata.getColumnDefinition(col));
+    }
+
     void addColumnDef(ColumnDefinition columnDef)
     {
        columnDefs.add(columnDef);
@@ -312,45 +320,56 @@ public abstract class SecondaryIndex
      * It will validate the index_options before initializing.
      *
      * @param baseCfs the source of data for the Index
-     * @param cdef the meta information about this column (index_type, index_options, name, etc...)
+     * @param indexDef the meta information about this index (index_type, index_options, name, etc...)
      *
      * @return The secondary index instance for this column
      * @throws ConfigurationException
      */
-    public static SecondaryIndex createInstance(ColumnFamilyStore baseCfs, ColumnDefinition cdef) throws ConfigurationException
+    public static SecondaryIndex createInstance(ColumnFamilyStore baseCfs,
+                                                IndexMetadata indexDef) throws ConfigurationException
+    {
+        SecondaryIndex index = uninitializedInstance(baseCfs.metadata, indexDef);
+        index.validateOptions(baseCfs.metadata, indexDef);
+        index.setBaseCfs(baseCfs);
+        index.setIndexMetadata(indexDef);
+
+        return index;
+    }
+
+    public static void validate(CFMetaData baseMetadata,
+                                IndexMetadata indexDef) throws ConfigurationException
     {
-        SecondaryIndex index;
+        SecondaryIndex index = uninitializedInstance(baseMetadata, indexDef);
+        index.validateOptions(baseMetadata, indexDef);
+    }
 
-        switch (cdef.getIndexType())
+    private static SecondaryIndex uninitializedInstance(CFMetaData baseMetadata,
+                                                        IndexMetadata indexDef) throws ConfigurationException
+    {
+        if (indexDef.isKeys())
+        {
+            return new KeysIndex();
+        }
+        else if (indexDef.isComposites())
+        {
+            return CompositesIndex.create(indexDef, baseMetadata);
+        }
+        else if (indexDef.isCustom())
         {
-        case KEYS:
-            index = new KeysIndex();
-            break;
-        case COMPOSITES:
-            index = CompositesIndex.create(cdef);
-            break;
-        case CUSTOM:
-            assert cdef.getIndexOptions() != null;
-            String class_name = cdef.getIndexOptions().get(CUSTOM_INDEX_OPTION_NAME);
+            assert indexDef.options != null;
+            String class_name = indexDef.options.get(CUSTOM_INDEX_OPTION_NAME);
             assert class_name != null;
             try
             {
-                index = (SecondaryIndex) Class.forName(class_name).newInstance();
+                return (SecondaryIndex) Class.forName(class_name).newInstance();
             }
             catch (Exception e)
             {
                 throw new RuntimeException(e);
             }
-            break;
-            default:
-                throw new RuntimeException("Unknown index type: " + cdef.getIndexName());
         }
 
-        index.addColumnDef(cdef);
-        index.validateOptions();
-        index.setBaseCfs(baseCfs);
-
-        return index;
+        throw new AssertionError("Unknown index type: " + indexDef.name);
     }
 
     public abstract void validate(DecoratedKey partitionKey) throws InvalidRequestException;
@@ -372,32 +391,31 @@ public abstract class SecondaryIndex
     /**
      * Create the index metadata for the index on a given column of a given table.
      */
-    public static CFMetaData newIndexMetadata(CFMetaData baseMetadata, ColumnDefinition def)
+    public static CFMetaData newIndexMetadata(CFMetaData baseMetadata, IndexMetadata def)
     {
-        return newIndexMetadata(baseMetadata, def, def.type);
+        return newIndexMetadata(baseMetadata, def, def.indexedColumn(baseMetadata).type);
     }
 
     /**
      * Create the index metadata for the index on a given column of a given table.
      */
-    static CFMetaData newIndexMetadata(CFMetaData baseMetadata, ColumnDefinition def, AbstractType<?> comparator)
+    static CFMetaData newIndexMetadata(CFMetaData baseMetadata, IndexMetadata def, AbstractType<?> comparator)
     {
-        if (def.getIndexType() == IndexType.CUSTOM)
-            return null;
+        assert !def.isCustom();
 
         CFMetaData.Builder builder = CFMetaData.Builder.create(baseMetadata.ksName, baseMetadata.indexColumnFamilyName(def))
                                                        .withId(baseMetadata.cfId)
                                                        .withPartitioner(new LocalPartitioner(comparator))
-                                                       .addPartitionKey(def.name, def.type);
+                                                       .addPartitionKey(def.indexedColumn(baseMetadata).name, comparator);
 
-        if (def.getIndexType() == IndexType.COMPOSITES)
+        if (def.isComposites())
         {
             CompositesIndex.addIndexClusteringColumns(builder, baseMetadata, def);
         }
         else
         {
-            assert def.getIndexType() == IndexType.KEYS;
-            KeysIndex.addIndexClusteringColumns(builder, baseMetadata, def);
+            assert def.isKeys();
+            KeysIndex.addIndexClusteringColumns(builder, baseMetadata);
         }
 
         return builder.build().reloadIndexMetadataProperties(baseMetadata);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index aaefc9c..996c730 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -30,7 +30,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.IndexType;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.filter.RowFilter;
@@ -39,6 +38,7 @@ import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 
@@ -107,14 +107,16 @@ public class SecondaryIndexManager
         for (ByteBuffer indexedColumn : indexedColumnNames)
         {
             ColumnDefinition def = baseCfs.metadata.getColumnDefinition(indexedColumn);
-            if (def == null || def.getIndexType() == null)
+            if (def == null || !baseCfs.metadata.getIndexes().get(def).isPresent())
                 removeIndexedColumn(indexedColumn);
         }
 
         // TODO: allow all ColumnDefinition type
-        for (ColumnDefinition cdef : baseCfs.metadata.allColumns())
-            if (cdef.getIndexType() != null && !indexedColumnNames.contains(cdef.name.bytes))
-                addIndexedColumn(cdef);
+        for (IndexMetadata indexDef : baseCfs.metadata.getIndexes())
+        {
+            if (!indexedColumnNames.contains(indexDef.indexedColumn(baseCfs.metadata).name.bytes))
+                addIndexedColumn(indexDef);
+        }
 
         for (SecondaryIndex index : allIndexes)
             index.reload();
@@ -230,17 +232,16 @@ public class SecondaryIndexManager
 
     /**
      * Adds and builds a index for a column
-     * @param cdef the column definition holding the index data
+     * @param indexDef the index metadata
      * @return a future which the caller can optionally block on signaling the index is built
      */
-    public synchronized Future<?> addIndexedColumn(ColumnDefinition cdef)
+    public synchronized Future<?> addIndexedColumn(IndexMetadata indexDef)
     {
+        ColumnDefinition cdef = indexDef.indexedColumn(baseCfs.metadata);
         if (indexesByColumn.containsKey(cdef.name.bytes))
             return null;
 
-        assert cdef.getIndexType() != null;
-
-        SecondaryIndex index = SecondaryIndex.createInstance(baseCfs, cdef);
+        SecondaryIndex index = SecondaryIndex.createInstance(baseCfs, indexDef);
 
         // Keep a single instance of the index per-cf for row level indexes
         // since we want all columns to be under the index
@@ -256,14 +257,14 @@ public class SecondaryIndexManager
             else
             {
                 index = currentIndex;
-                index.addColumnDef(cdef);
-                logger.info("Creating new index : {}",cdef);
+                index.setIndexMetadata(indexDef);
+                logger.info("Creating new index : {}",indexDef.name);
             }
         }
         else
         {
             // TODO: We sould do better than throw a RuntimeException
-            if (cdef.getIndexType() == IndexType.CUSTOM && index instanceof AbstractSimplePerColumnSecondaryIndex)
+            if (indexDef.isCustom() && index instanceof AbstractSimplePerColumnSecondaryIndex)
                 throw new RuntimeException("Cannot use a subclass of AbstractSimplePerColumnSecondaryIndex as a CUSTOM index, as they assume they are CFS backed");
             index.init();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
index df45ea2..abc5344 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
@@ -40,8 +41,9 @@ import org.apache.cassandra.exceptions.ConfigurationException;
  */
 public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIndex
 {
-    public static CompositesIndex create(ColumnDefinition cfDef)
+    public static CompositesIndex create(IndexMetadata indexDef, CFMetaData baseMetadata)
     {
+        ColumnDefinition cfDef = indexDef.indexedColumn(baseMetadata);
         if (cfDef.type.isCollection() && cfDef.type.isMultiCell())
         {
             switch (((CollectionType)cfDef.type).kind)
@@ -51,9 +53,9 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
                 case SET:
                     return new CompositesIndexOnCollectionKey();
                 case MAP:
-                    if (cfDef.hasIndexOption(SecondaryIndex.INDEX_KEYS_OPTION_NAME))
+                    if (indexDef.options.containsKey(SecondaryIndex.INDEX_KEYS_OPTION_NAME))
                         return new CompositesIndexOnCollectionKey();
-                    else if (cfDef.hasIndexOption(SecondaryIndex.INDEX_ENTRIES_OPTION_NAME))
+                    else if (indexDef.options.containsKey(SecondaryIndex.INDEX_ENTRIES_OPTION_NAME))
                         return new CompositesIndexOnCollectionKeyAndValue();
                     else
                         return new CompositesIndexOnCollectionValue();
@@ -74,13 +76,14 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
         throw new AssertionError();
     }
 
-    public static void addIndexClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition cfDef)
+    public static void addIndexClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, IndexMetadata indexDef)
     {
+        ColumnDefinition cfDef = indexDef.indexedColumn(baseMetadata);
         if (cfDef.type.isCollection() && cfDef.type.isMultiCell())
         {
             CollectionType type = (CollectionType)cfDef.type;
             if (type.kind == CollectionType.Kind.LIST
-                || (type.kind == CollectionType.Kind.MAP && cfDef.hasIndexOption(SecondaryIndex.INDEX_VALUES_OPTION_NAME)))
+                || (type.kind == CollectionType.Kind.MAP && indexDef.options.containsKey(SecondaryIndex.INDEX_VALUES_OPTION_NAME)))
             {
                 CompositesIndexOnCollectionValue.addClusteringColumns(indexMetadata, baseMetadata, cfDef);
             }
@@ -125,10 +128,10 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
         return new CompositesSearcher(baseCfs.indexManager, columns);
     }
 
-    public void validateOptions() throws ConfigurationException
+    public void validateOptions(CFMetaData baseCfm, IndexMetadata indexMetadata) throws ConfigurationException
     {
-        ColumnDefinition columnDef = columnDefs.iterator().next();
-        Map<String, String> options = new HashMap<String, String>(columnDef.getIndexOptions());
+        ColumnDefinition columnDef = indexMetadata.indexedColumn(baseCfm);
+        Map<String, String> options = new HashMap<String, String>(indexMetadata.options);
 
         // We used to have an option called "prefix_size" so skip it silently for backward compatibility sake.
         options.remove("prefix_size");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
index 478559a..384b29d 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexSearcher;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 
 /**
@@ -39,7 +40,7 @@ import org.apache.cassandra.utils.concurrent.OpOrder;
  */
 public class KeysIndex extends AbstractSimplePerColumnSecondaryIndex
 {
-    public static void addIndexClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition cfDef)
+    public static void addIndexClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata)
     {
         indexMetadata.addClusteringColumn("partition_key", baseMetadata.partitioner.partitionOrdering());
     }
@@ -82,7 +83,7 @@ public class KeysIndex extends AbstractSimplePerColumnSecondaryIndex
         return new KeysSearcher(baseCfs.indexManager, columns);
     }
 
-    public void validateOptions() throws ConfigurationException
+    public void validateOptions(CFMetaData baseCfm, IndexMetadata def) throws ConfigurationException
     {
         // no options used
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
index 8373a2b..9ca657d 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
@@ -27,6 +27,8 @@ import java.util.*;
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.base.Splitter;
+
+import com.datastax.driver.core.TypeCodec;
 import org.apache.cassandra.utils.AbstractIterator;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
@@ -340,12 +342,48 @@ public class CqlRecordReader extends RecordReader<Long, Row>
         }
 
         @Override
+        public <T> T get(int i, Class<T> aClass)
+        {
+            return row.get(i, aClass);
+        }
+
+        @Override
+        public <T> T get(int i, TypeToken<T> typeToken)
+        {
+            return row.get(i, typeToken);
+        }
+
+        @Override
+        public <T> T get(int i, TypeCodec<T> typeCodec)
+        {
+            return row.get(i, typeCodec);
+        }
+
+        @Override
         public Object getObject(String s)
         {
             return row.getObject(s);
         }
 
         @Override
+        public <T> T get(String s, Class<T> aClass)
+        {
+            return row.get(s, aClass);
+        }
+
+        @Override
+        public <T> T get(String s, TypeToken<T> typeToken)
+        {
+            return row.get(s, typeToken);
+        }
+
+        @Override
+        public <T> T get(String s, TypeCodec<T> typeCodec)
+        {
+            return row.get(s, typeCodec);
+        }
+
+        @Override
         public boolean getBool(int i)
         {
             return row.getBool(i);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06c130e3/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 5502669..5908594 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -56,6 +56,7 @@ import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.metrics.RestorableMeter;
 import org.apache.cassandra.metrics.StorageMetrics;
 import org.apache.cassandra.schema.CachingParams;
+import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.utils.*;
@@ -335,8 +336,11 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         {
             int i = descriptor.cfname.indexOf(SECONDARY_INDEX_NAME_SEPARATOR);
             String parentName = descriptor.cfname.substring(0, i);
+            String indexName = descriptor.cfname.substring(i + 1);
             CFMetaData parent = Schema.instance.getCFMetaData(descriptor.ksname, parentName);
-            ColumnDefinition def = parent.getColumnDefinitionForIndex(descriptor.cfname.substring(i + 1));
+            IndexMetadata def = parent.getIndexes()
+                                      .get(indexName)
+                                      .orElseThrow(() -> new AssertionError("Could not find index metadata for index cf " + i));
             metadata = SecondaryIndex.newIndexMetadata(parent, def);
         }
         else