You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2015/08/22 01:48:33 UTC

[02/15] cassandra git commit: New 2i API and implementations for built in indexes

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/internal/composites/RegularColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/composites/RegularColumnIndex.java b/src/java/org/apache/cassandra/index/internal/composites/RegularColumnIndex.java
new file mode 100644
index 0000000..f1dc3af
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/internal/composites/RegularColumnIndex.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.internal.composites;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.index.internal.CassandraIndex;
+import org.apache.cassandra.index.internal.IndexEntry;
+import org.apache.cassandra.schema.IndexMetadata;
+
+/**
+ * Index on a REGULAR column definition on a composite type.
+ *
+ * A cell indexed by this index will have the general form:
+ *   ck_0 ... ck_n c_name : v
+ * where ck_i are the cluster keys, c_name the last component of the cell
+ * composite name (or second to last if collections are in use, but this
+ * has no impact) and v the cell value.
+ *
+ * Such a cell is indexed if c_name == columnDef.name, and it will generate
+ * (makeIndexColumnName()) an index entry whose:
+ *   - row key will be the value v (getIndexedValue()).
+ *   - cell name will
+ *       rk ck_0 ... ck_n
+ *     where rk is the row key of the initial cell. I.e. the index entry store
+ *     all the information require to locate back the indexed cell.
+ */
+public class RegularColumnIndex extends CassandraIndex
+{
+    public RegularColumnIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
+    {
+        super(baseCfs, indexDef);
+    }
+
+    public ByteBuffer getIndexedValue(ByteBuffer partitionKey,
+                                      Clustering clustering,
+                                      CellPath path,
+                                      ByteBuffer cellValue)
+    {
+        return cellValue;
+    }
+
+    public CBuilder buildIndexClusteringPrefix(ByteBuffer partitionKey,
+                                               ClusteringPrefix prefix,
+                                               CellPath path)
+    {
+        CBuilder builder = CBuilder.create(getIndexComparator());
+        builder.add(partitionKey);
+        for (int i = 0; i < prefix.size(); i++)
+            builder.add(prefix.get(i));
+
+        return builder;
+    }
+
+    public IndexEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry)
+    {
+        Clustering clustering = indexEntry.clustering();
+        ClusteringComparator baseComparator = baseCfs.getComparator();
+        CBuilder builder = CBuilder.create(baseComparator);
+        for (int i = 0; i < baseComparator.size(); i++)
+            builder.add(clustering.get(i + 1));
+
+        return new IndexEntry(indexedValue,
+                                clustering,
+                                indexEntry.primaryKeyLivenessInfo().timestamp(),
+                                clustering.get(0),
+                                builder.build());
+    }
+
+    public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
+    {
+        Cell cell = data.getCell(indexedColumn);
+        return cell == null
+            || !cell.isLive(nowInSec)
+            || indexedColumn.type.compare(indexValue, cell.value()) != 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/internal/keys/KeysIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/keys/KeysIndex.java b/src/java/org/apache/cassandra/index/internal/keys/KeysIndex.java
new file mode 100644
index 0000000..53ecd01
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/internal/keys/KeysIndex.java
@@ -0,0 +1,62 @@
+package org.apache.cassandra.index.internal.keys;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.index.internal.CassandraIndex;
+import org.apache.cassandra.index.internal.IndexEntry;
+import org.apache.cassandra.schema.IndexMetadata;
+
+public class KeysIndex extends CassandraIndex
+{
+    public KeysIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
+    {
+        super(baseCfs, indexDef);
+    }
+
+    public CFMetaData.Builder addIndexClusteringColumns(CFMetaData.Builder builder,
+                                                        CFMetaData baseMetadata,
+                                                        ColumnDefinition cfDef)
+    {
+        // no additional clustering columns required
+        return builder;
+    }
+
+    protected CBuilder buildIndexClusteringPrefix(ByteBuffer partitionKey,
+                                               ClusteringPrefix prefix,
+                                               CellPath path)
+    {
+        CBuilder builder = CBuilder.create(getIndexComparator());
+        builder.add(partitionKey);
+        return builder;
+    }
+
+    protected ByteBuffer getIndexedValue(ByteBuffer partitionKey,
+                                      Clustering clustering,
+                                      CellPath path, ByteBuffer cellValue)
+    {
+        return cellValue;
+    }
+
+    public IndexEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry)
+    {
+        throw new UnsupportedOperationException("KEYS indexes do not use a specialized index entry format");
+    }
+
+    public boolean isStale(Row row, ByteBuffer indexValue, int nowInSec)
+    {
+        if (row == null)
+            return true;
+
+        Cell cell = row.getCell(indexedColumn);
+
+        return (cell == null
+             || !cell.isLive(nowInSec)
+             || indexedColumn.type.compare(indexValue, cell.value()) != 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java b/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
new file mode 100644
index 0000000..b60d2d9
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.internal.keys;
+
+import java.nio.ByteBuffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.index.internal.CassandraIndex;
+import org.apache.cassandra.index.internal.CassandraIndexSearcher;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+public class KeysSearcher extends CassandraIndexSearcher
+{
+    private static final Logger logger = LoggerFactory.getLogger(KeysSearcher.class);
+
+    public KeysSearcher(ReadCommand command,
+                        RowFilter.Expression expression,
+                        CassandraIndex indexer)
+    {
+        super(command, expression, indexer);
+    }
+
+    protected UnfilteredPartitionIterator queryDataFromIndex(final DecoratedKey indexKey,
+                                                             final RowIterator indexHits,
+                                                             final ReadCommand command,
+                                                             final ReadOrderGroup orderGroup)
+    {
+        assert indexHits.staticRow() == Rows.EMPTY_STATIC_ROW;
+
+        return new UnfilteredPartitionIterator()
+        {
+            private UnfilteredRowIterator next;
+
+            public boolean isForThrift()
+            {
+                return command.isForThrift();
+            }
+
+            public CFMetaData metadata()
+            {
+                return command.metadata();
+            }
+
+            public boolean hasNext()
+            {
+                return prepareNext();
+            }
+
+            public UnfilteredRowIterator next()
+            {
+                if (next == null)
+                    prepareNext();
+
+                UnfilteredRowIterator toReturn = next;
+                next = null;
+                return toReturn;
+            }
+
+            private boolean prepareNext()
+            {
+                while (next == null && indexHits.hasNext())
+                {
+                    Row hit = indexHits.next();
+                    DecoratedKey key = index.baseCfs.decorateKey(hit.clustering().get(0));
+
+                    SinglePartitionReadCommand dataCmd = SinglePartitionReadCommand.create(isForThrift(),
+                                                                                           index.baseCfs.metadata,
+                                                                                           command.nowInSec(),
+                                                                                           command.columnFilter(),
+                                                                                           command.rowFilter(),
+                                                                                           DataLimits.NONE,
+                                                                                           key,
+                                                                                           command.clusteringIndexFilter(key));
+
+                    @SuppressWarnings("resource") // filterIfStale closes it's iterator if either it materialize it or if it returns null.
+                                                  // Otherwise, we close right away if empty, and if it's assigned to next it will be called either
+                                                  // by the next caller of next, or through closing this iterator is this come before.
+                    UnfilteredRowIterator dataIter = filterIfStale(dataCmd.queryMemtableAndDisk(index.baseCfs,
+                                                                                                orderGroup.baseReadOpOrderGroup()),
+                                                                   hit,
+                                                                   indexKey.getKey(),
+                                                                   orderGroup.writeOpOrderGroup(),
+                                                                   isForThrift(),
+                                                                   command.nowInSec());
+
+                    if (dataIter != null)
+                    {
+                        if (dataIter.isEmpty())
+                            dataIter.close();
+                        else
+                            next = dataIter;
+                    }
+                }
+                return next != null;
+            }
+
+            public void remove()
+            {
+                throw new UnsupportedOperationException();
+            }
+
+            public void close()
+            {
+                indexHits.close();
+                if (next != null)
+                    next.close();
+            }
+        };
+    }
+
+    private UnfilteredRowIterator filterIfStale(UnfilteredRowIterator iterator,
+                                                Row indexHit,
+                                                ByteBuffer indexedValue,
+                                                OpOrder.Group writeOp,
+                                                boolean isForThrift,
+                                                int nowInSec)
+    {
+        if (isForThrift)
+        {
+            // The data we got has gone though ThrifResultsMerger, so we're looking for the row whose clustering
+            // is the indexed name. Ans so we need to materialize the partition.
+            ImmutableBTreePartition result = ImmutableBTreePartition.create(iterator);
+            iterator.close();
+            Row data = result.getRow(new Clustering(index.getIndexedColumn().name.bytes));
+
+            // for thrift tables, we need to compare the index entry against the compact value column,
+            // not the column actually designated as the indexed column so we don't use the index function
+            // lib for the staleness check like we do in every other case
+            Cell baseData = data.getCell(index.baseCfs.metadata.compactValueColumn());
+            if (baseData == null || !baseData.isLive(nowInSec) || index.getIndexedColumn().type.compare(indexedValue, baseData.value()) != 0)
+            {
+                // Index is stale, remove the index entry and ignore
+                index.deleteStaleEntry(index.getIndexCfs().decorateKey(indexedValue),
+                                         new Clustering(index.getIndexedColumn().name.bytes),
+                                         new DeletionTime(indexHit.primaryKeyLivenessInfo().timestamp(), nowInSec),
+                                         writeOp);
+                return null;
+            }
+            else
+            {
+                return result.unfilteredIterator();
+            }
+        }
+        else
+        {
+            assert iterator.metadata().isCompactTable();
+            Row data = iterator.staticRow();
+            if (index.isStale(data, indexedValue, nowInSec))
+            {
+                // Index is stale, remove the index entry and ignore
+                index.deleteStaleEntry(index.getIndexCfs().decorateKey(indexedValue),
+                                         makeIndexClustering(iterator.partitionKey().getKey(), Clustering.EMPTY),
+                                         new DeletionTime(indexHit.primaryKeyLivenessInfo().timestamp(), nowInSec),
+                                         writeOp);
+                iterator.close();
+                return null;
+            }
+            else
+            {
+                return iterator;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/transactions/CleanupTransaction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/transactions/CleanupTransaction.java b/src/java/org/apache/cassandra/index/transactions/CleanupTransaction.java
new file mode 100644
index 0000000..1d6ba56
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/transactions/CleanupTransaction.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.index.transactions;
+
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.rows.Row;
+
+/**
+ * Performs garbage collection of index entries during a cleanup.
+ *
+ * Notifies registered indexers of each partition being removed and
+ *
+ * Compaction & Cleanup are somewhat simpler than dealing with incoming writes,
+ * being only concerned with cleaning up stale index entries.
+ *
+ * When multiple versions of a row are compacted, the CleanupTransaction is
+ * notified of the versions being merged, which it diffs against the merge result
+ * and forwards to the registered Index.Indexer instances when on commit.
+ *
+ * Instances are currently scoped to a single row within a partition, but this could be improved to batch process
+ * multiple rows within a single partition.
+ */
+public interface CleanupTransaction extends IndexTransaction
+{
+
+    void onPartitionDeletion(DeletionTime deletionTime);
+    void onRowDelete(Row row);
+
+    CleanupTransaction NO_OP = new CleanupTransaction()
+    {
+        public void start(){}
+        public void onPartitionDeletion(DeletionTime deletionTime){}
+        public void onRowDelete(Row row){}
+        public void commit(){}
+    };
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/transactions/CompactionTransaction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/transactions/CompactionTransaction.java b/src/java/org/apache/cassandra/index/transactions/CompactionTransaction.java
new file mode 100644
index 0000000..a9fbf41
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/transactions/CompactionTransaction.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.index.transactions;
+
+import org.apache.cassandra.db.Columns;
+import org.apache.cassandra.db.rows.Row;
+
+/**
+ * Performs garbage collection of stale index entries during a regular compaction.
+ *
+ * A CompactionTransaction is concerned with cleaning up stale index entries.
+ * When multiple versions of a row are compacted, the CompactionTransaction is
+ * notified of the versions being merged, which it diffs against the merge result.
+ *
+ * Instances are currently scoped to a single row within a partition, but this could be improved to batch process
+ * multiple rows within a single partition.
+ */
+public interface CompactionTransaction extends IndexTransaction
+{
+    void onRowMerge(Columns columns, Row merged, Row...versions);
+
+    CompactionTransaction NO_OP = new CompactionTransaction()
+    {
+        public void start(){}
+        public void onRowMerge(Columns columns, Row merged, Row...versions){}
+        public void commit(){}
+    };
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/transactions/IndexTransaction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/transactions/IndexTransaction.java b/src/java/org/apache/cassandra/index/transactions/IndexTransaction.java
new file mode 100644
index 0000000..3fb8235
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/transactions/IndexTransaction.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.index.transactions;
+
+/**
+ * Base interface for the handling of index updates.
+ * There are 3 types of transaction where indexes are updated to stay in sync with the base table, each represented by
+ * a subinterface:
+ * * {@code UpdateTransaction}
+ *   Used on the regular write path and when indexing newly acquired SSTables from streaming or sideloading. This type
+ *   of transaction may include both row inserts and updates to rows previously existing in the base Memtable. Instances
+ *   are scoped to a single partition update and are obtained from the factory method
+ *   @{code SecondaryIndexManager#newUpdateTransaction}
+ *
+ * * {@code CompactionTransaction}
+ *   Used during compaction when stale entries which have been superceded are cleaned up from the index. As rows in a
+ *   partition are merged during the compaction, index entries for any purged rows are cleaned from the index to
+ *   compensate for the fact that they may not have been removed at write time if the data in the base table had been
+ *   already flushed to disk (and so was processed as an insert, not an update by the UpdateTransaction). These
+ *   transactions are currently scoped to a single row within a partition, but this could be improved to batch process
+ *   multiple rows within a single partition.
+ *
+ * * @{code CleanupTransaction}
+ *   During cleanup no merging is required, the only thing to do is to notify indexes of the partitions being removed,
+ *   along with the rows within those partitions. Like with compaction, these transactions are currently scoped to a
+ *   single row within a partition, but this could be improved with batching.
+ */
+public interface IndexTransaction
+{
+    /**
+     * Used to differentiate between type of index transaction when obtaining
+     * a handler from Index implementations.
+     */
+    public enum Type
+    {
+        UPDATE, COMPACTION, CLEANUP
+    }
+
+    void start();
+    void commit();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/transactions/UpdateTransaction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/transactions/UpdateTransaction.java b/src/java/org/apache/cassandra/index/transactions/UpdateTransaction.java
new file mode 100644
index 0000000..c78304a
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/transactions/UpdateTransaction.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.index.transactions;
+
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.rows.Row;
+
+/**
+ * Handling of index updates on the write path.
+ *
+ * Instances of an UpdateTransaction are scoped to a single partition update
+ * A new instance is used for every write, obtained from the
+ * newUpdateTransaction(PartitionUpdate) method. Likewise, a single
+ * CleanupTransaction instance is used for each partition processed during a
+ * compaction or cleanup.
+ *
+ * We make certain guarantees about the lifecycle of each UpdateTransaction
+ * instance. Namely that start() will be called before any other method, and
+ * commit() will be called at the end of the update.
+ * Each instance is initialized with 1..many Index.Indexer instances, one per
+ * registered Index. As with the transaction itself, these are scoped to a
+ * specific partition update, so implementations can be assured that all indexing
+ * events they receive relate to the same logical operation.
+ *
+ * onPartitionDelete(), onRangeTombstone(), onInserted() and onUpdated()
+ * calls may arrive in any order, but this should have no impact for the
+ * Indexers being notified as any events delivered to a single instance
+ * necessarily relate to a single partition.
+ *
+ * The typical sequence of events during a Memtable update would be:
+ * start()                       -- no-op, used to notify Indexers of the start of the transaction
+ * onPartitionDeletion(dt)       -- if the PartitionUpdate implies one
+ * onRangeTombstone(rt)*         -- for each in the PartitionUpdate, if any
+ *
+ * then:
+ * onInserted(row)*              -- called for each Row not already present in the Memtable
+ * onUpdated(existing, updated)* -- called for any Row in the update for where a version was already present
+ *                                  in the Memtable. It's important to note here that existing is the previous
+ *                                  row from the Memtable & updated is the final version replacing it. It is
+ *                                  *not* the incoming row, but the result of merging the incoming and existing
+ *                                  rows.
+ * commit()                      -- finally, finish is called when the new Partition is swapped into the Memtable
+ */
+public interface UpdateTransaction extends IndexTransaction
+{
+    void onPartitionDeletion(DeletionTime deletionTime);
+    void onRangeTombstone(RangeTombstone rangeTombstone);
+    void onInserted(Row row);
+    void onUpdated(Row existing, Row updated);
+
+    UpdateTransaction NO_OP = new UpdateTransaction()
+    {
+        public void start(){}
+        public void onPartitionDeletion(DeletionTime deletionTime){}
+        public void onRangeTombstone(RangeTombstone rangeTombstone){}
+        public void onInserted(Row row){}
+        public void onUpdated(Row existing, Row updated){}
+        public void commit(){}
+    };
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 5908594..9f68cea 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -40,14 +40,18 @@ import org.apache.cassandra.cache.InstrumentingCache;
 import org.apache.cassandra.cache.KeyCacheKey;
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
-import org.apache.cassandra.config.*;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.lifecycle.TransactionLog;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.dht.*;
+import org.apache.cassandra.db.rows.SliceableUnfilteredRowIterator;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.index.internal.CassandraIndex;
 import org.apache.cassandra.io.FSError;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.*;
@@ -340,8 +344,9 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
             CFMetaData parent = Schema.instance.getCFMetaData(descriptor.ksname, parentName);
             IndexMetadata def = parent.getIndexes()
                                       .get(indexName)
-                                      .orElseThrow(() -> new AssertionError("Could not find index metadata for index cf " + i));
-            metadata = SecondaryIndex.newIndexMetadata(parent, def);
+                                      .orElseThrow(() -> new AssertionError(
+                                                                           "Could not find index metadata for index cf " + i));
+            metadata = CassandraIndex.indexCfsMetadata(parent, def);
         }
         else
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/schema/IndexMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/IndexMetadata.java b/src/java/org/apache/cassandra/schema/IndexMetadata.java
index af07cee..40a75c6 100644
--- a/src/java/org/apache/cassandra/schema/IndexMetadata.java
+++ b/src/java/org/apache/cassandra/schema/IndexMetadata.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.schema;
 
+import java.lang.reflect.InvocationTargetException;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
@@ -25,20 +26,26 @@ import java.util.Set;
 import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-
+import com.google.common.collect.Maps;
 import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.cql3.statements.IndexTarget;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.utils.FBUtilities;
 
 /**
  * An immutable representation of secondary index metadata.
  */
 public final class IndexMetadata
 {
+    private static final Logger logger = LoggerFactory.getLogger(IndexMetadata.class);
+
     public enum IndexType
     {
         KEYS, CUSTOM, COMPOSITES
@@ -68,20 +75,20 @@ public final class IndexMetadata
         this.columns = columns == null ? ImmutableSet.of() : ImmutableSet.copyOf(columns);
     }
 
-    public static IndexMetadata legacyIndex(ColumnIdentifier column,
-                                              String name,
-                                              IndexType type,
-                                              Map<String, String> options)
+    public static IndexMetadata singleColumnIndex(ColumnIdentifier column,
+                                                  String name,
+                                                  IndexType type,
+                                                  Map<String, String> options)
     {
         return new IndexMetadata(name, options, type, TargetType.COLUMN, Collections.singleton(column));
     }
 
-    public static IndexMetadata legacyIndex(ColumnDefinition column,
-                                              String name,
-                                              IndexType type,
-                                              Map<String, String> options)
+    public static IndexMetadata singleColumnIndex(ColumnDefinition column,
+                                                  String name,
+                                                  IndexType type,
+                                                  Map<String, String> options)
     {
-        return legacyIndex(column.name, name, type, options);
+        return singleColumnIndex(column.name, name, type, options);
     }
 
     public static boolean isNameValid(String name)
@@ -107,11 +114,54 @@ public final class IndexMetadata
             throw new ConfigurationException("Target type is null for index " + name);
 
         if (indexType == IndexMetadata.IndexType.CUSTOM)
-            if (options == null || !options.containsKey(SecondaryIndex.CUSTOM_INDEX_OPTION_NAME))
+        {
+            if (options == null || !options.containsKey(IndexTarget.CUSTOM_INDEX_OPTION_NAME))
                 throw new ConfigurationException(String.format("Required option missing for index %s : %s",
-                                                               name, SecondaryIndex.CUSTOM_INDEX_OPTION_NAME));
+                                                               name, IndexTarget.CUSTOM_INDEX_OPTION_NAME));
+            String className = options.get(IndexTarget.CUSTOM_INDEX_OPTION_NAME);
+            Class<Index> indexerClass = FBUtilities.classForName(className, "custom indexer");
+            if(!Index.class.isAssignableFrom(indexerClass))
+                throw new ConfigurationException(String.format("Specified Indexer class (%s) does not implement the Indexer interface", className));
+            validateCustomIndexOptions(indexerClass, options);
+        }
+    }
+
+    private void validateCustomIndexOptions(Class<? extends Index> indexerClass, Map<String, String> options) throws ConfigurationException
+    {
+        try
+        {
+            Map<String, String> filteredOptions =
+                Maps.filterKeys(options,key -> !key.equals(IndexTarget.CUSTOM_INDEX_OPTION_NAME));
+
+            if (filteredOptions.isEmpty())
+                return;
+
+            Map<?,?> unknownOptions = (Map) indexerClass.getMethod("validateOptions", Map.class).invoke(null, filteredOptions);
+            if (!unknownOptions.isEmpty())
+                throw new ConfigurationException(String.format("Properties specified %s are not understood by %s", unknownOptions.keySet(), indexerClass.getSimpleName()));
+        }
+        catch (NoSuchMethodException e)
+        {
+            logger.info("Indexer {} does not have a static validateOptions method. Validation ignored",
+                        indexerClass.getName());
+        }
+        catch (InvocationTargetException e)
+        {
+            if (e.getTargetException() instanceof ConfigurationException)
+                throw (ConfigurationException) e.getTargetException();
+            throw new ConfigurationException("Failed to validate custom indexer options: " + options);
+        }
+        catch (ConfigurationException e)
+        {
+            throw e;
+        }
+        catch (Exception e)
+        {
+            throw new ConfigurationException("Failed to validate custom indexer options: " + options);
+        }
     }
 
+    // to be removed in CASSANDRA-10124 with multi-target & row based indexes
     public ColumnDefinition indexedColumn(CFMetaData cfm)
     {
        return cfm.getColumnDefinition(columns.iterator().next());
@@ -132,11 +182,29 @@ public final class IndexMetadata
         return indexType == IndexType.COMPOSITES;
     }
 
+    public boolean isRowIndex()
+    {
+        return targetType == TargetType.ROW;
+    }
+
+    public boolean isColumnIndex()
+    {
+        return targetType == TargetType.COLUMN;
+    }
+
     public int hashCode()
     {
         return Objects.hashCode(name, indexType, targetType, options, columns);
     }
 
+    public boolean equalsWithoutName(IndexMetadata other)
+    {
+        return Objects.equal(indexType, other.indexType)
+            && Objects.equal(targetType, other.targetType)
+            && Objects.equal(columns, other.columns)
+            && Objects.equal(options, other.options);
+    }
+
     public boolean equals(Object obj)
     {
         if (obj == this)
@@ -147,11 +215,7 @@ public final class IndexMetadata
 
         IndexMetadata other = (IndexMetadata)obj;
 
-        return Objects.equal(name, other.name)
-            && Objects.equal(indexType, other.indexType)
-            && Objects.equal(targetType, other.targetType)
-            && Objects.equal(options, other.options)
-            && Objects.equal(columns, other.columns);
+        return Objects.equal(name, other.name) && equalsWithoutName(other);
     }
 
     public String toString()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/schema/Indexes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Indexes.java b/src/java/org/apache/cassandra/schema/Indexes.java
index 7c930b3..6227e0b 100644
--- a/src/java/org/apache/cassandra/schema/Indexes.java
+++ b/src/java/org/apache/cassandra/schema/Indexes.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.schema;
 import java.util.*;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMultimap;
 
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.Schema;
@@ -39,17 +40,13 @@ import static com.google.common.collect.Iterables.filter;
  */
 public class Indexes implements Iterable<IndexMetadata>
 {
-    // lookup for index by target column
-    private final ImmutableMap<ColumnIdentifier, IndexMetadata> indexes;
+    private final ImmutableMap<String, IndexMetadata> indexes;
+    private final ImmutableMultimap<ColumnIdentifier, IndexMetadata> indexesByColumn;
 
     private Indexes(Builder builder)
     {
-        ImmutableMap.Builder<ColumnIdentifier, IndexMetadata> internalBuilder = ImmutableMap.builder();
-        builder.indexes.build()
-                       .values()
-                       .stream()
-                       .forEach(def -> internalBuilder.put(def.columns.iterator().next(), def));
-        indexes = internalBuilder.build();
+        indexes = builder.indexes.build();
+        indexesByColumn = builder.indexesByColumn.build();
     }
 
     public static Builder builder()
@@ -105,9 +102,9 @@ public class Indexes implements Iterable<IndexMetadata>
      * @param column a column definition for which an {@link IndexMetadata} is being sought
      * @return an empty {@link Optional} if the named index is not found; a non-empty optional of {@link IndexMetadata} otherwise
      */
-    public Optional<IndexMetadata> get(ColumnDefinition column)
+    public Collection<IndexMetadata> get(ColumnDefinition column)
     {
-        return Optional.ofNullable(indexes.get(column.name));
+        return indexesByColumn.get(column.name);
     }
 
     /**
@@ -117,7 +114,7 @@ public class Indexes implements Iterable<IndexMetadata>
      */
     public boolean hasIndexFor(ColumnDefinition column)
     {
-        return indexes.get(column.name) != null;
+        return !indexesByColumn.get(column.name).isEmpty();
     }
 
     /**
@@ -183,6 +180,7 @@ public class Indexes implements Iterable<IndexMetadata>
     public static final class Builder
     {
         final ImmutableMap.Builder<String, IndexMetadata> indexes = new ImmutableMap.Builder<>();
+        final ImmutableMultimap.Builder<ColumnIdentifier, IndexMetadata> indexesByColumn = new ImmutableMultimap.Builder<>();
 
         private Builder()
         {
@@ -196,6 +194,13 @@ public class Indexes implements Iterable<IndexMetadata>
         public Builder add(IndexMetadata index)
         {
             indexes.put(index.name, index);
+            // All indexes are column indexes at the moment
+            if (index.isColumnIndex())
+            {
+                for (ColumnIdentifier target : index.columns)
+                    indexesByColumn.put(target, index);
+
+            }
             return this;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
index c8b566c..7c0eadf 100644
--- a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
+++ b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
@@ -27,7 +27,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.*;
-import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.cql3.functions.FunctionName;
 import org.apache.cassandra.cql3.functions.UDAggregate;
 import org.apache.cassandra.cql3.functions.UDFunction;
@@ -628,7 +630,7 @@ public final class LegacySchemaMigrator
                                                                 isStaticCompactTable,
                                                                 needsUpgrade);
 
-            indexes.add(IndexMetadata.legacyIndex(column, indexName, indexType, indexOptions));
+            indexes.add(IndexMetadata.singleColumnIndex(column, indexName, indexType, indexOptions));
         }
 
         return indexes.build();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index d35ef82..acd9e93 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -38,7 +38,6 @@ import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.cql3.functions.*;
-import org.apache.cassandra.db.ClusteringComparator;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.db.partitions.*;
@@ -1427,7 +1426,7 @@ public final class SchemaKeyspace
                     findColumnIdentifierWithName(targetColumnName, cfm.allColumns()).ifPresent(targetColumns::add);
             });
         }
-        return IndexMetadata.legacyIndex(targetColumns.iterator().next(), name, type, options);
+        return IndexMetadata.singleColumnIndex(targetColumns.iterator().next(), name, type, options);
     }
 
     private static Optional<ColumnIdentifier> findColumnIdentifierWithName(String name,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 3dc323e..cd69ef3 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -26,14 +26,12 @@ import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
-import org.apache.cassandra.net.AsyncOneResponse;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.*;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -237,7 +235,7 @@ public class DataResolver extends ResponseResolver
                 if (merged.isEmpty())
                     return;
 
-                Rows.diff(merged, columns, versions, diffListener);
+                Rows.diff(diffListener, merged, columns, versions);
                 for (int i = 0; i < currentRows.length; i++)
                 {
                     if (currentRows[i] != null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 12c2c24..9d999ee 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -31,11 +31,6 @@ import com.google.common.base.Predicate;
 import com.google.common.cache.CacheLoader;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.Uninterruptibles;
-
-import org.apache.cassandra.db.view.MaterializedViewManager;
-import org.apache.cassandra.db.view.MaterializedViewUtils;
-import org.apache.cassandra.db.HintedHandOffManager;
-import org.apache.cassandra.metrics.*;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,31 +41,31 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
-import org.apache.cassandra.db.index.SecondaryIndexSearcher;
 import org.apache.cassandra.db.marshal.UUIDType;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.dht.Bounds;
-import org.apache.cassandra.dht.RingPosition;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.db.view.MaterializedViewManager;
+import org.apache.cassandra.db.view.MaterializedViewUtils;
+import org.apache.cassandra.dht.*;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.hints.Hint;
 import org.apache.cassandra.hints.HintsService;
+import org.apache.cassandra.index.Index;
 import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.locator.AbstractReplicationStrategy;
-import org.apache.cassandra.locator.IEndpointSnitch;
-import org.apache.cassandra.locator.LocalStrategy;
-import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.locator.*;
+import org.apache.cassandra.metrics.*;
 import org.apache.cassandra.net.*;
-import org.apache.cassandra.service.paxos.*;
+import org.apache.cassandra.service.paxos.Commit;
+import org.apache.cassandra.service.paxos.PrepareCallback;
+import org.apache.cassandra.service.paxos.ProposeCallback;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.triggers.TriggerExecutor;
-import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.AbstractIterator;
+import org.apache.cassandra.utils.*;
 
 public class StorageProxy implements StorageProxyMBean
 {
@@ -1698,11 +1693,10 @@ public class StorageProxy implements StorageProxyMBean
     private static float estimateResultsPerRange(PartitionRangeReadCommand command, Keyspace keyspace)
     {
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().cfId);
-        SecondaryIndexSearcher searcher = cfs.indexManager.getBestIndexSearcherFor(command);
-
-        float maxExpectedResults = searcher == null
+        Index index = cfs.indexManager.getBestIndexFor(command);
+        float maxExpectedResults = index == null
                                  ? command.limits().estimateTotalResults(cfs)
-                                 : searcher.highestSelectivityIndex(command.rowFilter()).estimateResultRows();
+                                 : index.getEstimatedResultRows();
 
         // adjust maxExpectedResults by the number of tokens this node has and the replication factor for this ks
         return (maxExpectedResults / DatabaseDescriptor.getNumTokens()) / keyspace.getReplicationStrategy().getReplicationFactor();
@@ -1962,7 +1956,9 @@ public class StorageProxy implements StorageProxyMBean
             }
 
             Tracing.trace("Submitted {} concurrent range requests", concurrentQueries.size());
-            return new CountingPartitionIterator(PartitionIterators.concat(concurrentQueries), command.limits(), command.nowInSec());
+            // We want to count the results for the sake of updating the concurrency factor (see updateConcurrencyFactor) but we don't want to
+            // enforce any particular limit at this point (this could break code than rely on postReconciliationProcessing), hence the DataLimits.NONE.
+            return new CountingPartitionIterator(PartitionIterators.concat(concurrentQueries), DataLimits.NONE, command.nowInSec());
         }
 
         public void close()
@@ -2003,7 +1999,8 @@ public class StorageProxy implements StorageProxyMBean
         Tracing.trace("Submitting range requests on {} ranges with a concurrency of {} ({} rows per range expected)", ranges.rangeCount(), concurrencyFactor, resultsPerRange);
 
         // Note that in general, a RangeCommandIterator will honor the command limit for each range, but will not enforce it globally.
-        return command.postReconciliationProcessing(command.limits().filter(new RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, consistencyLevel), command.nowInSec()));
+
+        return command.limits().filter(command.postReconciliationProcessing(new RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, consistencyLevel)), command.nowInSec());
     }
 
     public Map<String, List<String>> getSchemaVersions()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 2bcbbc1..52c8884 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -17,10 +17,7 @@
  */
 package org.apache.cassandra.streaming;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -31,21 +28,17 @@ import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
-import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.db.rows.RowIterators;
-import org.apache.cassandra.db.rows.Unfiltered;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
-import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.Pair;
-
 import org.apache.cassandra.utils.concurrent.Refs;
 
 /**
@@ -176,7 +169,7 @@ public class StreamReceiveTask extends StreamTask
 
                         // add sstables and build secondary indexes
                         cfs.addSSTables(readers);
-                        cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
+                        cfs.indexManager.buildAllIndexesBlocking(readers);
                     }
                 }
                 catch (Throwable t)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 7a2126c..fd391aa 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -38,12 +38,12 @@ import org.apache.cassandra.config.*;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.statements.ParsedStatement;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.TimeUUIDType;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.exceptions.*;
@@ -826,6 +826,9 @@ public class CassandraServer implements Cassandra.Iface
             Cell cell = cellFromColumn(metadata, name, column);
             PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata, key, BTreeRow.singleCellRow(name.clustering, cell));
 
+            // Indexed column values cannot be larger than 64K.  See CASSANDRA-3057/4240 for more details
+            Keyspace.open(metadata.ksName).getColumnFamilyStore(metadata.cfName).indexManager.validate(update);
+
             mutation = new org.apache.cassandra.db.Mutation(update);
         }
         catch (MarshalException|UnknownColumnException e)
@@ -916,6 +919,8 @@ public class CassandraServer implements Cassandra.Iface
             int nowInSec = FBUtilities.nowInSeconds();
 
             PartitionUpdate partitionUpdates = PartitionUpdate.fromIterator(LegacyLayout.toRowIterator(metadata, dk, toLegacyCells(metadata, updates, nowInSec).iterator(), nowInSec));
+            // Indexed column values cannot be larger than 64K.  See CASSANDRA-3057/4240 for more details
+            Keyspace.open(metadata.ksName).getColumnFamilyStore(metadata.cfName).indexManager.validate(partitionUpdates);
 
             FilteredPartition partitionExpected = null;
             if (!expected.isEmpty())
@@ -1121,6 +1126,9 @@ public class CassandraServer implements Cassandra.Iface
                 DecoratedKey dk = metadata.decorateKey(key);
                 PartitionUpdate update = PartitionUpdate.fromIterator(LegacyLayout.toUnfilteredRowIterator(metadata, dk, delInfo, cells.iterator()));
 
+                // Indexed column values cannot be larger than 64K.  See CASSANDRA-3057/4240 for more details
+                Keyspace.open(metadata.ksName).getColumnFamilyStore(metadata.cfName).indexManager.validate(update);
+
                 org.apache.cassandra.db.Mutation mutation;
                 if (metadata.isCounter())
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/thrift/ThriftConversion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftConversion.java b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
index a74bcea..4c17a4b 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftConversion.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
@@ -23,21 +23,17 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 import com.google.common.collect.Maps;
 
-import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
-import org.apache.cassandra.io.compress.ICompressor;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.*;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.CompactTables;
 import org.apache.cassandra.db.LegacyLayout;
 import org.apache.cassandra.db.WriteType;
+import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
 import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.schema.CompressionParams;
+import org.apache.cassandra.io.compress.ICompressor;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.LocalStrategy;
 import org.apache.cassandra.schema.*;
@@ -560,10 +556,10 @@ public class ThriftConversion
                 Map<String, String> indexOptions = def.getIndex_options();
                 IndexMetadata.IndexType indexType = IndexMetadata.IndexType.valueOf(def.index_type.name());
 
-                indexes.add(IndexMetadata.legacyIndex(column,
-                                                      indexName,
-                                                      indexType,
-                                                      indexOptions));
+                indexes.add(IndexMetadata.singleColumnIndex(column,
+                                                            indexName,
+                                                            indexType,
+                                                            indexOptions));
             }
         }
         return indexes.build();
@@ -576,13 +572,22 @@ public class ThriftConversion
 
         cd.setName(ByteBufferUtil.clone(column.name.bytes));
         cd.setValidation_class(column.type.toString());
-        Optional<IndexMetadata> index = cfMetaData.getIndexes().get(column);
-        index.ifPresent(def -> {
-            cd.setIndex_type(org.apache.cassandra.thrift.IndexType.valueOf(def.indexType.name()));
-            cd.setIndex_name(def.name);
-            cd.setIndex_options(def.options == null || def.options.isEmpty() ? null : Maps.newHashMap(def.options));
-        });
-
+        Collection<IndexMetadata> indexes = cfMetaData.getIndexes().get(column);
+        // we include the index in the ColumnDef iff
+        //   * it is the only index on the column
+        //   * it is the only target column for the index
+        if (indexes.size() == 1)
+        {
+            IndexMetadata index = indexes.iterator().next();
+            if (index.columns.size() == 1)
+            {
+                cd.setIndex_type(org.apache.cassandra.thrift.IndexType.valueOf(index.indexType.name()));
+                cd.setIndex_name(index.name);
+                cd.setIndex_options(index.options == null || index.options.isEmpty()
+                                    ? null
+                                    : Maps.newHashMap(index.options));
+            }
+        }
         return cd;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/thrift/ThriftValidation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftValidation.java b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
index 054b466..71aa335 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftValidation.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
@@ -18,18 +18,24 @@
 package org.apache.cassandra.thrift;
 
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.*;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.Attributes;
+import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.index.SecondaryIndexManager;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -449,8 +455,6 @@ public class ThriftValidation
             LegacyLayout.LegacyCellName cn = LegacyLayout.decodeCellName(metadata, scName, column.name);
             cn.column.validateCellValue(column.value);
 
-            // Indexed column values cannot be larger than 64K.  See CASSANDRA-3057/4240 for more details
-            Keyspace.open(metadata.ksName).getColumnFamilyStore(metadata.cfName).indexManager.validate(cn.column, column.value, null);
         }
         catch (UnknownColumnException e)
         {
@@ -609,7 +613,8 @@ public class ThriftValidation
                                                                                   me.getMessage()));
             }
 
-            isIndexed |= (expression.op == IndexOperator.EQ) && idxManager.indexes(def);
+            for(Index index : idxManager.listIndexes())
+                isIndexed |= index.supportsExpression(def, Operator.valueOf(expression.op.name()));
         }
 
         return isIndexed;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java
index 1c0dd76..8675d7f 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -27,15 +27,15 @@ import org.junit.BeforeClass;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.cql3.statements.IndexTarget;
+import org.apache.cassandra.db.RowUpdateBuilder;
 import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.index.PerRowSecondaryIndexTest;
-import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.schema.*;
+import org.apache.cassandra.index.StubIndex;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.schema.*;
 import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -218,10 +218,6 @@ public class SchemaLoader
         schema.add(KeyspaceMetadata.create(ks_nocommit, KeyspaceParams.simpleTransient(1), Tables.of(
                 standardCFMD(ks_nocommit, "Standard1"))));
 
-        // PerRowSecondaryIndexTest
-        schema.add(KeyspaceMetadata.create(ks_prsi, KeyspaceParams.simple(1), Tables.of(
-                perRowIndexedCFMD(ks_prsi, "Indexed1"))));
-
         // CQLKeyspace
         schema.add(KeyspaceMetadata.create(ks_cql, KeyspaceParams.simple(1), Tables.of(
 
@@ -291,8 +287,8 @@ public class SchemaLoader
     public static CFMetaData perRowIndexedCFMD(String ksName, String cfName)
     {
         final Map<String, String> indexOptions = Collections.singletonMap(
-                                                      SecondaryIndex.CUSTOM_INDEX_OPTION_NAME,
-                                                      PerRowSecondaryIndexTest.TestIndex.class.getName());
+                                                      IndexTarget.CUSTOM_INDEX_OPTION_NAME,
+                                                      StubIndex.class.getName());
 
         CFMetaData cfm =  CFMetaData.Builder.create(ksName, cfName)
                 .addPartitionKey("key", AsciiType.instance)
@@ -303,10 +299,10 @@ public class SchemaLoader
 
         cfm.indexes(
             cfm.getIndexes()
-               .with(IndexMetadata.legacyIndex(indexedColumn,
-                                               "indexe1",
-                                               IndexMetadata.IndexType.CUSTOM,
-                                               indexOptions)));
+               .with(IndexMetadata.singleColumnIndex(indexedColumn,
+                                                     "indexe1",
+                                                     IndexMetadata.IndexType.CUSTOM,
+                                                     indexOptions)));
         return cfm;
     }
 
@@ -414,10 +410,10 @@ public class SchemaLoader
         if (withIndex)
             cfm.indexes(
                 cfm.getIndexes()
-                    .with(IndexMetadata.legacyIndex(cfm.getColumnDefinition(new ColumnIdentifier("birthdate", true)),
-                                                    "birthdate_key_index",
-                                                    IndexMetadata.IndexType.COMPOSITES,
-                                                    Collections.EMPTY_MAP)));
+                   .with(IndexMetadata.singleColumnIndex(cfm.getColumnDefinition(new ColumnIdentifier("birthdate", true)),
+                                                         "birthdate_key_index",
+                                                         IndexMetadata.IndexType.COMPOSITES,
+                                                         Collections.EMPTY_MAP)));
 
         return cfm.compression(getCompressionParameters());
     }
@@ -434,10 +430,10 @@ public class SchemaLoader
         if (withIndex)
             cfm.indexes(
                 cfm.getIndexes()
-                    .with(IndexMetadata.legacyIndex(cfm.getColumnDefinition(new ColumnIdentifier("birthdate", true)),
-                                                    "birthdate_composite_index",
-                                                    IndexMetadata.IndexType.KEYS,
-                                                    Collections.EMPTY_MAP)));
+                   .with(IndexMetadata.singleColumnIndex(cfm.getColumnDefinition(new ColumnIdentifier("birthdate", true)),
+                                                         "birthdate_composite_index",
+                                                         IndexMetadata.IndexType.KEYS,
+                                                         Collections.EMPTY_MAP)));
 
 
         return cfm.compression(getCompressionParameters());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index e8451e0..5e19d5e 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -32,7 +32,6 @@ import java.util.function.Supplier;
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterators;
-
 import org.apache.commons.lang3.StringUtils;
 
 import org.apache.cassandra.config.CFMetaData;
@@ -40,13 +39,15 @@ import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.compaction.AbstractCompactionTask;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.partitions.*;
-import org.apache.cassandra.dht.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.RandomPartitioner.BigIntegerToken;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.VersionedValue;
@@ -531,4 +532,10 @@ public class Util
     {
         thread.join(10000);
     }
+
+    // for use with Optional in tests, can be used as an argument to orElseThrow
+    public static Supplier<AssertionError> throwAssert(final String message)
+    {
+        return () -> new AssertionError(message);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
index be3568a..02b2abd 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
@@ -18,20 +18,24 @@
 package org.apache.cassandra.cql3.validation.entities;
 
 import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Locale;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
 
 import org.apache.commons.lang3.StringUtils;
-
 import org.junit.Test;
 
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.index.StubIndex;
 
+import static org.apache.cassandra.Util.throwAssert;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -70,13 +74,23 @@ public class SecondaryIndexTest extends CQLTester
     private void testCreateAndDropIndex(String indexName, boolean addKeyspaceOnDrop) throws Throwable
     {
         execute("USE system");
-        assertInvalidMessage("Index '" + removeQuotes(indexName.toLowerCase(Locale.US)) + "' could not be found", "DROP INDEX " + indexName + ";");
+        assertInvalidMessage(String.format("Index '%s' could not be found",
+                                           removeQuotes(indexName.toLowerCase(Locale.US))),
+                             "DROP INDEX " + indexName + ";");
 
         createTable("CREATE TABLE %s (a int primary key, b int);");
         createIndex("CREATE INDEX " + indexName + " ON %s(b);");
         createIndex("CREATE INDEX IF NOT EXISTS " + indexName + " ON %s(b);");
 
-        assertInvalidMessage("Index already exists", "CREATE INDEX " + indexName + " ON %s(b)");
+        assertInvalidMessage(String.format("Index %s already exists",
+                                           removeQuotes(indexName.toLowerCase(Locale.US))),
+                             "CREATE INDEX " + indexName + " ON %s(b)");
+
+        String otherIndexName = "index_" + System.nanoTime();
+        assertInvalidMessage(String.format("Index %s is a duplicate of existing index %s",
+                                           removeQuotes(otherIndexName.toLowerCase(Locale.US)),
+                                           removeQuotes(indexName.toLowerCase(Locale.US))),
+                             "CREATE INDEX " + otherIndexName + " ON %s(b)");
 
         execute("INSERT INTO %s (a, b) values (?, ?);", 0, 0);
         execute("INSERT INTO %s (a, b) values (?, ?);", 1, 1);
@@ -84,7 +98,8 @@ public class SecondaryIndexTest extends CQLTester
         execute("INSERT INTO %s (a, b) values (?, ?);", 3, 1);
 
         assertRows(execute("SELECT * FROM %s where b = ?", 1), row(1, 1), row(3, 1));
-        assertInvalidMessage("Index '" + removeQuotes(indexName.toLowerCase(Locale.US)) + "' could not be found in any of the tables of keyspace 'system'",
+        assertInvalidMessage(String.format("Index '%s' could not be found in any of the tables of keyspace 'system'",
+                                           removeQuotes(indexName.toLowerCase(Locale.US))),
                              "DROP INDEX " + indexName);
 
         if (addKeyspaceOnDrop)
@@ -100,7 +115,9 @@ public class SecondaryIndexTest extends CQLTester
         assertInvalidMessage("No supported secondary index found for the non primary key columns restrictions",
                              "SELECT * FROM %s where b = ?", 1);
         dropIndex("DROP INDEX IF EXISTS " + indexName);
-        assertInvalidMessage("Index '" + removeQuotes(indexName.toLowerCase(Locale.US)) + "' could not be found", "DROP INDEX " + indexName);
+        assertInvalidMessage(String.format("Index '%s' could not be found",
+                                           removeQuotes(indexName.toLowerCase(Locale.US))),
+                             "DROP INDEX " + indexName);
     }
 
     /**
@@ -189,12 +206,10 @@ public class SecondaryIndexTest extends CQLTester
     public void testUnknownCompressionOptions() throws Throwable
     {
         String tableName = createTableName();
-        assertInvalidThrow(SyntaxException.class, String.format(
-                                                               "CREATE TABLE %s (key varchar PRIMARY KEY, password varchar, gender varchar) WITH compression_parameters:sstable_compressor = 'DeflateCompressor'", tableName));
+        assertInvalidThrow(SyntaxException.class, String.format("CREATE TABLE %s (key varchar PRIMARY KEY, password varchar, gender varchar) WITH compression_parameters:sstable_compressor = 'DeflateCompressor'", tableName));
 
-
-        assertInvalidThrow(ConfigurationException.class, String.format(
-                                                                      "CREATE TABLE %s (key varchar PRIMARY KEY, password varchar, gender varchar) WITH compression = { 'sstable_compressor': 'DeflateCompressor' }", tableName));
+        assertInvalidThrow(ConfigurationException.class, String.format("CREATE TABLE %s (key varchar PRIMARY KEY, password varchar, gender varchar) WITH compression = { 'sstable_compressor': 'DeflateCompressor' }",
+                                                                      tableName));
     }
 
     /**
@@ -400,9 +415,6 @@ public class SecondaryIndexTest extends CQLTester
         assertRows(execute("SELECT k, v FROM %s WHERE k = 0 AND m CONTAINS KEY 'a'"), row(0, 0), row(0, 1));
         assertRows(execute("SELECT k, v FROM %s WHERE m CONTAINS KEY 'c'"), row(0, 2));
         assertEmpty(execute("SELECT k, v FROM %s  WHERE m CONTAINS KEY 'd'"));
-
-        // we're not allowed to create a value index if we already have a key one
-        assertInvalid("CREATE INDEX ON %s(m)");
     }
 
     /**
@@ -412,7 +424,7 @@ public class SecondaryIndexTest extends CQLTester
     @Test
     public void testIndexOnKeyWithReverseClustering() throws Throwable
     {
-        createTable(" CREATE TABLE %s (k1 int, k2 int, v int, PRIMARY KEY ((k1, k2), v) ) WITH CLUSTERING ORDER BY (v DESC)");
+        createTable("CREATE TABLE %s (k1 int, k2 int, v int, PRIMARY KEY ((k1, k2), v) ) WITH CLUSTERING ORDER BY (v DESC)");
 
         createIndex("CREATE INDEX ON %s (k2)");
 
@@ -591,4 +603,51 @@ public class SecondaryIndexTest extends CQLTester
         assertInvalid("CREATE INDEX ON %s (c)");
     }
 
+    @Test
+    public void testMultipleIndexesOnOneColumn() throws Throwable
+    {
+        String indexClassName = StubIndex.class.getName();
+        createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY ((a), b))");
+        // uses different options otherwise the two indexes are considered duplicates
+        createIndex(String.format("CREATE CUSTOM INDEX c_idx_1 ON %%s(c) USING '%s' WITH OPTIONS = {'foo':'a'}", indexClassName));
+        createIndex(String.format("CREATE CUSTOM INDEX c_idx_2 ON %%s(c) USING '%s' WITH OPTIONS = {'foo':'b'}", indexClassName));
+
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+        CFMetaData cfm = cfs.metadata;
+        StubIndex index1 = (StubIndex)cfs.indexManager.getIndex(cfm.getIndexes()
+                                                                   .get("c_idx_1")
+                                                                   .orElseThrow(throwAssert("index not found")));
+        StubIndex index2 = (StubIndex)cfs.indexManager.getIndex(cfm.getIndexes()
+                                                                   .get("c_idx_2")
+                                                                   .orElseThrow(throwAssert("index not found")));
+        Object[] row1a = row(0, 0, 0);
+        Object[] row1b = row(0, 0, 1);
+        Object[] row2 = row(2, 2, 2);
+        execute("INSERT INTO %s (a, b, c) VALUES (?, ?, ?)", row1a);
+        execute("INSERT INTO %s (a, b, c) VALUES (?, ?, ?)", row1b);
+        execute("INSERT INTO %s (a, b, c) VALUES (?, ?, ?)", row2);
+
+        assertEquals(2, index1.rowsInserted.size());
+        assertColumnValue(0, "c", index1.rowsInserted.get(0), cfm);
+        assertColumnValue(2, "c", index1.rowsInserted.get(1), cfm);
+
+        assertEquals(2, index2.rowsInserted.size());
+        assertColumnValue(0, "c", index2.rowsInserted.get(0), cfm);
+        assertColumnValue(2, "c", index2.rowsInserted.get(1), cfm);
+
+        assertEquals(1, index1.rowsUpdated.size());
+        assertColumnValue(0, "c", index1.rowsUpdated.get(0).left, cfm);
+        assertColumnValue(1, "c", index1.rowsUpdated.get(0).right, cfm);
+
+        assertEquals(1, index2.rowsUpdated.size());
+        assertColumnValue(0, "c", index2.rowsUpdated.get(0).left, cfm);
+        assertColumnValue(1, "c", index2.rowsUpdated.get(0).right, cfm);
+    }
+
+    private static void assertColumnValue(int expected, String name, Row row, CFMetaData cfm)
+    {
+        ColumnDefinition col = cfm.getColumnDefinition(new ColumnIdentifier(name, true));
+        AbstractType<?> type = col.type;
+        assertEquals(expected, type.compose(row.getCell(col).value()));
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java
index 8e1f438..e0879d2 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java
@@ -21,6 +21,8 @@ import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
+import org.junit.Test;
+
 import junit.framework.Assert;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.CQLTester;
@@ -30,8 +32,6 @@ import org.apache.cassandra.db.compaction.CompactionInterruptedException;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.utils.FBUtilities;
 
-import org.junit.Test;
-
 
 public class CrcCheckChanceTest extends CQLTester
 {
@@ -49,7 +49,7 @@ public class CrcCheckChanceTest extends CQLTester
 
 
         ColumnFamilyStore cfs = Keyspace.open(CQLTester.KEYSPACE).getColumnFamilyStore(currentTable());
-        ColumnFamilyStore indexCfs = cfs.indexManager.getIndexesBackedByCfs().iterator().next();
+        ColumnFamilyStore indexCfs = cfs.indexManager.getAllIndexColumnFamilyStores().iterator().next();
         cfs.forceBlockingFlush();
 
         Assert.assertEquals(0.99, cfs.metadata.params.compression.getCrcCheckChance());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/test/unit/org/apache/cassandra/db/CleanupTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CleanupTest.java b/test/unit/org/apache/cassandra/db/CleanupTest.java
index 792aaa7..7b03640 100644
--- a/test/unit/org/apache/cassandra/db/CleanupTest.java
+++ b/test/unit/org/apache/cassandra/db/CleanupTest.java
@@ -27,17 +27,15 @@ import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.filter.RowFilter;
-
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.dht.ByteOrderedPartitioner.BytesToken;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -115,12 +113,14 @@ public class CleanupTest
         fillCF(cfs, "birthdate", LOOPS);
         assertEquals(LOOPS, Util.getAll(Util.cmd(cfs).build()).size());
 
-        SecondaryIndex index = cfs.indexManager.getIndexForColumn(cfs.metadata.getColumnDefinition(COLUMN));
+        ColumnDefinition cdef = cfs.metadata.getColumnDefinition(COLUMN);
+        String indexName = cfs.metadata.getIndexes()
+                                       .get(cdef)
+                                       .iterator().next().name;
         long start = System.nanoTime();
-        while (!index.isIndexBuilt(COLUMN) && System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10))
+        while (!cfs.getBuiltIndexes().contains(indexName) && System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10))
             Thread.sleep(10);
 
-        ColumnDefinition cdef = cfs.metadata.getColumnDefinition(COLUMN);
         RowFilter cf = RowFilter.create();
         cf.add(cdef, Operator.EQ, VALUE);
         assertEquals(LOOPS, Util.getAll(Util.cmd(cfs).filterOn("birthdate", Operator.EQ, VALUE).build()).size());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/test/unit/org/apache/cassandra/db/DirectoriesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/DirectoriesTest.java b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
index cb38e37..40093ea 100644
--- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java
+++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
@@ -17,33 +17,39 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.*;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.commons.lang3.StringUtils;
-
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.Config.DiskFailurePolicy;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Directories.DataDirectory;
-import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.index.internal.CassandraIndex;
+import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.utils.Pair;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class DirectoriesTest
 {
@@ -161,12 +167,13 @@ public class DirectoriesTest
                                   .addPartitionKey("thekey", UTF8Type.instance)
                                   .addClusteringColumn("col", UTF8Type.instance)
                                   .build();
-        IndexMetadata indexDef = IndexMetadata.legacyIndex(PARENT_CFM.getColumnDefinition(ByteBufferUtil.bytes("col")),
-                                                           "idx",
-                                                           IndexMetadata.IndexType.KEYS,
-                                                           Collections.emptyMap());
+        ColumnDefinition col = PARENT_CFM.getColumnDefinition(ByteBufferUtil.bytes("col"));
+        IndexMetadata indexDef = IndexMetadata.singleColumnIndex(col,
+                                                                 "idx",
+                                                                 IndexMetadata.IndexType.KEYS,
+                                                                 Collections.emptyMap());
         PARENT_CFM.indexes(PARENT_CFM.getIndexes().with(indexDef));
-        CFMetaData INDEX_CFM = SecondaryIndex.newIndexMetadata(PARENT_CFM, indexDef);
+        CFMetaData INDEX_CFM = CassandraIndex.indexCfsMetadata(PARENT_CFM, indexDef);
         Directories parentDirectories = new Directories(PARENT_CFM);
         Directories indexDirectories = new Directories(INDEX_CFM);
         // secondary index has its own directory