You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2015/09/18 12:09:24 UTC

[1/3] cassandra git commit: Support for indexes with 0, 1 or multiple targets

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 aeee9f74d -> df3972e94
  refs/heads/trunk b653411f0 -> 82dfc290d


Support for indexes with 0,1 or multiple targets

Patch by Sam Tunnicliffe; reviewed by Sylvain Lebresne for
CASSANDRA-10124


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/df3972e9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/df3972e9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/df3972e9

Branch: refs/heads/cassandra-3.0
Commit: df3972e9496599f26b76d73df9f4e14f6877c0ce
Parents: aeee9f7
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Fri Sep 11 17:11:17 2015 +0100
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Fri Sep 18 11:02:08 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 src/java/org/apache/cassandra/cql3/Cql.g        |  17 +-
 .../cql3/statements/CreateIndexStatement.java   | 113 +++---
 .../apache/cassandra/schema/IndexMetadata.java  |  29 +-
 .../org/apache/cassandra/schema/Indexes.java    |   5 +-
 .../cassandra/thrift/ThriftConversion.java      |   2 +-
 .../unit/org/apache/cassandra/SchemaLoader.java |  31 +-
 .../entities/FrozenCollectionsTest.java         |  20 +-
 .../apache/cassandra/db/DirectoriesTest.java    |  11 +-
 .../apache/cassandra/db/RangeTombstoneTest.java |  30 +-
 .../apache/cassandra/db/SecondaryIndexTest.java |  11 +-
 .../apache/cassandra/index/CustomIndexTest.java | 365 +++++++++++++++++++
 .../index/internal/CustomIndexTest.java         | 114 ------
 13 files changed, 518 insertions(+), 231 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/df3972e9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a1ba955..afe7dcb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.0-rc1
+ * Allow custom indexes with 0,1 or multiple target columns (CASSANDRA-10124)
  * Improve MV schema representation (CASSANDRA-9921)
  * Add flag to enable/disable coordinator batchlog for MV writes (CASSANDRA-10230)
  * Update cqlsh COPY for new internal driver serialization interface (CASSANDRA-10318)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/df3972e9/src/java/org/apache/cassandra/cql3/Cql.g
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Cql.g b/src/java/org/apache/cassandra/cql3/Cql.g
index 348c355..afef224 100644
--- a/src/java/org/apache/cassandra/cql3/Cql.g
+++ b/src/java/org/apache/cassandra/cql3/Cql.g
@@ -717,20 +717,21 @@ createIndexStatement returns [CreateIndexStatement expr]
         IndexPropDefs props = new IndexPropDefs();
         boolean ifNotExists = false;
         IndexName name = new IndexName();
+        List<IndexTarget.Raw> targets = new ArrayList<>();
     }
     : K_CREATE (K_CUSTOM { props.isCustom = true; })? K_INDEX (K_IF K_NOT K_EXISTS { ifNotExists = true; } )?
-        (idxName[name])? K_ON cf=columnFamilyName '(' id=indexIdent ')'
+        (idxName[name])? K_ON cf=columnFamilyName '(' (indexIdent[targets] (',' indexIdent[targets])*)? ')'
         (K_USING cls=STRING_LITERAL { props.customClass = $cls.text; })?
         (K_WITH properties[props])?
-      { $expr = new CreateIndexStatement(cf, name, id, props, ifNotExists); }
+      { $expr = new CreateIndexStatement(cf, name, targets, props, ifNotExists); }
     ;
 
-indexIdent returns [IndexTarget.Raw id]
-    : c=cident                   { $id = IndexTarget.Raw.simpleIndexOn(c); }
-    | K_VALUES '(' c=cident ')'  { $id = IndexTarget.Raw.valuesOf(c); }
-    | K_KEYS '(' c=cident ')'    { $id = IndexTarget.Raw.keysOf(c); }
-    | K_ENTRIES '(' c=cident ')' { $id = IndexTarget.Raw.keysAndValuesOf(c); }
-    | K_FULL '(' c=cident ')'    { $id = IndexTarget.Raw.fullCollection(c); }
+indexIdent [List<IndexTarget.Raw> targets]
+    : c=cident                   { $targets.add(IndexTarget.Raw.simpleIndexOn(c)); }
+    | K_VALUES '(' c=cident ')'  { $targets.add(IndexTarget.Raw.valuesOf(c)); }
+    | K_KEYS '(' c=cident ')'    { $targets.add(IndexTarget.Raw.keysOf(c)); }
+    | K_ENTRIES '(' c=cident ')' { $targets.add(IndexTarget.Raw.keysAndValuesOf(c)); }
+    | K_FULL '(' c=cident ')'    { $targets.add(IndexTarget.Raw.fullCollection(c)); }
     ;
 
 /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/df3972e9/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
index 6cc416d..bd6f0c3 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
@@ -17,8 +17,7 @@
  */
 package org.apache.cassandra.cql3.statements;
 
-import java.util.Collections;
-import java.util.Map;
+import java.util.*;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Strings;
@@ -31,6 +30,7 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.CFName;
+import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.IndexName;
 import org.apache.cassandra.db.marshal.MapType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -49,19 +49,19 @@ public class CreateIndexStatement extends SchemaAlteringStatement
     private static final Logger logger = LoggerFactory.getLogger(CreateIndexStatement.class);
 
     private final String indexName;
-    private final IndexTarget.Raw rawTarget;
+    private final List<IndexTarget.Raw> rawTargets;
     private final IndexPropDefs properties;
     private final boolean ifNotExists;
 
     public CreateIndexStatement(CFName name,
                                 IndexName indexName,
-                                IndexTarget.Raw target,
+                                List<IndexTarget.Raw> targets,
                                 IndexPropDefs properties,
                                 boolean ifNotExists)
     {
         super(name);
         this.indexName = indexName.getIdx();
-        this.rawTarget = target;
+        this.rawTargets = targets;
         this.properties = properties;
         this.ifNotExists = ifNotExists;
     }
@@ -74,24 +74,58 @@ public class CreateIndexStatement extends SchemaAlteringStatement
     public void validate(ClientState state) throws RequestValidationException
     {
         CFMetaData cfm = ThriftValidation.validateColumnFamily(keyspace(), columnFamily());
+
         if (cfm.isCounter())
             throw new InvalidRequestException("Secondary indexes are not supported on counter tables");
 
-        IndexTarget target = rawTarget.prepare(cfm);
-        ColumnDefinition cd = cfm.getColumnDefinition(target.column);
+        if (cfm.isCompactTable() && !cfm.isStaticCompactTable())
+            throw new InvalidRequestException("Secondary indexes are not supported on COMPACT STORAGE tables that have clustering columns");
 
-        boolean isMap = cd.type instanceof MapType;
-        boolean isFrozenCollection = cd.type.isCollection() && !cd.type.isMultiCell();
+        List<IndexTarget> targets = new ArrayList<>(rawTargets.size());
+        for (IndexTarget.Raw rawTarget : rawTargets)
+            targets.add(rawTarget.prepare(cfm));
 
-        if (isFrozenCollection)
-        {
-            validateForFrozenCollection(target);
-        }
-        else
+        if (targets.isEmpty() && !properties.isCustom)
+            throw new InvalidRequestException("Only CUSTOM indexes can be created without specifying a target column");
+
+        if (targets.size() > 1)
+            validateTargetsForMultiColumnIndex(targets);
+
+        for (IndexTarget target : targets)
         {
-            validateNotFullIndex(target);
-            validateIsSimpleIndexIfTargetColumnNotCollection(cd, target);
-            validateTargetColumnIsMapIfIndexInvolvesKeys(isMap, target);
+            ColumnDefinition cd = cfm.getColumnDefinition(target.column);
+
+            if (cd == null)
+                throw new InvalidRequestException("No column definition found for column " + target.column);
+
+            // TODO: we could lift that limitation
+            if (cfm.isCompactTable() && cd.isPrimaryKeyColumn())
+                throw new InvalidRequestException("Secondary indexes are not supported on PRIMARY KEY columns in COMPACT STORAGE tables");
+
+            // It would be possible to support 2ndary index on static columns (but not without modifications of at least ExtendedFilter and
+            // CompositesIndex) and maybe we should, but that means a query like:
+            //     SELECT * FROM foo WHERE static_column = 'bar'
+            // would pull the full partition every time the static column of partition is 'bar', which sounds like offering a
+            // fair potential for foot-shooting, so I prefer leaving that to a follow up ticket once we have identified cases where
+            // such indexing is actually useful.
+            if (!cfm.isCompactTable() && cd.isStatic())
+                throw new InvalidRequestException("Secondary indexes are not allowed on static columns");
+
+            if (cd.kind == ColumnDefinition.Kind.PARTITION_KEY && cd.isOnAllComponents())
+                throw new InvalidRequestException(String.format("Cannot create secondary index on partition key column %s", target.column));
+
+            boolean isMap = cd.type instanceof MapType;
+            boolean isFrozenCollection = cd.type.isCollection() && !cd.type.isMultiCell();
+            if (isFrozenCollection)
+            {
+                validateForFrozenCollection(target);
+            }
+            else
+            {
+                validateNotFullIndex(target);
+                validateIsSimpleIndexIfTargetColumnNotCollection(cd, target);
+                validateTargetColumnIsMapIfIndexInvolvesKeys(isMap, target);
+            }
         }
 
         if (!Strings.isNullOrEmpty(indexName))
@@ -106,27 +140,6 @@ public class CreateIndexStatement extends SchemaAlteringStatement
         }
 
         properties.validate();
-
-        if (cfm.isCompactTable())
-        {
-            if (!cfm.isStaticCompactTable())
-                throw new InvalidRequestException("Secondary indexes are not supported on COMPACT STORAGE tables that have clustering columns");
-            else if (cd.isPrimaryKeyColumn())
-                // TODO: we could lift that limitation
-                throw new InvalidRequestException("Secondary indexes are not supported on PRIMARY KEY columns in COMPACT STORAGE tables");
-        }
-
-        // It would be possible to support 2ndary index on static columns (but not without modifications of at least ExtendedFilter and
-        // CompositesIndex) and maybe we should, but that means a query like:
-        //     SELECT * FROM foo WHERE static_column = 'bar'
-        // would pull the full partition every time the static column of partition is 'bar', which sounds like offering a
-        // fair potential for foot-shooting, so I prefer leaving that to a follow up ticket once we have identified cases where
-        // such indexing is actually useful.
-        if (!cfm.isCompactTable() && cd.isStatic())
-            throw new InvalidRequestException("Secondary indexes are not allowed on static columns");
-
-        if (cd.kind == ColumnDefinition.Kind.PARTITION_KEY && cd.isOnAllComponents())
-            throw new InvalidRequestException(String.format("Cannot create secondary index on partition key column %s", target.column));
     }
 
     private void validateForFrozenCollection(IndexTarget target) throws InvalidRequestException
@@ -161,15 +174,31 @@ public class CreateIndexStatement extends SchemaAlteringStatement
         }
     }
 
+    private void validateTargetsForMultiColumnIndex(List<IndexTarget> targets)
+    {
+        if (!properties.isCustom)
+            throw new InvalidRequestException("Only CUSTOM indexes support multiple columns");
+
+        Set<ColumnIdentifier> columns = new HashSet<>();
+        for (IndexTarget target : targets)
+            if (!columns.add(target.column))
+                throw new InvalidRequestException("Duplicate column " + target.column + " in index target list");
+    }
+
     public boolean announceMigration(boolean isLocalOnly) throws RequestValidationException
     {
         CFMetaData cfm = Schema.instance.getCFMetaData(keyspace(), columnFamily()).copy();
-        IndexTarget target = rawTarget.prepare(cfm);
+        List<IndexTarget> targets = new ArrayList<>(rawTargets.size());
+        for (IndexTarget.Raw rawTarget : rawTargets)
+            targets.add(rawTarget.prepare(cfm));
 
-        ColumnDefinition cd = cfm.getColumnDefinition(target.column);
         String acceptedName = indexName;
         if (Strings.isNullOrEmpty(acceptedName))
-            acceptedName = Indexes.getAvailableIndexName(keyspace(), columnFamily(), cd.name);
+        {
+            acceptedName = Indexes.getAvailableIndexName(keyspace(),
+                                                         columnFamily(),
+                                                         targets.size() == 1 ? targets.get(0).column.toString() : null);
+        }
 
         if (Schema.instance.getKSMetaData(keyspace()).existingIndexNames(null).contains(acceptedName))
         {
@@ -192,7 +221,7 @@ public class CreateIndexStatement extends SchemaAlteringStatement
             kind = cfm.isCompound() ? IndexMetadata.Kind.COMPOSITES : IndexMetadata.Kind.KEYS;
         }
 
-        IndexMetadata index = IndexMetadata.singleTargetIndex(cfm, target, acceptedName, kind, indexOptions);
+        IndexMetadata index = IndexMetadata.fromIndexTargets(cfm, targets, acceptedName, kind, indexOptions);
 
         // check to disallow creation of an index which duplicates an existing one in all but name
         Optional<IndexMetadata> existingIndex = Iterables.tryFind(cfm.getIndexes(), existing -> existing.equalsWithoutName(index));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/df3972e9/src/java/org/apache/cassandra/schema/IndexMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/IndexMetadata.java b/src/java/org/apache/cassandra/schema/IndexMetadata.java
index 8052e9e..ee9179a 100644
--- a/src/java/org/apache/cassandra/schema/IndexMetadata.java
+++ b/src/java/org/apache/cassandra/schema/IndexMetadata.java
@@ -20,11 +20,9 @@ package org.apache.cassandra.schema;
 
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
+import java.util.stream.Collectors;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
@@ -34,7 +32,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.statements.IndexTarget;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.index.Index;
@@ -115,14 +112,16 @@ public final class IndexMetadata
         return new IndexMetadata(name, options, kind);
     }
 
-    public static IndexMetadata singleTargetIndex(CFMetaData cfm,
-                                                  IndexTarget target,
-                                                  String name,
-                                                  Kind kind,
-                                                  Map<String, String> options)
+    public static IndexMetadata fromIndexTargets(CFMetaData cfm,
+                                                 List<IndexTarget> targets,
+                                                 String name,
+                                                 Kind kind,
+                                                 Map<String, String> options)
     {
         Map<String, String> newOptions = new HashMap<>(options);
-        newOptions.put(IndexTarget.TARGET_OPTION_NAME, target.asCqlString(cfm));
+        newOptions.put(IndexTarget.TARGET_OPTION_NAME, targets.stream()
+                                                              .map(target -> target.asCqlString(cfm))
+                                                              .collect(Collectors.joining(", ")));
         return new IndexMetadata(name, newOptions, kind);
     }
 
@@ -131,10 +130,12 @@ public final class IndexMetadata
         return name != null && !name.isEmpty() && name.matches("\\w+");
     }
 
-    // these will go away as part of #9459 as we enable real per-row indexes
-    public static String getDefaultIndexName(String cfName, ColumnIdentifier columnName)
+    public static String getDefaultIndexName(String cfName, String root)
     {
-        return (cfName + "_" + columnName + "_idx").replaceAll("\\W", "");
+        if (root == null)
+            return (cfName + "_" + "idx").replaceAll("\\W", "");
+        else
+            return (cfName + "_" + root + "_idx").replaceAll("\\W", "");
     }
 
     public void validate()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/df3972e9/src/java/org/apache/cassandra/schema/Indexes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Indexes.java b/src/java/org/apache/cassandra/schema/Indexes.java
index 422a94c..49a1d3b 100644
--- a/src/java/org/apache/cassandra/schema/Indexes.java
+++ b/src/java/org/apache/cassandra/schema/Indexes.java
@@ -23,7 +23,6 @@ import java.util.*;
 import com.google.common.collect.ImmutableMap;
 
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.cql3.ColumnIdentifier;
 
 import static com.google.common.collect.Iterables.filter;
 
@@ -162,12 +161,12 @@ public class Indexes implements Iterable<IndexMetadata>
         return indexesByName.values().toString();
     }
 
-    public static String getAvailableIndexName(String ksName, String cfName, ColumnIdentifier columnName)
+    public static String getAvailableIndexName(String ksName, String cfName, String indexNameRoot)
     {
 
         KeyspaceMetadata ksm = Schema.instance.getKSMetaData(ksName);
         Set<String> existingNames = ksm == null ? new HashSet<>() : ksm.existingIndexNames(null);
-        String baseName = IndexMetadata.getDefaultIndexName(cfName, columnName);
+        String baseName = IndexMetadata.getDefaultIndexName(cfName, indexNameRoot);
         String acceptedName = baseName;
         int i = 0;
         while (existingNames.contains(acceptedName))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/df3972e9/src/java/org/apache/cassandra/thrift/ThriftConversion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftConversion.java b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
index 80b6447..005d6c5 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftConversion.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
@@ -551,7 +551,7 @@ public class ThriftConversion
                 String indexName = def.getIndex_name();
                 // add a generated index name if none was supplied
                 if (Strings.isNullOrEmpty(indexName))
-                    indexName = Indexes.getAvailableIndexName(ksName, cfName, column.name);
+                    indexName = Indexes.getAvailableIndexName(ksName, cfName, column.name.toString());
 
                 if (indexNames.contains(indexName))
                     throw new ConfigurationException("Duplicate index name " + indexName);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/df3972e9/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java
index c4b99c6..f2c01e4 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -299,11 +299,12 @@ public class SchemaLoader
 
         cfm.indexes(
             cfm.getIndexes()
-               .with(IndexMetadata.singleTargetIndex(cfm,
-                                                     new IndexTarget(indexedColumn.name, IndexTarget.Type.VALUES),
-                                                     "indexe1",
-                                                     IndexMetadata.Kind.CUSTOM,
-                                                     indexOptions)));
+               .with(IndexMetadata.fromIndexTargets(cfm,
+                                                    Collections.singletonList(new IndexTarget(indexedColumn.name,
+                                                                                              IndexTarget.Type.VALUES)),
+                                                    "indexe1",
+                                                    IndexMetadata.Kind.CUSTOM,
+                                                    indexOptions)));
         return cfm;
     }
 
@@ -411,12 +412,13 @@ public class SchemaLoader
         if (withIndex)
             cfm.indexes(
                 cfm.getIndexes()
-                   .with(IndexMetadata.singleTargetIndex(cfm,
-                                                         new IndexTarget(new ColumnIdentifier("birthdate", true),
-                                                                         IndexTarget.Type.VALUES),
-                                                         "birthdate_key_index",
-                                                         IndexMetadata.Kind.COMPOSITES,
-                                                         Collections.EMPTY_MAP)));
+                   .with(IndexMetadata.fromIndexTargets(cfm,
+                                                        Collections.singletonList(
+                                                            new IndexTarget(new ColumnIdentifier("birthdate", true),
+                                                                            IndexTarget.Type.VALUES)),
+                                                        "birthdate_key_index",
+                                                        IndexMetadata.Kind.COMPOSITES,
+                                                        Collections.EMPTY_MAP)));
 
         return cfm.compression(getCompressionParameters());
     }
@@ -433,9 +435,10 @@ public class SchemaLoader
         if (withIndex)
             cfm.indexes(
                 cfm.getIndexes()
-                   .with(IndexMetadata.singleTargetIndex(cfm,
-                                                         new IndexTarget(new ColumnIdentifier("birthdate", true),
-                                                                         IndexTarget.Type.VALUES),
+                   .with(IndexMetadata.fromIndexTargets(cfm,
+                                                        Collections.singletonList(
+                                                            new IndexTarget(new ColumnIdentifier("birthdate", true),
+                                                                            IndexTarget.Type.VALUES)),
                                                          "birthdate_composite_index",
                                                          IndexMetadata.Kind.KEYS,
                                                          Collections.EMPTY_MAP)));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/df3972e9/test/unit/org/apache/cassandra/cql3/validation/entities/FrozenCollectionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/FrozenCollectionsTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/FrozenCollectionsTest.java
index 70f7f19..a0d64be 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/FrozenCollectionsTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/FrozenCollectionsTest.java
@@ -17,6 +17,15 @@
  */
 package org.apache.cassandra.cql3.validation.entities;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.dht.ByteOrderedPartitioner;
@@ -24,14 +33,6 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.service.StorageService;
-import org.apache.commons.lang3.StringUtils;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 
@@ -601,8 +602,7 @@ public class FrozenCollectionsTest extends CQLTester
 
         // for now, we don't support indexing values or keys of collections in the primary key
         assertInvalidIndexCreationWithMessage("CREATE INDEX ON %s (full(a))", "Cannot create secondary index on partition key column");
-        assertInvalidIndexCreationWithMessage("CREATE INDEX ON %s (keys(a))", "Cannot create keys() index on frozen column a. " +
-                                                                              "Frozen collections only support full() indexes");
+        assertInvalidIndexCreationWithMessage("CREATE INDEX ON %s (keys(a))", "Cannot create secondary index on partition key column");
         assertInvalidIndexCreationWithMessage("CREATE INDEX ON %s (keys(b))", "Cannot create keys() index on frozen column b. " +
                                                                               "Frozen collections only support full() indexes");
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/df3972e9/test/unit/org/apache/cassandra/db/DirectoriesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/DirectoriesTest.java b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
index 6e51448..8732881 100644
--- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java
+++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
@@ -169,11 +169,12 @@ public class DirectoriesTest
                                   .addClusteringColumn("col", UTF8Type.instance)
                                   .build();
         ColumnDefinition col = PARENT_CFM.getColumnDefinition(ByteBufferUtil.bytes("col"));
-        IndexMetadata indexDef = IndexMetadata.singleTargetIndex(PARENT_CFM,
-                                                                 new IndexTarget(col.name, IndexTarget.Type.VALUES),
-                                                                 "idx",
-                                                                 IndexMetadata.Kind.KEYS,
-                                                                 Collections.emptyMap());
+        IndexMetadata indexDef =
+            IndexMetadata.fromIndexTargets(PARENT_CFM,
+                                           Collections.singletonList(new IndexTarget(col.name, IndexTarget.Type.VALUES)),
+                                           "idx",
+                                           IndexMetadata.Kind.KEYS,
+                                           Collections.emptyMap());
         PARENT_CFM.indexes(PARENT_CFM.getIndexes().with(indexDef));
         CFMetaData INDEX_CFM = CassandraIndex.indexCfsMetadata(PARENT_CFM, indexDef);
         Directories parentDirectories = new Directories(PARENT_CFM);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/df3972e9/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
index e0fc68a..95f38c1 100644
--- a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
+++ b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
@@ -19,9 +19,7 @@
 package org.apache.cassandra.db;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
+import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
@@ -464,12 +462,13 @@ public class RangeTombstoneTest
         cfs.disableAutoCompaction();
 
         ColumnDefinition cd = cfs.metadata.getColumnDefinition(indexedColumnName).copy();
-        IndexMetadata indexDef = IndexMetadata.singleTargetIndex(cfs.metadata,
-                                                                 new IndexTarget(cd.name, IndexTarget.Type.VALUES),
-                                                                 "test_index",
-                                                                 IndexMetadata.Kind.CUSTOM,
-                                                                 ImmutableMap.of(IndexTarget.CUSTOM_INDEX_OPTION_NAME,
-                                                                                 StubIndex.class.getName()));
+        IndexMetadata indexDef =
+            IndexMetadata.fromIndexTargets(cfs.metadata,
+                                           Collections.singletonList(new IndexTarget(cd.name, IndexTarget.Type.VALUES)),
+                                           "test_index",
+                                           IndexMetadata.Kind.CUSTOM,
+                                           ImmutableMap.of(IndexTarget.CUSTOM_INDEX_OPTION_NAME,
+                                                           StubIndex.class.getName()));
 
         if (!cfs.metadata.getIndexes().get("test_index").isPresent())
             cfs.metadata.indexes(cfs.metadata.getIndexes().with(indexDef));
@@ -561,12 +560,13 @@ public class RangeTombstoneTest
         cfs.disableAutoCompaction();
 
         ColumnDefinition cd = cfs.metadata.getColumnDefinition(indexedColumnName).copy();
-        IndexMetadata indexDef = IndexMetadata.singleTargetIndex(cfs.metadata,
-                                                                 new IndexTarget(cd.name,IndexTarget.Type.VALUES),
-                                                                 "test_index",
-                                                                 IndexMetadata.Kind.CUSTOM,
-                                                                 ImmutableMap.of(IndexTarget.CUSTOM_INDEX_OPTION_NAME,
-                                                                                 StubIndex.class.getName()));
+        IndexMetadata indexDef =
+            IndexMetadata.fromIndexTargets(cfs.metadata,
+                                           Collections.singletonList(new IndexTarget(cd.name, IndexTarget.Type.VALUES)),
+                                           "test_index",
+                                           IndexMetadata.Kind.CUSTOM,
+                                           ImmutableMap.of(IndexTarget.CUSTOM_INDEX_OPTION_NAME,
+                                                           StubIndex.class.getName()));
 
         if (!cfs.metadata.getIndexes().get("test_index").isPresent())
             cfs.metadata.indexes(cfs.metadata.getIndexes().with(indexDef));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/df3972e9/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java b/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
index 2a43e33..55ab574 100644
--- a/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
+++ b/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
@@ -438,11 +438,12 @@ public class SecondaryIndexTest
 
         String indexName = "birthdate_index";
         ColumnDefinition old = cfs.metadata.getColumnDefinition(ByteBufferUtil.bytes("birthdate"));
-        IndexMetadata indexDef = IndexMetadata.singleTargetIndex(cfs.metadata,
-                                                                 new IndexTarget(old.name, IndexTarget.Type.VALUES),
-                                                                 indexName,
-                                                                 IndexMetadata.Kind.COMPOSITES,
-                                                                 Collections.EMPTY_MAP);
+        IndexMetadata indexDef =
+            IndexMetadata.fromIndexTargets(cfs.metadata,
+                                           Collections.singletonList(new IndexTarget(old.name, IndexTarget.Type.VALUES)),
+                                           indexName,
+                                           IndexMetadata.Kind.COMPOSITES,
+                                           Collections.EMPTY_MAP);
         cfs.metadata.indexes(cfs.metadata.getIndexes().with(indexDef));
         Future<?> future = cfs.indexManager.addIndex(indexDef);
         future.get();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/df3972e9/test/unit/org/apache/cassandra/index/CustomIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/CustomIndexTest.java b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
new file mode 100644
index 0000000..4497364
--- /dev/null
+++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
@@ -0,0 +1,365 @@
+package org.apache.cassandra.index;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.statements.IndexTarget;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.schema.Indexes;
+
+import static org.apache.cassandra.Util.throwAssert;
+import static org.apache.cassandra.cql3.statements.IndexTarget.CUSTOM_INDEX_OPTION_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class CustomIndexTest extends CQLTester
+{
+    @Test
+    public void testInsertsOnCfsBackedIndex() throws Throwable
+    {
+        // test to ensure that we don't deadlock when flushing CFS backed custom indexers
+        // see CASSANDRA-10181
+        createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY (a, b))");
+        createIndex("CREATE CUSTOM INDEX myindex ON %s(c) USING 'org.apache.cassandra.index.internal.CustomCassandraIndex'");
+
+        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 0, 0, 2);
+        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 1, 0, 1);
+        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 2, 0, 0);
+    }
+
+    @Test
+    public void indexControlsIfIncludedInBuildOnNewSSTables() throws Throwable
+    {
+        createTable("CREATE TABLE %s (a int, b int, PRIMARY KEY (a))");
+        String toInclude = "include";
+        String toExclude = "exclude";
+        createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(b) USING '%s'",
+                                  toInclude, IndexIncludedInBuild.class.getName()));
+        createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(b) USING '%s'",
+                                  toExclude, IndexExcludedFromBuild.class.getName()));
+
+        execute("INSERT INTO %s (a, b) VALUES (?, ?)", 0, 0);
+        execute("INSERT INTO %s (a, b) VALUES (?, ?)", 1, 1);
+        execute("INSERT INTO %s (a, b) VALUES (?, ?)", 2, 2);
+        flush();
+
+        SecondaryIndexManager indexManager = getCurrentColumnFamilyStore().indexManager;
+        IndexIncludedInBuild included = (IndexIncludedInBuild)indexManager.getIndexByName(toInclude);
+        included.reset();
+        assertTrue(included.rowsInserted.isEmpty());
+
+        IndexExcludedFromBuild excluded = (IndexExcludedFromBuild)indexManager.getIndexByName(toExclude);
+        excluded.reset();
+        assertTrue(excluded.rowsInserted.isEmpty());
+
+        indexManager.buildAllIndexesBlocking(getCurrentColumnFamilyStore().getLiveSSTables());
+
+        assertEquals(3, included.rowsInserted.size());
+        assertTrue(excluded.rowsInserted.isEmpty());
+    }
+
+    @Test
+    public void indexReceivesWriteTimeDeletionsCorrectly() throws Throwable
+    {
+        createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY (a, b, c))");
+        String indexName = "test_index";
+        createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(d) USING '%s'",
+                                  indexName, StubIndex.class.getName()));
+
+        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 0, 0, 0);
+        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 0, 1, 1);
+        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 0, 2, 2);
+        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 1, 3, 3);
+
+        SecondaryIndexManager indexManager = getCurrentColumnFamilyStore().indexManager;
+        StubIndex index = (StubIndex)indexManager.getIndexByName(indexName);
+        assertEquals(4, index.rowsInserted.size());
+        assertTrue(index.partitionDeletions.isEmpty());
+        assertTrue(index.rangeTombstones.isEmpty());
+
+        execute("DELETE FROM %s WHERE a=0 AND b=0");
+        assertTrue(index.partitionDeletions.isEmpty());
+        assertEquals(1, index.rangeTombstones.size());
+
+        execute("DELETE FROM %s WHERE a=0");
+        assertEquals(1, index.partitionDeletions.size());
+        assertEquals(1, index.rangeTombstones.size());
+    }
+    @Test
+    public void nonCustomIndexesRequireExactlyOneTargetColumn() throws Throwable
+    {
+        createTable("CREATE TABLE %s(k int, c int, v1 int, v2 int, PRIMARY KEY (k,c))");
+
+        assertInvalidMessage("Only CUSTOM indexes support multiple columns", "CREATE INDEX multi_idx on %s(v1,v2)");
+        assertInvalidMessage("Only CUSTOM indexes can be created without specifying a target column",
+                           "CREATE INDEX no_targets on %s()");
+
+        createIndex(String.format("CREATE CUSTOM INDEX multi_idx ON %%s(v1, v2) USING '%s'", StubIndex.class.getName()));
+        assertIndexCreated("multi_idx", "v1", "v2");
+    }
+
+    @Test
+    public void rejectDuplicateColumnsInTargetList() throws Throwable
+    {
+        createTable("CREATE TABLE %s(k int, c int, v1 int, v2 int, PRIMARY KEY (k,c))");
+
+        assertInvalidMessage("Duplicate column v1 in index target list",
+                             String.format("CREATE CUSTOM INDEX ON %%s(v1, v1) USING '%s'",
+                                           StubIndex.class.getName()));
+
+        assertInvalidMessage("Duplicate column v1 in index target list",
+                             String.format("CREATE CUSTOM INDEX ON %%s(v1, v1, c, c) USING '%s'",
+                                           StubIndex.class.getName()));
+    }
+
+    @Test
+    public void requireFullQualifierForFrozenCollectionTargets() throws Throwable
+    {
+        // this is really just to prove that we require the full modifier on frozen collection
+        // targets whether the index is multicolumn or not
+        createTable("CREATE TABLE %s(" +
+                    " k int," +
+                    " c int," +
+                    " fmap frozen<map<int, text>>," +
+                    " flist frozen<list<int>>," +
+                    " fset frozen<set<int>>," +
+                    " PRIMARY KEY(k,c))");
+
+        assertInvalidMessage("Cannot create keys() index on frozen column fmap. " +
+                             "Frozen collections only support full() indexes",
+                             String.format("CREATE CUSTOM INDEX ON %%s(c, keys(fmap)) USING'%s'",
+                                           StubIndex.class.getName()));
+        assertInvalidMessage("Cannot create entries() index on frozen column fmap. " +
+                             "Frozen collections only support full() indexes",
+                             String.format("CREATE CUSTOM INDEX ON %%s(c, entries(fmap)) USING'%s'",
+                                           StubIndex.class.getName()));
+        assertInvalidMessage("Cannot create values() index on frozen column fmap. " +
+                             "Frozen collections only support full() indexes",
+                             String.format("CREATE CUSTOM INDEX ON %%s(c, fmap) USING'%s'", StubIndex.class.getName()));
+
+        assertInvalidMessage("Cannot create keys() index on frozen column flist. " +
+                             "Frozen collections only support full() indexes",
+                             String.format("CREATE CUSTOM INDEX ON %%s(c, keys(flist)) USING'%s'",
+                                           StubIndex.class.getName()));
+        assertInvalidMessage("Cannot create entries() index on frozen column flist. " +
+                             "Frozen collections only support full() indexes",
+                             String.format("CREATE CUSTOM INDEX ON %%s(c, entries(flist)) USING'%s'",
+                                           StubIndex.class.getName()));
+        assertInvalidMessage("Cannot create values() index on frozen column flist. " +
+                             "Frozen collections only support full() indexes",
+                             String.format("CREATE CUSTOM INDEX ON %%s(c, flist) USING'%s'", StubIndex.class.getName()));
+
+        assertInvalidMessage("Cannot create keys() index on frozen column fset. " +
+                             "Frozen collections only support full() indexes",
+                             String.format("CREATE CUSTOM INDEX ON %%s(c, keys(fset)) USING'%s'",
+                                           StubIndex.class.getName()));
+        assertInvalidMessage("Cannot create entries() index on frozen column fset. " +
+                             "Frozen collections only support full() indexes",
+                             String.format("CREATE CUSTOM INDEX ON %%s(c, entries(fset)) USING'%s'",
+                                           StubIndex.class.getName()));
+        assertInvalidMessage("Cannot create values() index on frozen column fset. " +
+                             "Frozen collections only support full() indexes",
+                             String.format("CREATE CUSTOM INDEX ON %%s(c, fset) USING'%s'", StubIndex.class.getName()));
+
+        createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c, full(fmap)) USING'%s'", StubIndex.class.getName()));
+        createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c, full(flist)) USING'%s'", StubIndex.class.getName()));
+        createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c, full(fset)) USING'%s'", StubIndex.class.getName()));
+    }
+
+    @Test
+    public void defaultIndexNameContainsTargetColumns() throws Throwable
+    {
+        createTable("CREATE TABLE %s(k int, c int, v1 int, v2 int, PRIMARY KEY(k,c))");
+
+        createIndex(String.format("CREATE CUSTOM INDEX ON %%s(v1, v2) USING '%s'", StubIndex.class.getName()));
+        assertEquals(1, getCurrentColumnFamilyStore().metadata.getIndexes().size());
+        assertIndexCreated(currentTable() + "_idx", "v1", "v2");
+
+        createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c, v1, v2) USING '%s'", StubIndex.class.getName()));
+        assertEquals(2, getCurrentColumnFamilyStore().metadata.getIndexes().size());
+        assertIndexCreated(currentTable() + "_idx_1", "c", "v1", "v2");
+
+        createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c, v2) USING '%s'", StubIndex.class.getName()));
+        assertEquals(3, getCurrentColumnFamilyStore().metadata.getIndexes().size());
+        assertIndexCreated(currentTable() + "_idx_2", "c", "v2");
+
+        // duplicate the previous index with some additional options and check the name is generated as expected
+        createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c, v2) USING '%s' WITH OPTIONS = {'foo':'bar'}",
+                                  StubIndex.class.getName()));
+        assertEquals(4, getCurrentColumnFamilyStore().metadata.getIndexes().size());
+        Map<String, String> options = new HashMap<>();
+        options.put("foo", "bar");
+        assertIndexCreated(currentTable() + "_idx_3", options, "c", "v2");
+    }
+
+    @Test
+    public void createMultiColumnIndexes() throws Throwable
+    {
+        // smoke test for various permutations of multicolumn indexes
+        createTable("CREATE TABLE %s (" +
+                    " pk1 int," +
+                    " pk2 int," +
+                    " c1 int," +
+                    " c2 int," +
+                    " v1 int," +
+                    " v2 int," +
+                    " mval map<text, int>," +
+                    " lval list<int>," +
+                    " sval set<int>," +
+                    " fmap frozen<map<text,int>>," +
+                    " flist frozen<list<int>>," +
+                    " fset frozen<set<int>>," +
+                    " PRIMARY KEY ((pk1, pk2), c1, c2))");
+
+        testCreateIndex("idx_1", "pk1", "pk2");
+        testCreateIndex("idx_2", "pk1", "c1");
+        testCreateIndex("idx_3", "pk1", "c2");
+        testCreateIndex("idx_4", "c1", "c2");
+        testCreateIndex("idx_5", "c2", "v1");
+        testCreateIndex("idx_6", "v1", "v2");
+        testCreateIndex("idx_7", "pk2", "c2", "v2");
+        testCreateIndex("idx_8", "pk1", "c1", "v1", "mval", "sval", "lval");
+
+        createIndex(String.format("CREATE CUSTOM INDEX inc_frozen ON %%s(" +
+                                  "  pk2, c2, v2, full(fmap), full(fset), full(flist)" +
+                                  ") USING '%s'",
+                                  StubIndex.class.getName()));
+        assertIndexCreated("inc_frozen",
+                           new HashMap<>(),
+                           ImmutableList.of(indexTarget("pk2", IndexTarget.Type.VALUES),
+                                            indexTarget("c2", IndexTarget.Type.VALUES),
+                                            indexTarget("v2", IndexTarget.Type.VALUES),
+                                            indexTarget("fmap", IndexTarget.Type.FULL),
+                                            indexTarget("fset", IndexTarget.Type.FULL),
+                                            indexTarget("flist", IndexTarget.Type.FULL)));
+
+        createIndex(String.format("CREATE CUSTOM INDEX all_teh_things ON %%s(" +
+                                  "  pk1, pk2, c1, c2, v1, v2, keys(mval), lval, sval, full(fmap), full(fset), full(flist)" +
+                                  ") USING '%s'",
+                                  StubIndex.class.getName()));
+        assertIndexCreated("all_teh_things",
+                           new HashMap<>(),
+                           ImmutableList.of(indexTarget("pk1", IndexTarget.Type.VALUES),
+                                            indexTarget("pk2", IndexTarget.Type.VALUES),
+                                            indexTarget("c1", IndexTarget.Type.VALUES),
+                                            indexTarget("c2", IndexTarget.Type.VALUES),
+                                            indexTarget("v1", IndexTarget.Type.VALUES),
+                                            indexTarget("v2", IndexTarget.Type.VALUES),
+                                            indexTarget("mval", IndexTarget.Type.KEYS),
+                                            indexTarget("lval", IndexTarget.Type.VALUES),
+                                            indexTarget("sval", IndexTarget.Type.VALUES),
+                                            indexTarget("fmap", IndexTarget.Type.FULL),
+                                            indexTarget("fset", IndexTarget.Type.FULL),
+                                            indexTarget("flist", IndexTarget.Type.FULL)));
+    }
+
+    @Test
+    public void createMultiColumnIndexIncludingUserTypeColumn() throws Throwable
+    {
+        String myType = KEYSPACE + '.' + createType("CREATE TYPE %s (a int, b int)");
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, v1 int, v2 frozen<" + myType + ">)");
+        testCreateIndex("udt_idx", "v1", "v2");
+        Indexes indexes = getCurrentColumnFamilyStore().metadata.getIndexes();
+        IndexMetadata expected = IndexMetadata.fromIndexTargets(getCurrentColumnFamilyStore().metadata,
+                                                                ImmutableList.of(indexTarget("v1", IndexTarget.Type.VALUES),
+                                                                                 indexTarget("v2", IndexTarget.Type.VALUES)),
+                                                                "udt_idx",
+                                                                IndexMetadata.Kind.CUSTOM,
+                                                                ImmutableMap.of(CUSTOM_INDEX_OPTION_NAME,
+                                                                                StubIndex.class.getName()));
+        IndexMetadata actual = indexes.get("udt_idx").orElseThrow(throwAssert("Index udt_idx not found"));
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void createIndexWithoutTargets() throws Throwable
+    {
+        createTable("CREATE TABLE %s(k int, c int, v1 int, v2 int, PRIMARY KEY(k,c))");
+        // only allowed for CUSTOM indexes
+        assertInvalidMessage("Only CUSTOM indexes can be created without specifying a target column",
+                             "CREATE INDEX ON %s()");
+
+        // parentheses are mandatory
+        assertInvalidSyntax("CREATE CUSTOM INDEX ON %%s USING '%s'", StubIndex.class.getName());
+        createIndex(String.format("CREATE CUSTOM INDEX no_targets ON %%s() USING '%s'", StubIndex.class.getName()));
+        assertIndexCreated("no_targets", new HashMap<>());
+    }
+
+    private void testCreateIndex(String indexName, String... targetColumnNames) throws Throwable
+    {
+        createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(%s) USING '%s'",
+                                  indexName,
+                                  Arrays.stream(targetColumnNames).collect(Collectors.joining(",")),
+                                  StubIndex.class.getName()));
+        assertIndexCreated(indexName, targetColumnNames);
+    }
+
+    private void assertIndexCreated(String name, String... targetColumnNames)
+    {
+        assertIndexCreated(name, new HashMap<>(), targetColumnNames);
+    }
+
+    private void assertIndexCreated(String name, Map<String, String> options, String... targetColumnNames)
+    {
+        List<IndexTarget> targets = Arrays.stream(targetColumnNames)
+                                          .map(s -> new IndexTarget(ColumnIdentifier.getInterned(s, true),
+                                                                    IndexTarget.Type.VALUES))
+                                          .collect(Collectors.toList());
+        assertIndexCreated(name, options, targets);
+    }
+
+    private void assertIndexCreated(String name, Map<String, String> options, List<IndexTarget> targets)
+    {
+        // all tests here use StubIndex as the custom index class,
+        // so add that to the map of options
+        options.put(CUSTOM_INDEX_OPTION_NAME, StubIndex.class.getName());
+        CFMetaData cfm = getCurrentColumnFamilyStore().metadata;
+        IndexMetadata expected = IndexMetadata.fromIndexTargets(cfm, targets, name, IndexMetadata.Kind.CUSTOM, options);
+        Indexes indexes = getCurrentColumnFamilyStore().metadata.getIndexes();
+        for (IndexMetadata actual : indexes)
+            if (actual.equals(expected))
+                return;
+
+        fail(String.format("Index %s not found in CFMetaData", expected));
+    }
+
+    private static IndexTarget indexTarget(String name, IndexTarget.Type type)
+    {
+        return new IndexTarget(ColumnIdentifier.getInterned(name, true), type);
+    }
+
+    public static final class IndexIncludedInBuild extends StubIndex
+    {
+        public IndexIncludedInBuild(ColumnFamilyStore baseCfs, IndexMetadata metadata)
+        {
+            super(baseCfs, metadata);
+        }
+
+        public boolean shouldBuildBlocking()
+        {
+            return true;
+        }
+    }
+
+    public static final class IndexExcludedFromBuild extends StubIndex
+    {
+        public IndexExcludedFromBuild(ColumnFamilyStore baseCfs, IndexMetadata metadata)
+        {
+            super(baseCfs, metadata);
+        }
+
+        public boolean shouldBuildBlocking()
+        {
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/df3972e9/test/unit/org/apache/cassandra/index/internal/CustomIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/internal/CustomIndexTest.java b/test/unit/org/apache/cassandra/index/internal/CustomIndexTest.java
deleted file mode 100644
index 2f7a10b..0000000
--- a/test/unit/org/apache/cassandra/index/internal/CustomIndexTest.java
+++ /dev/null
@@ -1,114 +0,0 @@
-package org.apache.cassandra.index.internal;
-
-import org.junit.Test;
-
-import org.apache.cassandra.cql3.CQLTester;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.index.SecondaryIndexManager;
-import org.apache.cassandra.index.StubIndex;
-import org.apache.cassandra.schema.IndexMetadata;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class CustomIndexTest extends CQLTester
-{
-    @Test
-    public void testInserts() throws Throwable
-    {
-        // test to ensure that we don't deadlock when flushing CFS backed custom indexers
-        // see CASSANDRA-10181
-        createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY (a, b))");
-        createIndex("CREATE CUSTOM INDEX myindex ON %s(c) USING 'org.apache.cassandra.index.internal.CustomCassandraIndex'");
-
-        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 0, 0, 2);
-        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 1, 0, 1);
-        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 2, 0, 0);
-    }
-
-    @Test
-    public void indexControlsIfIncludedInBuildOnNewSSTables() throws Throwable
-    {
-        createTable("CREATE TABLE %s (a int, b int, PRIMARY KEY (a))");
-        String toInclude = "include";
-        String toExclude = "exclude";
-        createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(b) USING '%s'",
-                                  toInclude, IndexIncludedInBuild.class.getName()));
-        createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(b) USING '%s'",
-                                  toExclude, IndexExcludedFromBuild.class.getName()));
-
-        execute("INSERT INTO %s (a, b) VALUES (?, ?)", 0, 0);
-        execute("INSERT INTO %s (a, b) VALUES (?, ?)", 1, 1);
-        execute("INSERT INTO %s (a, b) VALUES (?, ?)", 2, 2);
-        flush();
-
-        SecondaryIndexManager indexManager = getCurrentColumnFamilyStore().indexManager;
-        IndexIncludedInBuild included = (IndexIncludedInBuild)indexManager.getIndexByName(toInclude);
-        included.reset();
-        assertTrue(included.rowsInserted.isEmpty());
-
-        IndexExcludedFromBuild excluded = (IndexExcludedFromBuild)indexManager.getIndexByName(toExclude);
-        excluded.reset();
-        assertTrue(excluded.rowsInserted.isEmpty());
-
-        indexManager.buildAllIndexesBlocking(getCurrentColumnFamilyStore().getLiveSSTables());
-
-        assertEquals(3, included.rowsInserted.size());
-        assertTrue(excluded.rowsInserted.isEmpty());
-    }
-
-    @Test
-    public void indexReceivesWriteTimeDeletionsCorrectly() throws Throwable
-    {
-        createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY (a, b, c))");
-        String indexName = "test_index";
-        createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(d) USING '%s'",
-                                  indexName, StubIndex.class.getName()));
-
-        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 0, 0, 0);
-        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 0, 1, 1);
-        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 0, 2, 2);
-        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 1, 3, 3);
-
-        SecondaryIndexManager indexManager = getCurrentColumnFamilyStore().indexManager;
-        StubIndex index = (StubIndex)indexManager.getIndexByName(indexName);
-        assertEquals(4, index.rowsInserted.size());
-        assertTrue(index.partitionDeletions.isEmpty());
-        assertTrue(index.rangeTombstones.isEmpty());
-
-        execute("DELETE FROM %s WHERE a=0 AND b=0");
-        assertTrue(index.partitionDeletions.isEmpty());
-        assertEquals(1, index.rangeTombstones.size());
-
-        execute("DELETE FROM %s WHERE a=0");
-        assertEquals(1, index.partitionDeletions.size());
-        assertEquals(1, index.rangeTombstones.size());
-    }
-
-
-    public static final class IndexIncludedInBuild extends StubIndex
-    {
-        public IndexIncludedInBuild(ColumnFamilyStore baseCfs, IndexMetadata metadata)
-        {
-            super(baseCfs, metadata);
-        }
-
-        public boolean shouldBuildBlocking()
-        {
-            return true;
-        }
-    }
-
-    public static final class IndexExcludedFromBuild extends StubIndex
-    {
-        public IndexExcludedFromBuild(ColumnFamilyStore baseCfs, IndexMetadata metadata)
-        {
-            super(baseCfs, metadata);
-        }
-
-        public boolean shouldBuildBlocking()
-        {
-            return false;
-        }
-    }
-}


[3/3] cassandra git commit: Merge branch 'cassandra-3.0' into trunk

Posted by sa...@apache.org.
Merge branch 'cassandra-3.0' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/82dfc290
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/82dfc290
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/82dfc290

Branch: refs/heads/trunk
Commit: 82dfc290db439cac73848dac2c3ff09adc02f4c5
Parents: b653411 df3972e
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Fri Sep 18 11:05:35 2015 +0100
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Fri Sep 18 11:05:35 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 src/java/org/apache/cassandra/cql3/Cql.g        |  17 +-
 .../cql3/statements/CreateIndexStatement.java   | 113 +++---
 .../apache/cassandra/schema/IndexMetadata.java  |  29 +-
 .../org/apache/cassandra/schema/Indexes.java    |   5 +-
 .../cassandra/thrift/ThriftConversion.java      |   2 +-
 .../unit/org/apache/cassandra/SchemaLoader.java |  31 +-
 .../entities/FrozenCollectionsTest.java         |  20 +-
 .../apache/cassandra/db/DirectoriesTest.java    |  11 +-
 .../apache/cassandra/db/RangeTombstoneTest.java |  30 +-
 .../apache/cassandra/db/SecondaryIndexTest.java |  11 +-
 .../apache/cassandra/index/CustomIndexTest.java | 365 +++++++++++++++++++
 .../index/internal/CustomIndexTest.java         | 114 ------
 13 files changed, 518 insertions(+), 231 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/82dfc290/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index a45d8ad,afe7dcb..7e4e378
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,8 -1,5 +1,9 @@@
 +3.2
 + * Add transparent data encryption core classes (CASSANDRA-9945)
 +
 +
  3.0.0-rc1
+  * Allow custom indexes with 0,1 or multiple target columns (CASSANDRA-10124)
   * Improve MV schema representation (CASSANDRA-9921)
   * Add flag to enable/disable coordinator batchlog for MV writes (CASSANDRA-10230)
   * Update cqlsh COPY for new internal driver serialization interface (CASSANDRA-10318)


[2/3] cassandra git commit: Support for indexes with 0, 1 or multiple targets

Posted by sa...@apache.org.
Support for indexes with 0,1 or multiple targets

Patch by Sam Tunnicliffe; reviewed by Sylvain Lebresne for
CASSANDRA-10124


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/df3972e9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/df3972e9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/df3972e9

Branch: refs/heads/trunk
Commit: df3972e9496599f26b76d73df9f4e14f6877c0ce
Parents: aeee9f7
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Fri Sep 11 17:11:17 2015 +0100
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Fri Sep 18 11:02:08 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 src/java/org/apache/cassandra/cql3/Cql.g        |  17 +-
 .../cql3/statements/CreateIndexStatement.java   | 113 +++---
 .../apache/cassandra/schema/IndexMetadata.java  |  29 +-
 .../org/apache/cassandra/schema/Indexes.java    |   5 +-
 .../cassandra/thrift/ThriftConversion.java      |   2 +-
 .../unit/org/apache/cassandra/SchemaLoader.java |  31 +-
 .../entities/FrozenCollectionsTest.java         |  20 +-
 .../apache/cassandra/db/DirectoriesTest.java    |  11 +-
 .../apache/cassandra/db/RangeTombstoneTest.java |  30 +-
 .../apache/cassandra/db/SecondaryIndexTest.java |  11 +-
 .../apache/cassandra/index/CustomIndexTest.java | 365 +++++++++++++++++++
 .../index/internal/CustomIndexTest.java         | 114 ------
 13 files changed, 518 insertions(+), 231 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/df3972e9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a1ba955..afe7dcb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.0-rc1
+ * Allow custom indexes with 0,1 or multiple target columns (CASSANDRA-10124)
  * Improve MV schema representation (CASSANDRA-9921)
  * Add flag to enable/disable coordinator batchlog for MV writes (CASSANDRA-10230)
  * Update cqlsh COPY for new internal driver serialization interface (CASSANDRA-10318)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/df3972e9/src/java/org/apache/cassandra/cql3/Cql.g
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Cql.g b/src/java/org/apache/cassandra/cql3/Cql.g
index 348c355..afef224 100644
--- a/src/java/org/apache/cassandra/cql3/Cql.g
+++ b/src/java/org/apache/cassandra/cql3/Cql.g
@@ -717,20 +717,21 @@ createIndexStatement returns [CreateIndexStatement expr]
         IndexPropDefs props = new IndexPropDefs();
         boolean ifNotExists = false;
         IndexName name = new IndexName();
+        List<IndexTarget.Raw> targets = new ArrayList<>();
     }
     : K_CREATE (K_CUSTOM { props.isCustom = true; })? K_INDEX (K_IF K_NOT K_EXISTS { ifNotExists = true; } )?
-        (idxName[name])? K_ON cf=columnFamilyName '(' id=indexIdent ')'
+        (idxName[name])? K_ON cf=columnFamilyName '(' (indexIdent[targets] (',' indexIdent[targets])*)? ')'
         (K_USING cls=STRING_LITERAL { props.customClass = $cls.text; })?
         (K_WITH properties[props])?
-      { $expr = new CreateIndexStatement(cf, name, id, props, ifNotExists); }
+      { $expr = new CreateIndexStatement(cf, name, targets, props, ifNotExists); }
     ;
 
-indexIdent returns [IndexTarget.Raw id]
-    : c=cident                   { $id = IndexTarget.Raw.simpleIndexOn(c); }
-    | K_VALUES '(' c=cident ')'  { $id = IndexTarget.Raw.valuesOf(c); }
-    | K_KEYS '(' c=cident ')'    { $id = IndexTarget.Raw.keysOf(c); }
-    | K_ENTRIES '(' c=cident ')' { $id = IndexTarget.Raw.keysAndValuesOf(c); }
-    | K_FULL '(' c=cident ')'    { $id = IndexTarget.Raw.fullCollection(c); }
+indexIdent [List<IndexTarget.Raw> targets]
+    : c=cident                   { $targets.add(IndexTarget.Raw.simpleIndexOn(c)); }
+    | K_VALUES '(' c=cident ')'  { $targets.add(IndexTarget.Raw.valuesOf(c)); }
+    | K_KEYS '(' c=cident ')'    { $targets.add(IndexTarget.Raw.keysOf(c)); }
+    | K_ENTRIES '(' c=cident ')' { $targets.add(IndexTarget.Raw.keysAndValuesOf(c)); }
+    | K_FULL '(' c=cident ')'    { $targets.add(IndexTarget.Raw.fullCollection(c)); }
     ;
 
 /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/df3972e9/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
index 6cc416d..bd6f0c3 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
@@ -17,8 +17,7 @@
  */
 package org.apache.cassandra.cql3.statements;
 
-import java.util.Collections;
-import java.util.Map;
+import java.util.*;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Strings;
@@ -31,6 +30,7 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.CFName;
+import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.IndexName;
 import org.apache.cassandra.db.marshal.MapType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -49,19 +49,19 @@ public class CreateIndexStatement extends SchemaAlteringStatement
     private static final Logger logger = LoggerFactory.getLogger(CreateIndexStatement.class);
 
     private final String indexName;
-    private final IndexTarget.Raw rawTarget;
+    private final List<IndexTarget.Raw> rawTargets;
     private final IndexPropDefs properties;
     private final boolean ifNotExists;
 
     public CreateIndexStatement(CFName name,
                                 IndexName indexName,
-                                IndexTarget.Raw target,
+                                List<IndexTarget.Raw> targets,
                                 IndexPropDefs properties,
                                 boolean ifNotExists)
     {
         super(name);
         this.indexName = indexName.getIdx();
-        this.rawTarget = target;
+        this.rawTargets = targets;
         this.properties = properties;
         this.ifNotExists = ifNotExists;
     }
@@ -74,24 +74,58 @@ public class CreateIndexStatement extends SchemaAlteringStatement
     public void validate(ClientState state) throws RequestValidationException
     {
         CFMetaData cfm = ThriftValidation.validateColumnFamily(keyspace(), columnFamily());
+
         if (cfm.isCounter())
             throw new InvalidRequestException("Secondary indexes are not supported on counter tables");
 
-        IndexTarget target = rawTarget.prepare(cfm);
-        ColumnDefinition cd = cfm.getColumnDefinition(target.column);
+        if (cfm.isCompactTable() && !cfm.isStaticCompactTable())
+            throw new InvalidRequestException("Secondary indexes are not supported on COMPACT STORAGE tables that have clustering columns");
 
-        boolean isMap = cd.type instanceof MapType;
-        boolean isFrozenCollection = cd.type.isCollection() && !cd.type.isMultiCell();
+        List<IndexTarget> targets = new ArrayList<>(rawTargets.size());
+        for (IndexTarget.Raw rawTarget : rawTargets)
+            targets.add(rawTarget.prepare(cfm));
 
-        if (isFrozenCollection)
-        {
-            validateForFrozenCollection(target);
-        }
-        else
+        if (targets.isEmpty() && !properties.isCustom)
+            throw new InvalidRequestException("Only CUSTOM indexes can be created without specifying a target column");
+
+        if (targets.size() > 1)
+            validateTargetsForMultiColumnIndex(targets);
+
+        for (IndexTarget target : targets)
         {
-            validateNotFullIndex(target);
-            validateIsSimpleIndexIfTargetColumnNotCollection(cd, target);
-            validateTargetColumnIsMapIfIndexInvolvesKeys(isMap, target);
+            ColumnDefinition cd = cfm.getColumnDefinition(target.column);
+
+            if (cd == null)
+                throw new InvalidRequestException("No column definition found for column " + target.column);
+
+            // TODO: we could lift that limitation
+            if (cfm.isCompactTable() && cd.isPrimaryKeyColumn())
+                throw new InvalidRequestException("Secondary indexes are not supported on PRIMARY KEY columns in COMPACT STORAGE tables");
+
+            // It would be possible to support 2ndary index on static columns (but not without modifications of at least ExtendedFilter and
+            // CompositesIndex) and maybe we should, but that means a query like:
+            //     SELECT * FROM foo WHERE static_column = 'bar'
+            // would pull the full partition every time the static column of partition is 'bar', which sounds like offering a
+            // fair potential for foot-shooting, so I prefer leaving that to a follow up ticket once we have identified cases where
+            // such indexing is actually useful.
+            if (!cfm.isCompactTable() && cd.isStatic())
+                throw new InvalidRequestException("Secondary indexes are not allowed on static columns");
+
+            if (cd.kind == ColumnDefinition.Kind.PARTITION_KEY && cd.isOnAllComponents())
+                throw new InvalidRequestException(String.format("Cannot create secondary index on partition key column %s", target.column));
+
+            boolean isMap = cd.type instanceof MapType;
+            boolean isFrozenCollection = cd.type.isCollection() && !cd.type.isMultiCell();
+            if (isFrozenCollection)
+            {
+                validateForFrozenCollection(target);
+            }
+            else
+            {
+                validateNotFullIndex(target);
+                validateIsSimpleIndexIfTargetColumnNotCollection(cd, target);
+                validateTargetColumnIsMapIfIndexInvolvesKeys(isMap, target);
+            }
         }
 
         if (!Strings.isNullOrEmpty(indexName))
@@ -106,27 +140,6 @@ public class CreateIndexStatement extends SchemaAlteringStatement
         }
 
         properties.validate();
-
-        if (cfm.isCompactTable())
-        {
-            if (!cfm.isStaticCompactTable())
-                throw new InvalidRequestException("Secondary indexes are not supported on COMPACT STORAGE tables that have clustering columns");
-            else if (cd.isPrimaryKeyColumn())
-                // TODO: we could lift that limitation
-                throw new InvalidRequestException("Secondary indexes are not supported on PRIMARY KEY columns in COMPACT STORAGE tables");
-        }
-
-        // It would be possible to support 2ndary index on static columns (but not without modifications of at least ExtendedFilter and
-        // CompositesIndex) and maybe we should, but that means a query like:
-        //     SELECT * FROM foo WHERE static_column = 'bar'
-        // would pull the full partition every time the static column of partition is 'bar', which sounds like offering a
-        // fair potential for foot-shooting, so I prefer leaving that to a follow up ticket once we have identified cases where
-        // such indexing is actually useful.
-        if (!cfm.isCompactTable() && cd.isStatic())
-            throw new InvalidRequestException("Secondary indexes are not allowed on static columns");
-
-        if (cd.kind == ColumnDefinition.Kind.PARTITION_KEY && cd.isOnAllComponents())
-            throw new InvalidRequestException(String.format("Cannot create secondary index on partition key column %s", target.column));
     }
 
     private void validateForFrozenCollection(IndexTarget target) throws InvalidRequestException
@@ -161,15 +174,31 @@ public class CreateIndexStatement extends SchemaAlteringStatement
         }
     }
 
+    private void validateTargetsForMultiColumnIndex(List<IndexTarget> targets)
+    {
+        if (!properties.isCustom)
+            throw new InvalidRequestException("Only CUSTOM indexes support multiple columns");
+
+        Set<ColumnIdentifier> columns = new HashSet<>();
+        for (IndexTarget target : targets)
+            if (!columns.add(target.column))
+                throw new InvalidRequestException("Duplicate column " + target.column + " in index target list");
+    }
+
     public boolean announceMigration(boolean isLocalOnly) throws RequestValidationException
     {
         CFMetaData cfm = Schema.instance.getCFMetaData(keyspace(), columnFamily()).copy();
-        IndexTarget target = rawTarget.prepare(cfm);
+        List<IndexTarget> targets = new ArrayList<>(rawTargets.size());
+        for (IndexTarget.Raw rawTarget : rawTargets)
+            targets.add(rawTarget.prepare(cfm));
 
-        ColumnDefinition cd = cfm.getColumnDefinition(target.column);
         String acceptedName = indexName;
         if (Strings.isNullOrEmpty(acceptedName))
-            acceptedName = Indexes.getAvailableIndexName(keyspace(), columnFamily(), cd.name);
+        {
+            acceptedName = Indexes.getAvailableIndexName(keyspace(),
+                                                         columnFamily(),
+                                                         targets.size() == 1 ? targets.get(0).column.toString() : null);
+        }
 
         if (Schema.instance.getKSMetaData(keyspace()).existingIndexNames(null).contains(acceptedName))
         {
@@ -192,7 +221,7 @@ public class CreateIndexStatement extends SchemaAlteringStatement
             kind = cfm.isCompound() ? IndexMetadata.Kind.COMPOSITES : IndexMetadata.Kind.KEYS;
         }
 
-        IndexMetadata index = IndexMetadata.singleTargetIndex(cfm, target, acceptedName, kind, indexOptions);
+        IndexMetadata index = IndexMetadata.fromIndexTargets(cfm, targets, acceptedName, kind, indexOptions);
 
         // check to disallow creation of an index which duplicates an existing one in all but name
         Optional<IndexMetadata> existingIndex = Iterables.tryFind(cfm.getIndexes(), existing -> existing.equalsWithoutName(index));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/df3972e9/src/java/org/apache/cassandra/schema/IndexMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/IndexMetadata.java b/src/java/org/apache/cassandra/schema/IndexMetadata.java
index 8052e9e..ee9179a 100644
--- a/src/java/org/apache/cassandra/schema/IndexMetadata.java
+++ b/src/java/org/apache/cassandra/schema/IndexMetadata.java
@@ -20,11 +20,9 @@ package org.apache.cassandra.schema;
 
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
+import java.util.stream.Collectors;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
@@ -34,7 +32,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.statements.IndexTarget;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.index.Index;
@@ -115,14 +112,16 @@ public final class IndexMetadata
         return new IndexMetadata(name, options, kind);
     }
 
-    public static IndexMetadata singleTargetIndex(CFMetaData cfm,
-                                                  IndexTarget target,
-                                                  String name,
-                                                  Kind kind,
-                                                  Map<String, String> options)
+    public static IndexMetadata fromIndexTargets(CFMetaData cfm,
+                                                 List<IndexTarget> targets,
+                                                 String name,
+                                                 Kind kind,
+                                                 Map<String, String> options)
     {
         Map<String, String> newOptions = new HashMap<>(options);
-        newOptions.put(IndexTarget.TARGET_OPTION_NAME, target.asCqlString(cfm));
+        newOptions.put(IndexTarget.TARGET_OPTION_NAME, targets.stream()
+                                                              .map(target -> target.asCqlString(cfm))
+                                                              .collect(Collectors.joining(", ")));
         return new IndexMetadata(name, newOptions, kind);
     }
 
@@ -131,10 +130,12 @@ public final class IndexMetadata
         return name != null && !name.isEmpty() && name.matches("\\w+");
     }
 
-    // these will go away as part of #9459 as we enable real per-row indexes
-    public static String getDefaultIndexName(String cfName, ColumnIdentifier columnName)
+    public static String getDefaultIndexName(String cfName, String root)
     {
-        return (cfName + "_" + columnName + "_idx").replaceAll("\\W", "");
+        if (root == null)
+            return (cfName + "_" + "idx").replaceAll("\\W", "");
+        else
+            return (cfName + "_" + root + "_idx").replaceAll("\\W", "");
     }
 
     public void validate()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/df3972e9/src/java/org/apache/cassandra/schema/Indexes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Indexes.java b/src/java/org/apache/cassandra/schema/Indexes.java
index 422a94c..49a1d3b 100644
--- a/src/java/org/apache/cassandra/schema/Indexes.java
+++ b/src/java/org/apache/cassandra/schema/Indexes.java
@@ -23,7 +23,6 @@ import java.util.*;
 import com.google.common.collect.ImmutableMap;
 
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.cql3.ColumnIdentifier;
 
 import static com.google.common.collect.Iterables.filter;
 
@@ -162,12 +161,12 @@ public class Indexes implements Iterable<IndexMetadata>
         return indexesByName.values().toString();
     }
 
-    public static String getAvailableIndexName(String ksName, String cfName, ColumnIdentifier columnName)
+    public static String getAvailableIndexName(String ksName, String cfName, String indexNameRoot)
     {
 
         KeyspaceMetadata ksm = Schema.instance.getKSMetaData(ksName);
         Set<String> existingNames = ksm == null ? new HashSet<>() : ksm.existingIndexNames(null);
-        String baseName = IndexMetadata.getDefaultIndexName(cfName, columnName);
+        String baseName = IndexMetadata.getDefaultIndexName(cfName, indexNameRoot);
         String acceptedName = baseName;
         int i = 0;
         while (existingNames.contains(acceptedName))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/df3972e9/src/java/org/apache/cassandra/thrift/ThriftConversion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftConversion.java b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
index 80b6447..005d6c5 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftConversion.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
@@ -551,7 +551,7 @@ public class ThriftConversion
                 String indexName = def.getIndex_name();
                 // add a generated index name if none was supplied
                 if (Strings.isNullOrEmpty(indexName))
-                    indexName = Indexes.getAvailableIndexName(ksName, cfName, column.name);
+                    indexName = Indexes.getAvailableIndexName(ksName, cfName, column.name.toString());
 
                 if (indexNames.contains(indexName))
                     throw new ConfigurationException("Duplicate index name " + indexName);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/df3972e9/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java
index c4b99c6..f2c01e4 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -299,11 +299,12 @@ public class SchemaLoader
 
         cfm.indexes(
             cfm.getIndexes()
-               .with(IndexMetadata.singleTargetIndex(cfm,
-                                                     new IndexTarget(indexedColumn.name, IndexTarget.Type.VALUES),
-                                                     "indexe1",
-                                                     IndexMetadata.Kind.CUSTOM,
-                                                     indexOptions)));
+               .with(IndexMetadata.fromIndexTargets(cfm,
+                                                    Collections.singletonList(new IndexTarget(indexedColumn.name,
+                                                                                              IndexTarget.Type.VALUES)),
+                                                    "indexe1",
+                                                    IndexMetadata.Kind.CUSTOM,
+                                                    indexOptions)));
         return cfm;
     }
 
@@ -411,12 +412,13 @@ public class SchemaLoader
         if (withIndex)
             cfm.indexes(
                 cfm.getIndexes()
-                   .with(IndexMetadata.singleTargetIndex(cfm,
-                                                         new IndexTarget(new ColumnIdentifier("birthdate", true),
-                                                                         IndexTarget.Type.VALUES),
-                                                         "birthdate_key_index",
-                                                         IndexMetadata.Kind.COMPOSITES,
-                                                         Collections.EMPTY_MAP)));
+                   .with(IndexMetadata.fromIndexTargets(cfm,
+                                                        Collections.singletonList(
+                                                            new IndexTarget(new ColumnIdentifier("birthdate", true),
+                                                                            IndexTarget.Type.VALUES)),
+                                                        "birthdate_key_index",
+                                                        IndexMetadata.Kind.COMPOSITES,
+                                                        Collections.EMPTY_MAP)));
 
         return cfm.compression(getCompressionParameters());
     }
@@ -433,9 +435,10 @@ public class SchemaLoader
         if (withIndex)
             cfm.indexes(
                 cfm.getIndexes()
-                   .with(IndexMetadata.singleTargetIndex(cfm,
-                                                         new IndexTarget(new ColumnIdentifier("birthdate", true),
-                                                                         IndexTarget.Type.VALUES),
+                   .with(IndexMetadata.fromIndexTargets(cfm,
+                                                        Collections.singletonList(
+                                                            new IndexTarget(new ColumnIdentifier("birthdate", true),
+                                                                            IndexTarget.Type.VALUES)),
                                                          "birthdate_composite_index",
                                                          IndexMetadata.Kind.KEYS,
                                                          Collections.EMPTY_MAP)));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/df3972e9/test/unit/org/apache/cassandra/cql3/validation/entities/FrozenCollectionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/FrozenCollectionsTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/FrozenCollectionsTest.java
index 70f7f19..a0d64be 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/FrozenCollectionsTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/FrozenCollectionsTest.java
@@ -17,6 +17,15 @@
  */
 package org.apache.cassandra.cql3.validation.entities;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.dht.ByteOrderedPartitioner;
@@ -24,14 +33,6 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.service.StorageService;
-import org.apache.commons.lang3.StringUtils;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 
@@ -601,8 +602,7 @@ public class FrozenCollectionsTest extends CQLTester
 
         // for now, we don't support indexing values or keys of collections in the primary key
         assertInvalidIndexCreationWithMessage("CREATE INDEX ON %s (full(a))", "Cannot create secondary index on partition key column");
-        assertInvalidIndexCreationWithMessage("CREATE INDEX ON %s (keys(a))", "Cannot create keys() index on frozen column a. " +
-                                                                              "Frozen collections only support full() indexes");
+        assertInvalidIndexCreationWithMessage("CREATE INDEX ON %s (keys(a))", "Cannot create secondary index on partition key column");
         assertInvalidIndexCreationWithMessage("CREATE INDEX ON %s (keys(b))", "Cannot create keys() index on frozen column b. " +
                                                                               "Frozen collections only support full() indexes");
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/df3972e9/test/unit/org/apache/cassandra/db/DirectoriesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/DirectoriesTest.java b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
index 6e51448..8732881 100644
--- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java
+++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
@@ -169,11 +169,12 @@ public class DirectoriesTest
                                   .addClusteringColumn("col", UTF8Type.instance)
                                   .build();
         ColumnDefinition col = PARENT_CFM.getColumnDefinition(ByteBufferUtil.bytes("col"));
-        IndexMetadata indexDef = IndexMetadata.singleTargetIndex(PARENT_CFM,
-                                                                 new IndexTarget(col.name, IndexTarget.Type.VALUES),
-                                                                 "idx",
-                                                                 IndexMetadata.Kind.KEYS,
-                                                                 Collections.emptyMap());
+        IndexMetadata indexDef =
+            IndexMetadata.fromIndexTargets(PARENT_CFM,
+                                           Collections.singletonList(new IndexTarget(col.name, IndexTarget.Type.VALUES)),
+                                           "idx",
+                                           IndexMetadata.Kind.KEYS,
+                                           Collections.emptyMap());
         PARENT_CFM.indexes(PARENT_CFM.getIndexes().with(indexDef));
         CFMetaData INDEX_CFM = CassandraIndex.indexCfsMetadata(PARENT_CFM, indexDef);
         Directories parentDirectories = new Directories(PARENT_CFM);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/df3972e9/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
index e0fc68a..95f38c1 100644
--- a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
+++ b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
@@ -19,9 +19,7 @@
 package org.apache.cassandra.db;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
+import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
@@ -464,12 +462,13 @@ public class RangeTombstoneTest
         cfs.disableAutoCompaction();
 
         ColumnDefinition cd = cfs.metadata.getColumnDefinition(indexedColumnName).copy();
-        IndexMetadata indexDef = IndexMetadata.singleTargetIndex(cfs.metadata,
-                                                                 new IndexTarget(cd.name, IndexTarget.Type.VALUES),
-                                                                 "test_index",
-                                                                 IndexMetadata.Kind.CUSTOM,
-                                                                 ImmutableMap.of(IndexTarget.CUSTOM_INDEX_OPTION_NAME,
-                                                                                 StubIndex.class.getName()));
+        IndexMetadata indexDef =
+            IndexMetadata.fromIndexTargets(cfs.metadata,
+                                           Collections.singletonList(new IndexTarget(cd.name, IndexTarget.Type.VALUES)),
+                                           "test_index",
+                                           IndexMetadata.Kind.CUSTOM,
+                                           ImmutableMap.of(IndexTarget.CUSTOM_INDEX_OPTION_NAME,
+                                                           StubIndex.class.getName()));
 
         if (!cfs.metadata.getIndexes().get("test_index").isPresent())
             cfs.metadata.indexes(cfs.metadata.getIndexes().with(indexDef));
@@ -561,12 +560,13 @@ public class RangeTombstoneTest
         cfs.disableAutoCompaction();
 
         ColumnDefinition cd = cfs.metadata.getColumnDefinition(indexedColumnName).copy();
-        IndexMetadata indexDef = IndexMetadata.singleTargetIndex(cfs.metadata,
-                                                                 new IndexTarget(cd.name,IndexTarget.Type.VALUES),
-                                                                 "test_index",
-                                                                 IndexMetadata.Kind.CUSTOM,
-                                                                 ImmutableMap.of(IndexTarget.CUSTOM_INDEX_OPTION_NAME,
-                                                                                 StubIndex.class.getName()));
+        IndexMetadata indexDef =
+            IndexMetadata.fromIndexTargets(cfs.metadata,
+                                           Collections.singletonList(new IndexTarget(cd.name, IndexTarget.Type.VALUES)),
+                                           "test_index",
+                                           IndexMetadata.Kind.CUSTOM,
+                                           ImmutableMap.of(IndexTarget.CUSTOM_INDEX_OPTION_NAME,
+                                                           StubIndex.class.getName()));
 
         if (!cfs.metadata.getIndexes().get("test_index").isPresent())
             cfs.metadata.indexes(cfs.metadata.getIndexes().with(indexDef));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/df3972e9/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java b/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
index 2a43e33..55ab574 100644
--- a/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
+++ b/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
@@ -438,11 +438,12 @@ public class SecondaryIndexTest
 
         String indexName = "birthdate_index";
         ColumnDefinition old = cfs.metadata.getColumnDefinition(ByteBufferUtil.bytes("birthdate"));
-        IndexMetadata indexDef = IndexMetadata.singleTargetIndex(cfs.metadata,
-                                                                 new IndexTarget(old.name, IndexTarget.Type.VALUES),
-                                                                 indexName,
-                                                                 IndexMetadata.Kind.COMPOSITES,
-                                                                 Collections.EMPTY_MAP);
+        IndexMetadata indexDef =
+            IndexMetadata.fromIndexTargets(cfs.metadata,
+                                           Collections.singletonList(new IndexTarget(old.name, IndexTarget.Type.VALUES)),
+                                           indexName,
+                                           IndexMetadata.Kind.COMPOSITES,
+                                           Collections.EMPTY_MAP);
         cfs.metadata.indexes(cfs.metadata.getIndexes().with(indexDef));
         Future<?> future = cfs.indexManager.addIndex(indexDef);
         future.get();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/df3972e9/test/unit/org/apache/cassandra/index/CustomIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/CustomIndexTest.java b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
new file mode 100644
index 0000000..4497364
--- /dev/null
+++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
@@ -0,0 +1,365 @@
+package org.apache.cassandra.index;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.statements.IndexTarget;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.schema.Indexes;
+
+import static org.apache.cassandra.Util.throwAssert;
+import static org.apache.cassandra.cql3.statements.IndexTarget.CUSTOM_INDEX_OPTION_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class CustomIndexTest extends CQLTester
+{
+    @Test
+    public void testInsertsOnCfsBackedIndex() throws Throwable
+    {
+        // test to ensure that we don't deadlock when flushing CFS backed custom indexers
+        // see CASSANDRA-10181
+        createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY (a, b))");
+        createIndex("CREATE CUSTOM INDEX myindex ON %s(c) USING 'org.apache.cassandra.index.internal.CustomCassandraIndex'");
+
+        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 0, 0, 2);
+        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 1, 0, 1);
+        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 2, 0, 0);
+    }
+
+    @Test
+    public void indexControlsIfIncludedInBuildOnNewSSTables() throws Throwable
+    {
+        createTable("CREATE TABLE %s (a int, b int, PRIMARY KEY (a))");
+        String toInclude = "include";
+        String toExclude = "exclude";
+        createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(b) USING '%s'",
+                                  toInclude, IndexIncludedInBuild.class.getName()));
+        createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(b) USING '%s'",
+                                  toExclude, IndexExcludedFromBuild.class.getName()));
+
+        execute("INSERT INTO %s (a, b) VALUES (?, ?)", 0, 0);
+        execute("INSERT INTO %s (a, b) VALUES (?, ?)", 1, 1);
+        execute("INSERT INTO %s (a, b) VALUES (?, ?)", 2, 2);
+        flush();
+
+        SecondaryIndexManager indexManager = getCurrentColumnFamilyStore().indexManager;
+        IndexIncludedInBuild included = (IndexIncludedInBuild)indexManager.getIndexByName(toInclude);
+        included.reset();
+        assertTrue(included.rowsInserted.isEmpty());
+
+        IndexExcludedFromBuild excluded = (IndexExcludedFromBuild)indexManager.getIndexByName(toExclude);
+        excluded.reset();
+        assertTrue(excluded.rowsInserted.isEmpty());
+
+        indexManager.buildAllIndexesBlocking(getCurrentColumnFamilyStore().getLiveSSTables());
+
+        assertEquals(3, included.rowsInserted.size());
+        assertTrue(excluded.rowsInserted.isEmpty());
+    }
+
+    @Test
+    public void indexReceivesWriteTimeDeletionsCorrectly() throws Throwable
+    {
+        createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY (a, b, c))");
+        String indexName = "test_index";
+        createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(d) USING '%s'",
+                                  indexName, StubIndex.class.getName()));
+
+        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 0, 0, 0);
+        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 0, 1, 1);
+        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 0, 2, 2);
+        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 1, 3, 3);
+
+        SecondaryIndexManager indexManager = getCurrentColumnFamilyStore().indexManager;
+        StubIndex index = (StubIndex)indexManager.getIndexByName(indexName);
+        assertEquals(4, index.rowsInserted.size());
+        assertTrue(index.partitionDeletions.isEmpty());
+        assertTrue(index.rangeTombstones.isEmpty());
+
+        execute("DELETE FROM %s WHERE a=0 AND b=0");
+        assertTrue(index.partitionDeletions.isEmpty());
+        assertEquals(1, index.rangeTombstones.size());
+
+        execute("DELETE FROM %s WHERE a=0");
+        assertEquals(1, index.partitionDeletions.size());
+        assertEquals(1, index.rangeTombstones.size());
+    }
+    @Test
+    public void nonCustomIndexesRequireExactlyOneTargetColumn() throws Throwable
+    {
+        createTable("CREATE TABLE %s(k int, c int, v1 int, v2 int, PRIMARY KEY (k,c))");
+
+        assertInvalidMessage("Only CUSTOM indexes support multiple columns", "CREATE INDEX multi_idx on %s(v1,v2)");
+        assertInvalidMessage("Only CUSTOM indexes can be created without specifying a target column",
+                           "CREATE INDEX no_targets on %s()");
+
+        createIndex(String.format("CREATE CUSTOM INDEX multi_idx ON %%s(v1, v2) USING '%s'", StubIndex.class.getName()));
+        assertIndexCreated("multi_idx", "v1", "v2");
+    }
+
+    @Test
+    public void rejectDuplicateColumnsInTargetList() throws Throwable
+    {
+        createTable("CREATE TABLE %s(k int, c int, v1 int, v2 int, PRIMARY KEY (k,c))");
+
+        assertInvalidMessage("Duplicate column v1 in index target list",
+                             String.format("CREATE CUSTOM INDEX ON %%s(v1, v1) USING '%s'",
+                                           StubIndex.class.getName()));
+
+        assertInvalidMessage("Duplicate column v1 in index target list",
+                             String.format("CREATE CUSTOM INDEX ON %%s(v1, v1, c, c) USING '%s'",
+                                           StubIndex.class.getName()));
+    }
+
+    @Test
+    public void requireFullQualifierForFrozenCollectionTargets() throws Throwable
+    {
+        // this is really just to prove that we require the full modifier on frozen collection
+        // targets whether the index is multicolumn or not
+        createTable("CREATE TABLE %s(" +
+                    " k int," +
+                    " c int," +
+                    " fmap frozen<map<int, text>>," +
+                    " flist frozen<list<int>>," +
+                    " fset frozen<set<int>>," +
+                    " PRIMARY KEY(k,c))");
+
+        assertInvalidMessage("Cannot create keys() index on frozen column fmap. " +
+                             "Frozen collections only support full() indexes",
+                             String.format("CREATE CUSTOM INDEX ON %%s(c, keys(fmap)) USING'%s'",
+                                           StubIndex.class.getName()));
+        assertInvalidMessage("Cannot create entries() index on frozen column fmap. " +
+                             "Frozen collections only support full() indexes",
+                             String.format("CREATE CUSTOM INDEX ON %%s(c, entries(fmap)) USING'%s'",
+                                           StubIndex.class.getName()));
+        assertInvalidMessage("Cannot create values() index on frozen column fmap. " +
+                             "Frozen collections only support full() indexes",
+                             String.format("CREATE CUSTOM INDEX ON %%s(c, fmap) USING'%s'", StubIndex.class.getName()));
+
+        assertInvalidMessage("Cannot create keys() index on frozen column flist. " +
+                             "Frozen collections only support full() indexes",
+                             String.format("CREATE CUSTOM INDEX ON %%s(c, keys(flist)) USING'%s'",
+                                           StubIndex.class.getName()));
+        assertInvalidMessage("Cannot create entries() index on frozen column flist. " +
+                             "Frozen collections only support full() indexes",
+                             String.format("CREATE CUSTOM INDEX ON %%s(c, entries(flist)) USING'%s'",
+                                           StubIndex.class.getName()));
+        assertInvalidMessage("Cannot create values() index on frozen column flist. " +
+                             "Frozen collections only support full() indexes",
+                             String.format("CREATE CUSTOM INDEX ON %%s(c, flist) USING'%s'", StubIndex.class.getName()));
+
+        assertInvalidMessage("Cannot create keys() index on frozen column fset. " +
+                             "Frozen collections only support full() indexes",
+                             String.format("CREATE CUSTOM INDEX ON %%s(c, keys(fset)) USING'%s'",
+                                           StubIndex.class.getName()));
+        assertInvalidMessage("Cannot create entries() index on frozen column fset. " +
+                             "Frozen collections only support full() indexes",
+                             String.format("CREATE CUSTOM INDEX ON %%s(c, entries(fset)) USING'%s'",
+                                           StubIndex.class.getName()));
+        assertInvalidMessage("Cannot create values() index on frozen column fset. " +
+                             "Frozen collections only support full() indexes",
+                             String.format("CREATE CUSTOM INDEX ON %%s(c, fset) USING'%s'", StubIndex.class.getName()));
+
+        createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c, full(fmap)) USING'%s'", StubIndex.class.getName()));
+        createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c, full(flist)) USING'%s'", StubIndex.class.getName()));
+        createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c, full(fset)) USING'%s'", StubIndex.class.getName()));
+    }
+
+    @Test
+    public void defaultIndexNameContainsTargetColumns() throws Throwable
+    {
+        createTable("CREATE TABLE %s(k int, c int, v1 int, v2 int, PRIMARY KEY(k,c))");
+
+        createIndex(String.format("CREATE CUSTOM INDEX ON %%s(v1, v2) USING '%s'", StubIndex.class.getName()));
+        assertEquals(1, getCurrentColumnFamilyStore().metadata.getIndexes().size());
+        assertIndexCreated(currentTable() + "_idx", "v1", "v2");
+
+        createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c, v1, v2) USING '%s'", StubIndex.class.getName()));
+        assertEquals(2, getCurrentColumnFamilyStore().metadata.getIndexes().size());
+        assertIndexCreated(currentTable() + "_idx_1", "c", "v1", "v2");
+
+        createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c, v2) USING '%s'", StubIndex.class.getName()));
+        assertEquals(3, getCurrentColumnFamilyStore().metadata.getIndexes().size());
+        assertIndexCreated(currentTable() + "_idx_2", "c", "v2");
+
+        // duplicate the previous index with some additional options and check the name is generated as expected
+        createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c, v2) USING '%s' WITH OPTIONS = {'foo':'bar'}",
+                                  StubIndex.class.getName()));
+        assertEquals(4, getCurrentColumnFamilyStore().metadata.getIndexes().size());
+        Map<String, String> options = new HashMap<>();
+        options.put("foo", "bar");
+        assertIndexCreated(currentTable() + "_idx_3", options, "c", "v2");
+    }
+
+    @Test
+    public void createMultiColumnIndexes() throws Throwable
+    {
+        // smoke test for various permutations of multicolumn indexes
+        createTable("CREATE TABLE %s (" +
+                    " pk1 int," +
+                    " pk2 int," +
+                    " c1 int," +
+                    " c2 int," +
+                    " v1 int," +
+                    " v2 int," +
+                    " mval map<text, int>," +
+                    " lval list<int>," +
+                    " sval set<int>," +
+                    " fmap frozen<map<text,int>>," +
+                    " flist frozen<list<int>>," +
+                    " fset frozen<set<int>>," +
+                    " PRIMARY KEY ((pk1, pk2), c1, c2))");
+
+        testCreateIndex("idx_1", "pk1", "pk2");
+        testCreateIndex("idx_2", "pk1", "c1");
+        testCreateIndex("idx_3", "pk1", "c2");
+        testCreateIndex("idx_4", "c1", "c2");
+        testCreateIndex("idx_5", "c2", "v1");
+        testCreateIndex("idx_6", "v1", "v2");
+        testCreateIndex("idx_7", "pk2", "c2", "v2");
+        testCreateIndex("idx_8", "pk1", "c1", "v1", "mval", "sval", "lval");
+
+        createIndex(String.format("CREATE CUSTOM INDEX inc_frozen ON %%s(" +
+                                  "  pk2, c2, v2, full(fmap), full(fset), full(flist)" +
+                                  ") USING '%s'",
+                                  StubIndex.class.getName()));
+        assertIndexCreated("inc_frozen",
+                           new HashMap<>(),
+                           ImmutableList.of(indexTarget("pk2", IndexTarget.Type.VALUES),
+                                            indexTarget("c2", IndexTarget.Type.VALUES),
+                                            indexTarget("v2", IndexTarget.Type.VALUES),
+                                            indexTarget("fmap", IndexTarget.Type.FULL),
+                                            indexTarget("fset", IndexTarget.Type.FULL),
+                                            indexTarget("flist", IndexTarget.Type.FULL)));
+
+        createIndex(String.format("CREATE CUSTOM INDEX all_teh_things ON %%s(" +
+                                  "  pk1, pk2, c1, c2, v1, v2, keys(mval), lval, sval, full(fmap), full(fset), full(flist)" +
+                                  ") USING '%s'",
+                                  StubIndex.class.getName()));
+        assertIndexCreated("all_teh_things",
+                           new HashMap<>(),
+                           ImmutableList.of(indexTarget("pk1", IndexTarget.Type.VALUES),
+                                            indexTarget("pk2", IndexTarget.Type.VALUES),
+                                            indexTarget("c1", IndexTarget.Type.VALUES),
+                                            indexTarget("c2", IndexTarget.Type.VALUES),
+                                            indexTarget("v1", IndexTarget.Type.VALUES),
+                                            indexTarget("v2", IndexTarget.Type.VALUES),
+                                            indexTarget("mval", IndexTarget.Type.KEYS),
+                                            indexTarget("lval", IndexTarget.Type.VALUES),
+                                            indexTarget("sval", IndexTarget.Type.VALUES),
+                                            indexTarget("fmap", IndexTarget.Type.FULL),
+                                            indexTarget("fset", IndexTarget.Type.FULL),
+                                            indexTarget("flist", IndexTarget.Type.FULL)));
+    }
+
+    @Test
+    public void createMultiColumnIndexIncludingUserTypeColumn() throws Throwable
+    {
+        String myType = KEYSPACE + '.' + createType("CREATE TYPE %s (a int, b int)");
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, v1 int, v2 frozen<" + myType + ">)");
+        testCreateIndex("udt_idx", "v1", "v2");
+        Indexes indexes = getCurrentColumnFamilyStore().metadata.getIndexes();
+        IndexMetadata expected = IndexMetadata.fromIndexTargets(getCurrentColumnFamilyStore().metadata,
+                                                                ImmutableList.of(indexTarget("v1", IndexTarget.Type.VALUES),
+                                                                                 indexTarget("v2", IndexTarget.Type.VALUES)),
+                                                                "udt_idx",
+                                                                IndexMetadata.Kind.CUSTOM,
+                                                                ImmutableMap.of(CUSTOM_INDEX_OPTION_NAME,
+                                                                                StubIndex.class.getName()));
+        IndexMetadata actual = indexes.get("udt_idx").orElseThrow(throwAssert("Index udt_idx not found"));
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void createIndexWithoutTargets() throws Throwable
+    {
+        createTable("CREATE TABLE %s(k int, c int, v1 int, v2 int, PRIMARY KEY(k,c))");
+        // only allowed for CUSTOM indexes
+        assertInvalidMessage("Only CUSTOM indexes can be created without specifying a target column",
+                             "CREATE INDEX ON %s()");
+
+        // parentheses are mandatory
+        assertInvalidSyntax("CREATE CUSTOM INDEX ON %%s USING '%s'", StubIndex.class.getName());
+        createIndex(String.format("CREATE CUSTOM INDEX no_targets ON %%s() USING '%s'", StubIndex.class.getName()));
+        assertIndexCreated("no_targets", new HashMap<>());
+    }
+
+    private void testCreateIndex(String indexName, String... targetColumnNames) throws Throwable
+    {
+        createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(%s) USING '%s'",
+                                  indexName,
+                                  Arrays.stream(targetColumnNames).collect(Collectors.joining(",")),
+                                  StubIndex.class.getName()));
+        assertIndexCreated(indexName, targetColumnNames);
+    }
+
+    private void assertIndexCreated(String name, String... targetColumnNames)
+    {
+        assertIndexCreated(name, new HashMap<>(), targetColumnNames);
+    }
+
+    private void assertIndexCreated(String name, Map<String, String> options, String... targetColumnNames)
+    {
+        List<IndexTarget> targets = Arrays.stream(targetColumnNames)
+                                          .map(s -> new IndexTarget(ColumnIdentifier.getInterned(s, true),
+                                                                    IndexTarget.Type.VALUES))
+                                          .collect(Collectors.toList());
+        assertIndexCreated(name, options, targets);
+    }
+
+    private void assertIndexCreated(String name, Map<String, String> options, List<IndexTarget> targets)
+    {
+        // all tests here use StubIndex as the custom index class,
+        // so add that to the map of options
+        options.put(CUSTOM_INDEX_OPTION_NAME, StubIndex.class.getName());
+        CFMetaData cfm = getCurrentColumnFamilyStore().metadata;
+        IndexMetadata expected = IndexMetadata.fromIndexTargets(cfm, targets, name, IndexMetadata.Kind.CUSTOM, options);
+        Indexes indexes = getCurrentColumnFamilyStore().metadata.getIndexes();
+        for (IndexMetadata actual : indexes)
+            if (actual.equals(expected))
+                return;
+
+        fail(String.format("Index %s not found in CFMetaData", expected));
+    }
+
+    private static IndexTarget indexTarget(String name, IndexTarget.Type type)
+    {
+        return new IndexTarget(ColumnIdentifier.getInterned(name, true), type);
+    }
+
+    public static final class IndexIncludedInBuild extends StubIndex
+    {
+        public IndexIncludedInBuild(ColumnFamilyStore baseCfs, IndexMetadata metadata)
+        {
+            super(baseCfs, metadata);
+        }
+
+        public boolean shouldBuildBlocking()
+        {
+            return true;
+        }
+    }
+
+    public static final class IndexExcludedFromBuild extends StubIndex
+    {
+        public IndexExcludedFromBuild(ColumnFamilyStore baseCfs, IndexMetadata metadata)
+        {
+            super(baseCfs, metadata);
+        }
+
+        public boolean shouldBuildBlocking()
+        {
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/df3972e9/test/unit/org/apache/cassandra/index/internal/CustomIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/internal/CustomIndexTest.java b/test/unit/org/apache/cassandra/index/internal/CustomIndexTest.java
deleted file mode 100644
index 2f7a10b..0000000
--- a/test/unit/org/apache/cassandra/index/internal/CustomIndexTest.java
+++ /dev/null
@@ -1,114 +0,0 @@
-package org.apache.cassandra.index.internal;
-
-import org.junit.Test;
-
-import org.apache.cassandra.cql3.CQLTester;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.index.SecondaryIndexManager;
-import org.apache.cassandra.index.StubIndex;
-import org.apache.cassandra.schema.IndexMetadata;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class CustomIndexTest extends CQLTester
-{
-    @Test
-    public void testInserts() throws Throwable
-    {
-        // test to ensure that we don't deadlock when flushing CFS backed custom indexers
-        // see CASSANDRA-10181
-        createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY (a, b))");
-        createIndex("CREATE CUSTOM INDEX myindex ON %s(c) USING 'org.apache.cassandra.index.internal.CustomCassandraIndex'");
-
-        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 0, 0, 2);
-        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 1, 0, 1);
-        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 2, 0, 0);
-    }
-
-    @Test
-    public void indexControlsIfIncludedInBuildOnNewSSTables() throws Throwable
-    {
-        createTable("CREATE TABLE %s (a int, b int, PRIMARY KEY (a))");
-        String toInclude = "include";
-        String toExclude = "exclude";
-        createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(b) USING '%s'",
-                                  toInclude, IndexIncludedInBuild.class.getName()));
-        createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(b) USING '%s'",
-                                  toExclude, IndexExcludedFromBuild.class.getName()));
-
-        execute("INSERT INTO %s (a, b) VALUES (?, ?)", 0, 0);
-        execute("INSERT INTO %s (a, b) VALUES (?, ?)", 1, 1);
-        execute("INSERT INTO %s (a, b) VALUES (?, ?)", 2, 2);
-        flush();
-
-        SecondaryIndexManager indexManager = getCurrentColumnFamilyStore().indexManager;
-        IndexIncludedInBuild included = (IndexIncludedInBuild)indexManager.getIndexByName(toInclude);
-        included.reset();
-        assertTrue(included.rowsInserted.isEmpty());
-
-        IndexExcludedFromBuild excluded = (IndexExcludedFromBuild)indexManager.getIndexByName(toExclude);
-        excluded.reset();
-        assertTrue(excluded.rowsInserted.isEmpty());
-
-        indexManager.buildAllIndexesBlocking(getCurrentColumnFamilyStore().getLiveSSTables());
-
-        assertEquals(3, included.rowsInserted.size());
-        assertTrue(excluded.rowsInserted.isEmpty());
-    }
-
-    @Test
-    public void indexReceivesWriteTimeDeletionsCorrectly() throws Throwable
-    {
-        createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY (a, b, c))");
-        String indexName = "test_index";
-        createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(d) USING '%s'",
-                                  indexName, StubIndex.class.getName()));
-
-        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 0, 0, 0);
-        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 0, 1, 1);
-        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 0, 2, 2);
-        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 1, 3, 3);
-
-        SecondaryIndexManager indexManager = getCurrentColumnFamilyStore().indexManager;
-        StubIndex index = (StubIndex)indexManager.getIndexByName(indexName);
-        assertEquals(4, index.rowsInserted.size());
-        assertTrue(index.partitionDeletions.isEmpty());
-        assertTrue(index.rangeTombstones.isEmpty());
-
-        execute("DELETE FROM %s WHERE a=0 AND b=0");
-        assertTrue(index.partitionDeletions.isEmpty());
-        assertEquals(1, index.rangeTombstones.size());
-
-        execute("DELETE FROM %s WHERE a=0");
-        assertEquals(1, index.partitionDeletions.size());
-        assertEquals(1, index.rangeTombstones.size());
-    }
-
-
-    public static final class IndexIncludedInBuild extends StubIndex
-    {
-        public IndexIncludedInBuild(ColumnFamilyStore baseCfs, IndexMetadata metadata)
-        {
-            super(baseCfs, metadata);
-        }
-
-        public boolean shouldBuildBlocking()
-        {
-            return true;
-        }
-    }
-
-    public static final class IndexExcludedFromBuild extends StubIndex
-    {
-        public IndexExcludedFromBuild(ColumnFamilyStore baseCfs, IndexMetadata metadata)
-        {
-            super(baseCfs, metadata);
-        }
-
-        public boolean shouldBuildBlocking()
-        {
-            return false;
-        }
-    }
-}