You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2015/11/27 11:16:13 UTC

[1/4] cassandra git commit: Reject index queries while the index is building

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 d300a1850 -> 7b430eee6


Reject index queries while the index is building

patch by Benjamin Lerer; reviewed by Sam Tunnicliffe for CASSANDRA-8505


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/61e0251a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/61e0251a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/61e0251a

Branch: refs/heads/cassandra-3.0
Commit: 61e0251a1d4ddc695382aee11e443506afd40899
Parents: b3e6a43
Author: Benjamin Lerer <b....@gmail.com>
Authored: Fri Nov 27 11:10:28 2015 +0100
Committer: Benjamin Lerer <b....@gmail.com>
Committed: Fri Nov 27 11:10:28 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |   2 +-
 .../db/index/IndexNotAvailableException.java    |  49 ++++++
 .../cassandra/db/index/SecondaryIndex.java      |  28 +++-
 .../db/index/SecondaryIndexManager.java         |   6 +-
 .../db/index/composites/CompositesSearcher.java |   9 +-
 .../cassandra/db/index/keys/KeysSearcher.java   |   4 +-
 .../cassandra/net/MessageDeliveryTask.java      |   7 +-
 .../validation/entities/SecondaryIndexTest.java | 159 ++++++++++++++++++-
 9 files changed, 244 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/61e0251a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c2940cc..63305d6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.4
+ * Reject index queries while the index is building (CASSANDRA-8505)
  * CQL.textile syntax incorrectly includes optional keyspace for aggregate SFUNC and FINALFUNC (CASSANDRA-10747)
  * Fix JSON update with prepared statements (CASSANDRA-10631)
  * Don't do anticompaction after subrange repair (CASSANDRA-10422)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61e0251a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 2d58219..8fefae2 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -26,6 +26,7 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
+
 import javax.management.*;
 import javax.management.openmbean.*;
 
@@ -43,7 +44,6 @@ import org.apache.cassandra.io.FSWriteError;
 import org.json.simple.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.cache.*;
 import org.apache.cassandra.concurrent.*;
 import org.apache.cassandra.config.*;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61e0251a/src/java/org/apache/cassandra/db/index/IndexNotAvailableException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/IndexNotAvailableException.java b/src/java/org/apache/cassandra/db/index/IndexNotAvailableException.java
new file mode 100644
index 0000000..750e899
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/index/IndexNotAvailableException.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.index;
+
+/**
+ * Thrown if a secondary index is not currently available.
+ */
+public final class IndexNotAvailableException extends RuntimeException
+{
+    /**
+     * Creates a new <code>IndexNotAvailableException</code> for the specified index.
+     * @param name the index name
+     */
+    public IndexNotAvailableException(String name)
+    {
+        super(String.format("The secondary index '%s' is not yet available",
+                            removeTableNameIfNeeded(name)));
+    }
+
+    /**
+     * Extract the name of the index if necessary.
+     *
+     * @param name the index name prefixed by the tablename or not
+     * @return the index name
+     */
+    private static String removeTableNameIfNeeded(String name)
+    {
+        int index = name.indexOf('.');
+        if (index < 0)
+            return name;
+
+        return name.substring(index + 1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61e0251a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
index 11626d6..cf2deeb 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
@@ -89,6 +89,12 @@ public abstract class SecondaryIndex
      */
     protected ColumnFamilyStore baseCfs;
 
+    // We need to keep track if the index is queryable or not to be sure that we can safely use it. If the index
+    // is still being build, using it will return incomplete results.
+    /**
+     * Specify if the index is queryable or not.
+     */
+    private volatile boolean queryable;
 
     /**
      * The column definitions which this index is responsible for
@@ -150,8 +156,18 @@ public abstract class SecondaryIndex
         return SystemKeyspace.isIndexBuilt(baseCfs.keyspace.getName(), getNameForSystemKeyspace(columnName));
     }
 
+    /**
+     * Checks if the index is ready.
+     * @return <code>true</code> if the index is ready, <code>false</code> otherwise
+     */
+    public boolean isQueryable()
+    {
+        return queryable;
+    }
+
     public void setIndexBuilt()
     {
+        queryable = true;
         for (ColumnDefinition columnDef : columnDefs)
             SystemKeyspace.setIndexBuilt(baseCfs.keyspace.getName(), getNameForSystemKeyspace(columnDef.name.bytes));
     }
@@ -229,7 +245,7 @@ public abstract class SecondaryIndex
      *
      * @return A future object which the caller can block on (optional)
      */
-    public Future<?> buildIndexAsync()
+    public final Future<?> buildIndexAsync()
     {
         // if we're just linking in the index to indexedColumns on an already-built index post-restart, we're done
         boolean allAreBuilt = true;
@@ -243,7 +259,17 @@ public abstract class SecondaryIndex
         }
 
         if (allAreBuilt)
+        {
+            queryable = true;
             return null;
+        }
+
+        // If the base table is empty we can directly mark the index as built.
+        if (baseCfs.isEmpty())
+        {
+            setIndexBuilt();
+            return null;
+        }
 
         // build it asynchronously; addIndex gets called by CFS open and schema update, neither of which
         // we want to block for a long period.  (actual build is serialized on CompactionManager.)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61e0251a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index c4fd068..6df8616 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -314,11 +314,7 @@ public class SecondaryIndexManager
         // Add to all indexes set:
         indexesByName.put(index.getIndexName(), index);
 
-        // if we're just linking in the index to indexedColumns on an
-        // already-built index post-restart, we're done
-        if (index.isIndexBuilt(cdef.name.bytes))
-            return null;
-
+        // We do not need to check if the index is already build as buildIndexAsync will do it for us
         return index.buildIndexAsync();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61e0251a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
index 2f85e35..a67aa2b 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
@@ -40,6 +40,8 @@ import org.apache.cassandra.db.filter.ExtendedFilter;
 import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.db.index.IndexNotAvailableException;
+import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.index.SecondaryIndexSearcher;
 import org.apache.cassandra.dht.AbstractBounds;
@@ -60,12 +62,15 @@ public class CompositesSearcher extends SecondaryIndexSearcher
     {
         assert filter.getClause() != null && !filter.getClause().isEmpty();
         final IndexExpression primary = highestSelectivityPredicate(filter.getClause(), true);
-        final CompositesIndex index = (CompositesIndex)indexManager.getIndexForColumn(primary.column);
+        final SecondaryIndex index = indexManager.getIndexForColumn(primary.column);
+        if (!index.isQueryable())
+            throw new IndexNotAvailableException(index.getIndexName());
+
         // TODO: this should perhaps not open and maintain a writeOp for the full duration, but instead only *try* to delete stale entries, without blocking if there's no room
         // as it stands, we open a writeOp and keep it open for the duration to ensure that should this CF get flushed to make room we don't block the reclamation of any room being made
         try (OpOrder.Group writeOp = baseCfs.keyspace.writeOrder.start(); OpOrder.Group baseOp = baseCfs.readOrdering.start(); OpOrder.Group indexOp = index.getIndexCfs().readOrdering.start())
         {
-            return baseCfs.filter(getIndexedIterator(writeOp, filter, primary, index), filter);
+            return baseCfs.filter(getIndexedIterator(writeOp, filter, primary, (CompositesIndex) index), filter);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61e0251a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
index 86ff122..2b07c41 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
@@ -26,7 +26,6 @@ import java.util.Set;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.composites.CellNameType;
@@ -55,6 +54,9 @@ public class KeysSearcher extends SecondaryIndexSearcher
         assert filter.getClause() != null && !filter.getClause().isEmpty();
         final IndexExpression primary = highestSelectivityPredicate(filter.getClause(), true);
         final SecondaryIndex index = indexManager.getIndexForColumn(primary.column);
+        if (!index.isQueryable())
+            throw new IndexNotAvailableException(index.getIndexName());
+
         // TODO: this should perhaps not open and maintain a writeOp for the full duration, but instead only *try* to delete stale entries, without blocking if there's no room
         // as it stands, we open a writeOp and keep it open for the duration to ensure that should this CF get flushed to make room we don't block the reclamation of any room  being made
         try (OpOrder.Group writeOp = baseCfs.keyspace.writeOrder.start(); OpOrder.Group baseOp = baseCfs.readOrdering.start(); OpOrder.Group indexOp = index.getIndexCfs().readOrdering.start())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61e0251a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index a46366c..4211f5a 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -24,6 +24,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
+import org.apache.cassandra.db.index.IndexNotAvailableException;
 import org.apache.cassandra.gms.Gossiper;
 
 public class MessageDeliveryTask implements Runnable
@@ -70,10 +71,10 @@ public class MessageDeliveryTask implements Runnable
             handleFailure(ioe);
             throw new RuntimeException(ioe);
         }
-        catch (TombstoneOverwhelmingException toe)
+        catch (TombstoneOverwhelmingException | IndexNotAvailableException e)
         {
-            handleFailure(toe);
-            logger.error(toe.getMessage());
+            handleFailure(e);
+            logger.error(e.getMessage());
         }
         catch (Throwable t)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/61e0251a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
index 0b812c6..b8f6b9f 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
@@ -17,23 +17,35 @@
  */
 package org.apache.cassandra.cql3.validation.entities;
 
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.nio.ByteBuffer;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
-
-import org.apache.commons.lang3.StringUtils;
-
-import org.junit.Test;
+import java.util.concurrent.CountDownLatch;
 
 import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.IndexExpression;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.index.IndexNotAvailableException;
+import org.apache.cassandra.db.index.PerRowSecondaryIndex;
+import org.apache.cassandra.db.index.SecondaryIndexSearcher;
+import org.apache.cassandra.db.index.composites.CompositesSearcher;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.utils.FBUtilities;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import org.apache.cassandra.utils.concurrent.OpOrder.Group;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.Test;
 
 public class SecondaryIndexTest extends CQLTester
 {
@@ -563,7 +575,7 @@ public class SecondaryIndexTest extends CQLTester
     {
         createTable("CREATE TABLE %s(a int, b frozen<map<int, blob>>, PRIMARY KEY (a))");
         createIndex("CREATE INDEX ON %s(full(b))");
-        Map<Integer, ByteBuffer> map = new HashMap();
+        Map<Integer, ByteBuffer> map = new HashMap<>();
         map.put(0, ByteBuffer.allocate(1024 * 65));
         failInsert("INSERT INTO %s (a, b) VALUES (0, ?)", map);
     }
@@ -642,4 +654,135 @@ public class SecondaryIndexTest extends CQLTester
         assertInvalid("CREATE INDEX ON %s (c)");
     }
 
+    @Test
+    public void testIndexQueriesWithIndexNotReady() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck int, value int, PRIMARY KEY (pk, ck))");
+
+        for (int i = 0; i < 10; i++)
+            for (int j = 0; j < 10; j++)
+                execute("INSERT INTO %s (pk, ck, value) VALUES (?, ?, ?)", i, j, i + j);
+
+        createIndex("CREATE CUSTOM INDEX testIndex ON %s (value) USING '" + IndexBlockingOnInitialization.class.getName()
+                + "'");
+        try
+        {
+            execute("SELECT value FROM %s WHERE value = 2");
+            fail();
+        }
+        catch (IndexNotAvailableException e)
+        {
+            assertTrue(true);
+        }
+        finally
+        {
+            execute("DROP index " + KEYSPACE + ".testIndex");
+        }
+    }
+
+    /**
+     * Custom index used to test the behavior of the system when the index is not ready.
+     * As Custom indices cannot by <code>PerColumnSecondaryIndex</code> we use a <code>PerRowSecondaryIndex</code>
+     * to avoid the check but return a <code>CompositesSearcher</code>.
+     */
+    public static class IndexBlockingOnInitialization extends PerRowSecondaryIndex
+    {
+        private volatile CountDownLatch latch = new CountDownLatch(1);
+
+        @Override
+        public void index(ByteBuffer rowKey, ColumnFamily cf)
+        {
+            try
+            {
+                latch.await();
+            }
+            catch (InterruptedException e)
+            {
+                Thread.interrupted();
+            }
+        }
+
+        @Override
+        public void delete(DecoratedKey key, Group opGroup)
+        {
+        }
+
+        @Override
+        public void init()
+        {
+        }
+
+        @Override
+        public void reload()
+        {
+        }
+
+        @Override
+        public void validateOptions() throws ConfigurationException
+        {
+        }
+
+        @Override
+        public String getIndexName()
+        {
+            return "testIndex";
+        }
+
+        @Override
+        protected SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ByteBuffer> columns)
+        {
+            return new CompositesSearcher(baseCfs.indexManager, columns)
+            {
+                @Override
+                public boolean canHandleIndexClause(List<IndexExpression> clause)
+                {
+                    return true;
+                }
+
+                @Override
+                public void validate(IndexExpression indexExpression) throws InvalidRequestException
+                {
+                }
+            };
+        }
+
+        @Override
+        public void forceBlockingFlush()
+        {
+        }
+
+        @Override
+        public ColumnFamilyStore getIndexCfs()
+        {
+            return baseCfs;
+        }
+
+        @Override
+        public void removeIndex(ByteBuffer columnName)
+        {
+            latch.countDown();
+        }
+
+        @Override
+        public void invalidate()
+        {
+        }
+
+        @Override
+        public void truncateBlocking(long truncatedAt)
+        {
+        }
+
+        @Override
+        public boolean indexes(CellName name)
+        {
+            return false;
+        }
+
+        @Override
+        public long estimateResultRows()
+        {
+            return 0;
+        }
+    }
 }


[2/4] cassandra git commit: Merge branch cassandra-2.2 into cassandra-3.0

Posted by bl...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b430eee/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
index d30937f,b8f6b9f..e8bf1fd
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
@@@ -17,38 -17,35 +17,44 @@@
   */
  package org.apache.cassandra.cql3.validation.entities;
  
- import java.nio.ByteBuffer;
- import java.util.*;
- 
- import org.apache.cassandra.db.DeletionTime;
- import org.apache.cassandra.utils.Pair;
- import org.apache.commons.lang3.StringUtils;
 +import org.junit.Before;
 +import org.junit.Test;
++import static org.apache.cassandra.Util.throwAssert;
++import static org.junit.Assert.assertEquals;
++import static org.junit.Assert.assertFalse;
++import static org.junit.Assert.assertNotNull;
+ import static org.junit.Assert.assertTrue;
+ import static org.junit.Assert.fail;
+ 
+ import java.nio.ByteBuffer;
+ import java.util.HashMap;
 -import java.util.List;
+ import java.util.Locale;
+ import java.util.Map;
 -import java.util.Set;
+ import java.util.UUID;
++import java.util.concurrent.Callable;
+ import java.util.concurrent.CountDownLatch;
  
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.cql3.CQLTester;
 -import org.apache.cassandra.db.ColumnFamily;
 +import org.apache.cassandra.cql3.ColumnIdentifier;
 +import org.apache.cassandra.cql3.statements.IndexTarget;
  import org.apache.cassandra.db.ColumnFamilyStore;
 -import org.apache.cassandra.db.DecoratedKey;
 -import org.apache.cassandra.db.IndexExpression;
 -import org.apache.cassandra.db.composites.CellName;
 -import org.apache.cassandra.db.index.IndexNotAvailableException;
 -import org.apache.cassandra.db.index.PerRowSecondaryIndex;
 -import org.apache.cassandra.db.index.SecondaryIndexSearcher;
 -import org.apache.cassandra.db.index.composites.CompositesSearcher;
++import org.apache.cassandra.db.DeletionTime;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.db.rows.Cell;
 +import org.apache.cassandra.db.rows.Row;
  import org.apache.cassandra.exceptions.ConfigurationException;
 -import org.apache.cassandra.exceptions.InvalidRequestException;
  import org.apache.cassandra.exceptions.SyntaxException;
 -import org.apache.cassandra.utils.FBUtilities;
 -import org.apache.cassandra.utils.concurrent.OpOrder.Group;
++import org.apache.cassandra.index.IndexNotAvailableException;
 +import org.apache.cassandra.index.SecondaryIndexManager;
 +import org.apache.cassandra.index.StubIndex;
++import org.apache.cassandra.index.internal.CustomCassandraIndex;
 +import org.apache.cassandra.schema.IndexMetadata;
 +import org.apache.cassandra.utils.ByteBufferUtil;
- 
- import static org.apache.cassandra.Util.throwAssert;
- import static org.junit.Assert.assertEquals;
- import static org.junit.Assert.assertFalse;
- import static org.junit.Assert.assertNotNull;
- import static org.junit.Assert.assertTrue;
- import static org.junit.Assert.fail;
++import org.apache.cassandra.utils.Pair;
+ import org.apache.commons.lang3.StringUtils;
 -import org.junit.Test;
  
  public class SecondaryIndexTest extends CQLTester
  {
@@@ -668,16 -575,9 +674,16 @@@
      {
          createTable("CREATE TABLE %s(a int, b frozen<map<int, blob>>, PRIMARY KEY (a))");
          createIndex("CREATE INDEX ON %s(full(b))");
-         Map<Integer, ByteBuffer> map = new HashMap();
+         Map<Integer, ByteBuffer> map = new HashMap<>();
          map.put(0, ByteBuffer.allocate(1024 * 65));
          failInsert("INSERT INTO %s (a, b) VALUES (0, ?)", map);
 +        failInsert("INSERT INTO %s (a, b) VALUES (0, ?) IF NOT EXISTS", map);
 +        failInsert("BEGIN BATCH\n" +
 +                   "INSERT INTO %s (a, b) VALUES (0, ?);\n" +
 +                   "APPLY BATCH", map);
 +        failInsert("BEGIN BATCH\n" +
 +                   "INSERT INTO %s (a, b) VALUES (0, ?) IF NOT EXISTS;\n" +
 +                   "APPLY BATCH", map);
      }
  
      public void failInsert(String insertCQL, Object...args) throws Throwable
@@@ -755,180 -655,134 +761,234 @@@
      }
  
      @Test
 +    public void testMultipleIndexesOnOneColumn() throws Throwable
 +    {
 +        String indexClassName = StubIndex.class.getName();
 +        createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY ((a), b))");
 +        // uses different options otherwise the two indexes are considered duplicates
 +        createIndex(String.format("CREATE CUSTOM INDEX c_idx_1 ON %%s(c) USING '%s' WITH OPTIONS = {'foo':'a'}", indexClassName));
 +        createIndex(String.format("CREATE CUSTOM INDEX c_idx_2 ON %%s(c) USING '%s' WITH OPTIONS = {'foo':'b'}", indexClassName));
 +
 +        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
 +        CFMetaData cfm = cfs.metadata;
 +        StubIndex index1 = (StubIndex)cfs.indexManager.getIndex(cfm.getIndexes()
 +                                                                   .get("c_idx_1")
 +                                                                   .orElseThrow(throwAssert("index not found")));
 +        StubIndex index2 = (StubIndex)cfs.indexManager.getIndex(cfm.getIndexes()
 +                                                                   .get("c_idx_2")
 +                                                                   .orElseThrow(throwAssert("index not found")));
 +        Object[] row1a = row(0, 0, 0);
 +        Object[] row1b = row(0, 0, 1);
 +        Object[] row2 = row(2, 2, 2);
 +        execute("INSERT INTO %s (a, b, c) VALUES (?, ?, ?)", row1a);
 +        execute("INSERT INTO %s (a, b, c) VALUES (?, ?, ?)", row1b);
 +        execute("INSERT INTO %s (a, b, c) VALUES (?, ?, ?)", row2);
 +
 +        assertEquals(2, index1.rowsInserted.size());
 +        assertColumnValue(0, "c", index1.rowsInserted.get(0), cfm);
 +        assertColumnValue(2, "c", index1.rowsInserted.get(1), cfm);
 +
 +        assertEquals(2, index2.rowsInserted.size());
 +        assertColumnValue(0, "c", index2.rowsInserted.get(0), cfm);
 +        assertColumnValue(2, "c", index2.rowsInserted.get(1), cfm);
 +
 +        assertEquals(1, index1.rowsUpdated.size());
 +        assertColumnValue(0, "c", index1.rowsUpdated.get(0).left, cfm);
 +        assertColumnValue(1, "c", index1.rowsUpdated.get(0).right, cfm);
 +
 +        assertEquals(1, index2.rowsUpdated.size());
 +        assertColumnValue(0, "c", index2.rowsUpdated.get(0).left, cfm);
 +        assertColumnValue(1, "c", index2.rowsUpdated.get(0).right, cfm);
 +    }
 +
 +    @Test
 +    public void testDeletions() throws Throwable
 +    {
 +        // Test for bugs like CASSANDRA-10694.  These may not be readily visible with the built-in secondary index
 +        // implementation because of the stale entry handling.
 +
 +        String indexClassName = StubIndex.class.getName();
 +        createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY ((a), b))");
 +        createIndex(String.format("CREATE CUSTOM INDEX c_idx ON %%s(c) USING '%s'", indexClassName));
 +
 +        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
 +        CFMetaData cfm = cfs.metadata;
 +        StubIndex index1 = (StubIndex) cfs.indexManager.getIndex(cfm.getIndexes()
 +                .get("c_idx")
 +                .orElseThrow(throwAssert("index not found")));
 +
 +        execute("INSERT INTO %s (a, b, c) VALUES (?, ?, ?) USING TIMESTAMP 1", 0, 0, 0);
 +        assertEquals(1, index1.rowsInserted.size());
 +
 +        execute("DELETE FROM %s USING TIMESTAMP 2 WHERE a = ? AND b = ?", 0, 0);
 +        assertEquals(1, index1.rowsUpdated.size());
 +        Pair<Row, Row> update = index1.rowsUpdated.get(0);
 +        Row existingRow = update.left;
 +        Row newRow = update.right;
 +
 +        // check the existing row from the update call
 +        assertTrue(existingRow.deletion().isLive());
 +        assertEquals(DeletionTime.LIVE, existingRow.deletion().time());
 +        assertEquals(1L, existingRow.primaryKeyLivenessInfo().timestamp());
 +
 +        // check the new row from the update call
 +        assertFalse(newRow.deletion().isLive());
 +        assertEquals(2L, newRow.deletion().time().markedForDeleteAt());
 +        assertFalse(newRow.cells().iterator().hasNext());
 +
 +        // delete the same row again
 +        execute("DELETE FROM %s USING TIMESTAMP 3 WHERE a = ? AND b = ?", 0, 0);
 +        assertEquals(2, index1.rowsUpdated.size());
 +        update = index1.rowsUpdated.get(1);
 +        existingRow = update.left;
 +        newRow = update.right;
 +
 +        // check the new row from the update call
 +        assertFalse(existingRow.deletion().isLive());
 +        assertEquals(2L, existingRow.deletion().time().markedForDeleteAt());
 +        assertFalse(existingRow.cells().iterator().hasNext());
 +
 +        // check the new row from the update call
 +        assertFalse(newRow.deletion().isLive());
 +        assertEquals(3L, newRow.deletion().time().markedForDeleteAt());
 +        assertFalse(newRow.cells().iterator().hasNext());
 +    }
 +
 +    @Test
 +    public void testUpdatesToMemtableData() throws Throwable
 +    {
 +        // verify the contract specified by Index.Indexer::updateRow(oldRowData, newRowData),
 +        // when a row in the memtable is updated, the indexer should be informed of:
 +        // * new columns
 +        // * removed columns
 +        // * columns whose value, timestamp or ttl have been modified.
 +        // Any columns which are unchanged by the update are not passed to the Indexer
 +        // Note that for simplicity this test resets the index between each scenario
 +        createTable("CREATE TABLE %s (k int, c int, v1 int, v2 int, PRIMARY KEY (k,c))");
 +        createIndex(String.format("CREATE CUSTOM INDEX test_index ON %%s() USING '%s'", StubIndex.class.getName()));
 +        execute("INSERT INTO %s (k, c, v1, v2) VALUES (0, 0, 0, 0) USING TIMESTAMP 0");
 +
 +        ColumnDefinition v1 = getCurrentColumnFamilyStore().metadata.getColumnDefinition(new ColumnIdentifier("v1", true));
 +        ColumnDefinition v2 = getCurrentColumnFamilyStore().metadata.getColumnDefinition(new ColumnIdentifier("v2", true));
 +
 +        StubIndex index = (StubIndex)getCurrentColumnFamilyStore().indexManager.getIndexByName("test_index");
 +        assertEquals(1, index.rowsInserted.size());
 +
 +        // Overwrite a single value, leaving the other untouched
 +        execute("UPDATE %s USING TIMESTAMP 1 SET v1=1 WHERE k=0 AND c=0");
 +        assertEquals(1, index.rowsUpdated.size());
 +        Row oldRow = index.rowsUpdated.get(0).left;
 +        assertEquals(1, oldRow.size());
 +        validateCell(oldRow.getCell(v1), v1, ByteBufferUtil.bytes(0), 0);
 +        Row newRow = index.rowsUpdated.get(0).right;
 +        assertEquals(1, newRow.size());
 +        validateCell(newRow.getCell(v1), v1, ByteBufferUtil.bytes(1), 1);
 +        index.reset();
 +
 +        // Overwrite both values
 +        execute("UPDATE %s USING TIMESTAMP 2 SET v1=2, v2=2 WHERE k=0 AND c=0");
 +        assertEquals(1, index.rowsUpdated.size());
 +        oldRow = index.rowsUpdated.get(0).left;
 +        assertEquals(2, oldRow.size());
 +        validateCell(oldRow.getCell(v1), v1, ByteBufferUtil.bytes(1), 1);
 +        validateCell(oldRow.getCell(v2), v2, ByteBufferUtil.bytes(0), 0);
 +        newRow = index.rowsUpdated.get(0).right;
 +        assertEquals(2, newRow.size());
 +        validateCell(newRow.getCell(v1), v1, ByteBufferUtil.bytes(2), 2);
 +        validateCell(newRow.getCell(v2), v2, ByteBufferUtil.bytes(2), 2);
 +        index.reset();
 +
 +        // Delete one value
 +        execute("DELETE v1 FROM %s USING TIMESTAMP 3 WHERE k=0 AND c=0");
 +        assertEquals(1, index.rowsUpdated.size());
 +        oldRow = index.rowsUpdated.get(0).left;
 +        assertEquals(1, oldRow.size());
 +        validateCell(oldRow.getCell(v1), v1, ByteBufferUtil.bytes(2), 2);
 +        newRow = index.rowsUpdated.get(0).right;
 +        assertEquals(1, newRow.size());
 +        Cell newCell = newRow.getCell(v1);
 +        assertTrue(newCell.isTombstone());
 +        assertEquals(3, newCell.timestamp());
 +        index.reset();
 +
 +        // Modify the liveness of the primary key, the delta rows should contain
 +        // no cell data as only the pk was altered, but it should illustrate the
 +        // change to the liveness info
 +        execute("INSERT INTO %s(k, c) VALUES (0, 0) USING TIMESTAMP 4");
 +        assertEquals(1, index.rowsUpdated.size());
 +        oldRow = index.rowsUpdated.get(0).left;
 +        assertEquals(0, oldRow.size());
 +        assertEquals(0, oldRow.primaryKeyLivenessInfo().timestamp());
 +        newRow = index.rowsUpdated.get(0).right;
 +        assertEquals(0, newRow.size());
 +        assertEquals(4, newRow.primaryKeyLivenessInfo().timestamp());
 +    }
 +
++    @Test
+     public void testIndexQueriesWithIndexNotReady() throws Throwable
+     {
+         createTable("CREATE TABLE %s (pk int, ck int, value int, PRIMARY KEY (pk, ck))");
+ 
+         for (int i = 0; i < 10; i++)
+             for (int j = 0; j < 10; j++)
+                 execute("INSERT INTO %s (pk, ck, value) VALUES (?, ?, ?)", i, j, i + j);
+ 
 -        createIndex("CREATE CUSTOM INDEX testIndex ON %s (value) USING '" + IndexBlockingOnInitialization.class.getName()
 -                + "'");
++        createIndex("CREATE CUSTOM INDEX testIndex ON %s (value) USING '" + IndexBlockingOnInitialization.class.getName() + "'");
+         try
+         {
+             execute("SELECT value FROM %s WHERE value = 2");
+             fail();
+         }
+         catch (IndexNotAvailableException e)
+         {
+             assertTrue(true);
+         }
+         finally
+         {
+             execute("DROP index " + KEYSPACE + ".testIndex");
+         }
+     }
+ 
 -    /**
 -     * Custom index used to test the behavior of the system when the index is not ready.
 -     * As Custom indices cannot by <code>PerColumnSecondaryIndex</code> we use a <code>PerRowSecondaryIndex</code>
 -     * to avoid the check but return a <code>CompositesSearcher</code>.
 -     */
 -    public static class IndexBlockingOnInitialization extends PerRowSecondaryIndex
 +    private void validateCell(Cell cell, ColumnDefinition def, ByteBuffer val, long timestamp)
      {
 -        private volatile CountDownLatch latch = new CountDownLatch(1);
 -
 -        @Override
 -        public void index(ByteBuffer rowKey, ColumnFamily cf)
 -        {
 -            try
 -            {
 -                latch.await();
 -            }
 -            catch (InterruptedException e)
 -            {
 -                Thread.interrupted();
 -            }
 -        }
 -
 -        @Override
 -        public void delete(DecoratedKey key, Group opGroup)
 -        {
 -        }
 -
 -        @Override
 -        public void init()
 -        {
 -        }
 +        assertNotNull(cell);
 +        assertEquals(0, def.type.compare(cell.value(), val));
 +        assertEquals(timestamp, cell.timestamp());
 +    }
  
 -        @Override
 -        public void reload()
 -        {
 -        }
 +    private static void assertColumnValue(int expected, String name, Row row, CFMetaData cfm)
 +    {
 +        ColumnDefinition col = cfm.getColumnDefinition(new ColumnIdentifier(name, true));
 +        AbstractType<?> type = col.type;
 +        assertEquals(expected, type.compose(row.getCell(col).value()));
 +    }
+ 
 -        @Override
 -        public void validateOptions() throws ConfigurationException
 -        {
 -        }
++    /**
++     * <code>CassandraIndex</code> that blocks during the initialization.
++     */
++    public static class IndexBlockingOnInitialization extends CustomCassandraIndex
++    {
++        private final CountDownLatch latch = new CountDownLatch(1);
+ 
 -        @Override
 -        public String getIndexName()
++        public IndexBlockingOnInitialization(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
+         {
 -            return "testIndex";
++            super(baseCfs, indexDef);
+         }
+ 
+         @Override
 -        protected SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ByteBuffer> columns)
++        public Callable<?> getInitializationTask()
+         {
 -            return new CompositesSearcher(baseCfs.indexManager, columns)
 -            {
 -                @Override
 -                public boolean canHandleIndexClause(List<IndexExpression> clause)
 -                {
 -                    return true;
 -                }
 -
 -                @Override
 -                public void validate(IndexExpression indexExpression) throws InvalidRequestException
 -                {
 -                }
++            return () -> {
++                latch.await();
++                return null;
+             };
+         }
+ 
+         @Override
 -        public void forceBlockingFlush()
 -        {
 -        }
 -
 -        @Override
 -        public ColumnFamilyStore getIndexCfs()
 -        {
 -            return baseCfs;
 -        }
 -
 -        @Override
 -        public void removeIndex(ByteBuffer columnName)
++        public Callable<?> getInvalidateTask()
+         {
+             latch.countDown();
 -        }
 -
 -        @Override
 -        public void invalidate()
 -        {
 -        }
 -
 -        @Override
 -        public void truncateBlocking(long truncatedAt)
 -        {
 -        }
 -
 -        @Override
 -        public boolean indexes(CellName name)
 -        {
 -            return false;
 -        }
 -
 -        @Override
 -        public long estimateResultRows()
 -        {
 -            return 0;
++            return super.getInvalidateTask();
+         }
+     }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b430eee/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
index 0957f74,0000000..3bce683
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
+++ b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
@@@ -1,642 -1,0 +1,642 @@@
 +package org.apache.cassandra.index.internal;
 +
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +import java.util.concurrent.Callable;
 +import java.util.concurrent.Future;
 +import java.util.function.BiFunction;
 +import java.util.stream.Collectors;
 +import java.util.stream.StreamSupport;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.cql3.Operator;
 +import org.apache.cassandra.cql3.statements.IndexTarget;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.db.filter.RowFilter;
 +import org.apache.cassandra.db.lifecycle.SSTableSet;
 +import org.apache.cassandra.db.lifecycle.View;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.db.partitions.PartitionIterator;
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
 +import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.index.Index;
 +import org.apache.cassandra.index.IndexRegistry;
 +import org.apache.cassandra.index.SecondaryIndexBuilder;
 +import org.apache.cassandra.index.transactions.IndexTransaction;
 +import org.apache.cassandra.index.transactions.UpdateTransaction;
 +import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.schema.IndexMetadata;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.Pair;
 +import org.apache.cassandra.utils.concurrent.OpOrder;
 +import org.apache.cassandra.utils.concurrent.Refs;
 +
 +import static org.apache.cassandra.index.internal.CassandraIndex.getFunctions;
 +import static org.apache.cassandra.index.internal.CassandraIndex.indexCfsMetadata;
 +import static org.apache.cassandra.index.internal.CassandraIndex.parseTarget;
 +
 +/**
 + * Clone of KeysIndex used in CassandraIndexTest#testCustomIndexWithCFS to verify
 + * behaviour of flushing CFS backed CUSTOM indexes
 + */
 +public class CustomCassandraIndex implements Index
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(CassandraIndex.class);
 +
 +    public final ColumnFamilyStore baseCfs;
 +    protected IndexMetadata metadata;
 +    protected ColumnFamilyStore indexCfs;
 +    protected ColumnDefinition indexedColumn;
 +    protected CassandraIndexFunctions functions;
 +
 +    public CustomCassandraIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
 +    {
 +        this.baseCfs = baseCfs;
 +        setMetadata(indexDef);
 +    }
 +
 +    /**
 +     * Returns true if an index of this type can support search predicates of the form [column] OPERATOR [value]
 +     * @param indexedColumn
 +     * @param operator
 +     * @return
 +     */
 +    protected boolean supportsOperator(ColumnDefinition indexedColumn, Operator operator)
 +    {
 +        return operator.equals(Operator.EQ);
 +    }
 +
 +    public ColumnDefinition getIndexedColumn()
 +    {
 +        return indexedColumn;
 +    }
 +
 +    public ClusteringComparator getIndexComparator()
 +    {
 +        return indexCfs.metadata.comparator;
 +    }
 +
 +    public ColumnFamilyStore getIndexCfs()
 +    {
 +        return indexCfs;
 +    }
 +
 +    public void register(IndexRegistry registry)
 +    {
 +        registry.registerIndex(this);
 +    }
 +
 +    public Callable<?> getInitializationTask()
 +    {
 +        // if we're just linking in the index on an already-built index post-restart
-         // we've nothing to do. Otherwise, submit for building via SecondaryIndexBuilder
-         return isBuilt() ? null : getBuildIndexTask();
++        // or if the table is empty we've nothing to do. Otherwise, submit for building via SecondaryIndexBuilder
++        return isBuilt() || baseCfs.isEmpty() ? null : getBuildIndexTask();
 +    }
 +
 +    public IndexMetadata getIndexMetadata()
 +    {
 +        return metadata;
 +    }
 +
 +    public Optional<ColumnFamilyStore> getBackingTable()
 +    {
 +        return indexCfs == null ? Optional.empty() : Optional.of(indexCfs);
 +    }
 +
 +    public Callable<Void> getBlockingFlushTask()
 +    {
 +        return () -> {
 +            indexCfs.forceBlockingFlush();
 +            return null;
 +        };
 +    }
 +
 +    public Callable<?> getInvalidateTask()
 +    {
 +        return () -> {
 +            invalidate();
 +            return null;
 +        };
 +    }
 +
 +    public Callable<?> getMetadataReloadTask(IndexMetadata indexDef)
 +    {
 +        setMetadata(indexDef);
 +        return () -> {
 +            indexCfs.metadata.reloadIndexMetadataProperties(baseCfs.metadata);
 +            indexCfs.reload();
 +            return null;
 +        };
 +    }
 +
 +    private void setMetadata(IndexMetadata indexDef)
 +    {
 +        metadata = indexDef;
 +        Pair<ColumnDefinition, IndexTarget.Type> target = parseTarget(baseCfs.metadata, indexDef);
 +        functions = getFunctions(indexDef, target);
 +        CFMetaData cfm = indexCfsMetadata(baseCfs.metadata, indexDef);
 +        indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.keyspace,
 +                                                             cfm.cfName,
 +                                                             cfm,
 +                                                             baseCfs.getTracker().loadsstables);
 +        indexedColumn = target.left;
 +    }
 +
 +    public Callable<?> getTruncateTask(final long truncatedAt)
 +    {
 +        return () -> {
 +            indexCfs.discardSSTables(truncatedAt);
 +            return null;
 +        };
 +    }
 +
 +    public boolean shouldBuildBlocking()
 +    {
 +        return true;
 +    }
 +
 +    public boolean indexes(PartitionColumns columns)
 +    {
 +        // if we have indexes on the partition key or clustering columns, return true
 +        return isPrimaryKeyIndex() || columns.contains(indexedColumn);
 +    }
 +
 +    public boolean dependsOn(ColumnDefinition column)
 +    {
 +        return column.equals(indexedColumn);
 +    }
 +
 +    public boolean supportsExpression(ColumnDefinition column, Operator operator)
 +    {
 +        return indexedColumn.name.equals(column.name)
 +               && supportsOperator(indexedColumn, operator);
 +    }
 +
 +    public AbstractType<?> customExpressionValueType()
 +    {
 +        return null;
 +    }
 +
 +    private boolean supportsExpression(RowFilter.Expression expression)
 +    {
 +        return supportsExpression(expression.column(), expression.operator());
 +    }
 +
 +    public long getEstimatedResultRows()
 +    {
 +        return indexCfs.getMeanColumns();
 +    }
 +
 +    /**
 +     * No post processing of query results, just return them unchanged
 +     */
 +    public BiFunction<PartitionIterator, ReadCommand, PartitionIterator> postProcessorFor(ReadCommand command)
 +    {
 +        return (partitionIterator, readCommand) -> partitionIterator;
 +    }
 +
 +    public RowFilter getPostIndexQueryFilter(RowFilter filter)
 +    {
 +        return getTargetExpression(filter.getExpressions()).map(filter::without)
 +                                                           .orElse(filter);
 +    }
 +
 +    private Optional<RowFilter.Expression> getTargetExpression(List<RowFilter.Expression> expressions)
 +    {
 +        return expressions.stream().filter(this::supportsExpression).findFirst();
 +    }
 +
 +    public Index.Searcher searcherFor(ReadCommand command)
 +    {
 +        return null;
 +    }
 +
 +    public void validate(PartitionUpdate update) throws InvalidRequestException
 +    {
 +        switch (indexedColumn.kind)
 +        {
 +            case PARTITION_KEY:
 +                validatePartitionKey(update.partitionKey());
 +                break;
 +            case CLUSTERING:
 +                validateClusterings(update);
 +                break;
 +            case REGULAR:
 +                validateRows(update);
 +                break;
 +            case STATIC:
 +                validateRows(Collections.singleton(update.staticRow()));
 +                break;
 +        }
 +    }
 +
 +    protected CBuilder buildIndexClusteringPrefix(ByteBuffer partitionKey,
 +                                               ClusteringPrefix prefix,
 +                                               CellPath path)
 +    {
 +        CBuilder builder = CBuilder.create(getIndexComparator());
 +        builder.add(partitionKey);
 +        return builder;
 +    }
 +
 +    protected ByteBuffer getIndexedValue(ByteBuffer partitionKey,
 +                                      Clustering clustering,
 +                                      CellPath path, ByteBuffer cellValue)
 +    {
 +        return cellValue;
 +    }
 +
 +    public IndexEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry)
 +    {
 +        throw new UnsupportedOperationException("KEYS indexes do not use a specialized index entry format");
 +    }
 +
 +    public boolean isStale(Row row, ByteBuffer indexValue, int nowInSec)
 +    {
 +        if (row == null)
 +            return true;
 +
 +        Cell cell = row.getCell(indexedColumn);
 +
 +        return (cell == null
 +             || !cell.isLive(nowInSec)
 +             || indexedColumn.type.compare(indexValue, cell.value()) != 0);
 +    }
 +
 +    public Indexer indexerFor(final DecoratedKey key,
 +                              final int nowInSec,
 +                              final OpOrder.Group opGroup,
 +                              final IndexTransaction.Type transactionType)
 +    {
 +        return new Indexer()
 +        {
 +            public void begin()
 +            {
 +            }
 +
 +            public void partitionDelete(DeletionTime deletionTime)
 +            {
 +            }
 +
 +            public void rangeTombstone(RangeTombstone tombstone)
 +            {
 +            }
 +
 +            public void insertRow(Row row)
 +            {
 +                if (isPrimaryKeyIndex())
 +                {
 +                    indexPrimaryKey(row.clustering(),
 +                                    getPrimaryKeyIndexLiveness(row),
 +                                    row.deletion());
 +                }
 +                else
 +                {
 +                    if (indexedColumn.isComplex())
 +                        indexCells(row.clustering(), row.getComplexColumnData(indexedColumn));
 +                    else
 +                        indexCell(row.clustering(), row.getCell(indexedColumn));
 +                }
 +            }
 +
 +            public void removeRow(Row row)
 +            {
 +                if (isPrimaryKeyIndex())
 +                    indexPrimaryKey(row.clustering(), row.primaryKeyLivenessInfo(), row.deletion());
 +
 +                if (indexedColumn.isComplex())
 +                    removeCells(row.clustering(), row.getComplexColumnData(indexedColumn));
 +                else
 +                    removeCell(row.clustering(), row.getCell(indexedColumn));
 +            }
 +
 +
 +            public void updateRow(Row oldRow, Row newRow)
 +            {
 +                if (isPrimaryKeyIndex())
 +                    indexPrimaryKey(newRow.clustering(),
 +                                    newRow.primaryKeyLivenessInfo(),
 +                                    newRow.deletion());
 +
 +                if (indexedColumn.isComplex())
 +                {
 +                    indexCells(newRow.clustering(), newRow.getComplexColumnData(indexedColumn));
 +                    removeCells(oldRow.clustering(), oldRow.getComplexColumnData(indexedColumn));
 +                }
 +                else
 +                {
 +                    indexCell(newRow.clustering(), newRow.getCell(indexedColumn));
 +                    removeCell(oldRow.clustering(), oldRow.getCell(indexedColumn));
 +                }
 +            }
 +
 +            public void finish()
 +            {
 +            }
 +
 +            private void indexCells(Clustering clustering, Iterable<Cell> cells)
 +            {
 +                if (cells == null)
 +                    return;
 +
 +                for (Cell cell : cells)
 +                    indexCell(clustering, cell);
 +            }
 +
 +            private void indexCell(Clustering clustering, Cell cell)
 +            {
 +                if (cell == null || !cell.isLive(nowInSec))
 +                    return;
 +
 +                insert(key.getKey(),
 +                       clustering,
 +                       cell,
 +                       LivenessInfo.create(cell.timestamp(), cell.ttl(), cell.localDeletionTime()),
 +                       opGroup);
 +            }
 +
 +            private void removeCells(Clustering clustering, Iterable<Cell> cells)
 +            {
 +                if (cells == null)
 +                    return;
 +
 +                for (Cell cell : cells)
 +                    removeCell(clustering, cell);
 +            }
 +
 +            private void removeCell(Clustering clustering, Cell cell)
 +            {
 +                if (cell == null || !cell.isLive(nowInSec))
 +                    return;
 +
 +                delete(key.getKey(), clustering, cell, opGroup, nowInSec);
 +            }
 +
 +            private void indexPrimaryKey(final Clustering clustering,
 +                                         final LivenessInfo liveness,
 +                                         final Row.Deletion deletion)
 +            {
 +                if (liveness.timestamp() != LivenessInfo.NO_TIMESTAMP)
 +                    insert(key.getKey(), clustering, null, liveness, opGroup);
 +
 +                if (!deletion.isLive())
 +                    delete(key.getKey(), clustering, deletion.time(), opGroup);
 +            }
 +
 +            private LivenessInfo getPrimaryKeyIndexLiveness(Row row)
 +            {
 +                long timestamp = row.primaryKeyLivenessInfo().timestamp();
 +                int ttl = row.primaryKeyLivenessInfo().ttl();
 +                for (Cell cell : row.cells())
 +                {
 +                    long cellTimestamp = cell.timestamp();
 +                    if (cell.isLive(nowInSec))
 +                    {
 +                        if (cellTimestamp > timestamp)
 +                        {
 +                            timestamp = cellTimestamp;
 +                            ttl = cell.ttl();
 +                        }
 +                    }
 +                }
 +                return LivenessInfo.create(baseCfs.metadata, timestamp, ttl, nowInSec);
 +            }
 +        };
 +    }
 +
 +    /**
 +     * Specific to internal indexes, this is called by a
 +     * searcher when it encounters a stale entry in the index
 +     * @param indexKey the partition key in the index table
 +     * @param indexClustering the clustering in the index table
 +     * @param deletion deletion timestamp etc
 +     * @param opGroup the operation under which to perform the deletion
 +     */
 +    public void deleteStaleEntry(DecoratedKey indexKey,
 +                                 Clustering indexClustering,
 +                                 DeletionTime deletion,
 +                                 OpOrder.Group opGroup)
 +    {
 +        doDelete(indexKey, indexClustering, deletion, opGroup);
 +        logger.debug("Removed index entry for stale value {}", indexKey);
 +    }
 +
 +    /**
 +     * Called when adding a new entry to the index
 +     */
 +    private void insert(ByteBuffer rowKey,
 +                        Clustering clustering,
 +                        Cell cell,
 +                        LivenessInfo info,
 +                        OpOrder.Group opGroup)
 +    {
 +        DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
 +                                                               clustering,
 +                                                               cell));
 +        Row row = BTreeRow.noCellLiveRow(buildIndexClustering(rowKey, clustering, cell), info);
 +        PartitionUpdate upd = partitionUpdate(valueKey, row);
 +        indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null);
 +        logger.debug("Inserted entry into index for value {}", valueKey);
 +    }
 +
 +    /**
 +     * Called when deleting entries on non-primary key columns
 +     */
 +    private void delete(ByteBuffer rowKey,
 +                        Clustering clustering,
 +                        Cell cell,
 +                        OpOrder.Group opGroup,
 +                        int nowInSec)
 +    {
 +        DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
 +                                                               clustering,
 +                                                               cell));
 +        doDelete(valueKey,
 +                 buildIndexClustering(rowKey, clustering, cell),
 +                 new DeletionTime(cell.timestamp(), nowInSec),
 +                 opGroup);
 +    }
 +
 +    /**
 +     * Called when deleting entries from indexes on primary key columns
 +     */
 +    private void delete(ByteBuffer rowKey,
 +                        Clustering clustering,
 +                        DeletionTime deletion,
 +                        OpOrder.Group opGroup)
 +    {
 +        DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
 +                                                               clustering,
 +                                                               null));
 +        doDelete(valueKey,
 +                 buildIndexClustering(rowKey, clustering, null),
 +                 deletion,
 +                 opGroup);
 +    }
 +
 +    private void doDelete(DecoratedKey indexKey,
 +                          Clustering indexClustering,
 +                          DeletionTime deletion,
 +                          OpOrder.Group opGroup)
 +    {
 +        Row row = BTreeRow.emptyDeletedRow(indexClustering, Row.Deletion.regular(deletion));
 +        PartitionUpdate upd = partitionUpdate(indexKey, row);
 +        indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null);
 +        logger.debug("Removed index entry for value {}", indexKey);
 +    }
 +
 +    private void validatePartitionKey(DecoratedKey partitionKey) throws InvalidRequestException
 +    {
 +        assert indexedColumn.isPartitionKey();
 +        validateIndexedValue(getIndexedValue(partitionKey.getKey(), null, null ));
 +    }
 +
 +    private void validateClusterings(PartitionUpdate update) throws InvalidRequestException
 +    {
 +        assert indexedColumn.isClusteringColumn();
 +        for (Row row : update)
 +            validateIndexedValue(getIndexedValue(null, row.clustering(), null));
 +    }
 +
 +    private void validateRows(Iterable<Row> rows)
 +    {
 +        assert !indexedColumn.isPrimaryKeyColumn();
 +        for (Row row : rows)
 +        {
 +            if (indexedColumn.isComplex())
 +            {
 +                ComplexColumnData data = row.getComplexColumnData(indexedColumn);
 +                if (data != null)
 +                {
 +                    for (Cell cell : data)
 +                    {
 +                        validateIndexedValue(getIndexedValue(null, null, cell.path(), cell.value()));
 +                    }
 +                }
 +            }
 +            else
 +            {
 +                validateIndexedValue(getIndexedValue(null, null, row.getCell(indexedColumn)));
 +            }
 +        }
 +    }
 +
 +    private void validateIndexedValue(ByteBuffer value)
 +    {
 +        if (value != null && value.remaining() >= FBUtilities.MAX_UNSIGNED_SHORT)
 +            throw new InvalidRequestException(String.format(
 +                                                           "Cannot index value of size %d for index %s on %s.%s(%s) (maximum allowed size=%d)",
 +                                                           value.remaining(),
 +                                                           metadata.name,
 +                                                           baseCfs.metadata.ksName,
 +                                                           baseCfs.metadata.cfName,
 +                                                           indexedColumn.name.toString(),
 +                                                           FBUtilities.MAX_UNSIGNED_SHORT));
 +    }
 +
 +    private ByteBuffer getIndexedValue(ByteBuffer rowKey,
 +                                       Clustering clustering,
 +                                       Cell cell)
 +    {
 +        return getIndexedValue(rowKey,
 +                               clustering,
 +                               cell == null ? null : cell.path(),
 +                               cell == null ? null : cell.value()
 +        );
 +    }
 +
 +    private Clustering buildIndexClustering(ByteBuffer rowKey,
 +                                            Clustering clustering,
 +                                            Cell cell)
 +    {
 +        return buildIndexClusteringPrefix(rowKey,
 +                                          clustering,
 +                                          cell == null ? null : cell.path()).build();
 +    }
 +
 +    private DecoratedKey getIndexKeyFor(ByteBuffer value)
 +    {
 +        return indexCfs.decorateKey(value);
 +    }
 +
 +    private PartitionUpdate partitionUpdate(DecoratedKey valueKey, Row row)
 +    {
 +        return PartitionUpdate.singleRowUpdate(indexCfs.metadata, valueKey, row);
 +    }
 +
 +    private void invalidate()
 +    {
 +        // interrupt in-progress compactions
 +        Collection<ColumnFamilyStore> cfss = Collections.singleton(indexCfs);
 +        CompactionManager.instance.interruptCompactionForCFs(cfss, true);
 +        CompactionManager.instance.waitForCessation(cfss);
 +        indexCfs.keyspace.writeOrder.awaitNewBarrier();
 +        indexCfs.forceBlockingFlush();
 +        indexCfs.readOrdering.awaitNewBarrier();
 +        indexCfs.invalidate();
 +    }
 +
 +    private boolean isBuilt()
 +    {
 +        return SystemKeyspace.isIndexBuilt(baseCfs.keyspace.getName(), metadata.name);
 +    }
 +
 +    private boolean isPrimaryKeyIndex()
 +    {
 +        return indexedColumn.isPrimaryKeyColumn();
 +    }
 +
 +    private Callable<?> getBuildIndexTask()
 +    {
 +        return () -> {
 +            buildBlocking();
 +            return null;
 +        };
 +    }
 +
 +    private void buildBlocking()
 +    {
 +        baseCfs.forceBlockingFlush();
 +
 +        try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.select(SSTableSet.CANONICAL));
 +             Refs<SSTableReader> sstables = viewFragment.refs)
 +        {
 +            if (sstables.isEmpty())
 +            {
 +                logger.info("No SSTable data for {}.{} to build index {} from, marking empty index as built",
 +                            baseCfs.metadata.ksName,
 +                            baseCfs.metadata.cfName,
 +                            metadata.name);
 +                baseCfs.indexManager.markIndexBuilt(metadata.name);
 +                return;
 +            }
 +
 +            logger.info("Submitting index build of {} for data in {}",
 +                        metadata.name,
 +                        getSSTableNames(sstables));
 +
 +            SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs,
 +                                                                      Collections.singleton(this),
 +                                                                      new ReducingKeyIterator(sstables));
 +            Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
 +            FBUtilities.waitOnFuture(future);
 +            indexCfs.forceBlockingFlush();
 +            baseCfs.indexManager.markIndexBuilt(metadata.name);
 +        }
 +        logger.info("Index build of {} complete", metadata.name);
 +    }
 +
 +    private static String getSSTableNames(Collection<SSTableReader> sstables)
 +    {
 +        return StreamSupport.stream(sstables.spliterator(), false)
 +                            .map(SSTableReader::toString)
 +                            .collect(Collectors.joining(", "));
 +    }
 +}


[4/4] cassandra git commit: Merge branch cassandra-2.2 into cassandra-3.0

Posted by bl...@apache.org.
Merge branch cassandra-2.2 into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7b430eee
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7b430eee
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7b430eee

Branch: refs/heads/cassandra-3.0
Commit: 7b430eee69d8f70b086a30a6e9d3c42a9db4aa08
Parents: d300a18 61e0251
Author: Benjamin Lerer <b....@gmail.com>
Authored: Fri Nov 27 11:15:08 2015 +0100
Committer: Benjamin Lerer <b....@gmail.com>
Committed: Fri Nov 27 11:16:02 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/db/ReadCommand.java    |  9 +-
 .../index/IndexNotAvailableException.java       | 34 ++++++++
 .../cassandra/index/SecondaryIndexManager.java  | 64 +++++++++++---
 .../index/internal/CassandraIndex.java          |  6 +-
 .../cassandra/net/MessageDeliveryTask.java      |  7 +-
 .../validation/entities/SecondaryIndexTest.java | 88 ++++++++++++++++----
 .../index/internal/CustomCassandraIndex.java    |  4 +-
 8 files changed, 178 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b430eee/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 5e6c03c,63305d6..252972c
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,16 -1,5 +1,17 @@@
 -2.2.4
 +3.0.1
 + * Fix SELECT statement with IN restrictions on partition key,
 +   ORDER BY and LIMIT (CASSANDRA-10729)
 + * Improve stress performance over 1k threads (CASSANDRA-7217)
 + * Wait for migration responses to complete before bootstrapping (CASSANDRA-10731)
 + * Unable to create a function with argument of type Inet (CASSANDRA-10741)
 + * Fix backward incompatibiliy in CqlInputFormat (CASSANDRA-10717)
 + * Correctly preserve deletion info on updated rows when notifying indexers
 +   of single-row deletions (CASSANDRA-10694)
 + * Notify indexers of partition delete during cleanup (CASSANDRA-10685)
 + * Keep the file open in trySkipCache (CASSANDRA-10669)
 + * Updated trigger example (CASSANDRA-10257)
 +Merged from 2.2:
+  * Reject index queries while the index is building (CASSANDRA-8505)
   * CQL.textile syntax incorrectly includes optional keyspace for aggregate SFUNC and FINALFUNC (CASSANDRA-10747)
   * Fix JSON update with prepared statements (CASSANDRA-10631)
   * Don't do anticompaction after subrange repair (CASSANDRA-10422)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b430eee/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ReadCommand.java
index 301cb86,cd86336..5ab1ee5
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@@ -17,92 -17,39 +17,93 @@@
   */
  package org.apache.cassandra.db;
  
 -import java.io.DataInput;
  import java.io.IOException;
  import java.nio.ByteBuffer;
 +import java.util.*;
  
 -import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.db.filter.IDiskAtomFilter;
 -import org.apache.cassandra.db.filter.NamesQueryFilter;
 -import org.apache.cassandra.db.filter.SliceQueryFilter;
 +import com.google.common.collect.Lists;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.*;
 +import org.apache.cassandra.cql3.Operator;
 +import org.apache.cassandra.db.filter.*;
 +import org.apache.cassandra.db.partitions.*;
 +import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.db.transform.Transformation;
 +import org.apache.cassandra.dht.AbstractBounds;
 +import org.apache.cassandra.index.Index;
++import org.apache.cassandra.index.IndexNotAvailableException;
  import org.apache.cassandra.io.IVersionedSerializer;
 +import org.apache.cassandra.io.util.DataInputPlus;
  import org.apache.cassandra.io.util.DataOutputPlus;
 +import org.apache.cassandra.metrics.TableMetrics;
  import org.apache.cassandra.net.MessageOut;
  import org.apache.cassandra.net.MessagingService;
 -import org.apache.cassandra.service.IReadCommand;
 -import org.apache.cassandra.service.RowDataResolver;
 -import org.apache.cassandra.service.pager.Pageable;
 +import org.apache.cassandra.schema.IndexMetadata;
 +import org.apache.cassandra.schema.UnknownIndexException;
 +import org.apache.cassandra.service.ClientWarn;
 +import org.apache.cassandra.tracing.Tracing;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.Pair;
  
 -public abstract class ReadCommand implements IReadCommand, Pageable
 +/**
 + * General interface for storage-engine read commands (common to both range and
 + * single partition commands).
 + * <p>
 + * This contains all the informations needed to do a local read.
 + */
 +public abstract class ReadCommand implements ReadQuery
  {
 -    public enum Type
 +    protected static final Logger logger = LoggerFactory.getLogger(ReadCommand.class);
 +
 +    public static final IVersionedSerializer<ReadCommand> serializer = new Serializer();
 +    // For RANGE_SLICE verb: will either dispatch on 'serializer' for 3.0 or 'legacyRangeSliceCommandSerializer' for earlier version.
 +    // Can be removed (and replaced by 'serializer') once we drop pre-3.0 backward compatibility.
 +    public static final IVersionedSerializer<ReadCommand> rangeSliceSerializer = new RangeSliceSerializer();
 +
 +    public static final IVersionedSerializer<ReadCommand> legacyRangeSliceCommandSerializer = new LegacyRangeSliceCommandSerializer();
 +    public static final IVersionedSerializer<ReadCommand> legacyPagedRangeCommandSerializer = new LegacyPagedRangeCommandSerializer();
 +    public static final IVersionedSerializer<ReadCommand> legacyReadCommandSerializer = new LegacyReadCommandSerializer();
 +
 +    private final Kind kind;
 +    private final CFMetaData metadata;
 +    private final int nowInSec;
 +
 +    private final ColumnFilter columnFilter;
 +    private final RowFilter rowFilter;
 +    private final DataLimits limits;
 +
 +    // SecondaryIndexManager will attempt to provide the most selective of any available indexes
 +    // during execution. Here we also store an the results of that lookup to repeating it over
 +    // the lifetime of the command.
 +    protected Optional<IndexMetadata> index = Optional.empty();
 +
 +    // Flag to indicate whether the index manager has been queried to select an index for this
 +    // command. This is necessary as the result of that lookup may be null, in which case we
 +    // still don't want to repeat it.
 +    private boolean indexManagerQueried = false;
 +
 +    private boolean isDigestQuery;
 +    // if a digest query, the version for which the digest is expected. Ignored if not a digest.
 +    private int digestVersion;
 +    private final boolean isForThrift;
 +
 +    protected static abstract class SelectionDeserializer
      {
 -        GET_BY_NAMES((byte)1),
 -        GET_SLICES((byte)2);
 +        public abstract ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index) throws IOException;
 +    }
  
 -        public final byte serializedValue;
 +    protected enum Kind
 +    {
 +        SINGLE_PARTITION (SinglePartitionReadCommand.selectionDeserializer),
 +        PARTITION_RANGE  (PartitionRangeReadCommand.selectionDeserializer);
  
 -        private Type(byte b)
 -        {
 -            this.serializedValue = b;
 -        }
 +        private final SelectionDeserializer selectionDeserializer;
  
 -        public static Type fromSerializedValue(byte b)
 +        Kind(SelectionDeserializer selectionDeserializer)
          {
 -            return b == 1 ? GET_BY_NAMES : GET_SLICES;
 +            this.selectionDeserializer = selectionDeserializer;
          }
      }
  
@@@ -231,707 -95,55 +232,713 @@@
          return this;
      }
  
 -    public String getColumnFamilyName()
 +    /**
 +     * Sets the digest version, for when digest for that command is requested.
 +     * <p>
 +     * Note that we allow setting this independently of setting the command as a digest query as
 +     * this allows us to use the command as a carrier of the digest version even if we only call
 +     * setIsDigestQuery on some copy of it.
 +     *
 +     * @param digestVersion the version for the digest is this command is used for digest query..
 +     * @return this read command.
 +     */
 +    public ReadCommand setDigestVersion(int digestVersion)
      {
 -        return cfName;
 +        this.digestVersion = digestVersion;
 +        return this;
      }
  
 +    /**
 +     * Whether this query is for thrift or not.
 +     *
 +     * @return whether this query is for thrift.
 +     */
 +    public boolean isForThrift()
 +    {
 +        return isForThrift;
 +    }
 +
 +    /**
 +     * The clustering index filter this command to use for the provided key.
 +     * <p>
 +     * Note that that method should only be called on a key actually queried by this command
 +     * and in practice, this will almost always return the same filter, but for the sake of
 +     * paging, the filter on the first key of a range command might be slightly different.
 +     *
 +     * @param key a partition key queried by this command.
 +     *
 +     * @return the {@code ClusteringIndexFilter} to use for the partition of key {@code key}.
 +     */
 +    public abstract ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key);
 +
 +    /**
 +     * Returns a copy of this command.
 +     *
 +     * @return a copy of this command.
 +     */
      public abstract ReadCommand copy();
  
 -    public abstract Row getRow(Keyspace keyspace);
 +    protected abstract UnfilteredPartitionIterator queryStorage(ColumnFamilyStore cfs, ReadOrderGroup orderGroup);
 +
 +    protected abstract int oldestUnrepairedTombstone();
 +
 +    public ReadResponse createResponse(UnfilteredPartitionIterator iterator, ColumnFilter selection)
 +    {
 +        return isDigestQuery()
 +             ? ReadResponse.createDigestResponse(iterator, digestVersion)
 +             : ReadResponse.createDataResponse(iterator, selection);
 +    }
 +
 +    public long indexSerializedSize(int version)
 +    {
 +        if (index.isPresent())
 +            return IndexMetadata.serializer.serializedSize(index.get(), version);
 +        else
 +            return 0;
 +    }
 +
 +    public Index getIndex(ColumnFamilyStore cfs)
 +    {
 +        // if we've already consulted the index manager, and it returned a valid index
 +        // the result should be cached here.
 +        if(index.isPresent())
 +            return cfs.indexManager.getIndex(index.get());
 +
 +        // if no cached index is present, but we've already consulted the index manager
 +        // then no registered index is suitable for this command, so just return null.
 +        if (indexManagerQueried)
 +            return null;
 +
 +        // do the lookup, set the flag to indicate so and cache the result if not null
 +        Index selected = cfs.indexManager.getBestIndexFor(this);
 +        indexManagerQueried = true;
  
 -    public abstract IDiskAtomFilter filter();
 +        if (selected == null)
 +            return null;
  
 -    public String getKeyspace()
 +        index = Optional.of(selected.getIndexMetadata());
 +        return selected;
 +    }
 +
 +    /**
 +     * Executes this command on the local host.
 +     *
 +     * @param orderGroup the operation group spanning this command
 +     *
 +     * @return an iterator over the result of executing this command locally.
 +     */
 +    @SuppressWarnings("resource") // The result iterator is closed upon exceptions (we know it's fine to potentially not close the intermediary
 +                                  // iterators created inside the try as long as we do close the original resultIterator), or by closing the result.
 +    public UnfilteredPartitionIterator executeLocally(ReadOrderGroup orderGroup)
      {
 -        return ksName;
 +        long startTimeNanos = System.nanoTime();
 +
 +        ColumnFamilyStore cfs = Keyspace.openAndGetStore(metadata());
 +        Index index = getIndex(cfs);
-         Index.Searcher searcher = index == null ? null : index.searcherFor(this);
 +
++        Index.Searcher searcher = null;
 +        if (index != null)
++        {
++            if (!cfs.indexManager.isIndexQueryable(index))
++                throw new IndexNotAvailableException(index);
++
++            searcher = index.searcherFor(this);
 +            Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.ksName, cfs.metadata.cfName, index.getIndexMetadata().name);
++        }
 +
 +        UnfilteredPartitionIterator resultIterator = searcher == null
 +                                         ? queryStorage(cfs, orderGroup)
 +                                         : searcher.search(orderGroup);
 +
 +        try
 +        {
 +            resultIterator = withMetricsRecording(withoutPurgeableTombstones(resultIterator, cfs), cfs.metric, startTimeNanos);
 +
 +            // If we've used a 2ndary index, we know the result already satisfy the primary expression used, so
 +            // no point in checking it again.
 +            RowFilter updatedFilter = searcher == null
 +                                    ? rowFilter()
 +                                    : index.getPostIndexQueryFilter(rowFilter());
 +
 +            // TODO: We'll currently do filtering by the rowFilter here because it's convenient. However,
 +            // we'll probably want to optimize by pushing it down the layer (like for dropped columns) as it
 +            // would be more efficient (the sooner we discard stuff we know we don't care, the less useless
 +            // processing we do on it).
 +            return limits().filter(updatedFilter.filter(resultIterator, nowInSec()), nowInSec());
 +        }
 +        catch (RuntimeException | Error e)
 +        {
 +            resultIterator.close();
 +            throw e;
 +        }
      }
  
 -    // maybeGenerateRetryCommand is used to generate a retry for short reads
 -    public ReadCommand maybeGenerateRetryCommand(RowDataResolver resolver, Row row)
 +    protected abstract void recordLatency(TableMetrics metric, long latencyNanos);
 +
 +    public PartitionIterator executeInternal(ReadOrderGroup orderGroup)
      {
 -        return null;
 +        return UnfilteredPartitionIterators.filter(executeLocally(orderGroup), nowInSec());
      }
  
 -    // maybeTrim removes columns from a response that is too long
 -    public Row maybeTrim(Row row)
 +    public ReadOrderGroup startOrderGroup()
      {
 -        return row;
 +        return ReadOrderGroup.forCommand(this);
      }
  
 -    public long getTimeout()
 +    /**
 +     * Wraps the provided iterator so that metrics on what is scanned by the command are recorded.
 +     * This also log warning/trow TombstoneOverwhelmingException if appropriate.
 +     */
 +    private UnfilteredPartitionIterator withMetricsRecording(UnfilteredPartitionIterator iter, final TableMetrics metric, final long startTimeNanos)
      {
 -        return DatabaseDescriptor.getReadRpcTimeout();
 +        class MetricRecording extends Transformation<UnfilteredRowIterator>
 +        {
 +            private final int failureThreshold = DatabaseDescriptor.getTombstoneFailureThreshold();
 +            private final int warningThreshold = DatabaseDescriptor.getTombstoneWarnThreshold();
 +
 +            private final boolean respectTombstoneThresholds = !Schema.isSystemKeyspace(ReadCommand.this.metadata().ksName);
 +
 +            private int liveRows = 0;
 +            private int tombstones = 0;
 +
 +            private DecoratedKey currentKey;
 +
 +            @Override
 +            public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator iter)
 +            {
 +                currentKey = iter.partitionKey();
 +                return Transformation.apply(iter, this);
 +            }
 +
 +            @Override
 +            public Row applyToStatic(Row row)
 +            {
 +                return applyToRow(row);
 +            }
 +
 +            @Override
 +            public Row applyToRow(Row row)
 +            {
 +                if (row.hasLiveData(ReadCommand.this.nowInSec()))
 +                    ++liveRows;
 +
 +                for (Cell cell : row.cells())
 +                {
 +                    if (!cell.isLive(ReadCommand.this.nowInSec()))
 +                        countTombstone(row.clustering());
 +                }
 +                return row;
 +            }
 +
 +            @Override
 +            public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
 +            {
 +                countTombstone(marker.clustering());
 +                return marker;
 +            }
 +
 +            private void countTombstone(ClusteringPrefix clustering)
 +            {
 +                ++tombstones;
 +                if (tombstones > failureThreshold && respectTombstoneThresholds)
 +                {
 +                    String query = ReadCommand.this.toCQLString();
 +                    Tracing.trace("Scanned over {} tombstones for query {}; query aborted (see tombstone_failure_threshold)", failureThreshold, query);
 +                    throw new TombstoneOverwhelmingException(tombstones, query, ReadCommand.this.metadata(), currentKey, clustering);
 +                }
 +            }
 +
 +            @Override
 +            public void onClose()
 +            {
 +                recordLatency(metric, System.nanoTime() - startTimeNanos);
 +
 +                metric.tombstoneScannedHistogram.update(tombstones);
 +                metric.liveScannedHistogram.update(liveRows);
 +
 +                boolean warnTombstones = tombstones > warningThreshold && respectTombstoneThresholds;
 +                if (warnTombstones)
 +                {
 +                    String msg = String.format("Read %d live rows and %d tombstone cells for query %1.512s (see tombstone_warn_threshold)", liveRows, tombstones, ReadCommand.this.toCQLString());
 +                    ClientWarn.warn(msg);
 +                    logger.warn(msg);
 +                }
 +
 +                Tracing.trace("Read {} live and {} tombstone cells{}", liveRows, tombstones, (warnTombstones ? " (see tombstone_warn_threshold)" : ""));
 +            }
 +        };
 +
 +        return Transformation.apply(iter, new MetricRecording());
      }
 -}
  
 -class ReadCommandSerializer implements IVersionedSerializer<ReadCommand>
 -{
 -    public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
 +    /**
 +     * Creates a message for this command.
 +     */
 +    public abstract MessageOut<ReadCommand> createMessage(int version);
 +
 +    protected abstract void appendCQLWhereClause(StringBuilder sb);
 +
 +    // Skip purgeable tombstones. We do this because it's safe to do (post-merge of the memtable and sstable at least), it
 +    // can save us some bandwith, and avoid making us throw a TombstoneOverwhelmingException for purgeable tombstones (which
 +    // are to some extend an artefact of compaction lagging behind and hence counting them is somewhat unintuitive).
 +    protected UnfilteredPartitionIterator withoutPurgeableTombstones(UnfilteredPartitionIterator iterator, ColumnFamilyStore cfs)
 +    {
 +        final boolean isForThrift = iterator.isForThrift();
 +        class WithoutPurgeableTombstones extends PurgeFunction
 +        {
 +            public WithoutPurgeableTombstones()
 +            {
 +                super(isForThrift, cfs.gcBefore(nowInSec()), oldestUnrepairedTombstone(), cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones());
 +            }
 +
 +            protected long getMaxPurgeableTimestamp()
 +            {
 +                return Long.MAX_VALUE;
 +            }
 +        }
 +        return Transformation.apply(iterator, new WithoutPurgeableTombstones());
 +    }
 +
 +    /**
 +     * Recreate the CQL string corresponding to this query.
 +     * <p>
 +     * Note that in general the returned string will not be exactly the original user string, first
 +     * because there isn't always a single syntax for a given query,  but also because we don't have
 +     * all the information needed (we know the non-PK columns queried but not the PK ones as internally
 +     * we query them all). So this shouldn't be relied too strongly, but this should be good enough for
 +     * debugging purpose which is what this is for.
 +     */
 +    public String toCQLString()
 +    {
 +        StringBuilder sb = new StringBuilder();
 +        sb.append("SELECT ").append(columnFilter());
 +        sb.append(" FROM ").append(metadata().ksName).append('.').append(metadata.cfName);
 +        appendCQLWhereClause(sb);
 +
 +        if (limits() != DataLimits.NONE)
 +            sb.append(' ').append(limits());
 +        return sb.toString();
 +    }
 +
 +    private static class Serializer implements IVersionedSerializer<ReadCommand>
 +    {
 +        private static int digestFlag(boolean isDigest)
 +        {
 +            return isDigest ? 0x01 : 0;
 +        }
 +
 +        private static boolean isDigest(int flags)
 +        {
 +            return (flags & 0x01) != 0;
 +        }
 +
 +        private static int thriftFlag(boolean isForThrift)
 +        {
 +            return isForThrift ? 0x02 : 0;
 +        }
 +
 +        private static boolean isForThrift(int flags)
 +        {
 +            return (flags & 0x02) != 0;
 +        }
 +
 +        private static int indexFlag(boolean hasIndex)
 +        {
 +            return hasIndex ? 0x04 : 0;
 +        }
 +
 +        private static boolean hasIndex(int flags)
 +        {
 +            return (flags & 0x04) != 0;
 +        }
 +
 +        public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
 +        {
 +            // for serialization, createLegacyMessage() should cause legacyReadCommandSerializer to be used directly
 +            assert version >= MessagingService.VERSION_30;
 +
 +            out.writeByte(command.kind.ordinal());
 +            out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift()) | indexFlag(command.index.isPresent()));
 +            if (command.isDigestQuery())
 +                out.writeUnsignedVInt(command.digestVersion());
 +            CFMetaData.serializer.serialize(command.metadata(), out, version);
 +            out.writeInt(command.nowInSec());
 +            ColumnFilter.serializer.serialize(command.columnFilter(), out, version);
 +            RowFilter.serializer.serialize(command.rowFilter(), out, version);
 +            DataLimits.serializer.serialize(command.limits(), out, version);
 +            if (command.index.isPresent())
 +                IndexMetadata.serializer.serialize(command.index.get(), out, version);
 +
 +            command.serializeSelection(out, version);
 +        }
 +
 +        public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
 +        {
 +            if (version < MessagingService.VERSION_30)
 +                return legacyReadCommandSerializer.deserialize(in, version);
 +
 +            Kind kind = Kind.values()[in.readByte()];
 +            int flags = in.readByte();
 +            boolean isDigest = isDigest(flags);
 +            boolean isForThrift = isForThrift(flags);
 +            boolean hasIndex = hasIndex(flags);
 +            int digestVersion = isDigest ? (int)in.readUnsignedVInt() : 0;
 +            CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
 +            int nowInSec = in.readInt();
 +            ColumnFilter columnFilter = ColumnFilter.serializer.deserialize(in, version, metadata);
 +            RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, metadata);
 +            DataLimits limits = DataLimits.serializer.deserialize(in, version);
 +            Optional<IndexMetadata> index = hasIndex
 +                                            ? deserializeIndexMetadata(in, version, metadata)
 +                                            : Optional.empty();
 +
 +            return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, index);
 +        }
 +
 +        private Optional<IndexMetadata> deserializeIndexMetadata(DataInputPlus in, int version, CFMetaData cfm) throws IOException
 +        {
 +            try
 +            {
 +                return Optional.of(IndexMetadata.serializer.deserialize(in, version, cfm));
 +            }
 +            catch (UnknownIndexException e)
 +            {
 +                String message = String.format("Couldn't find a defined index on %s.%s with the id %s. " +
 +                                               "If an index was just created, this is likely due to the schema not " +
 +                                               "being fully propagated. Local read will proceed without using the " +
 +                                               "index. Please wait for schema agreement after index creation.",
 +                                               cfm.ksName, cfm.cfName, e.indexId.toString());
 +                logger.info(message);
 +                return Optional.empty();
 +            }
 +        }
 +
 +        public long serializedSize(ReadCommand command, int version)
 +        {
 +            // for serialization, createLegacyMessage() should cause legacyReadCommandSerializer to be used directly
 +            assert version >= MessagingService.VERSION_30;
 +
 +            return 2 // kind + flags
 +                 + (command.isDigestQuery() ? TypeSizes.sizeofUnsignedVInt(command.digestVersion()) : 0)
 +                 + CFMetaData.serializer.serializedSize(command.metadata(), version)
 +                 + TypeSizes.sizeof(command.nowInSec())
 +                 + ColumnFilter.serializer.serializedSize(command.columnFilter(), version)
 +                 + RowFilter.serializer.serializedSize(command.rowFilter(), version)
 +                 + DataLimits.serializer.serializedSize(command.limits(), version)
 +                 + command.selectionSerializedSize(version)
 +                 + command.indexSerializedSize(version);
 +        }
 +    }
 +
 +    // Dispatch to either Serializer or LegacyRangeSliceCommandSerializer. Only useful as long as we maintain pre-3.0
 +    // compatibility
 +    private static class RangeSliceSerializer implements IVersionedSerializer<ReadCommand>
 +    {
 +        public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
 +        {
 +            if (version < MessagingService.VERSION_30)
 +                legacyRangeSliceCommandSerializer.serialize(command, out, version);
 +            else
 +                serializer.serialize(command, out, version);
 +        }
 +
 +        public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
 +        {
 +            return version < MessagingService.VERSION_30
 +                 ? legacyRangeSliceCommandSerializer.deserialize(in, version)
 +                 : serializer.deserialize(in, version);
 +        }
 +
 +        public long serializedSize(ReadCommand command, int version)
 +        {
 +            return version < MessagingService.VERSION_30
 +                 ? legacyRangeSliceCommandSerializer.serializedSize(command, version)
 +                 : serializer.serializedSize(command, version);
 +        }
 +    }
 +
 +    private enum LegacyType
 +    {
 +        GET_BY_NAMES((byte)1),
 +        GET_SLICES((byte)2);
 +
 +        public final byte serializedValue;
 +
 +        LegacyType(byte b)
 +        {
 +            this.serializedValue = b;
 +        }
 +
 +        public static LegacyType fromPartitionFilterKind(ClusteringIndexFilter.Kind kind)
 +        {
 +            return kind == ClusteringIndexFilter.Kind.SLICE
 +                   ? GET_SLICES
 +                   : GET_BY_NAMES;
 +        }
 +
 +        public static LegacyType fromSerializedValue(byte b)
 +        {
 +            return b == 1 ? GET_BY_NAMES : GET_SLICES;
 +        }
 +    }
 +
 +    /**
 +     * Serializer for pre-3.0 RangeSliceCommands.
 +     */
 +    private static class LegacyRangeSliceCommandSerializer implements IVersionedSerializer<ReadCommand>
      {
 -        out.writeByte(command.commandType.serializedValue);
 -        switch (command.commandType)
 +        public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
 +        {
 +            assert version < MessagingService.VERSION_30;
 +
 +            PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command;
 +            assert !rangeCommand.dataRange().isPaging();
 +
 +            // convert pre-3.0 incompatible names filters to slice filters
 +            rangeCommand = maybeConvertNamesToSlice(rangeCommand);
 +
 +            CFMetaData metadata = rangeCommand.metadata();
 +
 +            out.writeUTF(metadata.ksName);
 +            out.writeUTF(metadata.cfName);
 +            out.writeLong(rangeCommand.nowInSec() * 1000L);  // convert from seconds to millis
 +
 +            // begin DiskAtomFilterSerializer.serialize()
 +            if (rangeCommand.isNamesQuery())
 +            {
 +                out.writeByte(1);  // 0 for slices, 1 for names
 +                ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter;
 +                LegacyReadCommandSerializer.serializeNamesFilter(rangeCommand, filter, out);
 +            }
 +            else
 +            {
 +                out.writeByte(0);  // 0 for slices, 1 for names
 +
 +                // slice filter serialization
 +                ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter;
 +
 +                boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
 +                LegacyReadCommandSerializer.serializeSlices(out, filter.requestedSlices(), filter.isReversed(), makeStaticSlice, metadata);
 +
 +                out.writeBoolean(filter.isReversed());
 +
 +                // limit
 +                DataLimits.Kind kind = rangeCommand.limits().kind();
 +                boolean isDistinct = (kind == DataLimits.Kind.CQL_LIMIT || kind == DataLimits.Kind.CQL_PAGING_LIMIT) && rangeCommand.limits().perPartitionCount() == 1;
 +                if (isDistinct)
 +                    out.writeInt(1);
 +                else
 +                    out.writeInt(LegacyReadCommandSerializer.updateLimitForQuery(rangeCommand.limits().count(), filter.requestedSlices()));
 +
 +                int compositesToGroup;
 +                boolean selectsStatics = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() || filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
 +                if (kind == DataLimits.Kind.THRIFT_LIMIT)
 +                    compositesToGroup = -1;
 +                else if (isDistinct && !selectsStatics)
 +                    compositesToGroup = -2;  // for DISTINCT queries (CASSANDRA-8490)
 +                else
 +                    compositesToGroup = metadata.isDense() ? -1 : metadata.clusteringColumns().size();
 +
 +                out.writeInt(compositesToGroup);
 +            }
 +
 +            serializeRowFilter(out, rangeCommand.rowFilter());
 +            AbstractBounds.rowPositionSerializer.serialize(rangeCommand.dataRange().keyRange(), out, version);
 +
 +            // maxResults
 +            out.writeInt(rangeCommand.limits().count());
 +
 +            // countCQL3Rows
 +            if (rangeCommand.isForThrift() || rangeCommand.limits().perPartitionCount() == 1)  // if for Thrift or DISTINCT
 +                out.writeBoolean(false);
 +            else
 +                out.writeBoolean(true);
 +
 +            // isPaging
 +            out.writeBoolean(false);
 +        }
 +
 +        public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
 +        {
 +            assert version < MessagingService.VERSION_30;
 +
 +            String keyspace = in.readUTF();
 +            String columnFamily = in.readUTF();
 +
 +            CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily);
 +            if (metadata == null)
 +            {
 +                String message = String.format("Got legacy range command for nonexistent table %s.%s.", keyspace, columnFamily);
 +                throw new UnknownColumnFamilyException(message, null);
 +            }
 +
 +            int nowInSec = (int) (in.readLong() / 1000);  // convert from millis to seconds
 +
 +            ClusteringIndexFilter filter;
 +            ColumnFilter selection;
 +            int compositesToGroup = 0;
 +            int perPartitionLimit = -1;
 +            byte readType = in.readByte();  // 0 for slices, 1 for names
 +            if (readType == 1)
 +            {
 +                Pair<ColumnFilter, ClusteringIndexNamesFilter> selectionAndFilter = LegacyReadCommandSerializer.deserializeNamesSelectionAndFilter(in, metadata);
 +                selection = selectionAndFilter.left;
 +                filter = selectionAndFilter.right;
 +            }
 +            else
 +            {
 +                Pair<ClusteringIndexSliceFilter, Boolean> p = LegacyReadCommandSerializer.deserializeSlicePartitionFilter(in, metadata);
 +                filter = p.left;
 +                perPartitionLimit = in.readInt();
 +                compositesToGroup = in.readInt();
 +                selection = getColumnSelectionForSlice(p.right, compositesToGroup, metadata);
 +            }
 +
 +            RowFilter rowFilter = deserializeRowFilter(in, metadata);
 +
 +            AbstractBounds<PartitionPosition> keyRange = AbstractBounds.rowPositionSerializer.deserialize(in, metadata.partitioner, version);
 +            int maxResults = in.readInt();
 +
 +            in.readBoolean();  // countCQL3Rows (not needed)
 +            in.readBoolean();  // isPaging (not needed)
 +
 +            boolean selectsStatics = (!selection.fetchedColumns().statics.isEmpty() || filter.selects(Clustering.STATIC_CLUSTERING));
 +            boolean isDistinct = compositesToGroup == -2 || (perPartitionLimit == 1 && selectsStatics);
 +            DataLimits limits;
 +            if (isDistinct)
 +                limits = DataLimits.distinctLimits(maxResults);
 +            else if (compositesToGroup == -1)
 +                limits = DataLimits.thriftLimits(maxResults, perPartitionLimit);
 +            else
 +                limits = DataLimits.cqlLimits(maxResults);
 +
 +            return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter), Optional.empty());
 +        }
 +
 +        static void serializeRowFilter(DataOutputPlus out, RowFilter rowFilter) throws IOException
 +        {
 +            ArrayList<RowFilter.Expression> indexExpressions = Lists.newArrayList(rowFilter.iterator());
 +            out.writeInt(indexExpressions.size());
 +            for (RowFilter.Expression expression : indexExpressions)
 +            {
 +                ByteBufferUtil.writeWithShortLength(expression.column().name.bytes, out);
 +                expression.operator().writeTo(out);
 +                ByteBufferUtil.writeWithShortLength(expression.getIndexValue(), out);
 +            }
 +        }
 +
 +        static RowFilter deserializeRowFilter(DataInputPlus in, CFMetaData metadata) throws IOException
 +        {
 +            int numRowFilters = in.readInt();
 +            if (numRowFilters == 0)
 +                return RowFilter.NONE;
 +
 +            RowFilter rowFilter = RowFilter.create(numRowFilters);
 +            for (int i = 0; i < numRowFilters; i++)
 +            {
 +                ByteBuffer columnName = ByteBufferUtil.readWithShortLength(in);
 +                ColumnDefinition column = metadata.getColumnDefinition(columnName);
 +                Operator op = Operator.readFrom(in);
 +                ByteBuffer indexValue = ByteBufferUtil.readWithShortLength(in);
 +                rowFilter.add(column, op, indexValue);
 +            }
 +            return rowFilter;
 +        }
 +
 +        static long serializedRowFilterSize(RowFilter rowFilter)
 +        {
 +            long size = TypeSizes.sizeof(0);  // rowFilterCount
 +            for (RowFilter.Expression expression : rowFilter)
 +            {
 +                size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes);
 +                size += TypeSizes.sizeof(0);  // operator int value
 +                size += ByteBufferUtil.serializedSizeWithShortLength(expression.getIndexValue());
 +            }
 +            return size;
 +        }
 +
 +        public long serializedSize(ReadCommand command, int version)
 +        {
 +            assert version < MessagingService.VERSION_30;
 +            assert command.kind == Kind.PARTITION_RANGE;
 +
 +            PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command;
 +            rangeCommand = maybeConvertNamesToSlice(rangeCommand);
 +            CFMetaData metadata = rangeCommand.metadata();
 +
 +            long size = TypeSizes.sizeof(metadata.ksName);
 +            size += TypeSizes.sizeof(metadata.cfName);
 +            size += TypeSizes.sizeof((long) rangeCommand.nowInSec());
 +
 +            size += 1;  // single byte flag: 0 for slices, 1 for names
 +            if (rangeCommand.isNamesQuery())
 +            {
 +                PartitionColumns columns = rangeCommand.columnFilter().fetchedColumns();
 +                ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter;
 +                size += LegacyReadCommandSerializer.serializedNamesFilterSize(filter, metadata, columns);
 +            }
 +            else
 +            {
 +                ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter;
 +                boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
 +                size += LegacyReadCommandSerializer.serializedSlicesSize(filter.requestedSlices(), makeStaticSlice, metadata);
 +                size += TypeSizes.sizeof(filter.isReversed());
 +                size += TypeSizes.sizeof(rangeCommand.limits().perPartitionCount());
 +                size += TypeSizes.sizeof(0); // compositesToGroup
 +            }
 +
 +            if (rangeCommand.rowFilter().equals(RowFilter.NONE))
 +            {
 +                size += TypeSizes.sizeof(0);
 +            }
 +            else
 +            {
 +                ArrayList<RowFilter.Expression> indexExpressions = Lists.newArrayList(rangeCommand.rowFilter().iterator());
 +                size += TypeSizes.sizeof(indexExpressions.size());
 +                for (RowFilter.Expression expression : indexExpressions)
 +                {
 +                    size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes);
 +                    size += TypeSizes.sizeof(expression.operator().ordinal());
 +                    size += ByteBufferUtil.serializedSizeWithShortLength(expression.getIndexValue());
 +                }
 +            }
 +
 +            size += AbstractBounds.rowPositionSerializer.serializedSize(rangeCommand.dataRange().keyRange(), version);
 +            size += TypeSizes.sizeof(rangeCommand.limits().count());
 +            size += TypeSizes.sizeof(!rangeCommand.isForThrift());
 +            return size + TypeSizes.sizeof(rangeCommand.dataRange().isPaging());
 +        }
 +
 +        static PartitionRangeReadCommand maybeConvertNamesToSlice(PartitionRangeReadCommand command)
          {
 -            case GET_BY_NAMES:
 -                SliceByNamesReadCommand.serializer.serialize(command, out, version);
 -                break;
 -            case GET_SLICES:
 -                SliceFromReadCommand.serializer.serialize(command, out, version);
 -                break;
 -            default:
 -                throw new AssertionError();
 +            if (!command.dataRange().isNamesQuery())
 +                return command;
 +
 +            CFMetaData metadata = command.metadata();
 +            if (!LegacyReadCommandSerializer.shouldConvertNamesToSlice(metadata, command.columnFilter().fetchedColumns()))
 +                return command;
 +
 +            ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) command.dataRange().clusteringIndexFilter;
 +            ClusteringIndexSliceFilter sliceFilter = LegacyReadCommandSerializer.convertNamesFilterToSliceFilter(filter, metadata);
 +            DataRange newRange = new DataRange(command.dataRange().keyRange(), sliceFilter);
 +            return new PartitionRangeReadCommand(
 +                    command.isDigestQuery(), command.digestVersion(), command.isForThrift(), metadata, command.nowInSec(),
 +                    command.columnFilter(), command.rowFilter(), command.limits(), newRange, Optional.empty());
 +        }
 +
 +        static ColumnFilter getColumnSelectionForSlice(boolean selectsStatics, int compositesToGroup, CFMetaData metadata)
 +        {
 +            // A value of -2 indicates this is a DISTINCT query that doesn't select static columns, only partition keys.
 +            // In that case, we'll basically be querying the first row of the partition, but we must make sure we include
 +            // all columns so we get at least one cell if there is a live row as it would confuse pre-3.0 nodes otherwise.
 +            if (compositesToGroup == -2)
 +                return ColumnFilter.all(metadata);
 +
 +            // if a slice query from a pre-3.0 node doesn't cover statics, we shouldn't select them at all
 +            PartitionColumns columns = selectsStatics
 +                                     ? metadata.partitionColumns()
 +                                     : metadata.partitionColumns().withoutStatics();
 +            return ColumnFilter.selectionBuilder().addAll(columns).build();
          }
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b430eee/src/java/org/apache/cassandra/index/IndexNotAvailableException.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/index/IndexNotAvailableException.java
index 0000000,0000000..5440e2a
new file mode 100644
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/IndexNotAvailableException.java
@@@ -1,0 -1,0 +1,34 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.cassandra.index;
++
++/**
++ * Thrown if a secondary index is not currently available.
++ */
++public final class IndexNotAvailableException extends RuntimeException
++{
++    /**
++     * Creates a new <code>IndexNotAvailableException</code> for the specified index.
++     * @param name the index name
++     */
++    public IndexNotAvailableException(Index index)
++    {
++        super(String.format("The secondary index '%s' is not yet available", index.getIndexMetadata().name));
++    }
++}


[3/4] cassandra git commit: Merge branch cassandra-2.2 into cassandra-3.0

Posted by bl...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b430eee/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index df8e38d,0000000..ba2c680
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@@ -1,1012 -1,0 +1,1052 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.index;
 +
 +import java.lang.reflect.Constructor;
 +import java.util.*;
 +import java.util.concurrent.*;
 +import java.util.function.Function;
 +import java.util.stream.Collectors;
 +import java.util.stream.Stream;
 +
 +import com.google.common.base.Joiner;
 +import com.google.common.base.Strings;
 +import com.google.common.collect.ImmutableSet;
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.Maps;
++import com.google.common.collect.Sets;
 +import com.google.common.primitives.Longs;
 +import com.google.common.util.concurrent.Futures;
 +import com.google.common.util.concurrent.MoreExecutors;
 +import org.apache.commons.lang3.StringUtils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 +import org.apache.cassandra.concurrent.NamedThreadFactory;
 +import org.apache.cassandra.concurrent.StageManager;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.cql3.statements.IndexTarget;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.db.filter.RowFilter;
 +import org.apache.cassandra.db.lifecycle.SSTableSet;
 +import org.apache.cassandra.db.lifecycle.View;
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
 +import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.index.internal.CassandraIndex;
 +import org.apache.cassandra.index.transactions.*;
 +import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.schema.IndexMetadata;
 +import org.apache.cassandra.schema.Indexes;
 +import org.apache.cassandra.tracing.Tracing;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.concurrent.OpOrder;
 +import org.apache.cassandra.utils.concurrent.Refs;
 +
 +/**
 + * Handles the core maintenance functionality associated with indexes: adding/removing them to or from
 + * a table, (re)building during bootstrap or other streaming operations, flushing, reloading metadata
 + * and so on.
 + *
 + * The Index interface defines a number of methods which return Callable<?>. These are primarily the
 + * management tasks for an index implementation. Most of them are currently executed in a blocking
 + * fashion via submission to SIM's blockingExecutor. This provides the desired behaviour in pretty
 + * much all cases, as tasks like flushing an index needs to be executed synchronously to avoid potentially
 + * deadlocking on the FlushWriter or PostFlusher. Several of these Callable<?> returning methods on Index could
 + * then be defined with as void and called directly from SIM (rather than being run via the executor service).
 + * Separating the task defintion from execution gives us greater flexibility though, so that in future, for example,
 + * if the flush process allows it we leave open the possibility of executing more of these tasks asynchronously.
 + *
 + * The primary exception to the above is the Callable returned from Index#addIndexedColumn. This may
 + * involve a significant effort, building a new index over any existing data. We perform this task asynchronously;
 + * as it is called as part of a schema update, which we do not want to block for a long period. Building non-custom
 + * indexes is performed on the CompactionManager.
 + *
 + * This class also provides instances of processors which listen to updates to the base table and forward to
 + * registered Indexes the info required to keep those indexes up to date.
 + * There are two variants of these processors, each with a factory method provided by SIM:
 + *      IndexTransaction: deals with updates generated on the regular write path.
 + *      CleanupTransaction: used when partitions are modified during compaction or cleanup operations.
 + * Further details on their usage and lifecycles can be found in the interface definitions below.
 + *
 + * Finally, the bestIndexFor method is used at query time to identify the most selective index of those able
 + * to satisfy any search predicates defined by a ReadCommand's RowFilter. It returns a thin IndexAccessor object
 + * which enables the ReadCommand to access the appropriate functions of the Index at various stages in its lifecycle.
 + * e.g. the getEstimatedResultRows is required when StorageProxy calculates the initial concurrency factor for
 + * distributing requests to replicas, whereas a Searcher instance is needed when the ReadCommand is executed locally on
 + * a target replica.
 + */
 +public class SecondaryIndexManager implements IndexRegistry
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(SecondaryIndexManager.class);
 +
 +    private Map<String, Index> indexes = Maps.newConcurrentMap();
 +
++    /**
++     * The indexes that are ready to server requests.
++     */
++    private Set<String> builtIndexes = Sets.newConcurrentHashSet();
++
 +    // executes tasks returned by Indexer#addIndexColumn which may require index(es) to be (re)built
 +    private static final ExecutorService asyncExecutor =
 +        new JMXEnabledThreadPoolExecutor(1,
 +                                         StageManager.KEEPALIVE,
 +                                         TimeUnit.SECONDS,
 +                                         new LinkedBlockingQueue<>(),
 +                                         new NamedThreadFactory("SecondaryIndexManagement"),
 +                                         "internal");
 +
 +    // executes all blocking tasks produced by Indexers e.g. getFlushTask, getMetadataReloadTask etc
 +    private static final ExecutorService blockingExecutor = MoreExecutors.newDirectExecutorService();
 +
 +    /**
 +     * The underlying column family containing the source data for these indexes
 +     */
 +    public final ColumnFamilyStore baseCfs;
 +
 +    public SecondaryIndexManager(ColumnFamilyStore baseCfs)
 +    {
 +        this.baseCfs = baseCfs;
 +    }
 +
 +
 +    /**
 +     * Drops and adds new indexes associated with the underlying CF
 +     */
 +    public void reload()
 +    {
 +        // figure out what needs to be added and dropped.
 +        Indexes tableIndexes = baseCfs.metadata.getIndexes();
 +        indexes.keySet()
 +               .stream()
 +               .filter(indexName -> !tableIndexes.has(indexName))
 +               .forEach(this::removeIndex);
 +
 +        // we call add for every index definition in the collection as
 +        // some may not have been created here yet, only added to schema
 +        for (IndexMetadata tableIndex : tableIndexes)
 +            addIndex(tableIndex);
 +    }
 +
 +    private Future<?> reloadIndex(IndexMetadata indexDef)
 +    {
 +        Index index = indexes.get(indexDef.name);
 +        Callable<?> reloadTask = index.getMetadataReloadTask(indexDef);
 +        return reloadTask == null
 +               ? Futures.immediateFuture(null)
-                : blockingExecutor.submit(index.getMetadataReloadTask(indexDef));
++               : blockingExecutor.submit(reloadTask);
 +    }
 +
 +    private Future<?> createIndex(IndexMetadata indexDef)
 +    {
 +        Index index = createInstance(indexDef);
 +        index.register(this);
++
 +        // if the index didn't register itself, we can probably assume that no initialization needs to happen
 +        final Callable<?> initialBuildTask = indexes.containsKey(indexDef.name)
 +                                           ? index.getInitializationTask()
 +                                           : null;
-         return initialBuildTask == null
-                ? Futures.immediateFuture(null)
-                : asyncExecutor.submit(initialBuildTask);
++        if (initialBuildTask == null)
++        {
++            // We need to make sure that the index is marked as built in the case where the initialBuildTask
++            // does not need to be run (if the index didn't register itself or if the base table was empty).
++            markIndexBuilt(indexDef.name);
++            return Futures.immediateFuture(null);
++        }
++        return asyncExecutor.submit(index.getInitializationTask());
 +    }
 +
 +    /**
 +     * Adds and builds a index
 +     * @param indexDef the IndexMetadata describing the index
 +     */
 +    public synchronized Future<?> addIndex(IndexMetadata indexDef)
 +    {
 +        if (indexes.containsKey(indexDef.name))
 +            return reloadIndex(indexDef);
 +        else
 +            return createIndex(indexDef);
 +    }
 +
++    /**
++     * Checks if the specified index is queryable.
++     *
++     * @param index the index
++     * @return <code>true</code> if the specified index is queryable, <code>false</code> otherwise
++     */
++    public boolean isIndexQueryable(Index index)
++    {
++        return builtIndexes.contains(index.getIndexMetadata().name);
++    }
++
 +    public synchronized void removeIndex(String indexName)
 +    {
-         Index index = indexes.remove(indexName);
++        Index index = unregisterIndex(indexName);
 +        if (null != index)
 +        {
 +            markIndexRemoved(indexName);
 +            executeBlocking(index.getInvalidateTask());
 +        }
 +    }
 +
 +
 +    public Set<IndexMetadata> getDependentIndexes(ColumnDefinition column)
 +    {
 +        if (indexes.isEmpty())
 +            return Collections.emptySet();
 +
 +        Set<IndexMetadata> dependentIndexes = new HashSet<>();
 +        for (Index index : indexes.values())
 +            if (index.dependsOn(column))
 +                dependentIndexes.add(index.getIndexMetadata());
 +
 +        return dependentIndexes;
 +    }
 +
- 
 +    /**
 +     * Called when dropping a Table
 +     */
 +    public void markAllIndexesRemoved()
 +    {
 +       getBuiltIndexNames().forEach(this::markIndexRemoved);
 +    }
 +
 +    /**
 +    * Does a full, blocking rebuild of the indexes specified by columns from the sstables.
 +    * Caller must acquire and release references to the sstables used here.
 +    * Note also that only this method of (re)building indexes:
 +    *   a) takes a set of index *names* rather than Indexers
 +    *   b) marks exsiting indexes removed prior to rebuilding
 +    *
 +    * @param sstables the data to build from
 +    * @param indexNames the list of indexes to be rebuilt
 +    */
 +    public void rebuildIndexesBlocking(Collection<SSTableReader> sstables, Set<String> indexNames)
 +    {
 +        Set<Index> toRebuild = indexes.values().stream()
 +                                               .filter(index -> indexNames.contains(index.getIndexMetadata().name))
 +                                               .filter(Index::shouldBuildBlocking)
 +                                               .collect(Collectors.toSet());
 +        if (toRebuild.isEmpty())
 +        {
 +            logger.info("No defined indexes with the supplied names: {}", Joiner.on(',').join(indexNames));
 +            return;
 +        }
 +
 +        toRebuild.forEach(indexer -> markIndexRemoved(indexer.getIndexMetadata().name));
 +
 +        buildIndexesBlocking(sstables, toRebuild);
 +
 +        toRebuild.forEach(indexer -> markIndexBuilt(indexer.getIndexMetadata().name));
 +    }
 +
 +    public void buildAllIndexesBlocking(Collection<SSTableReader> sstables)
 +    {
 +        buildIndexesBlocking(sstables, indexes.values()
 +                                              .stream()
 +                                              .filter(Index::shouldBuildBlocking)
 +                                              .collect(Collectors.toSet()));
 +    }
 +
 +    // For convenience, may be called directly from Index impls
 +    public void buildIndexBlocking(Index index)
 +    {
 +        if (index.shouldBuildBlocking())
 +        {
 +            try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.select(SSTableSet.CANONICAL));
 +                 Refs<SSTableReader> sstables = viewFragment.refs)
 +            {
 +                buildIndexesBlocking(sstables, Collections.singleton(index));
 +                markIndexBuilt(index.getIndexMetadata().name);
 +            }
 +        }
 +    }
 +
 +    /**
 +     * Checks if the specified {@link ColumnFamilyStore} is a secondary index.
 +     *
 +     * @param cfs the <code>ColumnFamilyStore</code> to check.
 +     * @return <code>true</code> if the specified <code>ColumnFamilyStore</code> is a secondary index,
 +     * <code>false</code> otherwise.
 +     */
 +    public static boolean isIndexColumnFamilyStore(ColumnFamilyStore cfs)
 +    {
 +        return isIndexColumnFamily(cfs.name);
 +    }
 +
 +    /**
 +     * Checks if the specified {@link ColumnFamilyStore} is the one secondary index.
 +     *
 +     * @param cfName the name of the <code>ColumnFamilyStore</code> to check.
 +     * @return <code>true</code> if the specified <code>ColumnFamilyStore</code> is a secondary index,
 +     * <code>false</code> otherwise.
 +     */
 +    public static boolean isIndexColumnFamily(String cfName)
 +    {
 +        return cfName.contains(Directories.SECONDARY_INDEX_NAME_SEPARATOR);
 +    }
 +
 +    /**
 +     * Returns the parent of the specified {@link ColumnFamilyStore}.
 +     *
 +     * @param cfs the <code>ColumnFamilyStore</code>
 +     * @return the parent of the specified <code>ColumnFamilyStore</code>
 +     */
 +    public static ColumnFamilyStore getParentCfs(ColumnFamilyStore cfs)
 +    {
 +        String parentCfs = getParentCfsName(cfs.name);
 +        return cfs.keyspace.getColumnFamilyStore(parentCfs);
 +    }
 +
 +    /**
 +     * Returns the parent name of the specified {@link ColumnFamilyStore}.
 +     *
 +     * @param cfName the <code>ColumnFamilyStore</code> name
 +     * @return the parent name of the specified <code>ColumnFamilyStore</code>
 +     */
 +    public static String getParentCfsName(String cfName)
 +    {
 +        assert isIndexColumnFamily(cfName);
 +        return StringUtils.substringBefore(cfName, Directories.SECONDARY_INDEX_NAME_SEPARATOR);
 +    }
 +
 +    /**
 +     * Returns the index name
 +     *
 +     * @param cfs the <code>ColumnFamilyStore</code>
 +     * @return the index name
 +     */
 +    public static String getIndexName(ColumnFamilyStore cfs)
 +    {
 +        return getIndexName(cfs.name);
 +    }
 +
 +    /**
 +     * Returns the index name
 +     *
 +     * @param cfName the <code>ColumnFamilyStore</code> name
 +     * @return the index name
 +     */
 +    public static String getIndexName(String cfName)
 +    {
 +        assert isIndexColumnFamily(cfName);
 +        return StringUtils.substringAfter(cfName, Directories.SECONDARY_INDEX_NAME_SEPARATOR);
 +    }
 +
 +    private void buildIndexesBlocking(Collection<SSTableReader> sstables, Set<Index> indexes)
 +    {
 +        if (indexes.isEmpty())
 +            return;
 +
 +        logger.info("Submitting index build of {} for data in {}",
 +                    indexes.stream().map(i -> i.getIndexMetadata().name).collect(Collectors.joining(",")),
 +                    sstables.stream().map(SSTableReader::toString).collect(Collectors.joining(",")));
 +
 +        SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs,
 +                                                                  indexes,
 +                                                                  new ReducingKeyIterator(sstables));
 +        Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
 +        FBUtilities.waitOnFuture(future);
 +
 +        flushIndexesBlocking(indexes);
 +        logger.info("Index build of {} complete",
 +                    indexes.stream().map(i -> i.getIndexMetadata().name).collect(Collectors.joining(",")));
 +    }
 +
++    /**
++     * Marks the specified index as build.
++     * <p>This method is public as it need to be accessible from the {@link Index} implementations</p>
++     * @param indexName the index name
++     */
 +    public void markIndexBuilt(String indexName)
 +    {
++        builtIndexes.add(indexName);
 +        SystemKeyspace.setIndexBuilt(baseCfs.keyspace.getName(), indexName);
 +    }
 +
++    /**
++     * Marks the specified index as removed.
++     * <p>This method is public as it need to be accessible from the {@link Index} implementations</p>
++     * @param indexName the index name
++     */
 +    public void markIndexRemoved(String indexName)
 +    {
 +        SystemKeyspace.setIndexRemoved(baseCfs.keyspace.getName(), indexName);
 +    }
 +
- 
 +    public Index getIndexByName(String indexName)
 +    {
 +        return indexes.get(indexName);
 +    }
 +
 +    private Index createInstance(IndexMetadata indexDef)
 +    {
 +        Index newIndex;
 +        if (indexDef.isCustom())
 +        {
 +            assert indexDef.options != null;
 +            String className = indexDef.options.get(IndexTarget.CUSTOM_INDEX_OPTION_NAME);
 +            assert ! Strings.isNullOrEmpty(className);
 +            try
 +            {
 +                Class<? extends Index> indexClass = FBUtilities.classForName(className, "Index");
-                 Constructor ctor = indexClass.getConstructor(ColumnFamilyStore.class, IndexMetadata.class);
++                Constructor<? extends Index> ctor = indexClass.getConstructor(ColumnFamilyStore.class, IndexMetadata.class);
 +                newIndex = (Index)ctor.newInstance(baseCfs, indexDef);
 +            }
 +            catch (Exception e)
 +            {
 +                throw new RuntimeException(e);
 +            }
 +        }
 +        else
 +        {
 +            newIndex = CassandraIndex.newIndex(baseCfs, indexDef);
 +        }
 +        return newIndex;
 +    }
 +
 +    /**
 +     * Truncate all indexes
 +     */
 +    public void truncateAllIndexesBlocking(final long truncatedAt)
 +    {
 +        executeAllBlocking(indexes.values().stream(), (index) -> index.getTruncateTask(truncatedAt));
 +    }
 +
 +    /**
 +     * Remove all indexes
 +     */
 +    public void invalidateAllIndexesBlocking()
 +    {
 +        markAllIndexesRemoved();
 +        executeAllBlocking(indexes.values().stream(), Index::getInvalidateTask);
 +    }
 +
 +    /**
 +     * Perform a blocking flush all indexes
 +     */
 +    public void flushAllIndexesBlocking()
 +    {
 +       flushIndexesBlocking(ImmutableSet.copyOf(indexes.values()));
 +    }
 +
 +    /**
 +     * Perform a blocking flush of selected indexes
 +     */
 +    public void flushIndexesBlocking(Set<Index> indexes)
 +    {
 +        if (indexes.isEmpty())
 +            return;
 +
 +        List<Future<?>> wait = new ArrayList<>();
 +        List<Index> nonCfsIndexes = new ArrayList<>();
 +
 +        // for each CFS backed index, submit a flush task which we'll wait on for completion
 +        // for the non-CFS backed indexes, we'll flush those while we wait.
 +        synchronized (baseCfs.getTracker())
 +        {
 +            indexes.forEach(index ->
 +                index.getBackingTable()
 +                     .map(cfs -> wait.add(cfs.forceFlush()))
 +                     .orElseGet(() -> nonCfsIndexes.add(index)));
 +        }
 +
 +        executeAllBlocking(nonCfsIndexes.stream(), Index::getBlockingFlushTask);
 +        FBUtilities.waitOnFutures(wait);
 +    }
 +
 +    /**
 +     * Performs a blocking flush of all custom indexes
 +     */
 +    public void flushAllNonCFSBackedIndexesBlocking()
 +    {
 +        Set<Index> customIndexers = indexes.values().stream()
 +                                                    .filter(index -> !(index.getBackingTable().isPresent()))
 +                                                    .collect(Collectors.toSet());
 +        flushIndexesBlocking(customIndexers);
 +    }
 +
 +    /**
 +     * @return all indexes which are marked as built and ready to use
 +     */
 +    public List<String> getBuiltIndexNames()
 +    {
 +        Set<String> allIndexNames = new HashSet<>();
 +        indexes.values().stream()
 +                .map(i -> i.getIndexMetadata().name)
 +                .forEach(allIndexNames::add);
 +        return SystemKeyspace.getBuiltIndexes(baseCfs.keyspace.getName(), allIndexNames);
 +    }
 +
 +    /**
 +     * @return all backing Tables used by registered indexes
 +     */
 +    public Set<ColumnFamilyStore> getAllIndexColumnFamilyStores()
 +    {
 +        Set<ColumnFamilyStore> backingTables = new HashSet<>();
 +        indexes.values().forEach(index -> index.getBackingTable().ifPresent(backingTables::add));
 +        return backingTables;
 +    }
 +
 +    /**
 +     * @return if there are ANY indexes registered for this table
 +     */
 +    public boolean hasIndexes()
 +    {
 +        return !indexes.isEmpty();
 +    }
 +
 +    /**
 +     * When building an index against existing data in sstables, add the given partition to the index
 +     */
 +    public void indexPartition(UnfilteredRowIterator partition, OpOrder.Group opGroup, Set<Index> indexes, int nowInSec)
 +    {
 +        if (!indexes.isEmpty())
 +        {
 +            DecoratedKey key = partition.partitionKey();
 +            Set<Index.Indexer> indexers = indexes.stream()
 +                                                 .map(index -> index.indexerFor(key,
 +                                                                                nowInSec,
 +                                                                                opGroup,
 +                                                                                IndexTransaction.Type.UPDATE))
 +                                                 .collect(Collectors.toSet());
 +
 +            indexers.forEach(Index.Indexer::begin);
 +
 +            try (RowIterator filtered = UnfilteredRowIterators.filter(partition, nowInSec))
 +            {
 +                if (!filtered.staticRow().isEmpty())
 +                    indexers.forEach(indexer -> indexer.insertRow(filtered.staticRow()));
 +
 +                while (filtered.hasNext())
 +                {
 +                    Row row = filtered.next();
 +                    indexers.forEach(indexer -> indexer.insertRow(row));
 +                }
 +            }
 +
 +            indexers.forEach(Index.Indexer::finish);
 +        }
 +    }
 +
 +    /**
 +     * Delete all data from all indexes for this partition.
 +     * For when cleanup rips a partition out entirely.
 +     *
 +     * TODO : improve cleanup transaction to batch updates & perform them async
 +     */
 +    public void deletePartition(UnfilteredRowIterator partition, int nowInSec)
 +    {
 +        // we need to acquire memtable lock because secondary index deletion may
 +        // cause a race (see CASSANDRA-3712). This is done internally by the
 +        // index transaction when it commits
 +        CleanupTransaction indexTransaction = newCleanupTransaction(partition.partitionKey(),
 +                                                                    partition.columns(),
 +                                                                    nowInSec);
 +        indexTransaction.start();
 +        indexTransaction.onPartitionDeletion(new DeletionTime(FBUtilities.timestampMicros(), nowInSec));
 +        indexTransaction.commit();
 +
 +        while (partition.hasNext())
 +        {
 +            Unfiltered unfiltered = partition.next();
 +            if (unfiltered.kind() != Unfiltered.Kind.ROW)
 +                continue;
 +
 +            indexTransaction = newCleanupTransaction(partition.partitionKey(),
 +                                                     partition.columns(),
 +                                                     nowInSec);
 +            indexTransaction.start();
 +            indexTransaction.onRowDelete((Row)unfiltered);
 +            indexTransaction.commit();
 +        }
 +    }
 +
 +    /**
 +     * Called at query time to choose which (if any) of the registered index implementations to use for a given query.
 +     *
 +     * This is a two step processes, firstly compiling the set of searchable indexes then choosing the one which reduces
 +     * the search space the most.
 +     *
 +     * In the first phase, if the command's RowFilter contains any custom index expressions, the indexes that they
 +     * specify are automatically included. Following that, the registered indexes are filtered to include only those
 +     * which support the standard expressions in the RowFilter.
 +     *
 +     * The filtered set then sorted by selectivity, as reported by the Index implementations' getEstimatedResultRows
 +     * method.
 +     *
 +     * Implementation specific validation of the target expression, either custom or standard, by the selected
 +     * index should be performed in the searcherFor method to ensure that we pick the right index regardless of
 +     * the validity of the expression.
 +     *
 +     * This method is only called once during the lifecycle of a ReadCommand and the result is
 +     * cached for future use when obtaining a Searcher, getting the index's underlying CFS for
 +     * ReadOrderGroup, or an estimate of the result size from an average index query.
 +     *
 +     * @param command ReadCommand to be executed
 +     * @return an Index instance, ready to use during execution of the command, or null if none
 +     * of the registered indexes can support the command.
 +     */
 +    public Index getBestIndexFor(ReadCommand command)
 +    {
 +        if (indexes.isEmpty() || command.rowFilter().isEmpty())
 +            return null;
 +
 +        Set<Index> searchableIndexes = new HashSet<>();
 +        for (RowFilter.Expression expression : command.rowFilter())
 +        {
 +            if (expression.isCustom())
 +            {
 +                // Only a single custom expression is allowed per query and, if present,
 +                // we want to always favour the index specified in such an expression
 +                RowFilter.CustomExpression customExpression = (RowFilter.CustomExpression)expression;
 +                logger.trace("Command contains a custom index expression, using target index {}", customExpression.getTargetIndex().name);
 +                Tracing.trace("Command contains a custom index expression, using target index {}", customExpression.getTargetIndex().name);
 +                return indexes.get(customExpression.getTargetIndex().name);
 +            }
 +            else
 +            {
 +                indexes.values().stream()
 +                       .filter(index -> index.supportsExpression(expression.column(), expression.operator()))
 +                       .forEach(searchableIndexes::add);
 +            }
 +        }
 +
 +        if (searchableIndexes.isEmpty())
 +        {
 +            logger.trace("No applicable indexes found");
 +            Tracing.trace("No applicable indexes found");
 +            return null;
 +        }
 +
 +        Index selected = searchableIndexes.size() == 1
 +                         ? Iterables.getOnlyElement(searchableIndexes)
 +                         : searchableIndexes.stream()
 +                                            .min((a, b) -> Longs.compare(a.getEstimatedResultRows(),
 +                                                                         b.getEstimatedResultRows()))
 +                                            .orElseThrow(() -> new AssertionError("Could not select most selective index"));
 +
 +        // pay for an additional threadlocal get() rather than build the strings unnecessarily
 +        if (Tracing.isTracing())
 +        {
 +            Tracing.trace("Index mean cardinalities are {}. Scanning with {}.",
 +                          searchableIndexes.stream().map(i -> i.getIndexMetadata().name + ':' + i.getEstimatedResultRows())
 +                                           .collect(Collectors.joining(",")),
 +                          selected.getIndexMetadata().name);
 +        }
 +        return selected;
 +    }
 +
 +    /**
 +     * Called at write time to ensure that values present in the update
 +     * are valid according to the rules of all registered indexes which
 +     * will process it. The partition key as well as the clustering and
 +     * cell values for each row in the update may be checked by index
 +     * implementations
 +     * @param update PartitionUpdate containing the values to be validated by registered Index implementations
 +     * @throws InvalidRequestException
 +     */
 +    public void validate(PartitionUpdate update) throws InvalidRequestException
 +    {
 +        indexes.values()
 +               .stream()
 +               .filter(i -> i.indexes(update.columns()))
 +               .forEach(i -> i.validate(update));
 +    }
 +
 +    /**
 +     * IndexRegistry methods
 +     */
 +    public void registerIndex(Index index)
 +    {
-         indexes.put(index.getIndexMetadata().name, index);
-         logger.trace("Registered index {}", index.getIndexMetadata().name);
++        String name = index.getIndexMetadata().name;
++        indexes.put(name, index);
++        logger.trace("Registered index {}", name);
 +    }
 +
 +    public void unregisterIndex(Index index)
 +    {
-         Index removed = indexes.remove(index.getIndexMetadata().name);
++        unregisterIndex(index.getIndexMetadata().name);
++    }
++
++    private Index unregisterIndex(String name)
++    {
++        Index removed = indexes.remove(name);
++        builtIndexes.remove(name);
 +        logger.trace(removed == null ? "Index {} was not registered" : "Removed index {} from registry",
-                      index.getIndexMetadata().name);
++                     name);
++        return removed;
 +    }
 +
 +    public Index getIndex(IndexMetadata metadata)
 +    {
 +        return indexes.get(metadata.name);
 +    }
 +
 +    public Collection<Index> listIndexes()
 +    {
 +        return ImmutableSet.copyOf(indexes.values());
 +    }
 +
 +    /**
 +     * Handling of index updates.
 +     * Implementations of the various IndexTransaction interfaces, for keeping indexes in sync with base data
 +     * during updates, compaction and cleanup. Plus factory methods for obtaining transaction instances.
 +     */
 +
 +    /**
 +     * Transaction for updates on the write path.
 +     */
 +    public UpdateTransaction newUpdateTransaction(PartitionUpdate update, OpOrder.Group opGroup, int nowInSec)
 +    {
 +        if (!hasIndexes())
 +            return UpdateTransaction.NO_OP;
 +
 +        // todo : optimize lookup, we can probably cache quite a bit of stuff, rather than doing
 +        // a linear scan every time. Holding off that though until CASSANDRA-7771 to figure out
 +        // exactly how indexes are to be identified & associated with a given partition update
 +        Index.Indexer[] indexers = indexes.values().stream()
 +                                          .filter(i -> i.indexes(update.columns()))
 +                                          .map(i -> i.indexerFor(update.partitionKey(),
 +                                                                 nowInSec,
 +                                                                 opGroup,
 +                                                                 IndexTransaction.Type.UPDATE))
 +                                          .toArray(Index.Indexer[]::new);
 +
 +        return indexers.length == 0 ? UpdateTransaction.NO_OP : new WriteTimeTransaction(indexers);
 +    }
 +
 +    /**
 +     * Transaction for use when merging rows during compaction
 +     */
 +    public CompactionTransaction newCompactionTransaction(DecoratedKey key,
 +                                                          PartitionColumns partitionColumns,
 +                                                          int versions,
 +                                                          int nowInSec)
 +    {
 +        // the check for whether there are any registered indexes is already done in CompactionIterator
 +
 +        Index[] interestedIndexes = indexes.values().stream()
 +                                           .filter(i -> i.indexes(partitionColumns))
 +                                           .toArray(Index[]::new);
 +
 +        return interestedIndexes.length == 0
 +               ? CompactionTransaction.NO_OP
 +               : new IndexGCTransaction(key, versions, nowInSec, interestedIndexes);
 +    }
 +
 +    /**
 +     * Transaction for use when removing partitions during cleanup
 +     */
 +    public CleanupTransaction newCleanupTransaction(DecoratedKey key,
 +                                                    PartitionColumns partitionColumns,
 +                                                    int nowInSec)
 +    {
 +        //
 +        if (!hasIndexes())
 +            return CleanupTransaction.NO_OP;
 +
 +        Index[] interestedIndexes = indexes.values().stream()
 +                                           .filter(i -> i.indexes(partitionColumns))
 +                                           .toArray(Index[]::new);
 +
 +        return interestedIndexes.length == 0
 +               ? CleanupTransaction.NO_OP
 +               : new CleanupGCTransaction(key, nowInSec, interestedIndexes);
 +    }
 +
 +    /**
 +     * A single use transaction for processing a partition update on the regular write path
 +     */
 +    private static final class WriteTimeTransaction implements UpdateTransaction
 +    {
 +        private final Index.Indexer[] indexers;
 +
 +        private WriteTimeTransaction(Index.Indexer...indexers)
 +        {
 +            // don't allow null indexers, if we don't need any use a NullUpdater object
 +            for (Index.Indexer indexer : indexers) assert indexer != null;
 +            this.indexers = indexers;
 +        }
 +
 +        public void start()
 +        {
 +            for (Index.Indexer indexer : indexers)
 +                indexer.begin();
 +        }
 +
 +        public void onPartitionDeletion(DeletionTime deletionTime)
 +        {
 +            for (Index.Indexer indexer : indexers)
 +                indexer.partitionDelete(deletionTime);
 +        }
 +
 +        public void onRangeTombstone(RangeTombstone tombstone)
 +        {
 +            for (Index.Indexer indexer : indexers)
 +                indexer.rangeTombstone(tombstone);
 +        }
 +
 +        public void onInserted(Row row)
 +        {
 +            Arrays.stream(indexers).forEach(h -> h.insertRow(row));
 +        }
 +
 +        public void onUpdated(Row existing, Row updated)
 +        {
 +            final Row.Builder toRemove = BTreeRow.sortedBuilder();
 +            toRemove.newRow(existing.clustering());
 +            toRemove.addPrimaryKeyLivenessInfo(existing.primaryKeyLivenessInfo());
 +            toRemove.addRowDeletion(existing.deletion());
 +            final Row.Builder toInsert = BTreeRow.sortedBuilder();
 +            toInsert.newRow(updated.clustering());
 +            toInsert.addPrimaryKeyLivenessInfo(updated.primaryKeyLivenessInfo());
 +            toInsert.addRowDeletion(updated.deletion());
 +            // diff listener collates the columns to be added & removed from the indexes
 +            RowDiffListener diffListener = new RowDiffListener()
 +            {
 +                public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original)
 +                {
 +                }
 +
 +                public void onDeletion(int i, Clustering clustering, Row.Deletion merged, Row.Deletion original)
 +                {
 +                }
 +
 +                public void onComplexDeletion(int i, Clustering clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original)
 +                {
 +                }
 +
 +                public void onCell(int i, Clustering clustering, Cell merged, Cell original)
 +                {
 +                    if (merged != null && !merged.equals(original))
 +                        toInsert.addCell(merged);
 +
 +                    if (merged == null || (original != null && shouldCleanupOldValue(original, merged)))
 +                        toRemove.addCell(original);
 +
 +                }
 +            };
 +            Rows.diff(diffListener, updated, existing);
 +            Row oldRow = toRemove.build();
 +            Row newRow = toInsert.build();
 +            for (Index.Indexer indexer : indexers)
 +                indexer.updateRow(oldRow, newRow);
 +        }
 +
 +        public void commit()
 +        {
 +            for (Index.Indexer indexer : indexers)
 +                indexer.finish();
 +        }
 +
 +        private boolean shouldCleanupOldValue(Cell oldCell, Cell newCell)
 +        {
 +            // If either the value or timestamp is different, then we
 +            // should delete from the index. If not, then we can infer that
 +            // at least one of the cells is an ExpiringColumn and that the
 +            // difference is in the expiry time. In this case, we don't want to
 +            // delete the old value from the index as the tombstone we insert
 +            // will just hide the inserted value.
 +            // Completely identical cells (including expiring columns with
 +            // identical ttl & localExpirationTime) will not get this far due
 +            // to the oldCell.equals(newCell) in StandardUpdater.update
 +            return !oldCell.value().equals(newCell.value()) || oldCell.timestamp() != newCell.timestamp();
 +        }
 +    }
 +
 +    /**
 +     * A single-use transaction for updating indexes for a single partition during compaction where the only
 +     * operation is to merge rows
 +     * TODO : make this smarter at batching updates so we can use a single transaction to process multiple rows in
 +     * a single partition
 +     */
 +    private static final class IndexGCTransaction implements CompactionTransaction
 +    {
 +        private final DecoratedKey key;
 +        private final int versions;
 +        private final int nowInSec;
 +        private final Index[] indexes;
 +
 +        private Row[] rows;
 +
 +        private IndexGCTransaction(DecoratedKey key,
 +                                   int versions,
 +                                   int nowInSec,
 +                                   Index...indexes)
 +        {
 +            // don't allow null indexers, if we don't have any, use a noop transaction
 +            for (Index index : indexes) assert index != null;
 +
 +            this.key = key;
 +            this.versions = versions;
 +            this.indexes = indexes;
 +            this.nowInSec = nowInSec;
 +        }
 +
 +        public void start()
 +        {
 +            if (versions > 0)
 +                rows = new Row[versions];
 +        }
 +
 +        public void onRowMerge(Row merged, Row...versions)
 +        {
 +            // Diff listener constructs rows representing deltas between the merged and original versions
 +            // These delta rows are then passed to registered indexes for removal processing
 +            final Row.Builder[] builders = new Row.Builder[versions.length];
 +            RowDiffListener diffListener = new RowDiffListener()
 +            {
 +                public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original)
 +                {
 +                }
 +
 +                public void onDeletion(int i, Clustering clustering, Row.Deletion merged, Row.Deletion original)
 +                {
 +                }
 +
 +                public void onComplexDeletion(int i, Clustering clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original)
 +                {
 +                }
 +
 +                public void onCell(int i, Clustering clustering, Cell merged, Cell original)
 +                {
 +                    if (original != null && merged == null)
 +                    {
 +                        if (builders[i] == null)
 +                        {
 +                            builders[i] = BTreeRow.sortedBuilder();
 +                            builders[i].newRow(clustering);
 +                        }
 +                        builders[i].addCell(original);
 +                    }
 +                }
 +            };
 +
 +            Rows.diff(diffListener, merged, versions);
 +
 +            for(int i = 0; i < builders.length; i++)
 +                if (builders[i] != null)
 +                    rows[i] = builders[i].build();
 +        }
 +
 +        public void commit()
 +        {
 +            if (rows == null)
 +                return;
 +
 +            try (OpOrder.Group opGroup = Keyspace.writeOrder.start())
 +            {
 +                for (Index index : indexes)
 +                {
 +                    Index.Indexer indexer = index.indexerFor(key, nowInSec, opGroup, Type.COMPACTION);
 +                    indexer.begin();
 +                    for (Row row : rows)
 +                        if (row != null)
 +                            indexer.removeRow(row);
 +                    indexer.finish();
 +                }
 +            }
 +        }
 +    }
 +
 +    /**
 +     * A single-use transaction for updating indexes for a single partition during cleanup, where
 +     * partitions and rows are only removed
 +     * TODO : make this smarter at batching updates so we can use a single transaction to process multiple rows in
 +     * a single partition
 +     */
 +    private static final class CleanupGCTransaction implements CleanupTransaction
 +    {
 +        private final DecoratedKey key;
 +        private final int nowInSec;
 +        private final Index[] indexes;
 +
 +        private Row row;
 +        private DeletionTime partitionDelete;
 +
 +        private CleanupGCTransaction(DecoratedKey key,
 +                                     int nowInSec,
 +                                     Index...indexes)
 +        {
 +            // don't allow null indexers, if we don't have any, use a noop transaction
 +            for (Index index : indexes) assert index != null;
 +
 +            this.key = key;
 +            this.indexes = indexes;
 +            this.nowInSec = nowInSec;
 +        }
 +
 +        public void start()
 +        {
 +        }
 +
 +        public void onPartitionDeletion(DeletionTime deletionTime)
 +        {
 +            partitionDelete = deletionTime;
 +        }
 +
 +        public void onRowDelete(Row row)
 +        {
 +            this.row = row;
 +        }
 +
 +        public void commit()
 +        {
 +            if (row == null && partitionDelete == null)
 +                return;
 +
 +            try (OpOrder.Group opGroup = Keyspace.writeOrder.start())
 +            {
 +                for (Index index : indexes)
 +                {
 +                    Index.Indexer indexer = index.indexerFor(key, nowInSec, opGroup, Type.CLEANUP);
 +                    indexer.begin();
 +
 +                    if (partitionDelete != null)
 +                        indexer.partitionDelete(partitionDelete);
 +
 +                    if (row != null)
 +                        indexer.removeRow(row);
 +
 +                    indexer.finish();
 +                }
 +            }
 +        }
 +    }
 +
 +    private static void executeBlocking(Callable<?> task)
 +    {
 +        if (null != task)
 +            FBUtilities.waitOnFuture(blockingExecutor.submit(task));
 +    }
 +
 +    private static void executeAllBlocking(Stream<Index> indexers, Function<Index, Callable<?>> function)
 +    {
 +        List<Future<?>> waitFor = new ArrayList<>();
 +        indexers.forEach(indexer -> {
 +            Callable<?> task = function.apply(indexer);
 +            if (null != task)
 +                waitFor.add(blockingExecutor.submit(task));
 +        });
 +        FBUtilities.waitOnFutures(waitFor);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b430eee/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/index/internal/CassandraIndex.java
index 674cd20,0000000..717126b
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
@@@ -1,809 -1,0 +1,809 @@@
 +package org.apache.cassandra.index.internal;
 +
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +import java.util.concurrent.Callable;
 +import java.util.concurrent.Future;
 +import java.util.function.BiFunction;
 +import java.util.regex.Matcher;
 +import java.util.regex.Pattern;
 +import java.util.stream.Collectors;
 +import java.util.stream.StreamSupport;
 +
 +import org.apache.commons.lang3.StringUtils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.cql3.ColumnIdentifier;
 +import org.apache.cassandra.cql3.Operator;
 +import org.apache.cassandra.cql3.statements.IndexTarget;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.db.filter.RowFilter;
 +import org.apache.cassandra.db.lifecycle.SSTableSet;
 +import org.apache.cassandra.db.lifecycle.View;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.db.marshal.CollectionType;
 +import org.apache.cassandra.db.partitions.PartitionIterator;
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
 +import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.dht.LocalPartitioner;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.index.Index;
 +import org.apache.cassandra.index.IndexRegistry;
 +import org.apache.cassandra.index.SecondaryIndexBuilder;
 +import org.apache.cassandra.index.internal.composites.CompositesSearcher;
 +import org.apache.cassandra.index.internal.keys.KeysSearcher;
 +import org.apache.cassandra.index.transactions.IndexTransaction;
 +import org.apache.cassandra.index.transactions.UpdateTransaction;
 +import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.schema.IndexMetadata;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.Pair;
 +import org.apache.cassandra.utils.concurrent.OpOrder;
 +import org.apache.cassandra.utils.concurrent.Refs;
 +
 +/**
 + * Index implementation which indexes the values for a single column in the base
 + * table and which stores its index data in a local, hidden table.
 + */
 +public abstract class CassandraIndex implements Index
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(CassandraIndex.class);
 +
 +    public static final Pattern TARGET_REGEX = Pattern.compile("^(keys|entries|values|full)\\((.+)\\)$");
 +
 +    public final ColumnFamilyStore baseCfs;
 +    protected IndexMetadata metadata;
 +    protected ColumnFamilyStore indexCfs;
 +    protected ColumnDefinition indexedColumn;
 +    protected CassandraIndexFunctions functions;
 +
 +    protected CassandraIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
 +    {
 +        this.baseCfs = baseCfs;
 +        setMetadata(indexDef);
 +    }
 +
 +    /**
 +     * Returns true if an index of this type can support search predicates of the form [column] OPERATOR [value]
 +     * @param indexedColumn
 +     * @param operator
 +     * @return
 +     */
 +    protected boolean supportsOperator(ColumnDefinition indexedColumn, Operator operator)
 +    {
 +        return operator == Operator.EQ;
 +    }
 +
 +    /**
 +     * Used to construct an the clustering for an entry in the index table based on values from the base data.
 +     * The clustering columns in the index table encode the values required to retrieve the correct data from the base
 +     * table and varies depending on the kind of the indexed column. See indexCfsMetadata for more details
 +     * Used whenever a row in the index table is written or deleted.
 +     * @param partitionKey from the base data being indexed
 +     * @param prefix from the base data being indexed
 +     * @param path from the base data being indexed
 +     * @return a clustering prefix to be used to insert into the index table
 +     */
 +    protected abstract CBuilder buildIndexClusteringPrefix(ByteBuffer partitionKey,
 +                                                           ClusteringPrefix prefix,
 +                                                           CellPath path);
 +
 +    /**
 +     * Used at search time to convert a row in the index table into a simple struct containing the values required
 +     * to retrieve the corresponding row from the base table.
 +     * @param indexedValue the partition key of the indexed table (i.e. the value that was indexed)
 +     * @param indexEntry a row from the index table
 +     * @return
 +     */
 +    public abstract IndexEntry decodeEntry(DecoratedKey indexedValue,
 +                                           Row indexEntry);
 +
 +    /**
 +     * Check whether a value retrieved from an index is still valid by comparing it to current row from the base table.
 +     * Used at read time to identify out of date index entries so that they can be excluded from search results and
 +     * repaired
 +     * @param row the current row from the primary data table
 +     * @param indexValue the value we retrieved from the index
 +     * @param nowInSec
 +     * @return true if the index is out of date and the entry should be dropped
 +     */
 +    public abstract boolean isStale(Row row, ByteBuffer indexValue, int nowInSec);
 +
 +    /**
 +     * Extract the value to be inserted into the index from the components of the base data
 +     * @param partitionKey from the primary data
 +     * @param clustering from the primary data
 +     * @param path from the primary data
 +     * @param cellValue from the primary data
 +     * @return a ByteBuffer containing the value to be inserted in the index. This will be used to make the partition
 +     * key in the index table
 +     */
 +    protected abstract ByteBuffer getIndexedValue(ByteBuffer partitionKey,
 +                                                  Clustering clustering,
 +                                                  CellPath path,
 +                                                  ByteBuffer cellValue);
 +
 +    public ColumnDefinition getIndexedColumn()
 +    {
 +        return indexedColumn;
 +    }
 +
 +    public ClusteringComparator getIndexComparator()
 +    {
 +        return indexCfs.metadata.comparator;
 +    }
 +
 +    public ColumnFamilyStore getIndexCfs()
 +    {
 +        return indexCfs;
 +    }
 +
 +    public void register(IndexRegistry registry)
 +    {
 +        registry.registerIndex(this);
 +    }
 +
 +    public Callable<?> getInitializationTask()
 +    {
-         // if we're just linking in the index on an already-built index post-restart
-         // we've nothing to do. Otherwise, submit for building via SecondaryIndexBuilder
-         return isBuilt() ? null : getBuildIndexTask();
++        // if we're just linking in the index on an already-built index post-restart or if the base
++        // table is empty we've nothing to do. Otherwise, submit for building via SecondaryIndexBuilder
++        return isBuilt() || baseCfs.isEmpty() ? null : getBuildIndexTask();
 +    }
 +
 +    public IndexMetadata getIndexMetadata()
 +    {
 +        return metadata;
 +    }
 +
 +    public Optional<ColumnFamilyStore> getBackingTable()
 +    {
 +        return indexCfs == null ? Optional.empty() : Optional.of(indexCfs);
 +    }
 +
 +    public Callable<Void> getBlockingFlushTask()
 +    {
 +        return () -> {
 +            indexCfs.forceBlockingFlush();
 +            return null;
 +        };
 +    }
 +
 +    public Callable<?> getInvalidateTask()
 +    {
 +        return () -> {
 +            invalidate();
 +            return null;
 +        };
 +    }
 +
 +    public Callable<?> getMetadataReloadTask(IndexMetadata indexDef)
 +    {
 +        return () -> {
 +            indexCfs.metadata.reloadIndexMetadataProperties(baseCfs.metadata);
 +            indexCfs.reload();
 +            return null;
 +        };
 +    }
 +
 +    private void setMetadata(IndexMetadata indexDef)
 +    {
 +        metadata = indexDef;
 +        Pair<ColumnDefinition, IndexTarget.Type> target = parseTarget(baseCfs.metadata, indexDef);
 +        functions = getFunctions(indexDef, target);
 +        CFMetaData cfm = indexCfsMetadata(baseCfs.metadata, indexDef);
 +        indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.keyspace,
 +                                                             cfm.cfName,
 +                                                             cfm,
 +                                                             baseCfs.getTracker().loadsstables);
 +        indexedColumn = target.left;
 +    }
 +
 +    public Callable<?> getTruncateTask(final long truncatedAt)
 +    {
 +        return () -> {
 +            indexCfs.discardSSTables(truncatedAt);
 +            return null;
 +        };
 +    }
 +
 +    public boolean shouldBuildBlocking()
 +    {
 +        // built-in indexes are always included in builds initiated from SecondaryIndexManager
 +        return true;
 +    }
 +
 +    public boolean indexes(PartitionColumns columns)
 +    {
 +        // if we have indexes on the partition key or clustering columns, return true
 +        return isPrimaryKeyIndex() || columns.contains(indexedColumn);
 +    }
 +
 +    public boolean dependsOn(ColumnDefinition column)
 +    {
 +        return indexedColumn.name.equals(column.name);
 +    }
 +
 +    public boolean supportsExpression(ColumnDefinition column, Operator operator)
 +    {
 +        return indexedColumn.name.equals(column.name)
 +               && supportsOperator(indexedColumn, operator);
 +    }
 +
 +    private boolean supportsExpression(RowFilter.Expression expression)
 +    {
 +        return supportsExpression(expression.column(), expression.operator());
 +    }
 +
 +    public AbstractType<?> customExpressionValueType()
 +    {
 +        return null;
 +    }
 +
 +    public long getEstimatedResultRows()
 +    {
 +        return indexCfs.getMeanColumns();
 +    }
 +
 +    /**
 +     * No post processing of query results, just return them unchanged
 +     */
 +    public BiFunction<PartitionIterator, ReadCommand, PartitionIterator> postProcessorFor(ReadCommand command)
 +    {
 +        return (partitionIterator, readCommand) -> partitionIterator;
 +    }
 +
 +    public RowFilter getPostIndexQueryFilter(RowFilter filter)
 +    {
 +        return getTargetExpression(filter.getExpressions()).map(filter::without)
 +                                                           .orElse(filter);
 +    }
 +
 +    private Optional<RowFilter.Expression> getTargetExpression(List<RowFilter.Expression> expressions)
 +    {
 +        return expressions.stream().filter(this::supportsExpression).findFirst();
 +    }
 +
 +    public Index.Searcher searcherFor(ReadCommand command)
 +    {
 +        Optional<RowFilter.Expression> target = getTargetExpression(command.rowFilter().getExpressions());
 +
 +        if (target.isPresent())
 +        {
 +            target.get().validateForIndexing();
 +            switch (getIndexMetadata().kind)
 +            {
 +                case COMPOSITES:
 +                    return new CompositesSearcher(command, target.get(), this);
 +                case KEYS:
 +                    return new KeysSearcher(command, target.get(), this);
 +                default:
 +                    throw new IllegalStateException(String.format("Unsupported index type %s for index %s on %s",
 +                                                                  metadata.kind,
 +                                                                  metadata.name,
 +                                                                  indexedColumn.name.toString()));
 +            }
 +        }
 +
 +        return null;
 +
 +    }
 +
 +    public void validate(PartitionUpdate update) throws InvalidRequestException
 +    {
 +        switch (indexedColumn.kind)
 +        {
 +            case PARTITION_KEY:
 +                validatePartitionKey(update.partitionKey());
 +                break;
 +            case CLUSTERING:
 +                validateClusterings(update);
 +                break;
 +            case REGULAR:
 +                validateRows(update);
 +                break;
 +            case STATIC:
 +                validateRows(Collections.singleton(update.staticRow()));
 +                break;
 +        }
 +    }
 +
 +    public Indexer indexerFor(final DecoratedKey key,
 +                              final int nowInSec,
 +                              final OpOrder.Group opGroup,
 +                              final IndexTransaction.Type transactionType)
 +    {
 +        return new Indexer()
 +        {
 +            public void begin()
 +            {
 +            }
 +
 +            public void partitionDelete(DeletionTime deletionTime)
 +            {
 +            }
 +
 +            public void rangeTombstone(RangeTombstone tombstone)
 +            {
 +            }
 +
 +            public void insertRow(Row row)
 +            {
 +                if (isPrimaryKeyIndex())
 +                {
 +                    indexPrimaryKey(row.clustering(),
 +                                    getPrimaryKeyIndexLiveness(row),
 +                                    row.deletion());
 +                }
 +                else
 +                {
 +                    if (indexedColumn.isComplex())
 +                        indexCells(row.clustering(), row.getComplexColumnData(indexedColumn));
 +                    else
 +                        indexCell(row.clustering(), row.getCell(indexedColumn));
 +                }
 +            }
 +
 +            public void removeRow(Row row)
 +            {
 +                if (isPrimaryKeyIndex())
 +                    indexPrimaryKey(row.clustering(), row.primaryKeyLivenessInfo(), row.deletion());
 +
 +                if (indexedColumn.isComplex())
 +                    removeCells(row.clustering(), row.getComplexColumnData(indexedColumn));
 +                else
 +                    removeCell(row.clustering(), row.getCell(indexedColumn));
 +            }
 +
 +
 +            public void updateRow(Row oldRow, Row newRow)
 +            {
 +                if (isPrimaryKeyIndex())
 +                    indexPrimaryKey(newRow.clustering(),
 +                                    newRow.primaryKeyLivenessInfo(),
 +                                    newRow.deletion());
 +
 +                if (indexedColumn.isComplex())
 +                {
 +                    indexCells(newRow.clustering(), newRow.getComplexColumnData(indexedColumn));
 +                    removeCells(oldRow.clustering(), oldRow.getComplexColumnData(indexedColumn));
 +                }
 +                else
 +                {
 +                    indexCell(newRow.clustering(), newRow.getCell(indexedColumn));
 +                    removeCell(oldRow.clustering(), oldRow.getCell(indexedColumn));
 +                }
 +            }
 +
 +            public void finish()
 +            {
 +            }
 +
 +            private void indexCells(Clustering clustering, Iterable<Cell> cells)
 +            {
 +                if (cells == null)
 +                    return;
 +
 +                for (Cell cell : cells)
 +                    indexCell(clustering, cell);
 +            }
 +
 +            private void indexCell(Clustering clustering, Cell cell)
 +            {
 +                if (cell == null || !cell.isLive(nowInSec))
 +                    return;
 +
 +                insert(key.getKey(),
 +                       clustering,
 +                       cell,
 +                       LivenessInfo.create(cell.timestamp(), cell.ttl(), cell.localDeletionTime()),
 +                       opGroup);
 +            }
 +
 +            private void removeCells(Clustering clustering, Iterable<Cell> cells)
 +            {
 +                if (cells == null)
 +                    return;
 +
 +                for (Cell cell : cells)
 +                    removeCell(clustering, cell);
 +            }
 +
 +            private void removeCell(Clustering clustering, Cell cell)
 +            {
 +                if (cell == null || !cell.isLive(nowInSec))
 +                    return;
 +
 +                delete(key.getKey(), clustering, cell, opGroup, nowInSec);
 +            }
 +
 +            private void indexPrimaryKey(final Clustering clustering,
 +                                         final LivenessInfo liveness,
 +                                         final Row.Deletion deletion)
 +            {
 +                if (liveness.timestamp() != LivenessInfo.NO_TIMESTAMP)
 +                    insert(key.getKey(), clustering, null, liveness, opGroup);
 +
 +                if (!deletion.isLive())
 +                    delete(key.getKey(), clustering, deletion.time(), opGroup);
 +            }
 +
 +            private LivenessInfo getPrimaryKeyIndexLiveness(Row row)
 +            {
 +                long timestamp = row.primaryKeyLivenessInfo().timestamp();
 +                int ttl = row.primaryKeyLivenessInfo().ttl();
 +                for (Cell cell : row.cells())
 +                {
 +                    long cellTimestamp = cell.timestamp();
 +                    if (cell.isLive(nowInSec))
 +                    {
 +                        if (cellTimestamp > timestamp)
 +                        {
 +                            timestamp = cellTimestamp;
 +                            ttl = cell.ttl();
 +                        }
 +                    }
 +                }
 +                return LivenessInfo.create(baseCfs.metadata, timestamp, ttl, nowInSec);
 +            }
 +        };
 +    }
 +
 +    /**
 +     * Specific to internal indexes, this is called by a
 +     * searcher when it encounters a stale entry in the index
 +     * @param indexKey the partition key in the index table
 +     * @param indexClustering the clustering in the index table
 +     * @param deletion deletion timestamp etc
 +     * @param opGroup the operation under which to perform the deletion
 +     */
 +    public void deleteStaleEntry(DecoratedKey indexKey,
 +                                 Clustering indexClustering,
 +                                 DeletionTime deletion,
 +                                 OpOrder.Group opGroup)
 +    {
 +        doDelete(indexKey, indexClustering, deletion, opGroup);
 +        logger.trace("Removed index entry for stale value {}", indexKey);
 +    }
 +
 +    /**
 +     * Called when adding a new entry to the index
 +     */
 +    private void insert(ByteBuffer rowKey,
 +                        Clustering clustering,
 +                        Cell cell,
 +                        LivenessInfo info,
 +                        OpOrder.Group opGroup)
 +    {
 +        DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
 +                                                               clustering,
 +                                                               cell));
 +        Row row = BTreeRow.noCellLiveRow(buildIndexClustering(rowKey, clustering, cell), info);
 +        PartitionUpdate upd = partitionUpdate(valueKey, row);
 +        indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null);
 +        logger.trace("Inserted entry into index for value {}", valueKey);
 +    }
 +
 +    /**
 +     * Called when deleting entries on non-primary key columns
 +     */
 +    private void delete(ByteBuffer rowKey,
 +                        Clustering clustering,
 +                        Cell cell,
 +                        OpOrder.Group opGroup,
 +                        int nowInSec)
 +    {
 +        DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
 +                                                               clustering,
 +                                                               cell));
 +        doDelete(valueKey,
 +                 buildIndexClustering(rowKey, clustering, cell),
 +                 new DeletionTime(cell.timestamp(), nowInSec),
 +                 opGroup);
 +    }
 +
 +    /**
 +     * Called when deleting entries from indexes on primary key columns
 +     */
 +    private void delete(ByteBuffer rowKey,
 +                        Clustering clustering,
 +                        DeletionTime deletion,
 +                        OpOrder.Group opGroup)
 +    {
 +        DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
 +                                                               clustering,
 +                                                               null));
 +        doDelete(valueKey,
 +                 buildIndexClustering(rowKey, clustering, null),
 +                 deletion,
 +                 opGroup);
 +    }
 +
 +    private void doDelete(DecoratedKey indexKey,
 +                          Clustering indexClustering,
 +                          DeletionTime deletion,
 +                          OpOrder.Group opGroup)
 +    {
 +        Row row = BTreeRow.emptyDeletedRow(indexClustering, Row.Deletion.regular(deletion));
 +        PartitionUpdate upd = partitionUpdate(indexKey, row);
 +        indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null);
 +        logger.trace("Removed index entry for value {}", indexKey);
 +    }
 +
 +    private void validatePartitionKey(DecoratedKey partitionKey) throws InvalidRequestException
 +    {
 +        assert indexedColumn.isPartitionKey();
 +        validateIndexedValue(getIndexedValue(partitionKey.getKey(), null, null));
 +    }
 +
 +    private void validateClusterings(PartitionUpdate update) throws InvalidRequestException
 +    {
 +        assert indexedColumn.isClusteringColumn();
 +        for (Row row : update)
 +            validateIndexedValue(getIndexedValue(null, row.clustering(), null));
 +    }
 +
 +    private void validateRows(Iterable<Row> rows)
 +    {
 +        assert !indexedColumn.isPrimaryKeyColumn();
 +        for (Row row : rows)
 +        {
 +            if (indexedColumn.isComplex())
 +            {
 +                ComplexColumnData data = row.getComplexColumnData(indexedColumn);
 +                if (data != null)
 +                {
 +                    for (Cell cell : data)
 +                    {
 +                        validateIndexedValue(getIndexedValue(null, null, cell.path(), cell.value()));
 +                    }
 +                }
 +            }
 +            else
 +            {
 +                validateIndexedValue(getIndexedValue(null, null, row.getCell(indexedColumn)));
 +            }
 +        }
 +    }
 +
 +    private void validateIndexedValue(ByteBuffer value)
 +    {
 +        if (value != null && value.remaining() >= FBUtilities.MAX_UNSIGNED_SHORT)
 +            throw new InvalidRequestException(String.format(
 +                                                           "Cannot index value of size %d for index %s on %s.%s(%s) (maximum allowed size=%d)",
 +                                                           value.remaining(),
 +                                                           metadata.name,
 +                                                           baseCfs.metadata.ksName,
 +                                                           baseCfs.metadata.cfName,
 +                                                           indexedColumn.name.toString(),
 +                                                           FBUtilities.MAX_UNSIGNED_SHORT));
 +    }
 +
 +    private ByteBuffer getIndexedValue(ByteBuffer rowKey,
 +                                       Clustering clustering,
 +                                       Cell cell)
 +    {
 +        return getIndexedValue(rowKey,
 +                               clustering,
 +                               cell == null ? null : cell.path(),
 +                               cell == null ? null : cell.value()
 +        );
 +    }
 +
 +    private Clustering buildIndexClustering(ByteBuffer rowKey,
 +                                            Clustering clustering,
 +                                            Cell cell)
 +    {
 +        return buildIndexClusteringPrefix(rowKey,
 +                                          clustering,
 +                                          cell == null ? null : cell.path()).build();
 +    }
 +
 +    private DecoratedKey getIndexKeyFor(ByteBuffer value)
 +    {
 +        return indexCfs.decorateKey(value);
 +    }
 +
 +    private PartitionUpdate partitionUpdate(DecoratedKey valueKey, Row row)
 +    {
 +        return PartitionUpdate.singleRowUpdate(indexCfs.metadata, valueKey, row);
 +    }
 +
 +    private void invalidate()
 +    {
 +        // interrupt in-progress compactions
 +        Collection<ColumnFamilyStore> cfss = Collections.singleton(indexCfs);
 +        CompactionManager.instance.interruptCompactionForCFs(cfss, true);
 +        CompactionManager.instance.waitForCessation(cfss);
 +        Keyspace.writeOrder.awaitNewBarrier();
 +        indexCfs.forceBlockingFlush();
 +        indexCfs.readOrdering.awaitNewBarrier();
 +        indexCfs.invalidate();
 +    }
 +
 +    private boolean isBuilt()
 +    {
 +        return SystemKeyspace.isIndexBuilt(baseCfs.keyspace.getName(), metadata.name);
 +    }
 +
 +    private boolean isPrimaryKeyIndex()
 +    {
 +        return indexedColumn.isPrimaryKeyColumn();
 +    }
 +
 +    private Callable<?> getBuildIndexTask()
 +    {
 +        return () -> {
 +            buildBlocking();
 +            return null;
 +        };
 +    }
 +
 +    private void buildBlocking()
 +    {
 +        baseCfs.forceBlockingFlush();
 +
 +        try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.select(SSTableSet.CANONICAL));
 +             Refs<SSTableReader> sstables = viewFragment.refs)
 +        {
 +            if (sstables.isEmpty())
 +            {
 +                logger.info("No SSTable data for {}.{} to build index {} from, marking empty index as built",
 +                            baseCfs.metadata.ksName,
 +                            baseCfs.metadata.cfName,
 +                            metadata.name);
 +                baseCfs.indexManager.markIndexBuilt(metadata.name);
 +                return;
 +            }
 +
 +            logger.info("Submitting index build of {} for data in {}",
 +                        metadata.name,
 +                        getSSTableNames(sstables));
 +
 +            SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs,
 +                                                                      Collections.singleton(this),
 +                                                                      new ReducingKeyIterator(sstables));
 +            Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
 +            FBUtilities.waitOnFuture(future);
 +            indexCfs.forceBlockingFlush();
 +            baseCfs.indexManager.markIndexBuilt(metadata.name);
 +        }
 +        logger.info("Index build of {} complete", metadata.name);
 +    }
 +
 +    private static String getSSTableNames(Collection<SSTableReader> sstables)
 +    {
 +        return StreamSupport.stream(sstables.spliterator(), false)
 +                            .map(SSTableReader::toString)
 +                            .collect(Collectors.joining(", "));
 +    }
 +
 +    /**
 +     * Construct the CFMetadata for an index table, the clustering columns in the index table
 +     * vary dependent on the kind of the indexed value.
 +     * @param baseCfsMetadata
 +     * @param indexMetadata
 +     * @return
 +     */
 +    public static final CFMetaData indexCfsMetadata(CFMetaData baseCfsMetadata, IndexMetadata indexMetadata)
 +    {
 +        Pair<ColumnDefinition, IndexTarget.Type> target = parseTarget(baseCfsMetadata, indexMetadata);
 +        CassandraIndexFunctions utils = getFunctions(indexMetadata, target);
 +        ColumnDefinition indexedColumn = target.left;
 +        AbstractType<?> indexedValueType = utils.getIndexedValueType(indexedColumn);
 +        CFMetaData.Builder builder = CFMetaData.Builder.create(baseCfsMetadata.ksName,
 +                                                               baseCfsMetadata.indexColumnFamilyName(indexMetadata))
 +                                                       .withId(baseCfsMetadata.cfId)
 +                                                       .withPartitioner(new LocalPartitioner(indexedValueType))
 +                                                       .addPartitionKey(indexedColumn.name, indexedColumn.type);
 +
 +        builder.addClusteringColumn("partition_key", baseCfsMetadata.partitioner.partitionOrdering());
 +        builder = utils.addIndexClusteringColumns(builder, baseCfsMetadata, indexedColumn);
 +        return builder.build().reloadIndexMetadataProperties(baseCfsMetadata);
 +    }
 +
 +    /**
 +     * Factory method for new CassandraIndex instances
 +     * @param baseCfs
 +     * @param indexMetadata
 +     * @return
 +     */
 +    public static CassandraIndex newIndex(ColumnFamilyStore baseCfs, IndexMetadata indexMetadata)
 +    {
 +        return getFunctions(indexMetadata, parseTarget(baseCfs.metadata, indexMetadata)).newIndexInstance(baseCfs, indexMetadata);
 +    }
 +
 +    // Public because it's also used to convert index metadata into a thrift-compatible format
 +    public static Pair<ColumnDefinition, IndexTarget.Type> parseTarget(CFMetaData cfm,
 +                                                                       IndexMetadata indexDef)
 +    {
 +        String target = indexDef.options.get("target");
 +        assert target != null : String.format("No target definition found for index %s", indexDef.name);
 +
 +        // if the regex matches then the target is in the form "keys(foo)", "entries(bar)" etc
 +        // if not, then it must be a simple column name and implictly its type is VALUES
 +        Matcher matcher = TARGET_REGEX.matcher(target);
 +        String columnName;
 +        IndexTarget.Type targetType;
 +        if (matcher.matches())
 +        {
 +            targetType = IndexTarget.Type.fromString(matcher.group(1));
 +            columnName = matcher.group(2);
 +        }
 +        else
 +        {
 +            columnName = target;
 +            targetType = IndexTarget.Type.VALUES;
 +        }
 +
 +        // in the case of a quoted column name the name in the target string
 +        // will be enclosed in quotes, which we need to unwrap. It may also
 +        // include quote characters internally, escaped like so:
 +        //      abc"def -> abc""def.
 +        // Because the target string is stored in a CQL compatible form, we
 +        // need to un-escape any such quotes to get the actual column name
 +        if (columnName.startsWith("\""))
 +        {
 +            columnName = StringUtils.substring(StringUtils.substring(columnName, 1), 0, -1);
 +            columnName = columnName.replaceAll("\"\"", "\"");
 +        }
 +
 +        // if it's not a CQL table, we can't assume that the column name is utf8, so
 +        // in that case we have to do a linear scan of the cfm's columns to get the matching one
 +        if (cfm.isCQLTable())
 +            return Pair.create(cfm.getColumnDefinition(new ColumnIdentifier(columnName, true)), targetType);
 +        else
 +            for (ColumnDefinition column : cfm.allColumns())
 +                if (column.name.toString().equals(columnName))
 +                    return Pair.create(column, targetType);
 +
 +        throw new RuntimeException(String.format("Unable to parse targets for index %s (%s)", indexDef.name, target));
 +    }
 +
 +    static CassandraIndexFunctions getFunctions(IndexMetadata indexDef,
 +                                                Pair<ColumnDefinition, IndexTarget.Type> target)
 +    {
 +        if (indexDef.isKeys())
 +            return CassandraIndexFunctions.KEYS_INDEX_FUNCTIONS;
 +
 +        ColumnDefinition indexedColumn = target.left;
 +        if (indexedColumn.type.isCollection() && indexedColumn.type.isMultiCell())
 +        {
 +            switch (((CollectionType)indexedColumn.type).kind)
 +            {
 +                case LIST:
 +                    return CassandraIndexFunctions.COLLECTION_VALUE_INDEX_FUNCTIONS;
 +                case SET:
 +                    return CassandraIndexFunctions.COLLECTION_KEY_INDEX_FUNCTIONS;
 +                case MAP:
 +                    switch (target.right)
 +                    {
 +                        case KEYS:
 +                            return CassandraIndexFunctions.COLLECTION_KEY_INDEX_FUNCTIONS;
 +                        case KEYS_AND_VALUES:
 +                            return CassandraIndexFunctions.COLLECTION_ENTRY_INDEX_FUNCTIONS;
 +                        case VALUES:
 +                            return CassandraIndexFunctions.COLLECTION_VALUE_INDEX_FUNCTIONS;
 +                    }
 +                    throw new AssertionError();
 +            }
 +        }
 +
 +        switch (indexedColumn.kind)
 +        {
 +            case CLUSTERING:
 +                return CassandraIndexFunctions.CLUSTERING_COLUMN_INDEX_FUNCTIONS;
 +            case REGULAR:
 +                return CassandraIndexFunctions.REGULAR_COLUMN_INDEX_FUNCTIONS;
 +            case PARTITION_KEY:
 +                return CassandraIndexFunctions.PARTITION_KEY_INDEX_FUNCTIONS;
 +            //case COMPACT_VALUE:
 +            //    return new CompositesIndexOnCompactValue();
 +        }
 +        throw new AssertionError();
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b430eee/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index a46366c,4211f5a..ce6eebc
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@@ -24,7 -24,8 +24,8 @@@ import org.slf4j.Logger
  import org.slf4j.LoggerFactory;
  
  import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
 -import org.apache.cassandra.db.index.IndexNotAvailableException;
  import org.apache.cassandra.gms.Gossiper;
++import org.apache.cassandra.index.IndexNotAvailableException;
  
  public class MessageDeliveryTask implements Runnable
  {