You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2015/09/16 18:32:47 UTC

[1/5] cassandra git commit: Remove target_columns from index metadata

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 1d94dc2b5 -> fde97c3b3
  refs/heads/trunk 5ba0adf50 -> f5cf57167


http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java b/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
index 0003f8f..2a43e33 100644
--- a/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
+++ b/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
@@ -29,12 +29,11 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import org.apache.cassandra.index.SecondaryIndexManager;
-
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.cql3.statements.IndexTarget;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.Row;
@@ -44,6 +43,8 @@ import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.Util.throwAssert;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -63,8 +64,8 @@ public class SecondaryIndexTest
         SchemaLoader.prepareServer();
         SchemaLoader.createKeyspace(KEYSPACE1,
                                     KeyspaceParams.simple(1),
-                                    SchemaLoader.compositeIndexCFMD(KEYSPACE1, WITH_COMPOSITE_INDEX, true) .gcGraceSeconds(0),
-                                    SchemaLoader.compositeIndexCFMD(KEYSPACE1, COMPOSITE_INDEX_TO_BE_ADDED, false) .gcGraceSeconds(0),
+                                    SchemaLoader.compositeIndexCFMD(KEYSPACE1, WITH_COMPOSITE_INDEX, true).gcGraceSeconds(0),
+                                    SchemaLoader.compositeIndexCFMD(KEYSPACE1, COMPOSITE_INDEX_TO_BE_ADDED, false).gcGraceSeconds(0),
                                     SchemaLoader.keysIndexCFMD(KEYSPACE1, WITH_KEYS_INDEX, true).gcGraceSeconds(0));
     }
 
@@ -435,10 +436,12 @@ public class SecondaryIndexTest
         // create a row and update the birthdate value, test that the index query fetches the new version
         new RowUpdateBuilder(cfs.metadata, 0, "k1").clustering("c").add("birthdate", 1L).build().applyUnsafe();
 
+        String indexName = "birthdate_index";
         ColumnDefinition old = cfs.metadata.getColumnDefinition(ByteBufferUtil.bytes("birthdate"));
-        IndexMetadata indexDef = IndexMetadata.singleColumnIndex(old,
-                                                                 "birthdate_index",
-                                                                 IndexMetadata.IndexType.COMPOSITES,
+        IndexMetadata indexDef = IndexMetadata.singleTargetIndex(cfs.metadata,
+                                                                 new IndexTarget(old.name, IndexTarget.Type.VALUES),
+                                                                 indexName,
+                                                                 IndexMetadata.Kind.COMPOSITES,
                                                                  Collections.EMPTY_MAP);
         cfs.metadata.indexes(cfs.metadata.getIndexes().with(indexDef));
         Future<?> future = cfs.indexManager.addIndex(indexDef);
@@ -446,15 +449,11 @@ public class SecondaryIndexTest
 
         // we had a bug (CASSANDRA-2244) where index would get created but not flushed -- check for that
         // the way we find the index cfs is a bit convoluted at the moment
-        ColumnDefinition cDef = cfs.metadata.getColumnDefinition(ByteBufferUtil.bytes("birthdate"));
-        String indexName = getIndexNameForColumn(cfs, cDef);
-        assertNotNull(indexName);
         boolean flushed = false;
-        for (ColumnFamilyStore indexCfs : cfs.indexManager.getAllIndexColumnFamilyStores())
-        {
-            if (SecondaryIndexManager.getIndexName(indexCfs).equals(indexName))
-                flushed = indexCfs.getLiveSSTables().size() > 0;
-        }
+        ColumnFamilyStore indexCfs = cfs.indexManager.getIndex(indexDef)
+                                                     .getBackingTable()
+                                                     .orElseThrow(throwAssert("Index not found"));
+        flushed = !indexCfs.getLiveSSTables().isEmpty();
         assertTrue(flushed);
         assertIndexedOne(cfs, ByteBufferUtil.bytes("birthdate"), 1L);
 
@@ -469,18 +468,6 @@ public class SecondaryIndexTest
         assertIndexedOne(cfs, ByteBufferUtil.bytes("birthdate"), 1L);
     }
 
-    private String getIndexNameForColumn(ColumnFamilyStore cfs, ColumnDefinition column)
-    {
-        // this is mega-ugly because there is a mismatch between the name of an index
-        // stored in schema metadata & the name used to refer to that index in other
-        // places (such as system.IndexInfo). Hopefully this is temporary and
-        // Index.getIndexName() can be made equalivalent to Index.getIndexMetadata().name
-        // (ideally even removing the former completely)
-        Collection<IndexMetadata> indexes = cfs.metadata.getIndexes().get(column);
-        assertEquals(1, indexes.size());
-        return cfs.indexManager.getIndex(indexes.iterator().next()).getIndexName();
-    }
-
     @Test
     public void testKeysSearcherSimple() throws Exception
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/test/unit/org/apache/cassandra/index/StubIndex.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/StubIndex.java b/test/unit/org/apache/cassandra/index/StubIndex.java
index 0ea03dc..544d482 100644
--- a/test/unit/org/apache/cassandra/index/StubIndex.java
+++ b/test/unit/org/apache/cassandra/index/StubIndex.java
@@ -23,7 +23,6 @@ import java.util.concurrent.Callable;
 import java.util.function.BiFunction;
 
 import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.RowFilter;
@@ -70,6 +69,11 @@ public class StubIndex implements Index
         return false;
     }
 
+    public boolean dependsOn(ColumnDefinition column)
+    {
+        return false;
+    }
+
     public boolean supportsExpression(ColumnDefinition column, Operator operator)
     {
         return operator == Operator.EQ;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
index 0a62d9b..8695018 100644
--- a/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
+++ b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
@@ -20,27 +20,27 @@ import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.db.lifecycle.View;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.dht.LocalPartitioner;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.index.Index;
 import org.apache.cassandra.index.IndexRegistry;
 import org.apache.cassandra.index.SecondaryIndexBuilder;
-import org.apache.cassandra.index.internal.composites.CompositesSearcher;
-import org.apache.cassandra.index.internal.keys.KeysSearcher;
 import org.apache.cassandra.index.transactions.IndexTransaction;
 import org.apache.cassandra.index.transactions.UpdateTransaction;
 import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.concurrent.Refs;
 
+import static org.apache.cassandra.index.internal.CassandraIndex.getFunctions;
+import static org.apache.cassandra.index.internal.CassandraIndex.indexCfsMetadata;
+import static org.apache.cassandra.index.internal.CassandraIndex.parseTarget;
+
 /**
  * Clone of KeysIndex used in CassandraIndexTest#testCustomIndexWithCFS to verify
  * behaviour of flushing CFS backed CUSTOM indexes
@@ -145,14 +145,14 @@ public class CustomCassandraIndex implements Index
     private void setMetadata(IndexMetadata indexDef)
     {
         metadata = indexDef;
-        functions = getFunctions(baseCfs.metadata, indexDef);
+        Pair<ColumnDefinition, IndexTarget.Type> target = parseTarget(baseCfs.metadata, indexDef);
+        functions = getFunctions(indexDef, target);
         CFMetaData cfm = indexCfsMetadata(baseCfs.metadata, indexDef);
         indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.keyspace,
                                                              cfm.cfName,
                                                              cfm,
                                                              baseCfs.getTracker().loadsstables);
-        assert indexDef.columns.size() == 1 : "Build in indexes on multiple target columns are not supported";
-        indexedColumn = indexDef.indexedColumn(baseCfs.metadata);
+        indexedColumn = target.left;
     }
 
     public Callable<?> getTruncateTask(final long truncatedAt)
@@ -174,6 +174,11 @@ public class CustomCassandraIndex implements Index
         return isPrimaryKeyIndex() || columns.contains(indexedColumn);
     }
 
+    public boolean dependsOn(ColumnDefinition column)
+    {
+        return column.equals(indexedColumn);
+    }
+
     public boolean supportsExpression(ColumnDefinition column, Operator operator)
     {
         return indexedColumn.name.equals(column.name)
@@ -668,76 +673,4 @@ public class CustomCassandraIndex implements Index
                             .map(SSTableReader::toString)
                             .collect(Collectors.joining(", "));
     }
-
-    /**
-     * Construct the CFMetadata for an index table, the clustering columns in the index table
-     * vary dependent on the kind of the indexed value.
-     * @param baseCfsMetadata
-     * @param indexMetadata
-     * @return
-     */
-    public static final CFMetaData indexCfsMetadata(CFMetaData baseCfsMetadata, IndexMetadata indexMetadata)
-    {
-        CassandraIndexFunctions utils = getFunctions(baseCfsMetadata, indexMetadata);
-        ColumnDefinition indexedColumn = indexMetadata.indexedColumn(baseCfsMetadata);
-        AbstractType<?> indexedValueType = utils.getIndexedValueType(indexedColumn);
-        CFMetaData.Builder builder = CFMetaData.Builder.create(baseCfsMetadata.ksName,
-                                                               baseCfsMetadata.indexColumnFamilyName(indexMetadata))
-                                                       .withId(baseCfsMetadata.cfId)
-                                                       .withPartitioner(new LocalPartitioner(indexedValueType))
-                                                       .addPartitionKey(indexedColumn.name, indexedColumn.type);
-
-        builder.addClusteringColumn("partition_key", baseCfsMetadata.partitioner.partitionOrdering());
-        builder = utils.addIndexClusteringColumns(builder, baseCfsMetadata, indexedColumn);
-        return builder.build().reloadIndexMetadataProperties(baseCfsMetadata);
-    }
-
-    /**
-     * Factory method for new CassandraIndex instances
-     * @param baseCfs
-     * @param indexMetadata
-     * @return
-     */
-    public static final CassandraIndex newIndex(ColumnFamilyStore baseCfs, IndexMetadata indexMetadata)
-    {
-        return getFunctions(baseCfs.metadata, indexMetadata).newIndexInstance(baseCfs, indexMetadata);
-    }
-
-    private static CassandraIndexFunctions getFunctions(CFMetaData baseCfMetadata, IndexMetadata indexDef)
-    {
-        if (indexDef.isKeys())
-            return CassandraIndexFunctions.KEYS_INDEX_FUNCTIONS;
-
-        ColumnDefinition indexedColumn = indexDef.indexedColumn(baseCfMetadata);
-        if (indexedColumn.type.isCollection() && indexedColumn.type.isMultiCell())
-        {
-            switch (((CollectionType)indexedColumn.type).kind)
-            {
-                case LIST:
-                    return CassandraIndexFunctions.COLLECTION_VALUE_INDEX_FUNCTIONS;
-                case SET:
-                    return CassandraIndexFunctions.COLLECTION_KEY_INDEX_FUNCTIONS;
-                case MAP:
-                    if (indexDef.options.containsKey(IndexTarget.INDEX_KEYS_OPTION_NAME))
-                        return CassandraIndexFunctions.COLLECTION_KEY_INDEX_FUNCTIONS;
-                    else if (indexDef.options.containsKey(IndexTarget.INDEX_ENTRIES_OPTION_NAME))
-                        return CassandraIndexFunctions.COLLECTION_ENTRY_INDEX_FUNCTIONS;
-                    else
-                        return CassandraIndexFunctions.COLLECTION_VALUE_INDEX_FUNCTIONS;
-            }
-        }
-
-        switch (indexedColumn.kind)
-        {
-            case CLUSTERING:
-                return CassandraIndexFunctions.CLUSTERING_COLUMN_INDEX_FUNCTIONS;
-            case REGULAR:
-                return CassandraIndexFunctions.REGULAR_COLUMN_INDEX_FUNCTIONS;
-            case PARTITION_KEY:
-                return CassandraIndexFunctions.PARTITION_KEY_INDEX_FUNCTIONS;
-            //case COMPACT_VALUE:
-            //    return new CompositesIndexOnCompactValue();
-        }
-        throw new AssertionError();
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/test/unit/org/apache/cassandra/schema/DefsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/schema/DefsTest.java b/test/unit/org/apache/cassandra/schema/DefsTest.java
index ee73b2b..f348c30 100644
--- a/test/unit/org/apache/cassandra/schema/DefsTest.java
+++ b/test/unit/org/apache/cassandra/schema/DefsTest.java
@@ -44,14 +44,13 @@ import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.index.Index;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.locator.OldNetworkTopologyStrategy;
 import org.apache.cassandra.service.MigrationManager;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
+import static org.apache.cassandra.Util.throwAssert;
 import static org.apache.cassandra.cql3.CQLTester.assertRows;
 import static org.apache.cassandra.cql3.CQLTester.row;
 import static org.junit.Assert.assertEquals;
@@ -501,6 +500,7 @@ public class DefsTest
         // persist keyspace definition in the system keyspace
         SchemaKeyspace.makeCreateKeyspaceMutation(Schema.instance.getKSMetaData(KEYSPACE6), FBUtilities.timestampMicros()).applyUnsafe();
         ColumnFamilyStore cfs = Keyspace.open(KEYSPACE6).getColumnFamilyStore(TABLE1i);
+        String indexName = "birthdate_key_index";
 
         // insert some data.  save the sstable descriptor so we can make sure it's marked for delete after the drop
         QueryProcessor.executeInternal(String.format(
@@ -510,37 +510,17 @@ public class DefsTest
                                        "key0", "col0", 1L, 1L);
 
         cfs.forceBlockingFlush();
-        ColumnDefinition indexedColumn = cfs.metadata.getColumnDefinition(ByteBufferUtil.bytes("birthdate"));
-        IndexMetadata index = cfs.metadata.getIndexes()
-                                          .get(indexedColumn)
-                                          .iterator()
-                                          .next();
-        ColumnFamilyStore indexCfs = cfs.indexManager.listIndexes()
-                                                     .stream()
-                                                     .filter(i -> i.getIndexMetadata().equals(index))
-                                                     .map(Index::getBackingTable)
-                                                     .findFirst()
-                                                     .orElseThrow(() -> new AssertionError("Index not found"))
-                                                     .orElseThrow(() -> new AssertionError("Index has no backing table"));
+        ColumnFamilyStore indexCfs = cfs.indexManager.getIndexByName(indexName)
+                                                     .getBackingTable()
+                                                     .orElseThrow(throwAssert("Cannot access index cfs"));
         Descriptor desc = indexCfs.getLiveSSTables().iterator().next().descriptor;
 
         // drop the index
         CFMetaData meta = cfs.metadata.copy();
-        // We currently have a mismatch between IndexMetadata.name (which is simply the name
-        // of the index) and what gets returned from SecondaryIndex#getIndexName() (usually, this
-        // defaults to <tablename>.<indexname>.
-        // IndexMetadata takes its lead from the prior implementation of ColumnDefinition.name
-        // which did not include the table name.
-        // This mismatch causes some other, long standing inconsistencies:
-        // nodetool rebuild_index <ks> <tbl> <idx>  - <idx> must be qualified, i.e. include the redundant table name
-        //                                            without it, the rebuild silently fails
-        // system.IndexInfo (which is also exposed over JMX as CF.BuildIndexes) uses the form <tbl>.<idx>
-        // cqlsh> describe index [<ks>.]<idx>  - here <idx> must not be qualified by the table name.
-        //
-        // This should get resolved as part of #9459 by better separating the index name from the
-        // name of it's underlying CFS (if it as one), as the comment in CFMetaData#indexColumnFamilyName promises
-        // Then we will be able to just use the value of SI#getIndexName() when removing an index from CFMetaData
-        IndexMetadata existing = meta.getIndexes().iterator().next();
+        IndexMetadata existing = cfs.metadata.getIndexes()
+                                             .get(indexName)
+                                             .orElseThrow(throwAssert("Index not found"));
+
         meta.indexes(meta.getIndexes().without(existing.name));
         MigrationManager.announceColumnFamilyUpdate(meta, false);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java b/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java
index d841e91..7124e40 100644
--- a/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java
+++ b/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.functions.*;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.index.internal.CassandraIndex;
 import org.apache.cassandra.thrift.ThriftConversion;
 
 import static java.lang.String.format;
@@ -490,7 +491,7 @@ public class LegacySchemaMigratorTest
         {
             IndexMetadata i = index.get();
             adder.add("index_name", i.name);
-            adder.add("index_type", i.indexType.toString());
+            adder.add("index_type", i.kind.toString());
             adder.add("index_options", json(i.options));
         }
         else
@@ -504,11 +505,14 @@ public class LegacySchemaMigratorTest
     }
 
     private static Optional<IndexMetadata> findIndexForColumn(Indexes indexes,
-                                                                CFMetaData table,
-                                                                ColumnDefinition column)
+                                                              CFMetaData table,
+                                                              ColumnDefinition column)
     {
+        // makes the assumptions that the string option denoting the
+        // index targets can be parsed by CassandraIndex.parseTarget
+        // which should be true for any pre-3.0 index
         for (IndexMetadata index : indexes)
-            if (index.indexedColumn(table).equals(column))
+          if (CassandraIndex.parseTarget(table, index).left.equals(column))
                 return Optional.of(index);
 
         return Optional.empty();


[2/5] cassandra git commit: Remove target_columns from index metadata

Posted by sa...@apache.org.
Remove target_columns from index metadata

Patch by Sam Tunnicliffe; reviewed by Sylvain Lebresne for
CASSANDRA-10216


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fde97c3b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fde97c3b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fde97c3b

Branch: refs/heads/cassandra-3.0
Commit: fde97c3b3d93901722ee2975552909e013b48b65
Parents: 1d94dc2
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Wed Sep 16 11:09:02 2015 +0100
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Wed Sep 16 17:25:23 2015 +0100

----------------------------------------------------------------------
 ...ore-3.0.0-alpha3-8bd064d-SNAPSHOT-shaded.jar | Bin 2218913 -> 0 bytes
 ...ore-3.0.0-alpha3-d56dd0b-SNAPSHOT-shaded.jar | Bin 0 -> 2216851 bytes
 ...iver-internal-only-3.0.0a2.post0-96883eb.zip | Bin 230630 -> 0 bytes
 ...iver-internal-only-3.0.0a2.post0-fecbd54.zip | Bin 0 -> 230674 bytes
 .../org/apache/cassandra/config/CFMetaData.java |  13 +-
 src/java/org/apache/cassandra/cql3/Cql.g        |   3 +-
 .../cql3/statements/AlterTableStatement.java    |  23 ++--
 .../cql3/statements/CreateIndexStatement.java   |  38 ++----
 .../cql3/statements/IndexPropDefs.java          |   5 +
 .../cassandra/cql3/statements/IndexTarget.java  | 102 +++++++++-------
 src/java/org/apache/cassandra/index/Index.java  |  13 ++
 .../cassandra/index/SecondaryIndexManager.java  |  19 ++-
 .../index/internal/CassandraIndex.java          | 111 +++++++++++++----
 .../apache/cassandra/schema/IndexMetadata.java  | 119 ++++++++++---------
 .../org/apache/cassandra/schema/Indexes.java    |  34 ------
 .../cassandra/schema/LegacySchemaMigrator.java  |  31 ++---
 .../apache/cassandra/schema/SchemaKeyspace.java |  46 +------
 .../cassandra/thrift/ThriftConversion.java      |  65 +++++++---
 .../unit/org/apache/cassandra/SchemaLoader.java |  17 ++-
 .../entities/FrozenCollectionsTest.java         |   6 +-
 .../validation/entities/SecondaryIndexTest.java |  67 ++++++++++-
 .../org/apache/cassandra/db/CleanupTest.java    |   4 +-
 .../apache/cassandra/db/DirectoriesTest.java    |   6 +-
 .../apache/cassandra/db/RangeTombstoneTest.java |  16 ++-
 .../apache/cassandra/db/SecondaryIndexTest.java |  41 +++----
 .../org/apache/cassandra/index/StubIndex.java   |   6 +-
 .../index/internal/CustomCassandraIndex.java    |  93 ++-------------
 .../org/apache/cassandra/schema/DefsTest.java   |  38 ++----
 .../schema/LegacySchemaMigratorTest.java        |  12 +-
 29 files changed, 494 insertions(+), 434 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/lib/cassandra-driver-core-3.0.0-alpha3-8bd064d-SNAPSHOT-shaded.jar
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-core-3.0.0-alpha3-8bd064d-SNAPSHOT-shaded.jar b/lib/cassandra-driver-core-3.0.0-alpha3-8bd064d-SNAPSHOT-shaded.jar
deleted file mode 100644
index fc5d2f0..0000000
Binary files a/lib/cassandra-driver-core-3.0.0-alpha3-8bd064d-SNAPSHOT-shaded.jar and /dev/null differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/lib/cassandra-driver-core-3.0.0-alpha3-d56dd0b-SNAPSHOT-shaded.jar
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-core-3.0.0-alpha3-d56dd0b-SNAPSHOT-shaded.jar b/lib/cassandra-driver-core-3.0.0-alpha3-d56dd0b-SNAPSHOT-shaded.jar
new file mode 100644
index 0000000..79d5a8a
Binary files /dev/null and b/lib/cassandra-driver-core-3.0.0-alpha3-d56dd0b-SNAPSHOT-shaded.jar differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/lib/cassandra-driver-internal-only-3.0.0a2.post0-96883eb.zip
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-internal-only-3.0.0a2.post0-96883eb.zip b/lib/cassandra-driver-internal-only-3.0.0a2.post0-96883eb.zip
deleted file mode 100644
index e55b4c3..0000000
Binary files a/lib/cassandra-driver-internal-only-3.0.0a2.post0-96883eb.zip and /dev/null differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/lib/cassandra-driver-internal-only-3.0.0a2.post0-fecbd54.zip
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-internal-only-3.0.0a2.post0-fecbd54.zip b/lib/cassandra-driver-internal-only-3.0.0a2.post0-fecbd54.zip
new file mode 100644
index 0000000..fe9e0de
Binary files /dev/null and b/lib/cassandra-driver-internal-only-3.0.0a2.post0-fecbd54.zip differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index cf1dbbf..929a34a 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -957,9 +957,18 @@ public final class CFMetaData
         {
             throw new InvalidRequestException(String.format("Cannot rename non PRIMARY KEY part %s", from));
         }
-        else if (getIndexes().hasIndexFor(def))
+
+        if (!getIndexes().isEmpty())
         {
-            throw new InvalidRequestException(String.format("Cannot rename column %s because it is secondary indexed", from));
+            ColumnFamilyStore store = Keyspace.openAndGetStore(this);
+            Set<IndexMetadata> dependentIndexes = store.indexManager.getDependentIndexes(def);
+            if (!dependentIndexes.isEmpty())
+                throw new InvalidRequestException(String.format("Cannot rename column %s because it has " +
+                                                                "dependent secondary indexes (%s)",
+                                                                from,
+                                                                dependentIndexes.stream()
+                                                                                .map(i -> i.name)
+                                                                                .collect(Collectors.joining(","))));
         }
 
         ColumnDefinition newDef = def.withNewName(to);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/src/java/org/apache/cassandra/cql3/Cql.g
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Cql.g b/src/java/org/apache/cassandra/cql3/Cql.g
index f6d54f5..b552165 100644
--- a/src/java/org/apache/cassandra/cql3/Cql.g
+++ b/src/java/org/apache/cassandra/cql3/Cql.g
@@ -726,7 +726,8 @@ createIndexStatement returns [CreateIndexStatement expr]
     ;
 
 indexIdent returns [IndexTarget.Raw id]
-    : c=cident                   { $id = IndexTarget.Raw.valuesOf(c); }
+    : c=cident                   { $id = IndexTarget.Raw.simpleIndexOn(c); }
+    | K_VALUES '(' c=cident ')'  { $id = IndexTarget.Raw.valuesOf(c); }
     | K_KEYS '(' c=cident ')'    { $id = IndexTarget.Raw.keysOf(c); }
     | K_ENTRIES '(' c=cident ')' { $id = IndexTarget.Raw.keysAndValuesOf(c); }
     | K_FULL '(' c=cident ')'    { $id = IndexTarget.Raw.fullCollection(c); }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
index af9a75c..0d2011b 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.cql3.statements;
 
 import java.util.*;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.Iterables;
 
@@ -26,6 +27,8 @@ import org.apache.cassandra.config.*;
 import org.apache.cassandra.cql3.CFName;
 import org.apache.cassandra.cql3.CQL3Type;
 import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.db.marshal.CounterColumnType;
@@ -268,17 +271,21 @@ public class AlterTableStatement extends SchemaAlteringStatement
                         break;
                 }
 
-                // If the dropped column is the only target column of a secondary
-                // index (and it's only possible to create an index with TargetType.COLUMN
-                // and a single target right now) we need to also drop the index.
+                // If the dropped column is required by any secondary indexes
+                // we reject the operation, as the indexes must be dropped first
                 Indexes allIndexes = cfm.getIndexes();
-                Collection<IndexMetadata> indexes = allIndexes.get(def);
-                for (IndexMetadata index : indexes)
+                if (!allIndexes.isEmpty())
                 {
-                    assert index.columns.size() == 1 : String.format("Can't drop column %s as it's a target of multi-column index %s", def.name, index.name);
-                    allIndexes = allIndexes.without(index.name);
+                    ColumnFamilyStore store = Keyspace.openAndGetStore(cfm);
+                    Set<IndexMetadata> dependentIndexes = store.indexManager.getDependentIndexes(def);
+                    if (!dependentIndexes.isEmpty())
+                        throw new InvalidRequestException(String.format("Cannot drop column %s because it has " +
+                                                                        "dependent secondary indexes (%s)",
+                                                                        def,
+                                                                        dependentIndexes.stream()
+                                                                                        .map(i -> i.name)
+                                                                                        .collect(Collectors.joining(","))));
                 }
-                cfm.indexes(allIndexes);
 
                 // If a column is dropped which is included in a view, we don't allow the drop to take place.
                 boolean rejectAlter = false;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
index 19d89b0..6cc416d 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
@@ -22,7 +22,6 @@ import java.util.Map;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -81,9 +80,6 @@ public class CreateIndexStatement extends SchemaAlteringStatement
         IndexTarget target = rawTarget.prepare(cfm);
         ColumnDefinition cd = cfm.getColumnDefinition(target.column);
 
-        if (cd == null)
-            throw new InvalidRequestException("No column definition found for column " + target.column);
-
         boolean isMap = cd.type instanceof MapType;
         boolean isFrozenCollection = cd.type.isCollection() && !cd.type.isMultiCell();
 
@@ -94,7 +90,7 @@ public class CreateIndexStatement extends SchemaAlteringStatement
         else
         {
             validateNotFullIndex(target);
-            validateIsValuesIndexIfTargetColumnNotCollection(cd, target);
+            validateIsSimpleIndexIfTargetColumnNotCollection(cd, target);
             validateTargetColumnIsMapIfIndexInvolvesKeys(isMap, target);
         }
 
@@ -136,7 +132,9 @@ public class CreateIndexStatement extends SchemaAlteringStatement
     private void validateForFrozenCollection(IndexTarget target) throws InvalidRequestException
     {
         if (target.type != IndexTarget.Type.FULL)
-            throw new InvalidRequestException(String.format("Cannot create index on %s of frozen<map> column %s", target.type, target.column));
+            throw new InvalidRequestException(String.format("Cannot create %s() index on frozen column %s. " +
+                                                            "Frozen collections only support full() indexes",
+                                                            target.type, target.column));
     }
 
     private void validateNotFullIndex(IndexTarget target) throws InvalidRequestException
@@ -145,11 +143,12 @@ public class CreateIndexStatement extends SchemaAlteringStatement
             throw new InvalidRequestException("full() indexes can only be created on frozen collections");
     }
 
-    private void validateIsValuesIndexIfTargetColumnNotCollection(ColumnDefinition cd, IndexTarget target) throws InvalidRequestException
+    private void validateIsSimpleIndexIfTargetColumnNotCollection(ColumnDefinition cd, IndexTarget target) throws InvalidRequestException
     {
-        if (!cd.type.isCollection() && target.type != IndexTarget.Type.VALUES)
-            throw new InvalidRequestException(String.format("Cannot create index on %s of column %s; only non-frozen collections support %s indexes",
-                                                            target.type, target.column, target.type));
+        if (!cd.type.isCollection() && target.type != IndexTarget.Type.SIMPLE)
+            throw new InvalidRequestException(String.format("Cannot create %s() index on %s. " +
+                                                            "Non-collection columns support only simple indexes",
+                                                            target.type.toString(), target.column));
     }
 
     private void validateTargetColumnIsMapIfIndexInvolvesKeys(boolean isMap, IndexTarget target) throws InvalidRequestException
@@ -180,31 +179,20 @@ public class CreateIndexStatement extends SchemaAlteringStatement
                 throw new InvalidRequestException(String.format("Index %s already exists", acceptedName));
         }
 
-        IndexMetadata.IndexType indexType;
+        IndexMetadata.Kind kind;
         Map<String, String> indexOptions;
         if (properties.isCustom)
         {
-            indexType = IndexMetadata.IndexType.CUSTOM;
+            kind = IndexMetadata.Kind.CUSTOM;
             indexOptions = properties.getOptions();
         }
-        else if (cfm.isCompound())
-        {
-            Map<String, String> options = Collections.emptyMap();
-            // For now, we only allow indexing values for collections, but we could later allow
-            // to also index map keys, so we record that this is the values we index to make our
-            // lives easier then.
-            if (cd.type.isCollection() && cd.type.isMultiCell())
-                options = ImmutableMap.of(target.type.indexOption(), "");
-            indexType = IndexMetadata.IndexType.COMPOSITES;
-            indexOptions = options;
-        }
         else
         {
-            indexType = IndexMetadata.IndexType.KEYS;
             indexOptions = Collections.emptyMap();
+            kind = cfm.isCompound() ? IndexMetadata.Kind.COMPOSITES : IndexMetadata.Kind.KEYS;
         }
 
-        IndexMetadata index = IndexMetadata.singleColumnIndex(cd, acceptedName, indexType, indexOptions);
+        IndexMetadata index = IndexMetadata.singleTargetIndex(cfm, target, acceptedName, kind, indexOptions);
 
         // check to disallow creation of an index which duplicates an existing one in all but name
         Optional<IndexMetadata> existingIndex = Iterables.tryFind(cfm.getIndexes(), existing -> existing.equalsWithoutName(index));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/src/java/org/apache/cassandra/cql3/statements/IndexPropDefs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/IndexPropDefs.java b/src/java/org/apache/cassandra/cql3/statements/IndexPropDefs.java
index e067217..b8ce7ec 100644
--- a/src/java/org/apache/cassandra/cql3/statements/IndexPropDefs.java
+++ b/src/java/org/apache/cassandra/cql3/statements/IndexPropDefs.java
@@ -54,6 +54,11 @@ public class IndexPropDefs extends PropertyDefinitions
         if (getRawOptions().containsKey(IndexTarget.CUSTOM_INDEX_OPTION_NAME))
             throw new InvalidRequestException(String.format("Cannot specify %s as a CUSTOM option",
                                                             IndexTarget.CUSTOM_INDEX_OPTION_NAME));
+
+        if (getRawOptions().containsKey(IndexTarget.TARGET_OPTION_NAME))
+            throw new InvalidRequestException(String.format("Cannot specify %s as a CUSTOM option",
+                                                            IndexTarget.TARGET_OPTION_NAME));
+
     }
 
     public Map<String, String> getRawOptions() throws SyntaxException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/src/java/org/apache/cassandra/cql3/statements/IndexTarget.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/IndexTarget.java b/src/java/org/apache/cassandra/cql3/statements/IndexTarget.java
index f948d8b..6210a86 100644
--- a/src/java/org/apache/cassandra/cql3/statements/IndexTarget.java
+++ b/src/java/org/apache/cassandra/cql3/statements/IndexTarget.java
@@ -17,15 +17,16 @@
  */
 package org.apache.cassandra.cql3.statements;
 
-import java.util.Map;
+import java.util.regex.Pattern;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.exceptions.InvalidRequestException;
 
 public class IndexTarget
 {
+    public static final String TARGET_OPTION_NAME = "target";
     public static final String CUSTOM_INDEX_OPTION_NAME = "class_name";
 
     /**
@@ -34,22 +35,43 @@ public class IndexTarget
     public static final String INDEX_KEYS_OPTION_NAME = "index_keys";
 
     /**
-     * The name of the option used to specify that the index is on the collection values.
+     * The name of the option used to specify that the index is on the collection (map) entries.
      */
-    public static final String INDEX_VALUES_OPTION_NAME = "index_values";
+    public static final String INDEX_ENTRIES_OPTION_NAME = "index_keys_and_values";
 
     /**
-     * The name of the option used to specify that the index is on the collection (map) entries.
+     * Regex for *unquoted* column names, anything which does not match this pattern must be a quoted name
      */
-    public static final String INDEX_ENTRIES_OPTION_NAME = "index_keys_and_values";
+    private static final Pattern COLUMN_IDENTIFIER_PATTERN = Pattern.compile("[a-z_0-9]+");
 
     public final ColumnIdentifier column;
+    public final boolean quoteName;
     public final Type type;
 
-    private IndexTarget(ColumnIdentifier column, Type type)
+    public IndexTarget(ColumnIdentifier column, Type type)
     {
         this.column = column;
         this.type = type;
+
+        // if the column name contains anything other than lower case alphanumerics
+        // or underscores, then it must be quoted when included in the target string
+        quoteName = !COLUMN_IDENTIFIER_PATTERN.matcher(column.toString()).matches();
+    }
+
+    public String asCqlString(CFMetaData cfm)
+    {
+        if (! cfm.getColumnDefinition(column).type.isCollection())
+            return maybeEscapeQuotedName(column.toString());
+
+        return String.format("%s(%s)", type.toString(), maybeEscapeQuotedName(column.toString()));
+    }
+
+    // Quoted column names may themselves contain quotes, these need
+    // to be escaped with a preceding quote when written out as cql.
+    // Of course, the escaped name also needs to be wrapped in quotes.
+    private String maybeEscapeQuotedName(String name)
+    {
+        return quoteName ? '\"' + name.replace("\"", "\"\"") + '\"' : name;
     }
 
     public static class Raw
@@ -63,6 +85,11 @@ public class IndexTarget
             this.type = type;
         }
 
+        public static Raw simpleIndexOn(ColumnIdentifier.Raw c)
+        {
+            return new Raw(c, Type.SIMPLE);
+        }
+
         public static Raw valuesOf(ColumnIdentifier.Raw c)
         {
             return new Raw(c, Type.VALUES);
@@ -85,13 +112,24 @@ public class IndexTarget
 
         public IndexTarget prepare(CFMetaData cfm)
         {
-            return new IndexTarget(column.prepare(cfm), type);
+            // Until we've prepared the target column, we can't be certain about the target type
+            // because (for backwards compatibility) an index on a collection's values uses the
+            // same syntax as an index on a regular column (i.e. the 'values' in
+            // 'CREATE INDEX on table(values(collection));' is optional). So we correct the target type
+            // when the target column is a collection & the target type is SIMPLE.
+            ColumnIdentifier colId = column.prepare(cfm);
+            ColumnDefinition columnDef = cfm.getColumnDefinition(colId);
+            if (columnDef == null)
+                throw new InvalidRequestException("No column definition found for column " + colId);
+
+            Type actualType = (type == Type.SIMPLE && columnDef.type.isCollection()) ? Type.VALUES : type;
+            return new IndexTarget(colId, actualType);
         }
     }
 
     public static enum Type
     {
-        VALUES, KEYS, KEYS_AND_VALUES, FULL;
+        VALUES, KEYS, KEYS_AND_VALUES, FULL, SIMPLE;
 
         public String toString()
         {
@@ -100,44 +138,26 @@ public class IndexTarget
                 case KEYS: return "keys";
                 case KEYS_AND_VALUES: return "entries";
                 case FULL: return "full";
-                default: return "values";
-            }
-        }
-
-        public String indexOption()
-        {
-            switch (this)
-            {
-                case KEYS: return INDEX_KEYS_OPTION_NAME;
-                case KEYS_AND_VALUES: return INDEX_ENTRIES_OPTION_NAME;
-                case VALUES: return INDEX_VALUES_OPTION_NAME;
-                default: throw new AssertionError();
+                case VALUES: return "values";
+                case SIMPLE: return "";
+                default: return "";
             }
         }
 
-        public static Type fromIndexMetadata(IndexMetadata index, CFMetaData cfm)
+        public static Type fromString(String s)
         {
-            Map<String, String> options = index.options;
-            if (options.containsKey(INDEX_KEYS_OPTION_NAME))
-            {
+            if ("".equals(s))
+                return SIMPLE;
+            else if ("values".equals(s))
+                return VALUES;
+            else if ("keys".equals(s))
                 return KEYS;
-            }
-            else if (options.containsKey(INDEX_ENTRIES_OPTION_NAME))
-            {
+            else if ("entries".equals(s))
                 return KEYS_AND_VALUES;
-            }
-            else
-            {
-                ColumnDefinition cd = index.indexedColumn(cfm);
-                if (cd.type.isCollection() && !cd.type.isMultiCell())
-                {
-                    return FULL;
-                }
-                else
-                {
-                    return VALUES;
-                }
-            }
+            else if ("full".equals(s))
+                return FULL;
+
+            throw new AssertionError("Unrecognized index target type " + s);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/src/java/org/apache/cassandra/index/Index.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/Index.java b/src/java/org/apache/cassandra/index/Index.java
index 5dc44a4..f07baad 100644
--- a/src/java/org/apache/cassandra/index/Index.java
+++ b/src/java/org/apache/cassandra/index/Index.java
@@ -202,6 +202,19 @@ public interface Index
      */
     public boolean indexes(PartitionColumns columns);
 
+    /**
+     * Called to determine whether this index targets a specific column.
+     * Used during schema operations such as when dropping or renaming a column, to check if
+     * the index will be affected by the change. Typically, if an index answers that it does
+     * depend upon a column, then schema operations on that column are not permitted until the index
+     * is dropped or altered.
+     *
+     * @param column the column definition to check
+     * @return true if the index depends on the supplied column being present; false if the column may be
+     *              safely dropped or modified without adversely affecting the index
+     */
+    public boolean dependsOn(ColumnDefinition column);
+
     // TODO : this will change when we decouple indexes from specific columns for real per-row indexes
     /**
      * Called to determine whether this index can provide a searcher to execute a query on the

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index ff4567b..1af2f6e 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -31,12 +31,10 @@ import com.google.common.collect.Maps;
 import com.google.common.primitives.Longs;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.MoreExecutors;
-
 import org.apache.commons.lang3.StringUtils;
-
-import org.apache.cassandra.db.Directories;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.concurrent.StageManager;
@@ -188,6 +186,21 @@ public class SecondaryIndexManager implements IndexRegistry
         }
     }
 
+
+    public Set<IndexMetadata> getDependentIndexes(ColumnDefinition column)
+    {
+        if (indexes.isEmpty())
+            return Collections.emptySet();
+
+        Set<IndexMetadata> dependentIndexes = new HashSet<>();
+        for (Index index : indexes.values())
+            if (index.dependsOn(column))
+                dependentIndexes.add(index.getIndexMetadata());
+
+        return dependentIndexes;
+    }
+
+
     /**
      * Called when dropping a Table
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
index 89c072c..d10af1f 100644
--- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
@@ -5,14 +5,18 @@ import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
 import java.util.function.BiFunction;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.cql3.statements.IndexTarget;
 import org.apache.cassandra.db.*;
@@ -38,6 +42,7 @@ import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.concurrent.Refs;
 
@@ -49,13 +54,15 @@ public abstract class CassandraIndex implements Index
 {
     private static final Logger logger = LoggerFactory.getLogger(CassandraIndex.class);
 
+    public static final Pattern TARGET_REGEX = Pattern.compile("^(keys|entries|values|full)\\((.+)\\)$");
+
     public final ColumnFamilyStore baseCfs;
     protected IndexMetadata metadata;
     protected ColumnFamilyStore indexCfs;
     protected ColumnDefinition indexedColumn;
     protected CassandraIndexFunctions functions;
 
-    public CassandraIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
+    protected CassandraIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
     {
         this.baseCfs = baseCfs;
         setMetadata(indexDef);
@@ -69,7 +76,7 @@ public abstract class CassandraIndex implements Index
      */
     protected boolean supportsOperator(ColumnDefinition indexedColumn, Operator operator)
     {
-        return operator.equals(Operator.EQ);
+        return operator == Operator.EQ;
     }
 
     /**
@@ -77,7 +84,6 @@ public abstract class CassandraIndex implements Index
      * The clustering columns in the index table encode the values required to retrieve the correct data from the base
      * table and varies depending on the kind of the indexed column. See indexCfsMetadata for more details
      * Used whenever a row in the index table is written or deleted.
-     * @param metadata
      * @param partitionKey from the base data being indexed
      * @param prefix from the base data being indexed
      * @param path from the base data being indexed
@@ -90,7 +96,6 @@ public abstract class CassandraIndex implements Index
     /**
      * Used at search time to convert a row in the index table into a simple struct containing the values required
      * to retrieve the corresponding row from the base table.
-     * @param metadata
      * @param indexedValue the partition key of the indexed table (i.e. the value that was indexed)
      * @param indexEntry a row from the index table
      * @return
@@ -102,7 +107,6 @@ public abstract class CassandraIndex implements Index
      * Check whether a value retrieved from an index is still valid by comparing it to current row from the base table.
      * Used at read time to identify out of date index entries so that they can be excluded from search results and
      * repaired
-     * @param metadata required to get the indexed column definition
      * @param row the current row from the primary data table
      * @param indexValue the value we retrieved from the index
      * @param nowInSec
@@ -112,7 +116,6 @@ public abstract class CassandraIndex implements Index
 
     /**
      * Extract the value to be inserted into the index from the components of the base data
-     * @param metadata
      * @param partitionKey from the primary data
      * @param clustering from the primary data
      * @param path from the primary data
@@ -197,14 +200,14 @@ public abstract class CassandraIndex implements Index
     private void setMetadata(IndexMetadata indexDef)
     {
         metadata = indexDef;
-        functions = getFunctions(baseCfs.metadata, indexDef);
+        Pair<ColumnDefinition, IndexTarget.Type> target = parseTarget(baseCfs.metadata, indexDef);
+        functions = getFunctions(indexDef, target);
         CFMetaData cfm = indexCfsMetadata(baseCfs.metadata, indexDef);
         indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.keyspace,
                                                              cfm.cfName,
                                                              cfm,
                                                              baseCfs.getTracker().loadsstables);
-        assert indexDef.columns.size() == 1 : "Build in indexes on multiple target columns are not supported";
-        indexedColumn = indexDef.indexedColumn(baseCfs.metadata);
+        indexedColumn = target.left;
     }
 
     public Callable<?> getTruncateTask(final long truncatedAt)
@@ -227,6 +230,11 @@ public abstract class CassandraIndex implements Index
         return isPrimaryKeyIndex() || columns.contains(indexedColumn);
     }
 
+    public boolean dependsOn(ColumnDefinition column)
+    {
+        return indexedColumn.name.equals(column.name);
+    }
+
     public boolean supportsExpression(ColumnDefinition column, Operator operator)
     {
         return indexedColumn.name.equals(column.name)
@@ -269,7 +277,7 @@ public abstract class CassandraIndex implements Index
         if (target.isPresent())
         {
             target.get().validateForIndexing();
-            switch (getIndexMetadata().indexType)
+            switch (getIndexMetadata().kind)
             {
                 case COMPOSITES:
                     return new CompositesSearcher(command, target.get(), this);
@@ -277,7 +285,7 @@ public abstract class CassandraIndex implements Index
                     return new KeysSearcher(command, target.get(), this);
                 default:
                     throw new IllegalStateException(String.format("Unsupported index type %s for index %s on %s",
-                                                                  metadata.indexType,
+                                                                  metadata.kind,
                                                                   metadata.name,
                                                                   indexedColumn.name.toString()));
             }
@@ -531,7 +539,7 @@ public abstract class CassandraIndex implements Index
     private void validatePartitionKey(DecoratedKey partitionKey) throws InvalidRequestException
     {
         assert indexedColumn.isPartitionKey();
-        validateIndexedValue(getIndexedValue(partitionKey.getKey(), null, null ));
+        validateIndexedValue(getIndexedValue(partitionKey.getKey(), null, null));
     }
 
     private void validateClusterings(PartitionUpdate update) throws InvalidRequestException
@@ -613,7 +621,7 @@ public abstract class CassandraIndex implements Index
         Collection<ColumnFamilyStore> cfss = Collections.singleton(indexCfs);
         CompactionManager.instance.interruptCompactionForCFs(cfss, true);
         CompactionManager.instance.waitForCessation(cfss);
-        indexCfs.keyspace.writeOrder.awaitNewBarrier();
+        Keyspace.writeOrder.awaitNewBarrier();
         indexCfs.forceBlockingFlush();
         indexCfs.readOrdering.awaitNewBarrier();
         indexCfs.invalidate();
@@ -695,8 +703,9 @@ public abstract class CassandraIndex implements Index
      */
     public static final CFMetaData indexCfsMetadata(CFMetaData baseCfsMetadata, IndexMetadata indexMetadata)
     {
-        CassandraIndexFunctions utils = getFunctions(baseCfsMetadata, indexMetadata);
-        ColumnDefinition indexedColumn = indexMetadata.indexedColumn(baseCfsMetadata);
+        Pair<ColumnDefinition, IndexTarget.Type> target = parseTarget(baseCfsMetadata, indexMetadata);
+        CassandraIndexFunctions utils = getFunctions(indexMetadata, target);
+        ColumnDefinition indexedColumn = target.left;
         AbstractType<?> indexedValueType = utils.getIndexedValueType(indexedColumn);
         CFMetaData.Builder builder = CFMetaData.Builder.create(baseCfsMetadata.ksName,
                                                                baseCfsMetadata.indexColumnFamilyName(indexMetadata))
@@ -715,17 +724,65 @@ public abstract class CassandraIndex implements Index
      * @param indexMetadata
      * @return
      */
-    public static final CassandraIndex newIndex(ColumnFamilyStore baseCfs, IndexMetadata indexMetadata)
+    public static CassandraIndex newIndex(ColumnFamilyStore baseCfs, IndexMetadata indexMetadata)
+    {
+        return getFunctions(indexMetadata, parseTarget(baseCfs.metadata, indexMetadata)).newIndexInstance(baseCfs, indexMetadata);
+    }
+
+    // Public because it's also used to convert index metadata into a thrift-compatible format
+    public static Pair<ColumnDefinition, IndexTarget.Type> parseTarget(CFMetaData cfm,
+                                                                       IndexMetadata indexDef)
     {
-        return getFunctions(baseCfs.metadata, indexMetadata).newIndexInstance(baseCfs, indexMetadata);
+        String target = indexDef.options.get("target");
+        assert target != null : String.format("No target definition found for index %s", indexDef.name);
+
+        // if the regex matches then the target is in the form "keys(foo)", "entries(bar)" etc
+        // if not, then it must be a simple column name and implictly its type is VALUES
+        Matcher matcher = TARGET_REGEX.matcher(target);
+        String columnName;
+        IndexTarget.Type targetType;
+        if (matcher.matches())
+        {
+            targetType = IndexTarget.Type.fromString(matcher.group(1));
+            columnName = matcher.group(2);
+        }
+        else
+        {
+            columnName = target;
+            targetType = IndexTarget.Type.VALUES;
+        }
+
+        // in the case of a quoted column name the name in the target string
+        // will be enclosed in quotes, which we need to unwrap. It may also
+        // include quote characters internally, escaped like so:
+        //      abc"def -> abc""def.
+        // Because the target string is stored in a CQL compatible form, we
+        // need to un-escape any such quotes to get the actual column name
+        if (columnName.startsWith("\""))
+        {
+            columnName = StringUtils.substring(StringUtils.substring(columnName, 1), 0, -1);
+            columnName = columnName.replaceAll("\"\"", "\"");
+        }
+
+        // if it's not a CQL table, we can't assume that the column name is utf8, so
+        // in that case we have to do a linear scan of the cfm's columns to get the matching one
+        if (cfm.isCQLTable())
+            return Pair.create(cfm.getColumnDefinition(new ColumnIdentifier(columnName, true)), targetType);
+        else
+            for (ColumnDefinition column : cfm.allColumns())
+                if (column.name.toString().equals(columnName))
+                    return Pair.create(column, targetType);
+
+        throw new RuntimeException(String.format("Unable to parse targets for index %s (%s)", indexDef.name, target));
     }
 
-    private static CassandraIndexFunctions getFunctions(CFMetaData baseCfMetadata, IndexMetadata indexDef)
+    static CassandraIndexFunctions getFunctions(IndexMetadata indexDef,
+                                                Pair<ColumnDefinition, IndexTarget.Type> target)
     {
         if (indexDef.isKeys())
             return CassandraIndexFunctions.KEYS_INDEX_FUNCTIONS;
 
-        ColumnDefinition indexedColumn = indexDef.indexedColumn(baseCfMetadata);
+        ColumnDefinition indexedColumn = target.left;
         if (indexedColumn.type.isCollection() && indexedColumn.type.isMultiCell())
         {
             switch (((CollectionType)indexedColumn.type).kind)
@@ -735,12 +792,16 @@ public abstract class CassandraIndex implements Index
                 case SET:
                     return CassandraIndexFunctions.COLLECTION_KEY_INDEX_FUNCTIONS;
                 case MAP:
-                    if (indexDef.options.containsKey(IndexTarget.INDEX_KEYS_OPTION_NAME))
-                        return CassandraIndexFunctions.COLLECTION_KEY_INDEX_FUNCTIONS;
-                    else if (indexDef.options.containsKey(IndexTarget.INDEX_ENTRIES_OPTION_NAME))
-                        return CassandraIndexFunctions.COLLECTION_ENTRY_INDEX_FUNCTIONS;
-                    else
-                        return CassandraIndexFunctions.COLLECTION_VALUE_INDEX_FUNCTIONS;
+                    switch (target.right)
+                    {
+                        case KEYS:
+                            return CassandraIndexFunctions.COLLECTION_KEY_INDEX_FUNCTIONS;
+                        case KEYS_AND_VALUES:
+                            return CassandraIndexFunctions.COLLECTION_ENTRY_INDEX_FUNCTIONS;
+                        case VALUES:
+                            return CassandraIndexFunctions.COLLECTION_VALUE_INDEX_FUNCTIONS;
+                    }
+                    throw new AssertionError();
             }
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/src/java/org/apache/cassandra/schema/IndexMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/IndexMetadata.java b/src/java/org/apache/cassandra/schema/IndexMetadata.java
index 6846a14..8052e9e 100644
--- a/src/java/org/apache/cassandra/schema/IndexMetadata.java
+++ b/src/java/org/apache/cassandra/schema/IndexMetadata.java
@@ -20,11 +20,13 @@ package org.apache.cassandra.schema;
 
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
-import java.util.*;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.slf4j.Logger;
@@ -50,53 +52,78 @@ public final class IndexMetadata
 
     public static final Serializer serializer = new Serializer();
 
-    public enum IndexType
+    public enum Kind
     {
         KEYS, CUSTOM, COMPOSITES
     }
 
-    public enum TargetType
-    {
-        COLUMN, ROW
-    }
-
     // UUID for serialization. This is a deterministic UUID generated from the index name
     // Both the id and name are guaranteed unique per keyspace.
     public final UUID id;
     public final String name;
-    public final IndexType indexType;
-    public final TargetType targetType;
+    public final Kind kind;
     public final Map<String, String> options;
-    public final Set<ColumnIdentifier> columns;
 
     private IndexMetadata(String name,
                           Map<String, String> options,
-                          IndexType indexType,
-                          TargetType targetType,
-                          Set<ColumnIdentifier> columns)
+                          Kind kind)
     {
         this.id = UUID.nameUUIDFromBytes(name.getBytes());
         this.name = name;
         this.options = options == null ? ImmutableMap.of() : ImmutableMap.copyOf(options);
-        this.indexType = indexType;
-        this.targetType = targetType;
-        this.columns = columns == null ? ImmutableSet.of() : ImmutableSet.copyOf(columns);
+        this.kind = kind;
     }
 
-    public static IndexMetadata singleColumnIndex(ColumnIdentifier column,
-                                                  String name,
-                                                  IndexType type,
-                                                  Map<String, String> options)
+    public static IndexMetadata fromLegacyMetadata(CFMetaData cfm,
+                                                   ColumnDefinition column,
+                                                   String name,
+                                                   Kind kind,
+                                                   Map<String, String> options)
+    {
+        Map<String, String> newOptions = new HashMap<>();
+        if (options != null)
+            newOptions.putAll(options);
+
+        IndexTarget target;
+        if (newOptions.containsKey(IndexTarget.INDEX_KEYS_OPTION_NAME))
+        {
+            newOptions.remove(IndexTarget.INDEX_KEYS_OPTION_NAME);
+            target = new IndexTarget(column.name, IndexTarget.Type.KEYS);
+        }
+        else if (newOptions.containsKey(IndexTarget.INDEX_ENTRIES_OPTION_NAME))
+        {
+            newOptions.remove(IndexTarget.INDEX_KEYS_OPTION_NAME);
+            target = new IndexTarget(column.name, IndexTarget.Type.KEYS_AND_VALUES);
+        }
+        else
+        {
+            if (column.type.isCollection() && !column.type.isMultiCell())
+            {
+                target = new IndexTarget(column.name, IndexTarget.Type.FULL);
+            }
+            else
+            {
+                target = new IndexTarget(column.name, IndexTarget.Type.VALUES);
+            }
+        }
+        newOptions.put(IndexTarget.TARGET_OPTION_NAME, target.asCqlString(cfm));
+        return new IndexMetadata(name, newOptions, kind);
+    }
+
+    public static IndexMetadata fromSchemaMetadata(String name, Kind kind, Map<String, String> options)
     {
-        return new IndexMetadata(name, options, type, TargetType.COLUMN, Collections.singleton(column));
+        return new IndexMetadata(name, options, kind);
     }
 
-    public static IndexMetadata singleColumnIndex(ColumnDefinition column,
+    public static IndexMetadata singleTargetIndex(CFMetaData cfm,
+                                                  IndexTarget target,
                                                   String name,
-                                                  IndexType type,
+                                                  Kind kind,
                                                   Map<String, String> options)
     {
-        return singleColumnIndex(column.name, name, type, options);
+        Map<String, String> newOptions = new HashMap<>(options);
+        newOptions.put(IndexTarget.TARGET_OPTION_NAME, target.asCqlString(cfm));
+        return new IndexMetadata(name, newOptions, kind);
     }
 
     public static boolean isNameValid(String name)
@@ -115,13 +142,10 @@ public final class IndexMetadata
         if (!isNameValid(name))
             throw new ConfigurationException("Illegal index name " + name);
 
-        if (indexType == null)
-            throw new ConfigurationException("Index type is null for index " + name);
+        if (kind == null)
+            throw new ConfigurationException("Index kind is null for index " + name);
 
-        if (targetType == null)
-            throw new ConfigurationException("Target type is null for index " + name);
-
-        if (indexType == IndexMetadata.IndexType.CUSTOM)
+        if (kind == Kind.CUSTOM)
         {
             if (options == null || !options.containsKey(IndexTarget.CUSTOM_INDEX_OPTION_NAME))
                 throw new ConfigurationException(String.format("Required option missing for index %s : %s",
@@ -169,47 +193,29 @@ public final class IndexMetadata
         }
     }
 
-    // to be removed in CASSANDRA-10124 with multi-target & row based indexes
-    public ColumnDefinition indexedColumn(CFMetaData cfm)
-    {
-       return cfm.getColumnDefinition(columns.iterator().next());
-    }
-
     public boolean isCustom()
     {
-        return indexType == IndexType.CUSTOM;
+        return kind == Kind.CUSTOM;
     }
 
     public boolean isKeys()
     {
-        return indexType == IndexType.KEYS;
+        return kind == Kind.KEYS;
     }
 
     public boolean isComposites()
     {
-        return indexType == IndexType.COMPOSITES;
-    }
-
-    public boolean isRowIndex()
-    {
-        return targetType == TargetType.ROW;
-    }
-
-    public boolean isColumnIndex()
-    {
-        return targetType == TargetType.COLUMN;
+        return kind == Kind.COMPOSITES;
     }
 
     public int hashCode()
     {
-        return Objects.hashCode(id, name, indexType, targetType, options, columns);
+        return Objects.hashCode(id, name, kind, options);
     }
 
     public boolean equalsWithoutName(IndexMetadata other)
     {
-        return Objects.equal(indexType, other.indexType)
-            && Objects.equal(targetType, other.targetType)
-            && Objects.equal(columns, other.columns)
+        return Objects.equal(kind, other.kind)
             && Objects.equal(options, other.options);
     }
 
@@ -231,9 +237,7 @@ public final class IndexMetadata
         return new ToStringBuilder(this)
             .append("id", id.toString())
             .append("name", name)
-            .append("indexType", indexType)
-            .append("targetType", targetType)
-            .append("columns", columns)
+            .append("kind", kind)
             .append("options", options)
             .build();
     }
@@ -255,6 +259,5 @@ public final class IndexMetadata
         {
             return UUIDSerializer.serializer.serializedSize(metadata.id, version);
         }
-
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/src/java/org/apache/cassandra/schema/Indexes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Indexes.java b/src/java/org/apache/cassandra/schema/Indexes.java
index 9114f63..422a94c 100644
--- a/src/java/org/apache/cassandra/schema/Indexes.java
+++ b/src/java/org/apache/cassandra/schema/Indexes.java
@@ -21,9 +21,7 @@ package org.apache.cassandra.schema;
 import java.util.*;
 
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableMultimap;
 
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 
@@ -42,13 +40,11 @@ public class Indexes implements Iterable<IndexMetadata>
 {
     private final ImmutableMap<String, IndexMetadata> indexesByName;
     private final ImmutableMap<UUID, IndexMetadata> indexesById;
-    private final ImmutableMultimap<ColumnIdentifier, IndexMetadata> indexesByColumn;
 
     private Indexes(Builder builder)
     {
         indexesByName = builder.indexesByName.build();
         indexesById = builder.indexesById.build();
-        indexesByColumn = builder.indexesByColumn.build();
     }
 
     public static Builder builder()
@@ -121,28 +117,6 @@ public class Indexes implements Iterable<IndexMetadata>
     }
 
     /**
-     * Get the index associated with the specified column. This may be removed or modified as support is added
-     * for indexes with multiple target columns and with TargetType.ROW
-     *
-     * @param column a column definition for which an {@link IndexMetadata} is being sought
-     * @return an empty {@link Optional} if the named index is not found; a non-empty optional of {@link IndexMetadata} otherwise
-     */
-    public Collection<IndexMetadata> get(ColumnDefinition column)
-    {
-        return indexesByColumn.get(column.name);
-    }
-
-    /**
-     * Answer true if an index is associated with the specified column.
-     * @param column
-     * @return
-     */
-    public boolean hasIndexFor(ColumnDefinition column)
-    {
-        return !indexesByColumn.get(column.name).isEmpty();
-    }
-
-    /**
      * Create a SecondaryIndexes instance with the provided index added
      */
     public Indexes with(IndexMetadata index)
@@ -206,7 +180,6 @@ public class Indexes implements Iterable<IndexMetadata>
     {
         final ImmutableMap.Builder<String, IndexMetadata> indexesByName = new ImmutableMap.Builder<>();
         final ImmutableMap.Builder<UUID, IndexMetadata> indexesById = new ImmutableMap.Builder<>();
-        final ImmutableMultimap.Builder<ColumnIdentifier, IndexMetadata> indexesByColumn = new ImmutableMultimap.Builder<>();
 
         private Builder()
         {
@@ -221,13 +194,6 @@ public class Indexes implements Iterable<IndexMetadata>
         {
             indexesByName.put(index.name, index);
             indexesById.put(index.id, index);
-            // All indexes are column indexes at the moment
-            if (index.isColumnIndex())
-            {
-                for (ColumnIdentifier target : index.columns)
-                    indexesByColumn.put(target, index);
-
-            }
             return this;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
index 1674de8..0d5a040 100644
--- a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
+++ b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
@@ -296,15 +296,6 @@ public final class LegacySchemaMigrator
                                                                         isStaticCompactTable,
                                                                         needsUpgrade);
 
-        Indexes indexes = createIndexesFromColumnRows(columnRows,
-                                                      ksName,
-                                                      cfName,
-                                                      rawComparator,
-                                                      subComparator,
-                                                      isSuper,
-                                                      isCQLTable,
-                                                      isStaticCompactTable,
-                                                      needsUpgrade);
 
         if (needsUpgrade)
         {
@@ -328,6 +319,17 @@ public final class LegacySchemaMigrator
                                            false, // legacy schema did not contain views
                                            columnDefs,
                                            DatabaseDescriptor.getPartitioner());
+
+        Indexes indexes = createIndexesFromColumnRows(cfm,
+                                                      columnRows,
+                                                      ksName,
+                                                      cfName,
+                                                      rawComparator,
+                                                      subComparator,
+                                                      isSuper,
+                                                      isCQLTable,
+                                                      isStaticCompactTable,
+                                                      needsUpgrade);
         cfm.indexes(indexes);
 
         if (tableRow.has("dropped_columns"))
@@ -591,7 +593,8 @@ public final class LegacySchemaMigrator
         return new ColumnDefinition(keyspace, table, name, validator, componentIndex, kind);
     }
 
-    private static Indexes createIndexesFromColumnRows(UntypedResultSet rows,
+    private static Indexes createIndexesFromColumnRows(CFMetaData cfm,
+                                                       UntypedResultSet rows,
                                                        String keyspace,
                                                        String table,
                                                        AbstractType<?> rawComparator,
@@ -605,11 +608,11 @@ public final class LegacySchemaMigrator
 
         for (UntypedResultSet.Row row : rows)
         {
-            IndexMetadata.IndexType indexType = null;
+            IndexMetadata.Kind kind = null;
             if (row.has("index_type"))
-                indexType = IndexMetadata.IndexType.valueOf(row.getString("index_type"));
+                kind = IndexMetadata.Kind.valueOf(row.getString("index_type"));
 
-            if (indexType == null)
+            if (kind == null)
                 continue;
 
             Map<String, String> indexOptions = null;
@@ -630,7 +633,7 @@ public final class LegacySchemaMigrator
                                                                 isStaticCompactTable,
                                                                 needsUpgrade);
 
-            indexes.add(IndexMetadata.singleColumnIndex(column, indexName, indexType, indexOptions));
+            indexes.add(IndexMetadata.fromLegacyMetadata(cfm, column, indexName, kind, indexOptions));
         }
 
         return indexes.build();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index c922612..d8992bd 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -177,10 +177,8 @@ public final class SchemaKeyspace
                 + "keyspace_name text,"
                 + "table_name text,"
                 + "index_name text,"
-                + "index_type text,"
+                + "kind text,"
                 + "options frozen<map<text, text>>,"
-                + "target_columns frozen<set<text>>,"
-                + "target_type text,"
                 + "PRIMARY KEY ((keyspace_name), table_name, index_name))");
 
     private static final CFMetaData Types =
@@ -1449,10 +1447,8 @@ public final class SchemaKeyspace
     {
         RowUpdateBuilder builder = new RowUpdateBuilder(Indexes, timestamp, mutation).clustering(table.cfName, index.name);
 
-        builder.add("index_type", index.indexType.toString());
+        builder.add("kind", index.kind.toString());
         builder.frozenMap("options", index.options);
-        builder.frozenSet("target_columns", index.columns.stream().map(ColumnIdentifier::toString).collect(Collectors.toSet()));
-        builder.add("target_type", index.targetType.toString());
         builder.build();
     }
 
@@ -1481,46 +1477,16 @@ public final class SchemaKeyspace
     {
         Indexes.Builder indexes = org.apache.cassandra.schema.Indexes.builder();
         String query = String.format("SELECT * FROM %s.%s", NAME, INDEXES);
-        QueryProcessor.resultify(query, partition).forEach(row -> indexes.add(createIndexMetadataFromIndexesRow(cfm, row)));
+        QueryProcessor.resultify(query, partition).forEach(row -> indexes.add(createIndexMetadataFromIndexesRow(row)));
         return indexes.build();
     }
 
-    private static IndexMetadata createIndexMetadataFromIndexesRow(CFMetaData cfm, UntypedResultSet.Row row)
+    private static IndexMetadata createIndexMetadataFromIndexesRow(UntypedResultSet.Row row)
     {
         String name = row.getString("index_name");
-        IndexMetadata.IndexType type = IndexMetadata.IndexType.valueOf(row.getString("index_type"));
-        IndexMetadata.TargetType targetType = IndexMetadata.TargetType.valueOf(row.getString("target_type"));
+        IndexMetadata.Kind type = IndexMetadata.Kind.valueOf(row.getString("kind"));
         Map<String, String> options = row.getFrozenTextMap("options");
-        if (options == null)
-            options = Collections.emptyMap();
-
-        Set<String> targetColumnNames = row.getFrozenSet("target_columns", UTF8Type.instance);
-        assert targetType == IndexMetadata.TargetType.COLUMN : "Per row indexes with dynamic target columns are not supported yet";
-
-        Set<ColumnIdentifier> targetColumns = new HashSet<>();
-        // if it's not a CQL table, we can't assume that the column name is utf8, so
-        // in that case we have to do a linear scan of the cfm's columns to get the matching one
-        if (targetColumnNames != null)
-        {
-            assert targetColumnNames.size() == 1 : "Secondary indexes targetting multiple columns are not supported yet";
-            targetColumnNames.forEach(targetColumnName -> {
-                if (cfm.isCQLTable())
-                    targetColumns.add(ColumnIdentifier.getInterned(targetColumnName, true));
-                else
-                    findColumnIdentifierWithName(targetColumnName, cfm.allColumns()).ifPresent(targetColumns::add);
-            });
-        }
-        return IndexMetadata.singleColumnIndex(targetColumns.iterator().next(), name, type, options);
-    }
-
-    private static Optional<ColumnIdentifier> findColumnIdentifierWithName(String name,
-                                                                           Iterable<ColumnDefinition> columns)
-    {
-        for (ColumnDefinition column : columns)
-            if (column.name.toString().equals(name))
-                return Optional.of(column.name);
-
-        return Optional.empty();
+        return IndexMetadata.fromSchemaMetadata(name, type, options);
     }
 
     /*

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftConversion.java b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
index b721226..80b6447 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftConversion.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.thrift;
 
 import java.util.*;
+import java.util.regex.Matcher;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
@@ -26,6 +27,7 @@ import com.google.common.collect.Maps;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.cql3.statements.IndexTarget;
 import org.apache.cassandra.db.CompactTables;
 import org.apache.cassandra.db.LegacyLayout;
 import org.apache.cassandra.db.WriteType;
@@ -33,6 +35,7 @@ import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
 import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.index.internal.CassandraIndex;
 import org.apache.cassandra.io.compress.ICompressor;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.LocalStrategy;
@@ -287,7 +290,8 @@ public class ThriftConversion
                                                    DatabaseDescriptor.getPartitioner());
 
             // Convert any secondary indexes defined in the thrift column_metadata
-            newCFMD.indexes(indexDefsFromThrift(cf_def.keyspace,
+            newCFMD.indexes(indexDefsFromThrift(newCFMD,
+                                                cf_def.keyspace,
                                                 cf_def.name,
                                                 rawComparator,
                                                 subComparator,
@@ -526,7 +530,8 @@ public class ThriftConversion
         return defs;
     }
 
-    private static Indexes indexDefsFromThrift(String ksName,
+    private static Indexes indexDefsFromThrift(CFMetaData cfm,
+                                               String ksName,
                                                String cfName,
                                                AbstractType<?> thriftComparator,
                                                AbstractType<?> thriftSubComparator,
@@ -554,12 +559,12 @@ public class ThriftConversion
                 indexNames.add(indexName);
 
                 Map<String, String> indexOptions = def.getIndex_options();
-                IndexMetadata.IndexType indexType = IndexMetadata.IndexType.valueOf(def.index_type.name());
+                if (indexOptions != null && indexOptions.containsKey(IndexTarget.TARGET_OPTION_NAME))
+                        throw new ConfigurationException("Reserved index option 'target' cannot be used");
 
-                indexes.add(IndexMetadata.singleColumnIndex(column,
-                                                            indexName,
-                                                            indexType,
-                                                            indexOptions));
+                IndexMetadata.Kind kind = IndexMetadata.Kind.valueOf(def.index_type.name());
+
+                indexes.add(IndexMetadata.fromLegacyMetadata(cfm, column, indexName, kind, indexOptions));
             }
         }
         return indexes.build();
@@ -572,22 +577,44 @@ public class ThriftConversion
 
         cd.setName(ByteBufferUtil.clone(column.name.bytes));
         cd.setValidation_class(column.type.toString());
-        Collection<IndexMetadata> indexes = cfMetaData.getIndexes().get(column);
-        // we include the index in the ColumnDef iff
-        //   * it is the only index on the column
-        //   * it is the only target column for the index
-        if (indexes.size() == 1)
+
+        // we include the index in the ColumnDef iff its targets are compatible with
+        // pre-3.0 indexes AND it is the only index defined on the given column, that is:
+        //   * it is the only index on the column (i.e. with this column as its target)
+        //   * it has only a single target, which matches the pattern for pre-3.0 indexes
+        //     i.e. keys/values/entries/full, with exactly 1 argument that matches the
+        //     column name OR a simple column name (for indexes on non-collection columns)
+        // n.b. it's a guess that using a pre-compiled regex and checking the group is
+        // cheaper than compiling a new regex for each column, but as this isn't on
+        // any hot path this hasn't been verified yet.
+        IndexMetadata matchedIndex = null;
+        for (IndexMetadata index : cfMetaData.getIndexes())
         {
-            IndexMetadata index = indexes.iterator().next();
-            if (index.columns.size() == 1)
+            String target = index.options.get(IndexTarget.TARGET_OPTION_NAME);
+            Matcher m = CassandraIndex.TARGET_REGEX.matcher(target);
+            if (target.equals(column.name.toString()) ||
+                (m.matches() && m.group(2).equals(column.name.toString())))
             {
-                cd.setIndex_type(org.apache.cassandra.thrift.IndexType.valueOf(index.indexType.name()));
-                cd.setIndex_name(index.name);
-                cd.setIndex_options(index.options == null || index.options.isEmpty()
-                                    ? null
-                                    : Maps.newHashMap(index.options));
+                // we already found an index for this column, we've no option but to
+                // ignore both of them (and any others we've yet to find)
+                if (matchedIndex != null)
+                    return cd;
+
+                matchedIndex = index;
             }
         }
+
+        if (matchedIndex != null)
+        {
+            cd.setIndex_type(org.apache.cassandra.thrift.IndexType.valueOf(matchedIndex.kind.name()));
+            cd.setIndex_name(matchedIndex.name);
+            Map<String, String> filteredOptions = Maps.filterKeys(matchedIndex.options,
+                                                                  s -> !IndexTarget.TARGET_OPTION_NAME.equals(s));
+            cd.setIndex_options(filteredOptions.isEmpty()
+                                ? null
+                                : Maps.newHashMap(matchedIndex.options));
+        }
+
         return cd;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java
index 72bfd00..c4b99c6 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -299,9 +299,10 @@ public class SchemaLoader
 
         cfm.indexes(
             cfm.getIndexes()
-               .with(IndexMetadata.singleColumnIndex(indexedColumn,
+               .with(IndexMetadata.singleTargetIndex(cfm,
+                                                     new IndexTarget(indexedColumn.name, IndexTarget.Type.VALUES),
                                                      "indexe1",
-                                                     IndexMetadata.IndexType.CUSTOM,
+                                                     IndexMetadata.Kind.CUSTOM,
                                                      indexOptions)));
         return cfm;
     }
@@ -410,9 +411,11 @@ public class SchemaLoader
         if (withIndex)
             cfm.indexes(
                 cfm.getIndexes()
-                   .with(IndexMetadata.singleColumnIndex(cfm.getColumnDefinition(new ColumnIdentifier("birthdate", true)),
+                   .with(IndexMetadata.singleTargetIndex(cfm,
+                                                         new IndexTarget(new ColumnIdentifier("birthdate", true),
+                                                                         IndexTarget.Type.VALUES),
                                                          "birthdate_key_index",
-                                                         IndexMetadata.IndexType.COMPOSITES,
+                                                         IndexMetadata.Kind.COMPOSITES,
                                                          Collections.EMPTY_MAP)));
 
         return cfm.compression(getCompressionParameters());
@@ -430,9 +433,11 @@ public class SchemaLoader
         if (withIndex)
             cfm.indexes(
                 cfm.getIndexes()
-                   .with(IndexMetadata.singleColumnIndex(cfm.getColumnDefinition(new ColumnIdentifier("birthdate", true)),
+                   .with(IndexMetadata.singleTargetIndex(cfm,
+                                                         new IndexTarget(new ColumnIdentifier("birthdate", true),
+                                                                         IndexTarget.Type.VALUES),
                                                          "birthdate_composite_index",
-                                                         IndexMetadata.IndexType.KEYS,
+                                                         IndexMetadata.Kind.KEYS,
                                                          Collections.EMPTY_MAP)));
 
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/test/unit/org/apache/cassandra/cql3/validation/entities/FrozenCollectionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/FrozenCollectionsTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/FrozenCollectionsTest.java
index e07e421..70f7f19 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/FrozenCollectionsTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/FrozenCollectionsTest.java
@@ -601,8 +601,10 @@ public class FrozenCollectionsTest extends CQLTester
 
         // for now, we don't support indexing values or keys of collections in the primary key
         assertInvalidIndexCreationWithMessage("CREATE INDEX ON %s (full(a))", "Cannot create secondary index on partition key column");
-        assertInvalidIndexCreationWithMessage("CREATE INDEX ON %s (keys(a))", "Cannot create index on keys of frozen<map> column");
-        assertInvalidIndexCreationWithMessage("CREATE INDEX ON %s (keys(b))", "Cannot create index on keys of frozen<map> column");
+        assertInvalidIndexCreationWithMessage("CREATE INDEX ON %s (keys(a))", "Cannot create keys() index on frozen column a. " +
+                                                                              "Frozen collections only support full() indexes");
+        assertInvalidIndexCreationWithMessage("CREATE INDEX ON %s (keys(b))", "Cannot create keys() index on frozen column b. " +
+                                                                              "Frozen collections only support full() indexes");
 
         createTable("CREATE TABLE %s (a int, b frozen<list<int>>, c frozen<set<int>>, d frozen<map<int, text>>, PRIMARY KEY (a, b))");
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
index 02b2abd..48d3a85 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
@@ -27,15 +27,19 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.statements.IndexTarget;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
+import org.apache.cassandra.index.SecondaryIndexManager;
 import org.apache.cassandra.index.StubIndex;
+import org.apache.cassandra.schema.IndexMetadata;
 
 import static org.apache.cassandra.Util.throwAssert;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -209,7 +213,7 @@ public class SecondaryIndexTest extends CQLTester
         assertInvalidThrow(SyntaxException.class, String.format("CREATE TABLE %s (key varchar PRIMARY KEY, password varchar, gender varchar) WITH compression_parameters:sstable_compressor = 'DeflateCompressor'", tableName));
 
         assertInvalidThrow(ConfigurationException.class, String.format("CREATE TABLE %s (key varchar PRIMARY KEY, password varchar, gender varchar) WITH compression = { 'sstable_compressor': 'DeflateCompressor' }",
-                                                                      tableName));
+                                                                       tableName));
     }
 
     /**
@@ -462,6 +466,67 @@ public class SecondaryIndexTest extends CQLTester
         assertRows(execute("select count(*) from %s where app_name='foo' and account='bar' and last_access > 4 allow filtering"), row(1L));
     }
 
+    @Test
+    public void testSyntaxVariationsForIndexOnCollectionsValue() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int, m map<int, int>, l list<int>, s set<int>, PRIMARY KEY (k))");
+        createAndDropCollectionValuesIndex("m");
+        createAndDropCollectionValuesIndex("l");
+        createAndDropCollectionValuesIndex("s");
+    }
+
+    private void createAndDropCollectionValuesIndex(String columnName) throws Throwable
+    {
+        String indexName = columnName + "_idx";
+        SecondaryIndexManager indexManager = getCurrentColumnFamilyStore().indexManager;
+        createIndex(String.format("CREATE INDEX %s on %%s(%s)", indexName, columnName));
+        IndexMetadata indexDef = indexManager.getIndexByName(indexName).getIndexMetadata();
+        assertEquals(String.format("values(%s)", columnName), indexDef.options.get(IndexTarget.TARGET_OPTION_NAME));
+        dropIndex(String.format("DROP INDEX %s.%s", KEYSPACE, indexName));
+        assertFalse(indexManager.hasIndexes());
+        createIndex(String.format("CREATE INDEX %s on %%s(values(%s))", indexName, columnName));
+        assertEquals(indexDef, indexManager.getIndexByName(indexName).getIndexMetadata());
+        dropIndex(String.format("DROP INDEX %s.%s", KEYSPACE, indexName));
+    }
+
+    @Test
+    public void testCreateIndexWithQuotedColumnNames() throws Throwable
+    {
+        createTable("CREATE TABLE %s (" +
+                    " k int," +
+                    " v int, " +
+                    " lower_case_map map<int, int>," +
+                    " \"MixedCaseMap\" map<int, int>," +
+                    " lower_case_frozen_list frozen<list<int>>," +
+                    " \"UPPER_CASE_FROZEN_LIST\" frozen<list<int>>," +
+                    " \"set name with spaces\" set<int>," +
+                    " \"column_name_with\"\"escaped quote\" int," +
+                    " PRIMARY KEY (k))");
+
+        createAndDropIndexWithQuotedColumnIdentifier("\"v\"");
+        createAndDropIndexWithQuotedColumnIdentifier("keys(\"lower_case_map\")");
+        createAndDropIndexWithQuotedColumnIdentifier("keys(\"MixedCaseMap\")");
+        createAndDropIndexWithQuotedColumnIdentifier("full(\"lower_case_frozen_list\")");
+        createAndDropIndexWithQuotedColumnIdentifier("full(\"UPPER_CASE_FROZEN_LIST\")");
+        createAndDropIndexWithQuotedColumnIdentifier("values(\"set name with spaces\")");
+        createAndDropIndexWithQuotedColumnIdentifier("\"column_name_with\"\"escaped quote\"");
+    }
+
+    private void createAndDropIndexWithQuotedColumnIdentifier(String target) throws Throwable
+    {
+        String indexName = "test_mixed_case_idx";
+        createIndex(String.format("CREATE INDEX %s ON %%s(%s)", indexName, target));
+        SecondaryIndexManager indexManager = getCurrentColumnFamilyStore().indexManager;
+        IndexMetadata indexDef = indexManager.getIndexByName(indexName).getIndexMetadata();
+        dropIndex(String.format("DROP INDEX %s.%s", KEYSPACE, indexName));
+        // verify we can re-create the index using the target string
+        createIndex(String.format("CREATE INDEX %s ON %%s(%s)",
+                                  indexName, indexDef.options.get(IndexTarget.TARGET_OPTION_NAME)));
+        assertEquals(indexDef, indexManager.getIndexByName(indexName).getIndexMetadata());
+        dropIndex(String.format("DROP INDEX %s.%s", KEYSPACE, indexName));
+    }
+
+
     /**
      * Test for CASSANDRA-5732, Can not query secondary index
      * migrated from cql_tests.py:TestCQL.bug_5732_test(),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/test/unit/org/apache/cassandra/db/CleanupTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CleanupTest.java b/test/unit/org/apache/cassandra/db/CleanupTest.java
index 7b03640..a4aca7f 100644
--- a/test/unit/org/apache/cassandra/db/CleanupTest.java
+++ b/test/unit/org/apache/cassandra/db/CleanupTest.java
@@ -114,9 +114,7 @@ public class CleanupTest
         assertEquals(LOOPS, Util.getAll(Util.cmd(cfs).build()).size());
 
         ColumnDefinition cdef = cfs.metadata.getColumnDefinition(COLUMN);
-        String indexName = cfs.metadata.getIndexes()
-                                       .get(cdef)
-                                       .iterator().next().name;
+        String indexName = "birthdate_key_index";
         long start = System.nanoTime();
         while (!cfs.getBuiltIndexes().contains(indexName) && System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10))
             Thread.sleep(10);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/DirectoriesTest.java b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
index 40093ea..6e51448 100644
--- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java
+++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.Config.DiskFailurePolicy;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.statements.IndexTarget;
 import org.apache.cassandra.db.Directories.DataDirectory;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.index.internal.CassandraIndex;
@@ -168,9 +169,10 @@ public class DirectoriesTest
                                   .addClusteringColumn("col", UTF8Type.instance)
                                   .build();
         ColumnDefinition col = PARENT_CFM.getColumnDefinition(ByteBufferUtil.bytes("col"));
-        IndexMetadata indexDef = IndexMetadata.singleColumnIndex(col,
+        IndexMetadata indexDef = IndexMetadata.singleTargetIndex(PARENT_CFM,
+                                                                 new IndexTarget(col.name, IndexTarget.Type.VALUES),
                                                                  "idx",
-                                                                 IndexMetadata.IndexType.KEYS,
+                                                                 IndexMetadata.Kind.KEYS,
                                                                  Collections.emptyMap());
         PARENT_CFM.indexes(PARENT_CFM.getIndexes().with(indexDef));
         CFMetaData INDEX_CFM = CassandraIndex.indexCfsMetadata(PARENT_CFM, indexDef);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
index ffc43c5..e0fc68a 100644
--- a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
+++ b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
@@ -464,9 +464,10 @@ public class RangeTombstoneTest
         cfs.disableAutoCompaction();
 
         ColumnDefinition cd = cfs.metadata.getColumnDefinition(indexedColumnName).copy();
-        IndexMetadata indexDef = IndexMetadata.singleColumnIndex(cd,
+        IndexMetadata indexDef = IndexMetadata.singleTargetIndex(cfs.metadata,
+                                                                 new IndexTarget(cd.name, IndexTarget.Type.VALUES),
                                                                  "test_index",
-                                                                 IndexMetadata.IndexType.CUSTOM,
+                                                                 IndexMetadata.Kind.CUSTOM,
                                                                  ImmutableMap.of(IndexTarget.CUSTOM_INDEX_OPTION_NAME,
                                                                                  StubIndex.class.getName()));
 
@@ -560,9 +561,10 @@ public class RangeTombstoneTest
         cfs.disableAutoCompaction();
 
         ColumnDefinition cd = cfs.metadata.getColumnDefinition(indexedColumnName).copy();
-        IndexMetadata indexDef = IndexMetadata.singleColumnIndex(cd,
+        IndexMetadata indexDef = IndexMetadata.singleTargetIndex(cfs.metadata,
+                                                                 new IndexTarget(cd.name,IndexTarget.Type.VALUES),
                                                                  "test_index",
-                                                                 IndexMetadata.IndexType.CUSTOM,
+                                                                 IndexMetadata.Kind.CUSTOM,
                                                                  ImmutableMap.of(IndexTarget.CUSTOM_INDEX_OPTION_NAME,
                                                                                  StubIndex.class.getName()));
 
@@ -574,11 +576,7 @@ public class RangeTombstoneTest
         if (rebuild != null)
             rebuild.get();
 
-        StubIndex index = (StubIndex)cfs.indexManager.listIndexes()
-                                                     .stream()
-                                                     .filter(i -> "test_index".equals(i.getIndexName()))
-                                                     .findFirst()
-                                                     .orElseThrow(() -> new RuntimeException(new AssertionError("Index not found")));
+        StubIndex index = (StubIndex)cfs.indexManager.getIndexByName("test_index");
         index.reset();
 
         UpdateBuilder.create(cfs.metadata, key).withTimestamp(0).newRow(1).add("val", 1).applyUnsafe();


[5/5] cassandra git commit: Merge branch 'cassandra-3.0' into trunk

Posted by sa...@apache.org.
Merge branch 'cassandra-3.0' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f5cf5716
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f5cf5716
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f5cf5716

Branch: refs/heads/trunk
Commit: f5cf5716789b70f47868c8b47c540068b78c6a46
Parents: 5ba0adf fde97c3
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Wed Sep 16 17:28:24 2015 +0100
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Wed Sep 16 17:28:24 2015 +0100

----------------------------------------------------------------------
 ...ore-3.0.0-alpha3-8bd064d-SNAPSHOT-shaded.jar | Bin 2218913 -> 0 bytes
 ...ore-3.0.0-alpha3-d56dd0b-SNAPSHOT-shaded.jar | Bin 0 -> 2216851 bytes
 ...iver-internal-only-3.0.0a2.post0-96883eb.zip | Bin 230630 -> 0 bytes
 ...iver-internal-only-3.0.0a2.post0-fecbd54.zip | Bin 0 -> 230674 bytes
 .../org/apache/cassandra/config/CFMetaData.java |  13 +-
 src/java/org/apache/cassandra/cql3/Cql.g        |   3 +-
 .../cql3/statements/AlterTableStatement.java    |  23 ++--
 .../cql3/statements/CreateIndexStatement.java   |  38 ++----
 .../cql3/statements/IndexPropDefs.java          |   5 +
 .../cassandra/cql3/statements/IndexTarget.java  | 102 +++++++++-------
 src/java/org/apache/cassandra/index/Index.java  |  13 ++
 .../cassandra/index/SecondaryIndexManager.java  |  19 ++-
 .../index/internal/CassandraIndex.java          | 111 +++++++++++++----
 .../apache/cassandra/schema/IndexMetadata.java  | 119 ++++++++++---------
 .../org/apache/cassandra/schema/Indexes.java    |  34 ------
 .../cassandra/schema/LegacySchemaMigrator.java  |  31 ++---
 .../apache/cassandra/schema/SchemaKeyspace.java |  46 +------
 .../cassandra/thrift/ThriftConversion.java      |  65 +++++++---
 .../unit/org/apache/cassandra/SchemaLoader.java |  17 ++-
 .../entities/FrozenCollectionsTest.java         |   6 +-
 .../validation/entities/SecondaryIndexTest.java |  67 ++++++++++-
 .../org/apache/cassandra/db/CleanupTest.java    |   4 +-
 .../apache/cassandra/db/DirectoriesTest.java    |   6 +-
 .../apache/cassandra/db/RangeTombstoneTest.java |  16 ++-
 .../apache/cassandra/db/SecondaryIndexTest.java |  41 +++----
 .../org/apache/cassandra/index/StubIndex.java   |   6 +-
 .../index/internal/CustomCassandraIndex.java    |  93 ++-------------
 .../org/apache/cassandra/schema/DefsTest.java   |  38 ++----
 .../schema/LegacySchemaMigratorTest.java        |  12 +-
 29 files changed, 494 insertions(+), 434 deletions(-)
----------------------------------------------------------------------



[3/5] cassandra git commit: Remove target_columns from index metadata

Posted by sa...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java b/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
index 0003f8f..2a43e33 100644
--- a/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
+++ b/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
@@ -29,12 +29,11 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import org.apache.cassandra.index.SecondaryIndexManager;
-
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.cql3.statements.IndexTarget;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.Row;
@@ -44,6 +43,8 @@ import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.Util.throwAssert;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -63,8 +64,8 @@ public class SecondaryIndexTest
         SchemaLoader.prepareServer();
         SchemaLoader.createKeyspace(KEYSPACE1,
                                     KeyspaceParams.simple(1),
-                                    SchemaLoader.compositeIndexCFMD(KEYSPACE1, WITH_COMPOSITE_INDEX, true) .gcGraceSeconds(0),
-                                    SchemaLoader.compositeIndexCFMD(KEYSPACE1, COMPOSITE_INDEX_TO_BE_ADDED, false) .gcGraceSeconds(0),
+                                    SchemaLoader.compositeIndexCFMD(KEYSPACE1, WITH_COMPOSITE_INDEX, true).gcGraceSeconds(0),
+                                    SchemaLoader.compositeIndexCFMD(KEYSPACE1, COMPOSITE_INDEX_TO_BE_ADDED, false).gcGraceSeconds(0),
                                     SchemaLoader.keysIndexCFMD(KEYSPACE1, WITH_KEYS_INDEX, true).gcGraceSeconds(0));
     }
 
@@ -435,10 +436,12 @@ public class SecondaryIndexTest
         // create a row and update the birthdate value, test that the index query fetches the new version
         new RowUpdateBuilder(cfs.metadata, 0, "k1").clustering("c").add("birthdate", 1L).build().applyUnsafe();
 
+        String indexName = "birthdate_index";
         ColumnDefinition old = cfs.metadata.getColumnDefinition(ByteBufferUtil.bytes("birthdate"));
-        IndexMetadata indexDef = IndexMetadata.singleColumnIndex(old,
-                                                                 "birthdate_index",
-                                                                 IndexMetadata.IndexType.COMPOSITES,
+        IndexMetadata indexDef = IndexMetadata.singleTargetIndex(cfs.metadata,
+                                                                 new IndexTarget(old.name, IndexTarget.Type.VALUES),
+                                                                 indexName,
+                                                                 IndexMetadata.Kind.COMPOSITES,
                                                                  Collections.EMPTY_MAP);
         cfs.metadata.indexes(cfs.metadata.getIndexes().with(indexDef));
         Future<?> future = cfs.indexManager.addIndex(indexDef);
@@ -446,15 +449,11 @@ public class SecondaryIndexTest
 
         // we had a bug (CASSANDRA-2244) where index would get created but not flushed -- check for that
         // the way we find the index cfs is a bit convoluted at the moment
-        ColumnDefinition cDef = cfs.metadata.getColumnDefinition(ByteBufferUtil.bytes("birthdate"));
-        String indexName = getIndexNameForColumn(cfs, cDef);
-        assertNotNull(indexName);
         boolean flushed = false;
-        for (ColumnFamilyStore indexCfs : cfs.indexManager.getAllIndexColumnFamilyStores())
-        {
-            if (SecondaryIndexManager.getIndexName(indexCfs).equals(indexName))
-                flushed = indexCfs.getLiveSSTables().size() > 0;
-        }
+        ColumnFamilyStore indexCfs = cfs.indexManager.getIndex(indexDef)
+                                                     .getBackingTable()
+                                                     .orElseThrow(throwAssert("Index not found"));
+        flushed = !indexCfs.getLiveSSTables().isEmpty();
         assertTrue(flushed);
         assertIndexedOne(cfs, ByteBufferUtil.bytes("birthdate"), 1L);
 
@@ -469,18 +468,6 @@ public class SecondaryIndexTest
         assertIndexedOne(cfs, ByteBufferUtil.bytes("birthdate"), 1L);
     }
 
-    private String getIndexNameForColumn(ColumnFamilyStore cfs, ColumnDefinition column)
-    {
-        // this is mega-ugly because there is a mismatch between the name of an index
-        // stored in schema metadata & the name used to refer to that index in other
-        // places (such as system.IndexInfo). Hopefully this is temporary and
-        // Index.getIndexName() can be made equalivalent to Index.getIndexMetadata().name
-        // (ideally even removing the former completely)
-        Collection<IndexMetadata> indexes = cfs.metadata.getIndexes().get(column);
-        assertEquals(1, indexes.size());
-        return cfs.indexManager.getIndex(indexes.iterator().next()).getIndexName();
-    }
-
     @Test
     public void testKeysSearcherSimple() throws Exception
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/test/unit/org/apache/cassandra/index/StubIndex.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/StubIndex.java b/test/unit/org/apache/cassandra/index/StubIndex.java
index 0ea03dc..544d482 100644
--- a/test/unit/org/apache/cassandra/index/StubIndex.java
+++ b/test/unit/org/apache/cassandra/index/StubIndex.java
@@ -23,7 +23,6 @@ import java.util.concurrent.Callable;
 import java.util.function.BiFunction;
 
 import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.RowFilter;
@@ -70,6 +69,11 @@ public class StubIndex implements Index
         return false;
     }
 
+    public boolean dependsOn(ColumnDefinition column)
+    {
+        return false;
+    }
+
     public boolean supportsExpression(ColumnDefinition column, Operator operator)
     {
         return operator == Operator.EQ;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
index 0a62d9b..8695018 100644
--- a/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
+++ b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
@@ -20,27 +20,27 @@ import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.db.lifecycle.View;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.dht.LocalPartitioner;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.index.Index;
 import org.apache.cassandra.index.IndexRegistry;
 import org.apache.cassandra.index.SecondaryIndexBuilder;
-import org.apache.cassandra.index.internal.composites.CompositesSearcher;
-import org.apache.cassandra.index.internal.keys.KeysSearcher;
 import org.apache.cassandra.index.transactions.IndexTransaction;
 import org.apache.cassandra.index.transactions.UpdateTransaction;
 import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.concurrent.Refs;
 
+import static org.apache.cassandra.index.internal.CassandraIndex.getFunctions;
+import static org.apache.cassandra.index.internal.CassandraIndex.indexCfsMetadata;
+import static org.apache.cassandra.index.internal.CassandraIndex.parseTarget;
+
 /**
  * Clone of KeysIndex used in CassandraIndexTest#testCustomIndexWithCFS to verify
  * behaviour of flushing CFS backed CUSTOM indexes
@@ -145,14 +145,14 @@ public class CustomCassandraIndex implements Index
     private void setMetadata(IndexMetadata indexDef)
     {
         metadata = indexDef;
-        functions = getFunctions(baseCfs.metadata, indexDef);
+        Pair<ColumnDefinition, IndexTarget.Type> target = parseTarget(baseCfs.metadata, indexDef);
+        functions = getFunctions(indexDef, target);
         CFMetaData cfm = indexCfsMetadata(baseCfs.metadata, indexDef);
         indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.keyspace,
                                                              cfm.cfName,
                                                              cfm,
                                                              baseCfs.getTracker().loadsstables);
-        assert indexDef.columns.size() == 1 : "Build in indexes on multiple target columns are not supported";
-        indexedColumn = indexDef.indexedColumn(baseCfs.metadata);
+        indexedColumn = target.left;
     }
 
     public Callable<?> getTruncateTask(final long truncatedAt)
@@ -174,6 +174,11 @@ public class CustomCassandraIndex implements Index
         return isPrimaryKeyIndex() || columns.contains(indexedColumn);
     }
 
+    public boolean dependsOn(ColumnDefinition column)
+    {
+        return column.equals(indexedColumn);
+    }
+
     public boolean supportsExpression(ColumnDefinition column, Operator operator)
     {
         return indexedColumn.name.equals(column.name)
@@ -668,76 +673,4 @@ public class CustomCassandraIndex implements Index
                             .map(SSTableReader::toString)
                             .collect(Collectors.joining(", "));
     }
-
-    /**
-     * Construct the CFMetadata for an index table, the clustering columns in the index table
-     * vary dependent on the kind of the indexed value.
-     * @param baseCfsMetadata
-     * @param indexMetadata
-     * @return
-     */
-    public static final CFMetaData indexCfsMetadata(CFMetaData baseCfsMetadata, IndexMetadata indexMetadata)
-    {
-        CassandraIndexFunctions utils = getFunctions(baseCfsMetadata, indexMetadata);
-        ColumnDefinition indexedColumn = indexMetadata.indexedColumn(baseCfsMetadata);
-        AbstractType<?> indexedValueType = utils.getIndexedValueType(indexedColumn);
-        CFMetaData.Builder builder = CFMetaData.Builder.create(baseCfsMetadata.ksName,
-                                                               baseCfsMetadata.indexColumnFamilyName(indexMetadata))
-                                                       .withId(baseCfsMetadata.cfId)
-                                                       .withPartitioner(new LocalPartitioner(indexedValueType))
-                                                       .addPartitionKey(indexedColumn.name, indexedColumn.type);
-
-        builder.addClusteringColumn("partition_key", baseCfsMetadata.partitioner.partitionOrdering());
-        builder = utils.addIndexClusteringColumns(builder, baseCfsMetadata, indexedColumn);
-        return builder.build().reloadIndexMetadataProperties(baseCfsMetadata);
-    }
-
-    /**
-     * Factory method for new CassandraIndex instances
-     * @param baseCfs
-     * @param indexMetadata
-     * @return
-     */
-    public static final CassandraIndex newIndex(ColumnFamilyStore baseCfs, IndexMetadata indexMetadata)
-    {
-        return getFunctions(baseCfs.metadata, indexMetadata).newIndexInstance(baseCfs, indexMetadata);
-    }
-
-    private static CassandraIndexFunctions getFunctions(CFMetaData baseCfMetadata, IndexMetadata indexDef)
-    {
-        if (indexDef.isKeys())
-            return CassandraIndexFunctions.KEYS_INDEX_FUNCTIONS;
-
-        ColumnDefinition indexedColumn = indexDef.indexedColumn(baseCfMetadata);
-        if (indexedColumn.type.isCollection() && indexedColumn.type.isMultiCell())
-        {
-            switch (((CollectionType)indexedColumn.type).kind)
-            {
-                case LIST:
-                    return CassandraIndexFunctions.COLLECTION_VALUE_INDEX_FUNCTIONS;
-                case SET:
-                    return CassandraIndexFunctions.COLLECTION_KEY_INDEX_FUNCTIONS;
-                case MAP:
-                    if (indexDef.options.containsKey(IndexTarget.INDEX_KEYS_OPTION_NAME))
-                        return CassandraIndexFunctions.COLLECTION_KEY_INDEX_FUNCTIONS;
-                    else if (indexDef.options.containsKey(IndexTarget.INDEX_ENTRIES_OPTION_NAME))
-                        return CassandraIndexFunctions.COLLECTION_ENTRY_INDEX_FUNCTIONS;
-                    else
-                        return CassandraIndexFunctions.COLLECTION_VALUE_INDEX_FUNCTIONS;
-            }
-        }
-
-        switch (indexedColumn.kind)
-        {
-            case CLUSTERING:
-                return CassandraIndexFunctions.CLUSTERING_COLUMN_INDEX_FUNCTIONS;
-            case REGULAR:
-                return CassandraIndexFunctions.REGULAR_COLUMN_INDEX_FUNCTIONS;
-            case PARTITION_KEY:
-                return CassandraIndexFunctions.PARTITION_KEY_INDEX_FUNCTIONS;
-            //case COMPACT_VALUE:
-            //    return new CompositesIndexOnCompactValue();
-        }
-        throw new AssertionError();
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/test/unit/org/apache/cassandra/schema/DefsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/schema/DefsTest.java b/test/unit/org/apache/cassandra/schema/DefsTest.java
index ee73b2b..f348c30 100644
--- a/test/unit/org/apache/cassandra/schema/DefsTest.java
+++ b/test/unit/org/apache/cassandra/schema/DefsTest.java
@@ -44,14 +44,13 @@ import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.index.Index;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.locator.OldNetworkTopologyStrategy;
 import org.apache.cassandra.service.MigrationManager;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
+import static org.apache.cassandra.Util.throwAssert;
 import static org.apache.cassandra.cql3.CQLTester.assertRows;
 import static org.apache.cassandra.cql3.CQLTester.row;
 import static org.junit.Assert.assertEquals;
@@ -501,6 +500,7 @@ public class DefsTest
         // persist keyspace definition in the system keyspace
         SchemaKeyspace.makeCreateKeyspaceMutation(Schema.instance.getKSMetaData(KEYSPACE6), FBUtilities.timestampMicros()).applyUnsafe();
         ColumnFamilyStore cfs = Keyspace.open(KEYSPACE6).getColumnFamilyStore(TABLE1i);
+        String indexName = "birthdate_key_index";
 
         // insert some data.  save the sstable descriptor so we can make sure it's marked for delete after the drop
         QueryProcessor.executeInternal(String.format(
@@ -510,37 +510,17 @@ public class DefsTest
                                        "key0", "col0", 1L, 1L);
 
         cfs.forceBlockingFlush();
-        ColumnDefinition indexedColumn = cfs.metadata.getColumnDefinition(ByteBufferUtil.bytes("birthdate"));
-        IndexMetadata index = cfs.metadata.getIndexes()
-                                          .get(indexedColumn)
-                                          .iterator()
-                                          .next();
-        ColumnFamilyStore indexCfs = cfs.indexManager.listIndexes()
-                                                     .stream()
-                                                     .filter(i -> i.getIndexMetadata().equals(index))
-                                                     .map(Index::getBackingTable)
-                                                     .findFirst()
-                                                     .orElseThrow(() -> new AssertionError("Index not found"))
-                                                     .orElseThrow(() -> new AssertionError("Index has no backing table"));
+        ColumnFamilyStore indexCfs = cfs.indexManager.getIndexByName(indexName)
+                                                     .getBackingTable()
+                                                     .orElseThrow(throwAssert("Cannot access index cfs"));
         Descriptor desc = indexCfs.getLiveSSTables().iterator().next().descriptor;
 
         // drop the index
         CFMetaData meta = cfs.metadata.copy();
-        // We currently have a mismatch between IndexMetadata.name (which is simply the name
-        // of the index) and what gets returned from SecondaryIndex#getIndexName() (usually, this
-        // defaults to <tablename>.<indexname>.
-        // IndexMetadata takes its lead from the prior implementation of ColumnDefinition.name
-        // which did not include the table name.
-        // This mismatch causes some other, long standing inconsistencies:
-        // nodetool rebuild_index <ks> <tbl> <idx>  - <idx> must be qualified, i.e. include the redundant table name
-        //                                            without it, the rebuild silently fails
-        // system.IndexInfo (which is also exposed over JMX as CF.BuildIndexes) uses the form <tbl>.<idx>
-        // cqlsh> describe index [<ks>.]<idx>  - here <idx> must not be qualified by the table name.
-        //
-        // This should get resolved as part of #9459 by better separating the index name from the
-        // name of it's underlying CFS (if it as one), as the comment in CFMetaData#indexColumnFamilyName promises
-        // Then we will be able to just use the value of SI#getIndexName() when removing an index from CFMetaData
-        IndexMetadata existing = meta.getIndexes().iterator().next();
+        IndexMetadata existing = cfs.metadata.getIndexes()
+                                             .get(indexName)
+                                             .orElseThrow(throwAssert("Index not found"));
+
         meta.indexes(meta.getIndexes().without(existing.name));
         MigrationManager.announceColumnFamilyUpdate(meta, false);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java b/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java
index d841e91..7124e40 100644
--- a/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java
+++ b/test/unit/org/apache/cassandra/schema/LegacySchemaMigratorTest.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.functions.*;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.index.internal.CassandraIndex;
 import org.apache.cassandra.thrift.ThriftConversion;
 
 import static java.lang.String.format;
@@ -490,7 +491,7 @@ public class LegacySchemaMigratorTest
         {
             IndexMetadata i = index.get();
             adder.add("index_name", i.name);
-            adder.add("index_type", i.indexType.toString());
+            adder.add("index_type", i.kind.toString());
             adder.add("index_options", json(i.options));
         }
         else
@@ -504,11 +505,14 @@ public class LegacySchemaMigratorTest
     }
 
     private static Optional<IndexMetadata> findIndexForColumn(Indexes indexes,
-                                                                CFMetaData table,
-                                                                ColumnDefinition column)
+                                                              CFMetaData table,
+                                                              ColumnDefinition column)
     {
+        // makes the assumptions that the string option denoting the
+        // index targets can be parsed by CassandraIndex.parseTarget
+        // which should be true for any pre-3.0 index
         for (IndexMetadata index : indexes)
-            if (index.indexedColumn(table).equals(column))
+          if (CassandraIndex.parseTarget(table, index).left.equals(column))
                 return Optional.of(index);
 
         return Optional.empty();


[4/5] cassandra git commit: Remove target_columns from index metadata

Posted by sa...@apache.org.
Remove target_columns from index metadata

Patch by Sam Tunnicliffe; reviewed by Sylvain Lebresne for
CASSANDRA-10216


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fde97c3b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fde97c3b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fde97c3b

Branch: refs/heads/trunk
Commit: fde97c3b3d93901722ee2975552909e013b48b65
Parents: 1d94dc2
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Wed Sep 16 11:09:02 2015 +0100
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Wed Sep 16 17:25:23 2015 +0100

----------------------------------------------------------------------
 ...ore-3.0.0-alpha3-8bd064d-SNAPSHOT-shaded.jar | Bin 2218913 -> 0 bytes
 ...ore-3.0.0-alpha3-d56dd0b-SNAPSHOT-shaded.jar | Bin 0 -> 2216851 bytes
 ...iver-internal-only-3.0.0a2.post0-96883eb.zip | Bin 230630 -> 0 bytes
 ...iver-internal-only-3.0.0a2.post0-fecbd54.zip | Bin 0 -> 230674 bytes
 .../org/apache/cassandra/config/CFMetaData.java |  13 +-
 src/java/org/apache/cassandra/cql3/Cql.g        |   3 +-
 .../cql3/statements/AlterTableStatement.java    |  23 ++--
 .../cql3/statements/CreateIndexStatement.java   |  38 ++----
 .../cql3/statements/IndexPropDefs.java          |   5 +
 .../cassandra/cql3/statements/IndexTarget.java  | 102 +++++++++-------
 src/java/org/apache/cassandra/index/Index.java  |  13 ++
 .../cassandra/index/SecondaryIndexManager.java  |  19 ++-
 .../index/internal/CassandraIndex.java          | 111 +++++++++++++----
 .../apache/cassandra/schema/IndexMetadata.java  | 119 ++++++++++---------
 .../org/apache/cassandra/schema/Indexes.java    |  34 ------
 .../cassandra/schema/LegacySchemaMigrator.java  |  31 ++---
 .../apache/cassandra/schema/SchemaKeyspace.java |  46 +------
 .../cassandra/thrift/ThriftConversion.java      |  65 +++++++---
 .../unit/org/apache/cassandra/SchemaLoader.java |  17 ++-
 .../entities/FrozenCollectionsTest.java         |   6 +-
 .../validation/entities/SecondaryIndexTest.java |  67 ++++++++++-
 .../org/apache/cassandra/db/CleanupTest.java    |   4 +-
 .../apache/cassandra/db/DirectoriesTest.java    |   6 +-
 .../apache/cassandra/db/RangeTombstoneTest.java |  16 ++-
 .../apache/cassandra/db/SecondaryIndexTest.java |  41 +++----
 .../org/apache/cassandra/index/StubIndex.java   |   6 +-
 .../index/internal/CustomCassandraIndex.java    |  93 ++-------------
 .../org/apache/cassandra/schema/DefsTest.java   |  38 ++----
 .../schema/LegacySchemaMigratorTest.java        |  12 +-
 29 files changed, 494 insertions(+), 434 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/lib/cassandra-driver-core-3.0.0-alpha3-8bd064d-SNAPSHOT-shaded.jar
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-core-3.0.0-alpha3-8bd064d-SNAPSHOT-shaded.jar b/lib/cassandra-driver-core-3.0.0-alpha3-8bd064d-SNAPSHOT-shaded.jar
deleted file mode 100644
index fc5d2f0..0000000
Binary files a/lib/cassandra-driver-core-3.0.0-alpha3-8bd064d-SNAPSHOT-shaded.jar and /dev/null differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/lib/cassandra-driver-core-3.0.0-alpha3-d56dd0b-SNAPSHOT-shaded.jar
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-core-3.0.0-alpha3-d56dd0b-SNAPSHOT-shaded.jar b/lib/cassandra-driver-core-3.0.0-alpha3-d56dd0b-SNAPSHOT-shaded.jar
new file mode 100644
index 0000000..79d5a8a
Binary files /dev/null and b/lib/cassandra-driver-core-3.0.0-alpha3-d56dd0b-SNAPSHOT-shaded.jar differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/lib/cassandra-driver-internal-only-3.0.0a2.post0-96883eb.zip
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-internal-only-3.0.0a2.post0-96883eb.zip b/lib/cassandra-driver-internal-only-3.0.0a2.post0-96883eb.zip
deleted file mode 100644
index e55b4c3..0000000
Binary files a/lib/cassandra-driver-internal-only-3.0.0a2.post0-96883eb.zip and /dev/null differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/lib/cassandra-driver-internal-only-3.0.0a2.post0-fecbd54.zip
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-internal-only-3.0.0a2.post0-fecbd54.zip b/lib/cassandra-driver-internal-only-3.0.0a2.post0-fecbd54.zip
new file mode 100644
index 0000000..fe9e0de
Binary files /dev/null and b/lib/cassandra-driver-internal-only-3.0.0a2.post0-fecbd54.zip differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index cf1dbbf..929a34a 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -957,9 +957,18 @@ public final class CFMetaData
         {
             throw new InvalidRequestException(String.format("Cannot rename non PRIMARY KEY part %s", from));
         }
-        else if (getIndexes().hasIndexFor(def))
+
+        if (!getIndexes().isEmpty())
         {
-            throw new InvalidRequestException(String.format("Cannot rename column %s because it is secondary indexed", from));
+            ColumnFamilyStore store = Keyspace.openAndGetStore(this);
+            Set<IndexMetadata> dependentIndexes = store.indexManager.getDependentIndexes(def);
+            if (!dependentIndexes.isEmpty())
+                throw new InvalidRequestException(String.format("Cannot rename column %s because it has " +
+                                                                "dependent secondary indexes (%s)",
+                                                                from,
+                                                                dependentIndexes.stream()
+                                                                                .map(i -> i.name)
+                                                                                .collect(Collectors.joining(","))));
         }
 
         ColumnDefinition newDef = def.withNewName(to);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/src/java/org/apache/cassandra/cql3/Cql.g
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Cql.g b/src/java/org/apache/cassandra/cql3/Cql.g
index f6d54f5..b552165 100644
--- a/src/java/org/apache/cassandra/cql3/Cql.g
+++ b/src/java/org/apache/cassandra/cql3/Cql.g
@@ -726,7 +726,8 @@ createIndexStatement returns [CreateIndexStatement expr]
     ;
 
 indexIdent returns [IndexTarget.Raw id]
-    : c=cident                   { $id = IndexTarget.Raw.valuesOf(c); }
+    : c=cident                   { $id = IndexTarget.Raw.simpleIndexOn(c); }
+    | K_VALUES '(' c=cident ')'  { $id = IndexTarget.Raw.valuesOf(c); }
     | K_KEYS '(' c=cident ')'    { $id = IndexTarget.Raw.keysOf(c); }
     | K_ENTRIES '(' c=cident ')' { $id = IndexTarget.Raw.keysAndValuesOf(c); }
     | K_FULL '(' c=cident ')'    { $id = IndexTarget.Raw.fullCollection(c); }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
index af9a75c..0d2011b 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.cql3.statements;
 
 import java.util.*;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.Iterables;
 
@@ -26,6 +27,8 @@ import org.apache.cassandra.config.*;
 import org.apache.cassandra.cql3.CFName;
 import org.apache.cassandra.cql3.CQL3Type;
 import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.db.marshal.CounterColumnType;
@@ -268,17 +271,21 @@ public class AlterTableStatement extends SchemaAlteringStatement
                         break;
                 }
 
-                // If the dropped column is the only target column of a secondary
-                // index (and it's only possible to create an index with TargetType.COLUMN
-                // and a single target right now) we need to also drop the index.
+                // If the dropped column is required by any secondary indexes
+                // we reject the operation, as the indexes must be dropped first
                 Indexes allIndexes = cfm.getIndexes();
-                Collection<IndexMetadata> indexes = allIndexes.get(def);
-                for (IndexMetadata index : indexes)
+                if (!allIndexes.isEmpty())
                 {
-                    assert index.columns.size() == 1 : String.format("Can't drop column %s as it's a target of multi-column index %s", def.name, index.name);
-                    allIndexes = allIndexes.without(index.name);
+                    ColumnFamilyStore store = Keyspace.openAndGetStore(cfm);
+                    Set<IndexMetadata> dependentIndexes = store.indexManager.getDependentIndexes(def);
+                    if (!dependentIndexes.isEmpty())
+                        throw new InvalidRequestException(String.format("Cannot drop column %s because it has " +
+                                                                        "dependent secondary indexes (%s)",
+                                                                        def,
+                                                                        dependentIndexes.stream()
+                                                                                        .map(i -> i.name)
+                                                                                        .collect(Collectors.joining(","))));
                 }
-                cfm.indexes(allIndexes);
 
                 // If a column is dropped which is included in a view, we don't allow the drop to take place.
                 boolean rejectAlter = false;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
index 19d89b0..6cc416d 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
@@ -22,7 +22,6 @@ import java.util.Map;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -81,9 +80,6 @@ public class CreateIndexStatement extends SchemaAlteringStatement
         IndexTarget target = rawTarget.prepare(cfm);
         ColumnDefinition cd = cfm.getColumnDefinition(target.column);
 
-        if (cd == null)
-            throw new InvalidRequestException("No column definition found for column " + target.column);
-
         boolean isMap = cd.type instanceof MapType;
         boolean isFrozenCollection = cd.type.isCollection() && !cd.type.isMultiCell();
 
@@ -94,7 +90,7 @@ public class CreateIndexStatement extends SchemaAlteringStatement
         else
         {
             validateNotFullIndex(target);
-            validateIsValuesIndexIfTargetColumnNotCollection(cd, target);
+            validateIsSimpleIndexIfTargetColumnNotCollection(cd, target);
             validateTargetColumnIsMapIfIndexInvolvesKeys(isMap, target);
         }
 
@@ -136,7 +132,9 @@ public class CreateIndexStatement extends SchemaAlteringStatement
     private void validateForFrozenCollection(IndexTarget target) throws InvalidRequestException
     {
         if (target.type != IndexTarget.Type.FULL)
-            throw new InvalidRequestException(String.format("Cannot create index on %s of frozen<map> column %s", target.type, target.column));
+            throw new InvalidRequestException(String.format("Cannot create %s() index on frozen column %s. " +
+                                                            "Frozen collections only support full() indexes",
+                                                            target.type, target.column));
     }
 
     private void validateNotFullIndex(IndexTarget target) throws InvalidRequestException
@@ -145,11 +143,12 @@ public class CreateIndexStatement extends SchemaAlteringStatement
             throw new InvalidRequestException("full() indexes can only be created on frozen collections");
     }
 
-    private void validateIsValuesIndexIfTargetColumnNotCollection(ColumnDefinition cd, IndexTarget target) throws InvalidRequestException
+    private void validateIsSimpleIndexIfTargetColumnNotCollection(ColumnDefinition cd, IndexTarget target) throws InvalidRequestException
     {
-        if (!cd.type.isCollection() && target.type != IndexTarget.Type.VALUES)
-            throw new InvalidRequestException(String.format("Cannot create index on %s of column %s; only non-frozen collections support %s indexes",
-                                                            target.type, target.column, target.type));
+        if (!cd.type.isCollection() && target.type != IndexTarget.Type.SIMPLE)
+            throw new InvalidRequestException(String.format("Cannot create %s() index on %s. " +
+                                                            "Non-collection columns support only simple indexes",
+                                                            target.type.toString(), target.column));
     }
 
     private void validateTargetColumnIsMapIfIndexInvolvesKeys(boolean isMap, IndexTarget target) throws InvalidRequestException
@@ -180,31 +179,20 @@ public class CreateIndexStatement extends SchemaAlteringStatement
                 throw new InvalidRequestException(String.format("Index %s already exists", acceptedName));
         }
 
-        IndexMetadata.IndexType indexType;
+        IndexMetadata.Kind kind;
         Map<String, String> indexOptions;
         if (properties.isCustom)
         {
-            indexType = IndexMetadata.IndexType.CUSTOM;
+            kind = IndexMetadata.Kind.CUSTOM;
             indexOptions = properties.getOptions();
         }
-        else if (cfm.isCompound())
-        {
-            Map<String, String> options = Collections.emptyMap();
-            // For now, we only allow indexing values for collections, but we could later allow
-            // to also index map keys, so we record that this is the values we index to make our
-            // lives easier then.
-            if (cd.type.isCollection() && cd.type.isMultiCell())
-                options = ImmutableMap.of(target.type.indexOption(), "");
-            indexType = IndexMetadata.IndexType.COMPOSITES;
-            indexOptions = options;
-        }
         else
         {
-            indexType = IndexMetadata.IndexType.KEYS;
             indexOptions = Collections.emptyMap();
+            kind = cfm.isCompound() ? IndexMetadata.Kind.COMPOSITES : IndexMetadata.Kind.KEYS;
         }
 
-        IndexMetadata index = IndexMetadata.singleColumnIndex(cd, acceptedName, indexType, indexOptions);
+        IndexMetadata index = IndexMetadata.singleTargetIndex(cfm, target, acceptedName, kind, indexOptions);
 
         // check to disallow creation of an index which duplicates an existing one in all but name
         Optional<IndexMetadata> existingIndex = Iterables.tryFind(cfm.getIndexes(), existing -> existing.equalsWithoutName(index));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/src/java/org/apache/cassandra/cql3/statements/IndexPropDefs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/IndexPropDefs.java b/src/java/org/apache/cassandra/cql3/statements/IndexPropDefs.java
index e067217..b8ce7ec 100644
--- a/src/java/org/apache/cassandra/cql3/statements/IndexPropDefs.java
+++ b/src/java/org/apache/cassandra/cql3/statements/IndexPropDefs.java
@@ -54,6 +54,11 @@ public class IndexPropDefs extends PropertyDefinitions
         if (getRawOptions().containsKey(IndexTarget.CUSTOM_INDEX_OPTION_NAME))
             throw new InvalidRequestException(String.format("Cannot specify %s as a CUSTOM option",
                                                             IndexTarget.CUSTOM_INDEX_OPTION_NAME));
+
+        if (getRawOptions().containsKey(IndexTarget.TARGET_OPTION_NAME))
+            throw new InvalidRequestException(String.format("Cannot specify %s as a CUSTOM option",
+                                                            IndexTarget.TARGET_OPTION_NAME));
+
     }
 
     public Map<String, String> getRawOptions() throws SyntaxException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/src/java/org/apache/cassandra/cql3/statements/IndexTarget.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/IndexTarget.java b/src/java/org/apache/cassandra/cql3/statements/IndexTarget.java
index f948d8b..6210a86 100644
--- a/src/java/org/apache/cassandra/cql3/statements/IndexTarget.java
+++ b/src/java/org/apache/cassandra/cql3/statements/IndexTarget.java
@@ -17,15 +17,16 @@
  */
 package org.apache.cassandra.cql3.statements;
 
-import java.util.Map;
+import java.util.regex.Pattern;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.exceptions.InvalidRequestException;
 
 public class IndexTarget
 {
+    public static final String TARGET_OPTION_NAME = "target";
     public static final String CUSTOM_INDEX_OPTION_NAME = "class_name";
 
     /**
@@ -34,22 +35,43 @@ public class IndexTarget
     public static final String INDEX_KEYS_OPTION_NAME = "index_keys";
 
     /**
-     * The name of the option used to specify that the index is on the collection values.
+     * The name of the option used to specify that the index is on the collection (map) entries.
      */
-    public static final String INDEX_VALUES_OPTION_NAME = "index_values";
+    public static final String INDEX_ENTRIES_OPTION_NAME = "index_keys_and_values";
 
     /**
-     * The name of the option used to specify that the index is on the collection (map) entries.
+     * Regex for *unquoted* column names, anything which does not match this pattern must be a quoted name
      */
-    public static final String INDEX_ENTRIES_OPTION_NAME = "index_keys_and_values";
+    private static final Pattern COLUMN_IDENTIFIER_PATTERN = Pattern.compile("[a-z_0-9]+");
 
     public final ColumnIdentifier column;
+    public final boolean quoteName;
     public final Type type;
 
-    private IndexTarget(ColumnIdentifier column, Type type)
+    public IndexTarget(ColumnIdentifier column, Type type)
     {
         this.column = column;
         this.type = type;
+
+        // if the column name contains anything other than lower case alphanumerics
+        // or underscores, then it must be quoted when included in the target string
+        quoteName = !COLUMN_IDENTIFIER_PATTERN.matcher(column.toString()).matches();
+    }
+
+    public String asCqlString(CFMetaData cfm)
+    {
+        if (! cfm.getColumnDefinition(column).type.isCollection())
+            return maybeEscapeQuotedName(column.toString());
+
+        return String.format("%s(%s)", type.toString(), maybeEscapeQuotedName(column.toString()));
+    }
+
+    // Quoted column names may themselves contain quotes, these need
+    // to be escaped with a preceding quote when written out as cql.
+    // Of course, the escaped name also needs to be wrapped in quotes.
+    private String maybeEscapeQuotedName(String name)
+    {
+        return quoteName ? '\"' + name.replace("\"", "\"\"") + '\"' : name;
     }
 
     public static class Raw
@@ -63,6 +85,11 @@ public class IndexTarget
             this.type = type;
         }
 
+        public static Raw simpleIndexOn(ColumnIdentifier.Raw c)
+        {
+            return new Raw(c, Type.SIMPLE);
+        }
+
         public static Raw valuesOf(ColumnIdentifier.Raw c)
         {
             return new Raw(c, Type.VALUES);
@@ -85,13 +112,24 @@ public class IndexTarget
 
         public IndexTarget prepare(CFMetaData cfm)
         {
-            return new IndexTarget(column.prepare(cfm), type);
+            // Until we've prepared the target column, we can't be certain about the target type
+            // because (for backwards compatibility) an index on a collection's values uses the
+            // same syntax as an index on a regular column (i.e. the 'values' in
+            // 'CREATE INDEX on table(values(collection));' is optional). So we correct the target type
+            // when the target column is a collection & the target type is SIMPLE.
+            ColumnIdentifier colId = column.prepare(cfm);
+            ColumnDefinition columnDef = cfm.getColumnDefinition(colId);
+            if (columnDef == null)
+                throw new InvalidRequestException("No column definition found for column " + colId);
+
+            Type actualType = (type == Type.SIMPLE && columnDef.type.isCollection()) ? Type.VALUES : type;
+            return new IndexTarget(colId, actualType);
         }
     }
 
     public static enum Type
     {
-        VALUES, KEYS, KEYS_AND_VALUES, FULL;
+        VALUES, KEYS, KEYS_AND_VALUES, FULL, SIMPLE;
 
         public String toString()
         {
@@ -100,44 +138,26 @@ public class IndexTarget
                 case KEYS: return "keys";
                 case KEYS_AND_VALUES: return "entries";
                 case FULL: return "full";
-                default: return "values";
-            }
-        }
-
-        public String indexOption()
-        {
-            switch (this)
-            {
-                case KEYS: return INDEX_KEYS_OPTION_NAME;
-                case KEYS_AND_VALUES: return INDEX_ENTRIES_OPTION_NAME;
-                case VALUES: return INDEX_VALUES_OPTION_NAME;
-                default: throw new AssertionError();
+                case VALUES: return "values";
+                case SIMPLE: return "";
+                default: return "";
             }
         }
 
-        public static Type fromIndexMetadata(IndexMetadata index, CFMetaData cfm)
+        public static Type fromString(String s)
         {
-            Map<String, String> options = index.options;
-            if (options.containsKey(INDEX_KEYS_OPTION_NAME))
-            {
+            if ("".equals(s))
+                return SIMPLE;
+            else if ("values".equals(s))
+                return VALUES;
+            else if ("keys".equals(s))
                 return KEYS;
-            }
-            else if (options.containsKey(INDEX_ENTRIES_OPTION_NAME))
-            {
+            else if ("entries".equals(s))
                 return KEYS_AND_VALUES;
-            }
-            else
-            {
-                ColumnDefinition cd = index.indexedColumn(cfm);
-                if (cd.type.isCollection() && !cd.type.isMultiCell())
-                {
-                    return FULL;
-                }
-                else
-                {
-                    return VALUES;
-                }
-            }
+            else if ("full".equals(s))
+                return FULL;
+
+            throw new AssertionError("Unrecognized index target type " + s);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/src/java/org/apache/cassandra/index/Index.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/Index.java b/src/java/org/apache/cassandra/index/Index.java
index 5dc44a4..f07baad 100644
--- a/src/java/org/apache/cassandra/index/Index.java
+++ b/src/java/org/apache/cassandra/index/Index.java
@@ -202,6 +202,19 @@ public interface Index
      */
     public boolean indexes(PartitionColumns columns);
 
+    /**
+     * Called to determine whether this index targets a specific column.
+     * Used during schema operations such as when dropping or renaming a column, to check if
+     * the index will be affected by the change. Typically, if an index answers that it does
+     * depend upon a column, then schema operations on that column are not permitted until the index
+     * is dropped or altered.
+     *
+     * @param column the column definition to check
+     * @return true if the index depends on the supplied column being present; false if the column may be
+     *              safely dropped or modified without adversely affecting the index
+     */
+    public boolean dependsOn(ColumnDefinition column);
+
     // TODO : this will change when we decouple indexes from specific columns for real per-row indexes
     /**
      * Called to determine whether this index can provide a searcher to execute a query on the

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index ff4567b..1af2f6e 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -31,12 +31,10 @@ import com.google.common.collect.Maps;
 import com.google.common.primitives.Longs;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.MoreExecutors;
-
 import org.apache.commons.lang3.StringUtils;
-
-import org.apache.cassandra.db.Directories;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.concurrent.StageManager;
@@ -188,6 +186,21 @@ public class SecondaryIndexManager implements IndexRegistry
         }
     }
 
+
+    public Set<IndexMetadata> getDependentIndexes(ColumnDefinition column)
+    {
+        if (indexes.isEmpty())
+            return Collections.emptySet();
+
+        Set<IndexMetadata> dependentIndexes = new HashSet<>();
+        for (Index index : indexes.values())
+            if (index.dependsOn(column))
+                dependentIndexes.add(index.getIndexMetadata());
+
+        return dependentIndexes;
+    }
+
+
     /**
      * Called when dropping a Table
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
index 89c072c..d10af1f 100644
--- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
@@ -5,14 +5,18 @@ import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
 import java.util.function.BiFunction;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.cql3.statements.IndexTarget;
 import org.apache.cassandra.db.*;
@@ -38,6 +42,7 @@ import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.concurrent.Refs;
 
@@ -49,13 +54,15 @@ public abstract class CassandraIndex implements Index
 {
     private static final Logger logger = LoggerFactory.getLogger(CassandraIndex.class);
 
+    public static final Pattern TARGET_REGEX = Pattern.compile("^(keys|entries|values|full)\\((.+)\\)$");
+
     public final ColumnFamilyStore baseCfs;
     protected IndexMetadata metadata;
     protected ColumnFamilyStore indexCfs;
     protected ColumnDefinition indexedColumn;
     protected CassandraIndexFunctions functions;
 
-    public CassandraIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
+    protected CassandraIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
     {
         this.baseCfs = baseCfs;
         setMetadata(indexDef);
@@ -69,7 +76,7 @@ public abstract class CassandraIndex implements Index
      */
     protected boolean supportsOperator(ColumnDefinition indexedColumn, Operator operator)
     {
-        return operator.equals(Operator.EQ);
+        return operator == Operator.EQ;
     }
 
     /**
@@ -77,7 +84,6 @@ public abstract class CassandraIndex implements Index
      * The clustering columns in the index table encode the values required to retrieve the correct data from the base
      * table and varies depending on the kind of the indexed column. See indexCfsMetadata for more details
      * Used whenever a row in the index table is written or deleted.
-     * @param metadata
      * @param partitionKey from the base data being indexed
      * @param prefix from the base data being indexed
      * @param path from the base data being indexed
@@ -90,7 +96,6 @@ public abstract class CassandraIndex implements Index
     /**
      * Used at search time to convert a row in the index table into a simple struct containing the values required
      * to retrieve the corresponding row from the base table.
-     * @param metadata
      * @param indexedValue the partition key of the indexed table (i.e. the value that was indexed)
      * @param indexEntry a row from the index table
      * @return
@@ -102,7 +107,6 @@ public abstract class CassandraIndex implements Index
      * Check whether a value retrieved from an index is still valid by comparing it to current row from the base table.
      * Used at read time to identify out of date index entries so that they can be excluded from search results and
      * repaired
-     * @param metadata required to get the indexed column definition
      * @param row the current row from the primary data table
      * @param indexValue the value we retrieved from the index
      * @param nowInSec
@@ -112,7 +116,6 @@ public abstract class CassandraIndex implements Index
 
     /**
      * Extract the value to be inserted into the index from the components of the base data
-     * @param metadata
      * @param partitionKey from the primary data
      * @param clustering from the primary data
      * @param path from the primary data
@@ -197,14 +200,14 @@ public abstract class CassandraIndex implements Index
     private void setMetadata(IndexMetadata indexDef)
     {
         metadata = indexDef;
-        functions = getFunctions(baseCfs.metadata, indexDef);
+        Pair<ColumnDefinition, IndexTarget.Type> target = parseTarget(baseCfs.metadata, indexDef);
+        functions = getFunctions(indexDef, target);
         CFMetaData cfm = indexCfsMetadata(baseCfs.metadata, indexDef);
         indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.keyspace,
                                                              cfm.cfName,
                                                              cfm,
                                                              baseCfs.getTracker().loadsstables);
-        assert indexDef.columns.size() == 1 : "Build in indexes on multiple target columns are not supported";
-        indexedColumn = indexDef.indexedColumn(baseCfs.metadata);
+        indexedColumn = target.left;
     }
 
     public Callable<?> getTruncateTask(final long truncatedAt)
@@ -227,6 +230,11 @@ public abstract class CassandraIndex implements Index
         return isPrimaryKeyIndex() || columns.contains(indexedColumn);
     }
 
+    public boolean dependsOn(ColumnDefinition column)
+    {
+        return indexedColumn.name.equals(column.name);
+    }
+
     public boolean supportsExpression(ColumnDefinition column, Operator operator)
     {
         return indexedColumn.name.equals(column.name)
@@ -269,7 +277,7 @@ public abstract class CassandraIndex implements Index
         if (target.isPresent())
         {
             target.get().validateForIndexing();
-            switch (getIndexMetadata().indexType)
+            switch (getIndexMetadata().kind)
             {
                 case COMPOSITES:
                     return new CompositesSearcher(command, target.get(), this);
@@ -277,7 +285,7 @@ public abstract class CassandraIndex implements Index
                     return new KeysSearcher(command, target.get(), this);
                 default:
                     throw new IllegalStateException(String.format("Unsupported index type %s for index %s on %s",
-                                                                  metadata.indexType,
+                                                                  metadata.kind,
                                                                   metadata.name,
                                                                   indexedColumn.name.toString()));
             }
@@ -531,7 +539,7 @@ public abstract class CassandraIndex implements Index
     private void validatePartitionKey(DecoratedKey partitionKey) throws InvalidRequestException
     {
         assert indexedColumn.isPartitionKey();
-        validateIndexedValue(getIndexedValue(partitionKey.getKey(), null, null ));
+        validateIndexedValue(getIndexedValue(partitionKey.getKey(), null, null));
     }
 
     private void validateClusterings(PartitionUpdate update) throws InvalidRequestException
@@ -613,7 +621,7 @@ public abstract class CassandraIndex implements Index
         Collection<ColumnFamilyStore> cfss = Collections.singleton(indexCfs);
         CompactionManager.instance.interruptCompactionForCFs(cfss, true);
         CompactionManager.instance.waitForCessation(cfss);
-        indexCfs.keyspace.writeOrder.awaitNewBarrier();
+        Keyspace.writeOrder.awaitNewBarrier();
         indexCfs.forceBlockingFlush();
         indexCfs.readOrdering.awaitNewBarrier();
         indexCfs.invalidate();
@@ -695,8 +703,9 @@ public abstract class CassandraIndex implements Index
      */
     public static final CFMetaData indexCfsMetadata(CFMetaData baseCfsMetadata, IndexMetadata indexMetadata)
     {
-        CassandraIndexFunctions utils = getFunctions(baseCfsMetadata, indexMetadata);
-        ColumnDefinition indexedColumn = indexMetadata.indexedColumn(baseCfsMetadata);
+        Pair<ColumnDefinition, IndexTarget.Type> target = parseTarget(baseCfsMetadata, indexMetadata);
+        CassandraIndexFunctions utils = getFunctions(indexMetadata, target);
+        ColumnDefinition indexedColumn = target.left;
         AbstractType<?> indexedValueType = utils.getIndexedValueType(indexedColumn);
         CFMetaData.Builder builder = CFMetaData.Builder.create(baseCfsMetadata.ksName,
                                                                baseCfsMetadata.indexColumnFamilyName(indexMetadata))
@@ -715,17 +724,65 @@ public abstract class CassandraIndex implements Index
      * @param indexMetadata
      * @return
      */
-    public static final CassandraIndex newIndex(ColumnFamilyStore baseCfs, IndexMetadata indexMetadata)
+    public static CassandraIndex newIndex(ColumnFamilyStore baseCfs, IndexMetadata indexMetadata)
+    {
+        return getFunctions(indexMetadata, parseTarget(baseCfs.metadata, indexMetadata)).newIndexInstance(baseCfs, indexMetadata);
+    }
+
+    // Public because it's also used to convert index metadata into a thrift-compatible format
+    public static Pair<ColumnDefinition, IndexTarget.Type> parseTarget(CFMetaData cfm,
+                                                                       IndexMetadata indexDef)
     {
-        return getFunctions(baseCfs.metadata, indexMetadata).newIndexInstance(baseCfs, indexMetadata);
+        String target = indexDef.options.get("target");
+        assert target != null : String.format("No target definition found for index %s", indexDef.name);
+
+        // if the regex matches then the target is in the form "keys(foo)", "entries(bar)" etc
+        // if not, then it must be a simple column name and implictly its type is VALUES
+        Matcher matcher = TARGET_REGEX.matcher(target);
+        String columnName;
+        IndexTarget.Type targetType;
+        if (matcher.matches())
+        {
+            targetType = IndexTarget.Type.fromString(matcher.group(1));
+            columnName = matcher.group(2);
+        }
+        else
+        {
+            columnName = target;
+            targetType = IndexTarget.Type.VALUES;
+        }
+
+        // in the case of a quoted column name the name in the target string
+        // will be enclosed in quotes, which we need to unwrap. It may also
+        // include quote characters internally, escaped like so:
+        //      abc"def -> abc""def.
+        // Because the target string is stored in a CQL compatible form, we
+        // need to un-escape any such quotes to get the actual column name
+        if (columnName.startsWith("\""))
+        {
+            columnName = StringUtils.substring(StringUtils.substring(columnName, 1), 0, -1);
+            columnName = columnName.replaceAll("\"\"", "\"");
+        }
+
+        // if it's not a CQL table, we can't assume that the column name is utf8, so
+        // in that case we have to do a linear scan of the cfm's columns to get the matching one
+        if (cfm.isCQLTable())
+            return Pair.create(cfm.getColumnDefinition(new ColumnIdentifier(columnName, true)), targetType);
+        else
+            for (ColumnDefinition column : cfm.allColumns())
+                if (column.name.toString().equals(columnName))
+                    return Pair.create(column, targetType);
+
+        throw new RuntimeException(String.format("Unable to parse targets for index %s (%s)", indexDef.name, target));
     }
 
-    private static CassandraIndexFunctions getFunctions(CFMetaData baseCfMetadata, IndexMetadata indexDef)
+    static CassandraIndexFunctions getFunctions(IndexMetadata indexDef,
+                                                Pair<ColumnDefinition, IndexTarget.Type> target)
     {
         if (indexDef.isKeys())
             return CassandraIndexFunctions.KEYS_INDEX_FUNCTIONS;
 
-        ColumnDefinition indexedColumn = indexDef.indexedColumn(baseCfMetadata);
+        ColumnDefinition indexedColumn = target.left;
         if (indexedColumn.type.isCollection() && indexedColumn.type.isMultiCell())
         {
             switch (((CollectionType)indexedColumn.type).kind)
@@ -735,12 +792,16 @@ public abstract class CassandraIndex implements Index
                 case SET:
                     return CassandraIndexFunctions.COLLECTION_KEY_INDEX_FUNCTIONS;
                 case MAP:
-                    if (indexDef.options.containsKey(IndexTarget.INDEX_KEYS_OPTION_NAME))
-                        return CassandraIndexFunctions.COLLECTION_KEY_INDEX_FUNCTIONS;
-                    else if (indexDef.options.containsKey(IndexTarget.INDEX_ENTRIES_OPTION_NAME))
-                        return CassandraIndexFunctions.COLLECTION_ENTRY_INDEX_FUNCTIONS;
-                    else
-                        return CassandraIndexFunctions.COLLECTION_VALUE_INDEX_FUNCTIONS;
+                    switch (target.right)
+                    {
+                        case KEYS:
+                            return CassandraIndexFunctions.COLLECTION_KEY_INDEX_FUNCTIONS;
+                        case KEYS_AND_VALUES:
+                            return CassandraIndexFunctions.COLLECTION_ENTRY_INDEX_FUNCTIONS;
+                        case VALUES:
+                            return CassandraIndexFunctions.COLLECTION_VALUE_INDEX_FUNCTIONS;
+                    }
+                    throw new AssertionError();
             }
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/src/java/org/apache/cassandra/schema/IndexMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/IndexMetadata.java b/src/java/org/apache/cassandra/schema/IndexMetadata.java
index 6846a14..8052e9e 100644
--- a/src/java/org/apache/cassandra/schema/IndexMetadata.java
+++ b/src/java/org/apache/cassandra/schema/IndexMetadata.java
@@ -20,11 +20,13 @@ package org.apache.cassandra.schema;
 
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
-import java.util.*;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.slf4j.Logger;
@@ -50,53 +52,78 @@ public final class IndexMetadata
 
     public static final Serializer serializer = new Serializer();
 
-    public enum IndexType
+    public enum Kind
     {
         KEYS, CUSTOM, COMPOSITES
     }
 
-    public enum TargetType
-    {
-        COLUMN, ROW
-    }
-
     // UUID for serialization. This is a deterministic UUID generated from the index name
     // Both the id and name are guaranteed unique per keyspace.
     public final UUID id;
     public final String name;
-    public final IndexType indexType;
-    public final TargetType targetType;
+    public final Kind kind;
     public final Map<String, String> options;
-    public final Set<ColumnIdentifier> columns;
 
     private IndexMetadata(String name,
                           Map<String, String> options,
-                          IndexType indexType,
-                          TargetType targetType,
-                          Set<ColumnIdentifier> columns)
+                          Kind kind)
     {
         this.id = UUID.nameUUIDFromBytes(name.getBytes());
         this.name = name;
         this.options = options == null ? ImmutableMap.of() : ImmutableMap.copyOf(options);
-        this.indexType = indexType;
-        this.targetType = targetType;
-        this.columns = columns == null ? ImmutableSet.of() : ImmutableSet.copyOf(columns);
+        this.kind = kind;
     }
 
-    public static IndexMetadata singleColumnIndex(ColumnIdentifier column,
-                                                  String name,
-                                                  IndexType type,
-                                                  Map<String, String> options)
+    public static IndexMetadata fromLegacyMetadata(CFMetaData cfm,
+                                                   ColumnDefinition column,
+                                                   String name,
+                                                   Kind kind,
+                                                   Map<String, String> options)
+    {
+        Map<String, String> newOptions = new HashMap<>();
+        if (options != null)
+            newOptions.putAll(options);
+
+        IndexTarget target;
+        if (newOptions.containsKey(IndexTarget.INDEX_KEYS_OPTION_NAME))
+        {
+            newOptions.remove(IndexTarget.INDEX_KEYS_OPTION_NAME);
+            target = new IndexTarget(column.name, IndexTarget.Type.KEYS);
+        }
+        else if (newOptions.containsKey(IndexTarget.INDEX_ENTRIES_OPTION_NAME))
+        {
+            newOptions.remove(IndexTarget.INDEX_KEYS_OPTION_NAME);
+            target = new IndexTarget(column.name, IndexTarget.Type.KEYS_AND_VALUES);
+        }
+        else
+        {
+            if (column.type.isCollection() && !column.type.isMultiCell())
+            {
+                target = new IndexTarget(column.name, IndexTarget.Type.FULL);
+            }
+            else
+            {
+                target = new IndexTarget(column.name, IndexTarget.Type.VALUES);
+            }
+        }
+        newOptions.put(IndexTarget.TARGET_OPTION_NAME, target.asCqlString(cfm));
+        return new IndexMetadata(name, newOptions, kind);
+    }
+
+    public static IndexMetadata fromSchemaMetadata(String name, Kind kind, Map<String, String> options)
     {
-        return new IndexMetadata(name, options, type, TargetType.COLUMN, Collections.singleton(column));
+        return new IndexMetadata(name, options, kind);
     }
 
-    public static IndexMetadata singleColumnIndex(ColumnDefinition column,
+    public static IndexMetadata singleTargetIndex(CFMetaData cfm,
+                                                  IndexTarget target,
                                                   String name,
-                                                  IndexType type,
+                                                  Kind kind,
                                                   Map<String, String> options)
     {
-        return singleColumnIndex(column.name, name, type, options);
+        Map<String, String> newOptions = new HashMap<>(options);
+        newOptions.put(IndexTarget.TARGET_OPTION_NAME, target.asCqlString(cfm));
+        return new IndexMetadata(name, newOptions, kind);
     }
 
     public static boolean isNameValid(String name)
@@ -115,13 +142,10 @@ public final class IndexMetadata
         if (!isNameValid(name))
             throw new ConfigurationException("Illegal index name " + name);
 
-        if (indexType == null)
-            throw new ConfigurationException("Index type is null for index " + name);
+        if (kind == null)
+            throw new ConfigurationException("Index kind is null for index " + name);
 
-        if (targetType == null)
-            throw new ConfigurationException("Target type is null for index " + name);
-
-        if (indexType == IndexMetadata.IndexType.CUSTOM)
+        if (kind == Kind.CUSTOM)
         {
             if (options == null || !options.containsKey(IndexTarget.CUSTOM_INDEX_OPTION_NAME))
                 throw new ConfigurationException(String.format("Required option missing for index %s : %s",
@@ -169,47 +193,29 @@ public final class IndexMetadata
         }
     }
 
-    // to be removed in CASSANDRA-10124 with multi-target & row based indexes
-    public ColumnDefinition indexedColumn(CFMetaData cfm)
-    {
-       return cfm.getColumnDefinition(columns.iterator().next());
-    }
-
     public boolean isCustom()
     {
-        return indexType == IndexType.CUSTOM;
+        return kind == Kind.CUSTOM;
     }
 
     public boolean isKeys()
     {
-        return indexType == IndexType.KEYS;
+        return kind == Kind.KEYS;
     }
 
     public boolean isComposites()
     {
-        return indexType == IndexType.COMPOSITES;
-    }
-
-    public boolean isRowIndex()
-    {
-        return targetType == TargetType.ROW;
-    }
-
-    public boolean isColumnIndex()
-    {
-        return targetType == TargetType.COLUMN;
+        return kind == Kind.COMPOSITES;
     }
 
     public int hashCode()
     {
-        return Objects.hashCode(id, name, indexType, targetType, options, columns);
+        return Objects.hashCode(id, name, kind, options);
     }
 
     public boolean equalsWithoutName(IndexMetadata other)
     {
-        return Objects.equal(indexType, other.indexType)
-            && Objects.equal(targetType, other.targetType)
-            && Objects.equal(columns, other.columns)
+        return Objects.equal(kind, other.kind)
             && Objects.equal(options, other.options);
     }
 
@@ -231,9 +237,7 @@ public final class IndexMetadata
         return new ToStringBuilder(this)
             .append("id", id.toString())
             .append("name", name)
-            .append("indexType", indexType)
-            .append("targetType", targetType)
-            .append("columns", columns)
+            .append("kind", kind)
             .append("options", options)
             .build();
     }
@@ -255,6 +259,5 @@ public final class IndexMetadata
         {
             return UUIDSerializer.serializer.serializedSize(metadata.id, version);
         }
-
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/src/java/org/apache/cassandra/schema/Indexes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Indexes.java b/src/java/org/apache/cassandra/schema/Indexes.java
index 9114f63..422a94c 100644
--- a/src/java/org/apache/cassandra/schema/Indexes.java
+++ b/src/java/org/apache/cassandra/schema/Indexes.java
@@ -21,9 +21,7 @@ package org.apache.cassandra.schema;
 import java.util.*;
 
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableMultimap;
 
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 
@@ -42,13 +40,11 @@ public class Indexes implements Iterable<IndexMetadata>
 {
     private final ImmutableMap<String, IndexMetadata> indexesByName;
     private final ImmutableMap<UUID, IndexMetadata> indexesById;
-    private final ImmutableMultimap<ColumnIdentifier, IndexMetadata> indexesByColumn;
 
     private Indexes(Builder builder)
     {
         indexesByName = builder.indexesByName.build();
         indexesById = builder.indexesById.build();
-        indexesByColumn = builder.indexesByColumn.build();
     }
 
     public static Builder builder()
@@ -121,28 +117,6 @@ public class Indexes implements Iterable<IndexMetadata>
     }
 
     /**
-     * Get the index associated with the specified column. This may be removed or modified as support is added
-     * for indexes with multiple target columns and with TargetType.ROW
-     *
-     * @param column a column definition for which an {@link IndexMetadata} is being sought
-     * @return an empty {@link Optional} if the named index is not found; a non-empty optional of {@link IndexMetadata} otherwise
-     */
-    public Collection<IndexMetadata> get(ColumnDefinition column)
-    {
-        return indexesByColumn.get(column.name);
-    }
-
-    /**
-     * Answer true if an index is associated with the specified column.
-     * @param column
-     * @return
-     */
-    public boolean hasIndexFor(ColumnDefinition column)
-    {
-        return !indexesByColumn.get(column.name).isEmpty();
-    }
-
-    /**
      * Create a SecondaryIndexes instance with the provided index added
      */
     public Indexes with(IndexMetadata index)
@@ -206,7 +180,6 @@ public class Indexes implements Iterable<IndexMetadata>
     {
         final ImmutableMap.Builder<String, IndexMetadata> indexesByName = new ImmutableMap.Builder<>();
         final ImmutableMap.Builder<UUID, IndexMetadata> indexesById = new ImmutableMap.Builder<>();
-        final ImmutableMultimap.Builder<ColumnIdentifier, IndexMetadata> indexesByColumn = new ImmutableMultimap.Builder<>();
 
         private Builder()
         {
@@ -221,13 +194,6 @@ public class Indexes implements Iterable<IndexMetadata>
         {
             indexesByName.put(index.name, index);
             indexesById.put(index.id, index);
-            // All indexes are column indexes at the moment
-            if (index.isColumnIndex())
-            {
-                for (ColumnIdentifier target : index.columns)
-                    indexesByColumn.put(target, index);
-
-            }
             return this;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
index 1674de8..0d5a040 100644
--- a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
+++ b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
@@ -296,15 +296,6 @@ public final class LegacySchemaMigrator
                                                                         isStaticCompactTable,
                                                                         needsUpgrade);
 
-        Indexes indexes = createIndexesFromColumnRows(columnRows,
-                                                      ksName,
-                                                      cfName,
-                                                      rawComparator,
-                                                      subComparator,
-                                                      isSuper,
-                                                      isCQLTable,
-                                                      isStaticCompactTable,
-                                                      needsUpgrade);
 
         if (needsUpgrade)
         {
@@ -328,6 +319,17 @@ public final class LegacySchemaMigrator
                                            false, // legacy schema did not contain views
                                            columnDefs,
                                            DatabaseDescriptor.getPartitioner());
+
+        Indexes indexes = createIndexesFromColumnRows(cfm,
+                                                      columnRows,
+                                                      ksName,
+                                                      cfName,
+                                                      rawComparator,
+                                                      subComparator,
+                                                      isSuper,
+                                                      isCQLTable,
+                                                      isStaticCompactTable,
+                                                      needsUpgrade);
         cfm.indexes(indexes);
 
         if (tableRow.has("dropped_columns"))
@@ -591,7 +593,8 @@ public final class LegacySchemaMigrator
         return new ColumnDefinition(keyspace, table, name, validator, componentIndex, kind);
     }
 
-    private static Indexes createIndexesFromColumnRows(UntypedResultSet rows,
+    private static Indexes createIndexesFromColumnRows(CFMetaData cfm,
+                                                       UntypedResultSet rows,
                                                        String keyspace,
                                                        String table,
                                                        AbstractType<?> rawComparator,
@@ -605,11 +608,11 @@ public final class LegacySchemaMigrator
 
         for (UntypedResultSet.Row row : rows)
         {
-            IndexMetadata.IndexType indexType = null;
+            IndexMetadata.Kind kind = null;
             if (row.has("index_type"))
-                indexType = IndexMetadata.IndexType.valueOf(row.getString("index_type"));
+                kind = IndexMetadata.Kind.valueOf(row.getString("index_type"));
 
-            if (indexType == null)
+            if (kind == null)
                 continue;
 
             Map<String, String> indexOptions = null;
@@ -630,7 +633,7 @@ public final class LegacySchemaMigrator
                                                                 isStaticCompactTable,
                                                                 needsUpgrade);
 
-            indexes.add(IndexMetadata.singleColumnIndex(column, indexName, indexType, indexOptions));
+            indexes.add(IndexMetadata.fromLegacyMetadata(cfm, column, indexName, kind, indexOptions));
         }
 
         return indexes.build();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index c922612..d8992bd 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -177,10 +177,8 @@ public final class SchemaKeyspace
                 + "keyspace_name text,"
                 + "table_name text,"
                 + "index_name text,"
-                + "index_type text,"
+                + "kind text,"
                 + "options frozen<map<text, text>>,"
-                + "target_columns frozen<set<text>>,"
-                + "target_type text,"
                 + "PRIMARY KEY ((keyspace_name), table_name, index_name))");
 
     private static final CFMetaData Types =
@@ -1449,10 +1447,8 @@ public final class SchemaKeyspace
     {
         RowUpdateBuilder builder = new RowUpdateBuilder(Indexes, timestamp, mutation).clustering(table.cfName, index.name);
 
-        builder.add("index_type", index.indexType.toString());
+        builder.add("kind", index.kind.toString());
         builder.frozenMap("options", index.options);
-        builder.frozenSet("target_columns", index.columns.stream().map(ColumnIdentifier::toString).collect(Collectors.toSet()));
-        builder.add("target_type", index.targetType.toString());
         builder.build();
     }
 
@@ -1481,46 +1477,16 @@ public final class SchemaKeyspace
     {
         Indexes.Builder indexes = org.apache.cassandra.schema.Indexes.builder();
         String query = String.format("SELECT * FROM %s.%s", NAME, INDEXES);
-        QueryProcessor.resultify(query, partition).forEach(row -> indexes.add(createIndexMetadataFromIndexesRow(cfm, row)));
+        QueryProcessor.resultify(query, partition).forEach(row -> indexes.add(createIndexMetadataFromIndexesRow(row)));
         return indexes.build();
     }
 
-    private static IndexMetadata createIndexMetadataFromIndexesRow(CFMetaData cfm, UntypedResultSet.Row row)
+    private static IndexMetadata createIndexMetadataFromIndexesRow(UntypedResultSet.Row row)
     {
         String name = row.getString("index_name");
-        IndexMetadata.IndexType type = IndexMetadata.IndexType.valueOf(row.getString("index_type"));
-        IndexMetadata.TargetType targetType = IndexMetadata.TargetType.valueOf(row.getString("target_type"));
+        IndexMetadata.Kind type = IndexMetadata.Kind.valueOf(row.getString("kind"));
         Map<String, String> options = row.getFrozenTextMap("options");
-        if (options == null)
-            options = Collections.emptyMap();
-
-        Set<String> targetColumnNames = row.getFrozenSet("target_columns", UTF8Type.instance);
-        assert targetType == IndexMetadata.TargetType.COLUMN : "Per row indexes with dynamic target columns are not supported yet";
-
-        Set<ColumnIdentifier> targetColumns = new HashSet<>();
-        // if it's not a CQL table, we can't assume that the column name is utf8, so
-        // in that case we have to do a linear scan of the cfm's columns to get the matching one
-        if (targetColumnNames != null)
-        {
-            assert targetColumnNames.size() == 1 : "Secondary indexes targetting multiple columns are not supported yet";
-            targetColumnNames.forEach(targetColumnName -> {
-                if (cfm.isCQLTable())
-                    targetColumns.add(ColumnIdentifier.getInterned(targetColumnName, true));
-                else
-                    findColumnIdentifierWithName(targetColumnName, cfm.allColumns()).ifPresent(targetColumns::add);
-            });
-        }
-        return IndexMetadata.singleColumnIndex(targetColumns.iterator().next(), name, type, options);
-    }
-
-    private static Optional<ColumnIdentifier> findColumnIdentifierWithName(String name,
-                                                                           Iterable<ColumnDefinition> columns)
-    {
-        for (ColumnDefinition column : columns)
-            if (column.name.toString().equals(name))
-                return Optional.of(column.name);
-
-        return Optional.empty();
+        return IndexMetadata.fromSchemaMetadata(name, type, options);
     }
 
     /*

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftConversion.java b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
index b721226..80b6447 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftConversion.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.thrift;
 
 import java.util.*;
+import java.util.regex.Matcher;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
@@ -26,6 +27,7 @@ import com.google.common.collect.Maps;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.cql3.statements.IndexTarget;
 import org.apache.cassandra.db.CompactTables;
 import org.apache.cassandra.db.LegacyLayout;
 import org.apache.cassandra.db.WriteType;
@@ -33,6 +35,7 @@ import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
 import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.index.internal.CassandraIndex;
 import org.apache.cassandra.io.compress.ICompressor;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.LocalStrategy;
@@ -287,7 +290,8 @@ public class ThriftConversion
                                                    DatabaseDescriptor.getPartitioner());
 
             // Convert any secondary indexes defined in the thrift column_metadata
-            newCFMD.indexes(indexDefsFromThrift(cf_def.keyspace,
+            newCFMD.indexes(indexDefsFromThrift(newCFMD,
+                                                cf_def.keyspace,
                                                 cf_def.name,
                                                 rawComparator,
                                                 subComparator,
@@ -526,7 +530,8 @@ public class ThriftConversion
         return defs;
     }
 
-    private static Indexes indexDefsFromThrift(String ksName,
+    private static Indexes indexDefsFromThrift(CFMetaData cfm,
+                                               String ksName,
                                                String cfName,
                                                AbstractType<?> thriftComparator,
                                                AbstractType<?> thriftSubComparator,
@@ -554,12 +559,12 @@ public class ThriftConversion
                 indexNames.add(indexName);
 
                 Map<String, String> indexOptions = def.getIndex_options();
-                IndexMetadata.IndexType indexType = IndexMetadata.IndexType.valueOf(def.index_type.name());
+                if (indexOptions != null && indexOptions.containsKey(IndexTarget.TARGET_OPTION_NAME))
+                        throw new ConfigurationException("Reserved index option 'target' cannot be used");
 
-                indexes.add(IndexMetadata.singleColumnIndex(column,
-                                                            indexName,
-                                                            indexType,
-                                                            indexOptions));
+                IndexMetadata.Kind kind = IndexMetadata.Kind.valueOf(def.index_type.name());
+
+                indexes.add(IndexMetadata.fromLegacyMetadata(cfm, column, indexName, kind, indexOptions));
             }
         }
         return indexes.build();
@@ -572,22 +577,44 @@ public class ThriftConversion
 
         cd.setName(ByteBufferUtil.clone(column.name.bytes));
         cd.setValidation_class(column.type.toString());
-        Collection<IndexMetadata> indexes = cfMetaData.getIndexes().get(column);
-        // we include the index in the ColumnDef iff
-        //   * it is the only index on the column
-        //   * it is the only target column for the index
-        if (indexes.size() == 1)
+
+        // we include the index in the ColumnDef iff its targets are compatible with
+        // pre-3.0 indexes AND it is the only index defined on the given column, that is:
+        //   * it is the only index on the column (i.e. with this column as its target)
+        //   * it has only a single target, which matches the pattern for pre-3.0 indexes
+        //     i.e. keys/values/entries/full, with exactly 1 argument that matches the
+        //     column name OR a simple column name (for indexes on non-collection columns)
+        // n.b. it's a guess that using a pre-compiled regex and checking the group is
+        // cheaper than compiling a new regex for each column, but as this isn't on
+        // any hot path this hasn't been verified yet.
+        IndexMetadata matchedIndex = null;
+        for (IndexMetadata index : cfMetaData.getIndexes())
         {
-            IndexMetadata index = indexes.iterator().next();
-            if (index.columns.size() == 1)
+            String target = index.options.get(IndexTarget.TARGET_OPTION_NAME);
+            Matcher m = CassandraIndex.TARGET_REGEX.matcher(target);
+            if (target.equals(column.name.toString()) ||
+                (m.matches() && m.group(2).equals(column.name.toString())))
             {
-                cd.setIndex_type(org.apache.cassandra.thrift.IndexType.valueOf(index.indexType.name()));
-                cd.setIndex_name(index.name);
-                cd.setIndex_options(index.options == null || index.options.isEmpty()
-                                    ? null
-                                    : Maps.newHashMap(index.options));
+                // we already found an index for this column, we've no option but to
+                // ignore both of them (and any others we've yet to find)
+                if (matchedIndex != null)
+                    return cd;
+
+                matchedIndex = index;
             }
         }
+
+        if (matchedIndex != null)
+        {
+            cd.setIndex_type(org.apache.cassandra.thrift.IndexType.valueOf(matchedIndex.kind.name()));
+            cd.setIndex_name(matchedIndex.name);
+            Map<String, String> filteredOptions = Maps.filterKeys(matchedIndex.options,
+                                                                  s -> !IndexTarget.TARGET_OPTION_NAME.equals(s));
+            cd.setIndex_options(filteredOptions.isEmpty()
+                                ? null
+                                : Maps.newHashMap(matchedIndex.options));
+        }
+
         return cd;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java
index 72bfd00..c4b99c6 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -299,9 +299,10 @@ public class SchemaLoader
 
         cfm.indexes(
             cfm.getIndexes()
-               .with(IndexMetadata.singleColumnIndex(indexedColumn,
+               .with(IndexMetadata.singleTargetIndex(cfm,
+                                                     new IndexTarget(indexedColumn.name, IndexTarget.Type.VALUES),
                                                      "indexe1",
-                                                     IndexMetadata.IndexType.CUSTOM,
+                                                     IndexMetadata.Kind.CUSTOM,
                                                      indexOptions)));
         return cfm;
     }
@@ -410,9 +411,11 @@ public class SchemaLoader
         if (withIndex)
             cfm.indexes(
                 cfm.getIndexes()
-                   .with(IndexMetadata.singleColumnIndex(cfm.getColumnDefinition(new ColumnIdentifier("birthdate", true)),
+                   .with(IndexMetadata.singleTargetIndex(cfm,
+                                                         new IndexTarget(new ColumnIdentifier("birthdate", true),
+                                                                         IndexTarget.Type.VALUES),
                                                          "birthdate_key_index",
-                                                         IndexMetadata.IndexType.COMPOSITES,
+                                                         IndexMetadata.Kind.COMPOSITES,
                                                          Collections.EMPTY_MAP)));
 
         return cfm.compression(getCompressionParameters());
@@ -430,9 +433,11 @@ public class SchemaLoader
         if (withIndex)
             cfm.indexes(
                 cfm.getIndexes()
-                   .with(IndexMetadata.singleColumnIndex(cfm.getColumnDefinition(new ColumnIdentifier("birthdate", true)),
+                   .with(IndexMetadata.singleTargetIndex(cfm,
+                                                         new IndexTarget(new ColumnIdentifier("birthdate", true),
+                                                                         IndexTarget.Type.VALUES),
                                                          "birthdate_composite_index",
-                                                         IndexMetadata.IndexType.KEYS,
+                                                         IndexMetadata.Kind.KEYS,
                                                          Collections.EMPTY_MAP)));
 
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/test/unit/org/apache/cassandra/cql3/validation/entities/FrozenCollectionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/FrozenCollectionsTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/FrozenCollectionsTest.java
index e07e421..70f7f19 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/FrozenCollectionsTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/FrozenCollectionsTest.java
@@ -601,8 +601,10 @@ public class FrozenCollectionsTest extends CQLTester
 
         // for now, we don't support indexing values or keys of collections in the primary key
         assertInvalidIndexCreationWithMessage("CREATE INDEX ON %s (full(a))", "Cannot create secondary index on partition key column");
-        assertInvalidIndexCreationWithMessage("CREATE INDEX ON %s (keys(a))", "Cannot create index on keys of frozen<map> column");
-        assertInvalidIndexCreationWithMessage("CREATE INDEX ON %s (keys(b))", "Cannot create index on keys of frozen<map> column");
+        assertInvalidIndexCreationWithMessage("CREATE INDEX ON %s (keys(a))", "Cannot create keys() index on frozen column a. " +
+                                                                              "Frozen collections only support full() indexes");
+        assertInvalidIndexCreationWithMessage("CREATE INDEX ON %s (keys(b))", "Cannot create keys() index on frozen column b. " +
+                                                                              "Frozen collections only support full() indexes");
 
         createTable("CREATE TABLE %s (a int, b frozen<list<int>>, c frozen<set<int>>, d frozen<map<int, text>>, PRIMARY KEY (a, b))");
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
index 02b2abd..48d3a85 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
@@ -27,15 +27,19 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.statements.IndexTarget;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
+import org.apache.cassandra.index.SecondaryIndexManager;
 import org.apache.cassandra.index.StubIndex;
+import org.apache.cassandra.schema.IndexMetadata;
 
 import static org.apache.cassandra.Util.throwAssert;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -209,7 +213,7 @@ public class SecondaryIndexTest extends CQLTester
         assertInvalidThrow(SyntaxException.class, String.format("CREATE TABLE %s (key varchar PRIMARY KEY, password varchar, gender varchar) WITH compression_parameters:sstable_compressor = 'DeflateCompressor'", tableName));
 
         assertInvalidThrow(ConfigurationException.class, String.format("CREATE TABLE %s (key varchar PRIMARY KEY, password varchar, gender varchar) WITH compression = { 'sstable_compressor': 'DeflateCompressor' }",
-                                                                      tableName));
+                                                                       tableName));
     }
 
     /**
@@ -462,6 +466,67 @@ public class SecondaryIndexTest extends CQLTester
         assertRows(execute("select count(*) from %s where app_name='foo' and account='bar' and last_access > 4 allow filtering"), row(1L));
     }
 
+    @Test
+    public void testSyntaxVariationsForIndexOnCollectionsValue() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int, m map<int, int>, l list<int>, s set<int>, PRIMARY KEY (k))");
+        createAndDropCollectionValuesIndex("m");
+        createAndDropCollectionValuesIndex("l");
+        createAndDropCollectionValuesIndex("s");
+    }
+
+    private void createAndDropCollectionValuesIndex(String columnName) throws Throwable
+    {
+        String indexName = columnName + "_idx";
+        SecondaryIndexManager indexManager = getCurrentColumnFamilyStore().indexManager;
+        createIndex(String.format("CREATE INDEX %s on %%s(%s)", indexName, columnName));
+        IndexMetadata indexDef = indexManager.getIndexByName(indexName).getIndexMetadata();
+        assertEquals(String.format("values(%s)", columnName), indexDef.options.get(IndexTarget.TARGET_OPTION_NAME));
+        dropIndex(String.format("DROP INDEX %s.%s", KEYSPACE, indexName));
+        assertFalse(indexManager.hasIndexes());
+        createIndex(String.format("CREATE INDEX %s on %%s(values(%s))", indexName, columnName));
+        assertEquals(indexDef, indexManager.getIndexByName(indexName).getIndexMetadata());
+        dropIndex(String.format("DROP INDEX %s.%s", KEYSPACE, indexName));
+    }
+
+    @Test
+    public void testCreateIndexWithQuotedColumnNames() throws Throwable
+    {
+        createTable("CREATE TABLE %s (" +
+                    " k int," +
+                    " v int, " +
+                    " lower_case_map map<int, int>," +
+                    " \"MixedCaseMap\" map<int, int>," +
+                    " lower_case_frozen_list frozen<list<int>>," +
+                    " \"UPPER_CASE_FROZEN_LIST\" frozen<list<int>>," +
+                    " \"set name with spaces\" set<int>," +
+                    " \"column_name_with\"\"escaped quote\" int," +
+                    " PRIMARY KEY (k))");
+
+        createAndDropIndexWithQuotedColumnIdentifier("\"v\"");
+        createAndDropIndexWithQuotedColumnIdentifier("keys(\"lower_case_map\")");
+        createAndDropIndexWithQuotedColumnIdentifier("keys(\"MixedCaseMap\")");
+        createAndDropIndexWithQuotedColumnIdentifier("full(\"lower_case_frozen_list\")");
+        createAndDropIndexWithQuotedColumnIdentifier("full(\"UPPER_CASE_FROZEN_LIST\")");
+        createAndDropIndexWithQuotedColumnIdentifier("values(\"set name with spaces\")");
+        createAndDropIndexWithQuotedColumnIdentifier("\"column_name_with\"\"escaped quote\"");
+    }
+
+    private void createAndDropIndexWithQuotedColumnIdentifier(String target) throws Throwable
+    {
+        String indexName = "test_mixed_case_idx";
+        createIndex(String.format("CREATE INDEX %s ON %%s(%s)", indexName, target));
+        SecondaryIndexManager indexManager = getCurrentColumnFamilyStore().indexManager;
+        IndexMetadata indexDef = indexManager.getIndexByName(indexName).getIndexMetadata();
+        dropIndex(String.format("DROP INDEX %s.%s", KEYSPACE, indexName));
+        // verify we can re-create the index using the target string
+        createIndex(String.format("CREATE INDEX %s ON %%s(%s)",
+                                  indexName, indexDef.options.get(IndexTarget.TARGET_OPTION_NAME)));
+        assertEquals(indexDef, indexManager.getIndexByName(indexName).getIndexMetadata());
+        dropIndex(String.format("DROP INDEX %s.%s", KEYSPACE, indexName));
+    }
+
+
     /**
      * Test for CASSANDRA-5732, Can not query secondary index
      * migrated from cql_tests.py:TestCQL.bug_5732_test(),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/test/unit/org/apache/cassandra/db/CleanupTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CleanupTest.java b/test/unit/org/apache/cassandra/db/CleanupTest.java
index 7b03640..a4aca7f 100644
--- a/test/unit/org/apache/cassandra/db/CleanupTest.java
+++ b/test/unit/org/apache/cassandra/db/CleanupTest.java
@@ -114,9 +114,7 @@ public class CleanupTest
         assertEquals(LOOPS, Util.getAll(Util.cmd(cfs).build()).size());
 
         ColumnDefinition cdef = cfs.metadata.getColumnDefinition(COLUMN);
-        String indexName = cfs.metadata.getIndexes()
-                                       .get(cdef)
-                                       .iterator().next().name;
+        String indexName = "birthdate_key_index";
         long start = System.nanoTime();
         while (!cfs.getBuiltIndexes().contains(indexName) && System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10))
             Thread.sleep(10);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/DirectoriesTest.java b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
index 40093ea..6e51448 100644
--- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java
+++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.Config.DiskFailurePolicy;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.statements.IndexTarget;
 import org.apache.cassandra.db.Directories.DataDirectory;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.index.internal.CassandraIndex;
@@ -168,9 +169,10 @@ public class DirectoriesTest
                                   .addClusteringColumn("col", UTF8Type.instance)
                                   .build();
         ColumnDefinition col = PARENT_CFM.getColumnDefinition(ByteBufferUtil.bytes("col"));
-        IndexMetadata indexDef = IndexMetadata.singleColumnIndex(col,
+        IndexMetadata indexDef = IndexMetadata.singleTargetIndex(PARENT_CFM,
+                                                                 new IndexTarget(col.name, IndexTarget.Type.VALUES),
                                                                  "idx",
-                                                                 IndexMetadata.IndexType.KEYS,
+                                                                 IndexMetadata.Kind.KEYS,
                                                                  Collections.emptyMap());
         PARENT_CFM.indexes(PARENT_CFM.getIndexes().with(indexDef));
         CFMetaData INDEX_CFM = CassandraIndex.indexCfsMetadata(PARENT_CFM, indexDef);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fde97c3b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
index ffc43c5..e0fc68a 100644
--- a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
+++ b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
@@ -464,9 +464,10 @@ public class RangeTombstoneTest
         cfs.disableAutoCompaction();
 
         ColumnDefinition cd = cfs.metadata.getColumnDefinition(indexedColumnName).copy();
-        IndexMetadata indexDef = IndexMetadata.singleColumnIndex(cd,
+        IndexMetadata indexDef = IndexMetadata.singleTargetIndex(cfs.metadata,
+                                                                 new IndexTarget(cd.name, IndexTarget.Type.VALUES),
                                                                  "test_index",
-                                                                 IndexMetadata.IndexType.CUSTOM,
+                                                                 IndexMetadata.Kind.CUSTOM,
                                                                  ImmutableMap.of(IndexTarget.CUSTOM_INDEX_OPTION_NAME,
                                                                                  StubIndex.class.getName()));
 
@@ -560,9 +561,10 @@ public class RangeTombstoneTest
         cfs.disableAutoCompaction();
 
         ColumnDefinition cd = cfs.metadata.getColumnDefinition(indexedColumnName).copy();
-        IndexMetadata indexDef = IndexMetadata.singleColumnIndex(cd,
+        IndexMetadata indexDef = IndexMetadata.singleTargetIndex(cfs.metadata,
+                                                                 new IndexTarget(cd.name,IndexTarget.Type.VALUES),
                                                                  "test_index",
-                                                                 IndexMetadata.IndexType.CUSTOM,
+                                                                 IndexMetadata.Kind.CUSTOM,
                                                                  ImmutableMap.of(IndexTarget.CUSTOM_INDEX_OPTION_NAME,
                                                                                  StubIndex.class.getName()));
 
@@ -574,11 +576,7 @@ public class RangeTombstoneTest
         if (rebuild != null)
             rebuild.get();
 
-        StubIndex index = (StubIndex)cfs.indexManager.listIndexes()
-                                                     .stream()
-                                                     .filter(i -> "test_index".equals(i.getIndexName()))
-                                                     .findFirst()
-                                                     .orElseThrow(() -> new RuntimeException(new AssertionError("Index not found")));
+        StubIndex index = (StubIndex)cfs.indexManager.getIndexByName("test_index");
         index.reset();
 
         UpdateBuilder.create(cfs.metadata, key).withTimestamp(0).newRow(1).add("val", 1).applyUnsafe();