You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2017/06/23 10:02:11 UTC

[2/2] cassandra git commit: Improve secondary index (re)build failure and concurrency handling

Improve secondary index (re)build failure and concurrency handling

patch by Andres de la Peña; reviewed by Paulo Motta and Sergio Bossa for CASSANDRA-10130


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/679c3171
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/679c3171
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/679c3171

Branch: refs/heads/trunk
Commit: 679c31718b709f5619bba80eeb6f388484b94c3c
Parents: a1c6a62
Author: Andrés de la Peña <a....@gmail.com>
Authored: Fri Jun 23 09:30:21 2017 +0100
Committer: Andrés de la Peña <a....@gmail.com>
Committed: Fri Jun 23 09:30:21 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |  17 +-
 .../db/compaction/CompactionManager.java        |   2 +-
 .../apache/cassandra/db/lifecycle/Tracker.java  |   8 +-
 .../cassandra/index/SecondaryIndexManager.java  | 597 +++++++++++----
 .../index/internal/CassandraIndex.java          |   2 -
 .../notifications/SSTableAddedNotification.java |  35 +-
 .../cassandra/streaming/StreamReceiveTask.java  |   4 +-
 ...pactionStrategyManagerPendingRepairTest.java |  10 +-
 .../LeveledCompactionStrategyTest.java          |   2 +-
 .../cassandra/db/lifecycle/TrackerTest.java     |   6 +-
 .../apache/cassandra/index/CustomIndexTest.java |   3 +-
 .../index/SecondaryIndexManagerTest.java        | 721 +++++++++++++++++++
 .../index/internal/CassandraIndexTest.java      |   1 +
 .../index/internal/CustomCassandraIndex.java    |   2 -
 .../cassandra/index/sasi/SASIIndexTest.java     |  12 +-
 .../cassandra/stress/CompactionStress.java      |   4 +
 17 files changed, 1236 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/679c3171/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 82a0bda..ab06e2b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Improve secondary index (re)build failure and concurrency handling (CASSANDRA-10130)
  * Improve calculation of available disk space for compaction (CASSANDRA-13068)
  * Change the accessibility of RowCacheSerializer for third party row cache plugins (CASSANDRA-13579)
  * Allow sub-range repairs for a preview of repaired data (CASSANDRA-13570)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/679c3171/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index dceb41d..893d525 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -410,7 +410,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         minCompactionThreshold = new DefaultValue<>(metadata.get().params.compaction.minCompactionThreshold());
         maxCompactionThreshold = new DefaultValue<>(metadata.get().params.compaction.maxCompactionThreshold());
         crcCheckChance = new DefaultValue<>(metadata.get().params.crcCheckChance);
-        indexManager = new SecondaryIndexManager(this);
         viewManager = keyspace.viewManager.forTable(metadata.id);
         metric = new TableMetrics(this);
         fileIndexGenerator.set(generation);
@@ -455,6 +454,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
 
         // create the private ColumnFamilyStores for the secondary column indexes
+        indexManager = new SecondaryIndexManager(this);
         for (IndexMetadata info : metadata.get().indexes)
             indexManager.addIndex(info);
 
@@ -567,7 +567,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
         data.dropSSTables();
         LifecycleTransaction.waitForDeletions();
-        indexManager.invalidateAllIndexesBlocking();
+        indexManager.dropAllIndexes();
 
         invalidateCaches();
     }
@@ -800,7 +800,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         try (Refs<SSTableReader> refs = Refs.ref(newSSTables))
         {
             data.addSSTables(newSSTables);
-            indexManager.buildAllIndexesBlocking(newSSTables);
         }
 
         logger.info("Done loading load new SSTables for {}/{}", keyspace.getName(), name);
@@ -815,14 +814,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     {
         ColumnFamilyStore cfs = Keyspace.open(ksName).getColumnFamilyStore(cfName);
 
-        Set<String> indexes = new HashSet<String>(Arrays.asList(idxNames));
-
-        Iterable<SSTableReader> sstables = cfs.getSSTables(SSTableSet.CANONICAL);
-        try (Refs<SSTableReader> refs = Refs.ref(sstables))
-        {
-            logger.info("User Requested secondary index re-build for {}/{} indexes: {}", ksName, cfName, Joiner.on(',').join(idxNames));
-            cfs.indexManager.rebuildIndexesBlocking(refs, indexes);
-        }
+        logger.info("User Requested secondary index re-build for {}/{} indexes: {}", ksName, cfName, Joiner.on(',').join(idxNames));
+        cfs.indexManager.rebuildIndexesBlocking(Sets.newHashSet(Arrays.asList(idxNames)));
     }
 
     public AbstractCompactionStrategy createCompactionStrategyInstance(CompactionParams compactionParams)
@@ -1451,7 +1444,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     public void addSSTable(SSTableReader sstable)
     {
         assert sstable.getColumnFamilyName().equals(name);
-        addSSTables(Arrays.asList(sstable));
+        addSSTables(Collections.singletonList(sstable));
     }
 
     public void addSSTables(Collection<SSTableReader> sstables)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/679c3171/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index bc8b305..d7e00da 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1619,7 +1619,7 @@ public class CompactionManager implements CompactionManagerMBean
     /**
      * Is not scheduled, because it is performing disjoint work from sstable compaction.
      */
-    public Future<?> submitIndexBuild(final SecondaryIndexBuilder builder)
+    public ListenableFuture<?> submitIndexBuild(final SecondaryIndexBuilder builder)
     {
         Runnable runnable = new Runnable()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/679c3171/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index e2fcb06..d46ee60 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@ -359,7 +359,7 @@ public class Tracker
         notifyDiscarded(memtable);
 
         // TODO: if we're invalidated, should we notifyadded AND removed, or just skip both?
-        fail = notifyAdded(sstables, fail);
+        fail = notifyAdded(sstables, memtable, fail);
 
         if (!isDummy() && !cfstore.isValid())
             dropSSTables();
@@ -417,9 +417,9 @@ public class Tracker
         return accumulate;
     }
 
-    Throwable notifyAdded(Iterable<SSTableReader> added, Throwable accumulate)
+    Throwable notifyAdded(Iterable<SSTableReader> added, Memtable memtable, Throwable accumulate)
     {
-        INotification notification = new SSTableAddedNotification(added);
+        INotification notification = new SSTableAddedNotification(added, memtable);
         for (INotificationConsumer subscriber : subscribers)
         {
             try
@@ -436,7 +436,7 @@ public class Tracker
 
     public void notifyAdded(Iterable<SSTableReader> added)
     {
-        maybeFail(notifyAdded(added, null));
+        maybeFail(notifyAdded(added, null, null));
     }
 
     public void notifySSTableRepairedStatusChanged(Collection<SSTableReader> repairStatusesChanged)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/679c3171/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index f7b7d13..c2ed134 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -20,20 +20,29 @@ package org.apache.cassandra.index;
 import java.lang.reflect.Constructor;
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.primitives.Longs;
+import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+
 import org.apache.commons.lang3.StringUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,6 +62,9 @@ import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.index.internal.CassandraIndex;
 import org.apache.cassandra.index.transactions.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.notifications.INotification;
+import org.apache.cassandra.notifications.INotificationConsumer;
+import org.apache.cassandra.notifications.SSTableAddedNotification;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.schema.Indexes;
@@ -60,6 +72,7 @@ import org.apache.cassandra.service.pager.SinglePartitionPager;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.concurrent.Refs;
 
@@ -67,7 +80,7 @@ import org.apache.cassandra.utils.concurrent.Refs;
  * Handles the core maintenance functionality associated with indexes: adding/removing them to or from
  * a table, (re)building during bootstrap or other streaming operations, flushing, reloading metadata
  * and so on.
- *
+ * <br><br>
  * The Index interface defines a number of methods which return {@code Callable<?>}. These are primarily the
  * management tasks for an index implementation. Most of them are currently executed in a blocking
  * fashion via submission to SIM's blockingExecutor. This provides the desired behaviour in pretty
@@ -76,51 +89,76 @@ import org.apache.cassandra.utils.concurrent.Refs;
  * then be defined with as void and called directly from SIM (rather than being run via the executor service).
  * Separating the task defintion from execution gives us greater flexibility though, so that in future, for example,
  * if the flush process allows it we leave open the possibility of executing more of these tasks asynchronously.
- *
+ * <br><br>
  * The primary exception to the above is the Callable returned from Index#addIndexedColumn. This may
  * involve a significant effort, building a new index over any existing data. We perform this task asynchronously;
  * as it is called as part of a schema update, which we do not want to block for a long period. Building non-custom
  * indexes is performed on the CompactionManager.
- *
+ * <br><br>
  * This class also provides instances of processors which listen to updates to the base table and forward to
  * registered Indexes the info required to keep those indexes up to date.
  * There are two variants of these processors, each with a factory method provided by SIM:
- *      IndexTransaction: deals with updates generated on the regular write path.
- *      CleanupTransaction: used when partitions are modified during compaction or cleanup operations.
+ * IndexTransaction: deals with updates generated on the regular write path.
+ * CleanupTransaction: used when partitions are modified during compaction or cleanup operations.
  * Further details on their usage and lifecycles can be found in the interface definitions below.
- *
- * Finally, the bestIndexFor method is used at query time to identify the most selective index of those able
+ * <br><br>
+ * The bestIndexFor method is used at query time to identify the most selective index of those able
  * to satisfy any search predicates defined by a ReadCommand's RowFilter. It returns a thin IndexAccessor object
  * which enables the ReadCommand to access the appropriate functions of the Index at various stages in its lifecycle.
  * e.g. the getEstimatedResultRows is required when StorageProxy calculates the initial concurrency factor for
  * distributing requests to replicas, whereas a Searcher instance is needed when the ReadCommand is executed locally on
  * a target replica.
+ * <br><br>
+ * Finally, this class provides a clear and safe lifecycle to manage index builds, either full rebuilds via
+ * {@link this#rebuildIndexesBlocking(Set)} or builds of new sstables
+ * added via {@link org.apache.cassandra.notifications.SSTableAddedNotification}s, guaranteeing
+ * the following:
+ * <ul>
+ * <li>The initialization task and any subsequent successful (re)build mark the index as built.</li>
+ * <li>If any (re)build operation fails, the index is not marked as built, and only another full rebuild can mark the
+ * index as built.</li>
+ * <li>Full rebuilds cannot be run concurrently with other full or sstable (re)builds.</li>
+ * <li>SSTable builds can always be run concurrently with any other builds.</li>
+ * </ul>
  */
-public class SecondaryIndexManager implements IndexRegistry
+public class SecondaryIndexManager implements IndexRegistry, INotificationConsumer
 {
     private static final Logger logger = LoggerFactory.getLogger(SecondaryIndexManager.class);
 
     // default page size (in rows) when rebuilding the index for a whole partition
     public static final int DEFAULT_PAGE_SIZE = 10000;
 
-    private Map<String, Index> indexes = Maps.newConcurrentMap();
+    /**
+     * All registered indexes.
+     */
+    private final Map<String, Index> indexes = Maps.newConcurrentMap();
+
+    /**
+     * The indexes that had a build failure.
+     */
+    private final Set<String> needsFullRebuild = Sets.newConcurrentHashSet();
+
+    /**
+     * The indexes that are available for querying.
+     */
+    private final Set<String> queryableIndexes = Sets.newConcurrentHashSet();
 
     /**
-     * The indexes that are ready to server requests.
+     * The count of pending index builds for each index.
      */
-    private Set<String> builtIndexes = Sets.newConcurrentHashSet();
+    private final Map<String, AtomicInteger> inProgressBuilds = Maps.newConcurrentMap();
 
     // executes tasks returned by Indexer#addIndexColumn which may require index(es) to be (re)built
-    private static final ExecutorService asyncExecutor =
-        new JMXEnabledThreadPoolExecutor(1,
-                                         StageManager.KEEPALIVE,
-                                         TimeUnit.SECONDS,
-                                         new LinkedBlockingQueue<>(),
-                                         new NamedThreadFactory("SecondaryIndexManagement"),
-                                         "internal");
+    private static final ListeningExecutorService asyncExecutor = MoreExecutors.listeningDecorator(
+    new JMXEnabledThreadPoolExecutor(1,
+                                     StageManager.KEEPALIVE,
+                                     TimeUnit.SECONDS,
+                                     new LinkedBlockingQueue<>(),
+                                     new NamedThreadFactory("SecondaryIndexManagement"),
+                                     "internal"));
 
     // executes all blocking tasks produced by Indexers e.g. getFlushTask, getMetadataReloadTask etc
-    private static final ExecutorService blockingExecutor = MoreExecutors.newDirectExecutorService();
+    private static final ListeningExecutorService blockingExecutor = MoreExecutors.newDirectExecutorService();
 
     /**
      * The underlying column family containing the source data for these indexes
@@ -130,9 +168,9 @@ public class SecondaryIndexManager implements IndexRegistry
     public SecondaryIndexManager(ColumnFamilyStore baseCfs)
     {
         this.baseCfs = baseCfs;
+        baseCfs.getTracker().subscribe(this);
     }
 
-
     /**
      * Drops and adds new indexes associated with the underlying CF
      */
@@ -160,27 +198,63 @@ public class SecondaryIndexManager implements IndexRegistry
                : blockingExecutor.submit(reloadTask);
     }
 
-    private Future<?> createIndex(IndexMetadata indexDef)
+    @SuppressWarnings("unchecked")
+    private synchronized Future<?> createIndex(IndexMetadata indexDef)
     {
-        Index index = createInstance(indexDef);
+        final Index index = createInstance(indexDef);
+        String indexName = index.getIndexMetadata().name;
         index.register(this);
 
+        // now mark as building prior to initializing
+        markIndexesBuilding(ImmutableSet.of(index), true);
+
+        Callable<?> initialBuildTask = null;
         // if the index didn't register itself, we can probably assume that no initialization needs to happen
-        final Callable<?> initialBuildTask = indexes.containsKey(indexDef.name)
-                                           ? index.getInitializationTask()
-                                           : null;
+        if (indexes.containsKey(indexDef.name))
+        {
+            try
+            {
+                initialBuildTask = index.getInitializationTask();
+            }
+            catch (Throwable t)
+            {
+                logAndMarkIndexesFailed(Collections.singleton(index), t);
+                throw t;
+            }
+        }
+
+        // if there's no initialization, just mark as built and return:
         if (initialBuildTask == null)
         {
-            // We need to make sure that the index is marked as built in the case where the initialBuildTask
-            // does not need to be run (if the index didn't register itself or if the base table was empty).
-            markIndexBuilt(indexDef.name);
+            markIndexBuilt(index, true);
             return Futures.immediateFuture(null);
         }
-        return asyncExecutor.submit(index.getInitializationTask());
+
+        // otherwise run the initialization task asynchronously with a callback to mark it built or failed
+        final SettableFuture initialization = SettableFuture.create();
+        Futures.addCallback(asyncExecutor.submit(initialBuildTask), new FutureCallback()
+        {
+            @Override
+            public void onFailure(Throwable t)
+            {
+                logAndMarkIndexesFailed(Collections.singleton(index), t);
+                initialization.setException(t);
+            }
+
+            @Override
+            public void onSuccess(Object o)
+            {
+                markIndexBuilt(index, true);
+                initialization.set(o);
+            }
+        }, MoreExecutors.directExecutor());
+
+        return initialization;
     }
 
     /**
      * Adds and builds a index
+     *
      * @param indexDef the IndexMetadata describing the index
      */
     public synchronized Future<?> addIndex(IndexMetadata indexDef)
@@ -195,11 +269,11 @@ public class SecondaryIndexManager implements IndexRegistry
      * Checks if the specified index is queryable.
      *
      * @param index the index
-     * @return <code>true</code> if the specified index is queryable, <code>false</code> otherwise
+     * @return <code>true</code> if the specified index is registered, <code>false</code> otherwise
      */
     public boolean isIndexQueryable(Index index)
     {
-        return builtIndexes.contains(index.getIndexMetadata().name);
+        return queryableIndexes.contains(index.getIndexMetadata().name);
     }
 
     public synchronized void removeIndex(String indexName)
@@ -208,7 +282,7 @@ public class SecondaryIndexManager implements IndexRegistry
         if (null != index)
         {
             markIndexRemoved(indexName);
-            executeBlocking(index.getInvalidateTask());
+            executeBlocking(index.getInvalidateTask(), null);
         }
     }
 
@@ -231,57 +305,35 @@ public class SecondaryIndexManager implements IndexRegistry
      */
     public void markAllIndexesRemoved()
     {
-       getBuiltIndexNames().forEach(this::markIndexRemoved);
+        getBuiltIndexNames().forEach(this::markIndexRemoved);
     }
 
     /**
-    * Does a full, blocking rebuild of the indexes specified by columns from the sstables.
-    * Caller must acquire and release references to the sstables used here.
-    * Note also that only this method of (re)building indexes:
-    *   a) takes a set of index *names* rather than Indexers
-    *   b) marks exsiting indexes removed prior to rebuilding
-    *
-    * @param sstables the data to build from
-    * @param indexNames the list of indexes to be rebuilt
-    */
-    public void rebuildIndexesBlocking(Collection<SSTableReader> sstables, Set<String> indexNames)
-    {
-        Set<Index> toRebuild = indexes.values().stream()
-                                               .filter(index -> indexNames.contains(index.getIndexMetadata().name))
-                                               .filter(Index::shouldBuildBlocking)
-                                               .collect(Collectors.toSet());
-        if (toRebuild.isEmpty())
-        {
-            logger.info("No defined indexes with the supplied names: {}", Joiner.on(',').join(indexNames));
-            return;
-        }
-
-        toRebuild.forEach(indexer -> markIndexRemoved(indexer.getIndexMetadata().name));
-
-        buildIndexesBlocking(sstables, toRebuild);
-
-        toRebuild.forEach(indexer -> markIndexBuilt(indexer.getIndexMetadata().name));
-    }
-
-    public void buildAllIndexesBlocking(Collection<SSTableReader> sstables)
-    {
-        buildIndexesBlocking(sstables, indexes.values()
-                                              .stream()
-                                              .filter(Index::shouldBuildBlocking)
-                                              .collect(Collectors.toSet()));
-    }
-
-    // For convenience, may be called directly from Index impls
-    public void buildIndexBlocking(Index index)
+     * Does a blocking full rebuild of the specifed indexes from all the sstables in the base table.
+     * Note also that this method of (re)building indexes:
+     * a) takes a set of index *names* rather than Indexers
+     * b) marks existing indexes removed prior to rebuilding
+     * c) fails if such marking operation conflicts with any ongoing index builds, as full rebuilds cannot be run
+     * concurrently
+     *
+     * @param indexNames the list of indexes to be rebuilt
+     */
+    public void rebuildIndexesBlocking(Set<String> indexNames)
     {
-        if (index.shouldBuildBlocking())
+        try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.selectFunction(SSTableSet.CANONICAL));
+             Refs<SSTableReader> allSSTables = viewFragment.refs)
         {
-            try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.selectFunction(SSTableSet.CANONICAL));
-                 Refs<SSTableReader> sstables = viewFragment.refs)
+            Set<Index> toRebuild = indexes.values().stream()
+                                          .filter(index -> indexNames.contains(index.getIndexMetadata().name))
+                                          .filter(Index::shouldBuildBlocking)
+                                          .collect(Collectors.toSet());
+            if (toRebuild.isEmpty())
             {
-                buildIndexesBlocking(sstables, Collections.singleton(index));
-                markIndexBuilt(index.getIndexMetadata().name);
+                logger.info("No defined indexes with the supplied names: {}", Joiner.on(',').join(indexNames));
+                return;
             }
+
+            buildIndexesBlocking(allSSTables, toRebuild, true);
         }
     }
 
@@ -356,55 +408,255 @@ public class SecondaryIndexManager implements IndexRegistry
         return StringUtils.substringAfter(cfName, Directories.SECONDARY_INDEX_NAME_SEPARATOR);
     }
 
-    private void buildIndexesBlocking(Collection<SSTableReader> sstables, Set<Index> indexes)
+    /**
+     * Performs a blocking (re)indexing of the specified SSTables for the specified indexes.
+     *
+     * @param sstables      the SSTables to be (re)indexed
+     * @param indexes       the indexes to be (re)built for the specifed SSTables
+     * @param isFullRebuild True if this method is invoked as a full index rebuild, false otherwise
+     */
+    @SuppressWarnings({ "unchecked" })
+    private void buildIndexesBlocking(Collection<SSTableReader> sstables, Set<Index> indexes, boolean isFullRebuild)
     {
         if (indexes.isEmpty())
             return;
 
-        logger.info("Submitting index build of {} for data in {}",
-                    indexes.stream().map(i -> i.getIndexMetadata().name).collect(Collectors.joining(",")),
-                    sstables.stream().map(SSTableReader::toString).collect(Collectors.joining(",")));
+        // Mark all indexes as building: this step must happen first, because if any index can't be marked, the whole
+        // process needs to abort
+        markIndexesBuilding(indexes, isFullRebuild);
+
+        // Build indexes in a try/catch, so that any index not marked as either built or failed will be marked as failed:
+        final Set<Index> builtIndexes = new HashSet<>();
+        final Set<Index> unbuiltIndexes = new HashSet<>();
 
-        Map<Index.IndexBuildingSupport, Set<Index>> byType = new HashMap<>();
-        for (Index index : indexes)
+        // Any exception thrown during index building that could be suppressed by the finally block
+        Exception accumulatedFail = null;
+
+        try
         {
-            Set<Index> stored = byType.computeIfAbsent(index.getBuildTaskSupport(), i -> new HashSet<>());
-            stored.add(index);
+            logger.info("Submitting index build of {} for data in {}",
+                        indexes.stream().map(i -> i.getIndexMetadata().name).collect(Collectors.joining(",")),
+                        sstables.stream().map(SSTableReader::toString).collect(Collectors.joining(",")));
+
+            // Group all building tasks
+            Map<Index.IndexBuildingSupport, Set<Index>> byType = new HashMap<>();
+            for (Index index : indexes)
+            {
+                Set<Index> stored = byType.computeIfAbsent(index.getBuildTaskSupport(), i -> new HashSet<>());
+                stored.add(index);
+            }
+
+            // Schedule all index building tasks with a callback to mark them as built or failed
+            List<Future<?>> futures = new ArrayList<>(byType.size());
+            byType.forEach((buildingSupport, groupedIndexes) ->
+                           {
+                               SecondaryIndexBuilder builder = buildingSupport.getIndexBuildTask(baseCfs, groupedIndexes, sstables);
+                               final SettableFuture build = SettableFuture.create();
+                               Futures.addCallback(CompactionManager.instance.submitIndexBuild(builder), new FutureCallback()
+                               {
+                                   @Override
+                                   public void onFailure(Throwable t)
+                                   {
+                                       logAndMarkIndexesFailed(groupedIndexes, t);
+                                       unbuiltIndexes.addAll(groupedIndexes);
+                                       build.setException(t);
+                                   }
+
+                                   @Override
+                                   public void onSuccess(Object o)
+                                   {
+                                       groupedIndexes.forEach(i -> markIndexBuilt(i, isFullRebuild));
+                                       logger.info("Index build of {} completed", getIndexNames(groupedIndexes));
+                                       builtIndexes.addAll(groupedIndexes);
+                                       build.set(o);
+                                   }
+                               });
+                               futures.add(build);
+                           });
+
+            // Finally wait for the index builds to finish and flush the indexes that built successfully
+            FBUtilities.waitOnFutures(futures);
         }
+        catch (Exception e)
+        {
+            accumulatedFail = e;
+            throw e;
+        }
+        finally
+        {
+            try
+            {
+                // Fail any indexes that couldn't be marked
+                Set<Index> failedIndexes = Sets.difference(indexes, Sets.union(builtIndexes, unbuiltIndexes));
+                if (!failedIndexes.isEmpty())
+                {
+                    logAndMarkIndexesFailed(failedIndexes, accumulatedFail);
+                }
 
-        List<Future<?>> futures = byType.entrySet()
-                                        .stream()
-                                        .map((e) -> e.getKey().getIndexBuildTask(baseCfs, e.getValue(), sstables))
-                                        .map(CompactionManager.instance::submitIndexBuild)
-                                        .collect(Collectors.toList());
+                // Flush all built indexes with an aynchronous callback to log the success or failure of the flush
+                flushIndexesBlocking(builtIndexes, new FutureCallback()
+                {
+                    String indexNames = StringUtils.join(builtIndexes.stream()
+                                                                     .map(i -> i.getIndexMetadata().name)
+                                                                     .collect(Collectors.toList()), ',');
 
-        FBUtilities.waitOnFutures(futures);
+                    @Override
+                    public void onFailure(Throwable ignored)
+                    {
+                        logger.info("Index flush of {} failed", indexNames);
+                    }
 
-        flushIndexesBlocking(indexes);
-        logger.info("Index build of {} complete",
-                    indexes.stream().map(i -> i.getIndexMetadata().name).collect(Collectors.joining(",")));
+                    @Override
+                    public void onSuccess(Object ignored)
+                    {
+                        logger.info("Index flush of {} completed", indexNames);
+                    }
+                });
+            }
+            catch (Exception e)
+            {
+                if (accumulatedFail != null)
+                {
+                    accumulatedFail.addSuppressed(e);
+                }
+                else
+                {
+                    throw e;
+                }
+            }
+        }
+    }
+
+    private String getIndexNames(Set<Index> indexes)
+    {
+        List<String> indexNames = indexes.stream()
+                                         .map(i -> i.getIndexMetadata().name)
+                                         .collect(Collectors.toList());
+        return StringUtils.join(indexNames, ',');
     }
 
     /**
-     * Marks the specified index as build.
-     * <p>This method is public as it need to be accessible from the {@link Index} implementations</p>
-     * @param indexName the index name
+     * Marks the specified indexes as (re)building if:
+     * 1) There's no in progress rebuild of any of the given indexes.
+     * 2) There's an in progress rebuild but the caller is not a full rebuild.
+     * <p>
+     * Otherwise, this method invocation fails, as it is not possible to run full rebuilds while other concurrent rebuilds
+     * are in progress. Please note this is checked atomically against all given indexes; that is, no index will be marked
+     * if even a single one fails.
+     * <p>
+     * Marking an index as "building" practically means:
+     * 1) The index is removed from the "failed" set if this is a full rebuild.
+     * 2) The index is removed from the system keyspace built indexes.
+     * <p>
+     * Thread safety is guaranteed by having all methods managing index builds synchronized: being synchronized on
+     * the SecondaryIndexManager instance, it means all invocations for all different indexes will go through the same
+     * lock, but this is fine as the work done while holding such lock is trivial.
+     * <p>
+     * {@link #markIndexBuilt(Index, boolean)} or {@link #markIndexFailed(Index)} should be always called after the
+     * rebuilding has finished, so that the index build state can be correctly managed and the index rebuilt.
+     *
+     * @param indexes       the index to be marked as building
+     * @param isFullRebuild {@code true} if this method is invoked as a full index rebuild, {@code false} otherwise
      */
-    public void markIndexBuilt(String indexName)
+    private synchronized void markIndexesBuilding(Set<Index> indexes, boolean isFullRebuild)
     {
-        builtIndexes.add(indexName);
-        if (DatabaseDescriptor.isDaemonInitialized())
-            SystemKeyspace.setIndexBuilt(baseCfs.keyspace.getName(), indexName);
+        String keyspaceName = baseCfs.keyspace.getName();
+
+        // First step is to validate against concurrent rebuilds; it would be more optimized to do everything on a single
+        // step, but we're not really expecting a very high number of indexes, and this isn't on any hot path, so
+        // we're favouring readability over performance
+        indexes.forEach(index ->
+                        {
+                            String indexName = index.getIndexMetadata().name;
+                            AtomicInteger counter = inProgressBuilds.computeIfAbsent(indexName, ignored -> new AtomicInteger(0));
+
+                            if (counter.get() > 0 && isFullRebuild)
+                                throw new IllegalStateException(String.format("Cannot rebuild index %s as another index build for the same index is currently in progress.", indexName));
+                        });
+
+        // Second step is the actual marking:
+        indexes.forEach(index ->
+                        {
+                            String indexName = index.getIndexMetadata().name;
+                            AtomicInteger counter = inProgressBuilds.computeIfAbsent(indexName, ignored -> new AtomicInteger(0));
+
+                            if (isFullRebuild)
+                                needsFullRebuild.remove(indexName);
+
+                            if (counter.getAndIncrement() == 0 && DatabaseDescriptor.isDaemonInitialized())
+                                SystemKeyspace.setIndexRemoved(keyspaceName, indexName);
+                        });
+    }
+
+    /**
+     * Marks the specified index as built if there are no in progress index builds and the index is not failed.
+     * {@link #markIndexesBuilding(Set, boolean)} should always be invoked before this method.
+     *
+     * @param index the index to be marked as built
+     * @param isFullRebuild {@code true} if this method is invoked as a full index rebuild, {@code false} otherwise
+     */
+    private synchronized void markIndexBuilt(Index index, boolean isFullRebuild)
+    {
+        String indexName = index.getIndexMetadata().name;
+        AtomicInteger counter = inProgressBuilds.get(indexName);
+        if (counter != null)
+        {
+            assert counter.get() > 0;
+            if (counter.decrementAndGet() == 0)
+            {
+                if (isFullRebuild)
+                    queryableIndexes.add(indexName);
+                inProgressBuilds.remove(indexName);
+                if (!needsFullRebuild.contains(indexName) && DatabaseDescriptor.isDaemonInitialized())
+                    SystemKeyspace.setIndexBuilt(baseCfs.keyspace.getName(), indexName);
+            }
+        }
+    }
+
+    /**
+     * Marks the specified index as failed.
+     * {@link #markIndexesBuilding(Set, boolean)} should always be invoked before this method.
+     *
+     * @param index the index to be marked as built
+     */
+    private synchronized void markIndexFailed(Index index)
+    {
+        String indexName = index.getIndexMetadata().name;
+        AtomicInteger counter = inProgressBuilds.get(indexName);
+        if (counter != null)
+        {
+            assert counter.get() > 0;
+
+            counter.decrementAndGet();
+
+            if (DatabaseDescriptor.isDaemonInitialized())
+                SystemKeyspace.setIndexRemoved(baseCfs.keyspace.getName(), indexName);
+
+            needsFullRebuild.add(indexName);
+        }
+    }
+
+    private void logAndMarkIndexesFailed(Set<Index> indexes, Throwable indexBuildFailure)
+    {
+        JVMStabilityInspector.inspectThrowable(indexBuildFailure);
+        if (indexBuildFailure != null)
+            logger.warn("Index build of {} failed. Please run full index rebuild to fix it.", getIndexNames(indexes), indexBuildFailure);
+        else
+            logger.warn("Index build of {} failed. Please run full index rebuild to fix it.", getIndexNames(indexes));
+        indexes.forEach(SecondaryIndexManager.this::markIndexFailed);
     }
 
     /**
      * Marks the specified index as removed.
-     * <p>This method is public as it need to be accessible from the {@link Index} implementations</p>
+     *
      * @param indexName the index name
      */
-    public void markIndexRemoved(String indexName)
+    private synchronized void markIndexRemoved(String indexName)
     {
         SystemKeyspace.setIndexRemoved(baseCfs.keyspace.getName(), indexName);
+        queryableIndexes.remove(indexName);
+        needsFullRebuild.remove(indexName);
+        inProgressBuilds.remove(indexName);
     }
 
     public Index getIndexByName(String indexName)
@@ -419,7 +671,7 @@ public class SecondaryIndexManager implements IndexRegistry
         {
             assert indexDef.options != null;
             String className = indexDef.options.get(IndexTarget.CUSTOM_INDEX_OPTION_NAME);
-            assert ! Strings.isNullOrEmpty(className);
+            assert !Strings.isNullOrEmpty(className);
             try
             {
                 Class<? extends Index> indexClass = FBUtilities.classForName(className, "Index");
@@ -443,16 +695,22 @@ public class SecondaryIndexManager implements IndexRegistry
      */
     public void truncateAllIndexesBlocking(final long truncatedAt)
     {
-        executeAllBlocking(indexes.values().stream(), (index) -> index.getTruncateTask(truncatedAt));
+        executeAllBlocking(indexes.values().stream(), (index) -> index.getTruncateTask(truncatedAt), null);
     }
 
     /**
      * Remove all indexes
      */
-    public void invalidateAllIndexesBlocking()
+    public void dropAllIndexes()
     {
         markAllIndexesRemoved();
-        executeAllBlocking(indexes.values().stream(), Index::getInvalidateTask);
+        invalidateAllIndexesBlocking();
+    }
+
+    @VisibleForTesting
+    public void invalidateAllIndexesBlocking()
+    {
+        executeAllBlocking(indexes.values().stream(), Index::getInvalidateTask, null);
     }
 
     /**
@@ -460,7 +718,7 @@ public class SecondaryIndexManager implements IndexRegistry
      */
     public void flushAllIndexesBlocking()
     {
-       flushIndexesBlocking(ImmutableSet.copyOf(indexes.values()));
+        flushIndexesBlocking(ImmutableSet.copyOf(indexes.values()));
     }
 
     /**
@@ -468,24 +726,7 @@ public class SecondaryIndexManager implements IndexRegistry
      */
     public void flushIndexesBlocking(Set<Index> indexes)
     {
-        if (indexes.isEmpty())
-            return;
-
-        List<Future<?>> wait = new ArrayList<>();
-        List<Index> nonCfsIndexes = new ArrayList<>();
-
-        // for each CFS backed index, submit a flush task which we'll wait on for completion
-        // for the non-CFS backed indexes, we'll flush those while we wait.
-        synchronized (baseCfs.getTracker())
-        {
-            indexes.forEach(index ->
-                index.getBackingTable()
-                     .map(cfs -> wait.add(cfs.forceFlush()))
-                     .orElseGet(() -> nonCfsIndexes.add(index)));
-        }
-
-        executeAllBlocking(nonCfsIndexes.stream(), Index::getBlockingFlushTask);
-        FBUtilities.waitOnFutures(wait);
+        flushIndexesBlocking(indexes, null);
     }
 
     /**
@@ -496,7 +737,7 @@ public class SecondaryIndexManager implements IndexRegistry
         executeAllBlocking(indexes.values()
                                   .stream()
                                   .filter(index -> !index.getBackingTable().isPresent()),
-                           Index::getBlockingFlushTask);
+                           Index::getBlockingFlushTask, null);
     }
 
     /**
@@ -505,9 +746,32 @@ public class SecondaryIndexManager implements IndexRegistry
     public void executePreJoinTasksBlocking(boolean hadBootstrap)
     {
         logger.info("Executing pre-join{} tasks for: {}", hadBootstrap ? " post-bootstrap" : "", this.baseCfs);
-        executeAllBlocking(indexes.values().stream(), (index) -> {
+        executeAllBlocking(indexes.values().stream(), (index) ->
+        {
             return index.getPreJoinTask(hadBootstrap);
-        });
+        }, null);
+    }
+
+    private void flushIndexesBlocking(Set<Index> indexes, FutureCallback<Object> callback)
+    {
+        if (indexes.isEmpty())
+            return;
+
+        List<Future<?>> wait = new ArrayList<>();
+        List<Index> nonCfsIndexes = new ArrayList<>();
+
+        // for each CFS backed index, submit a flush task which we'll wait on for completion
+        // for the non-CFS backed indexes, we'll flush those while we wait.
+        synchronized (baseCfs.getTracker())
+        {
+            indexes.forEach(index ->
+                            index.getBackingTable()
+                                 .map(cfs -> wait.add(cfs.forceFlush()))
+                                 .orElseGet(() -> nonCfsIndexes.add(index)));
+        }
+
+        executeAllBlocking(nonCfsIndexes.stream(), Index::getBlockingFlushTask, callback);
+        FBUtilities.waitOnFutures(wait);
     }
 
     /**
@@ -517,8 +781,8 @@ public class SecondaryIndexManager implements IndexRegistry
     {
         Set<String> allIndexNames = new HashSet<>();
         indexes.values().stream()
-                .map(i -> i.getIndexMetadata().name)
-                .forEach(allIndexNames::add);
+               .map(i -> i.getIndexMetadata().name)
+               .forEach(allIndexNames::add);
         return SystemKeyspace.getBuiltIndexes(baseCfs.keyspace.getName(), allIndexNames);
     }
 
@@ -566,7 +830,8 @@ public class SecondaryIndexManager implements IndexRegistry
                     if (!page.hasNext())
                         break;
 
-                    try (UnfilteredRowIterator partition = page.next()) {
+                    try (UnfilteredRowIterator partition = page.next())
+                    {
                         Set<Index.Indexer> indexers = indexes.stream()
                                                              .map(index -> index.indexerFor(key,
                                                                                             partition.columns(),
@@ -665,7 +930,7 @@ public class SecondaryIndexManager implements IndexRegistry
     /**
      * Delete all data from all indexes for this partition.
      * For when cleanup rips a partition out entirely.
-     *
+     * <p>
      * TODO : improve cleanup transaction to batch updates and perform them async
      */
     public void deletePartition(UnfilteredRowIterator partition, int nowInSec)
@@ -690,28 +955,28 @@ public class SecondaryIndexManager implements IndexRegistry
                                                      partition.columns(),
                                                      nowInSec);
             indexTransaction.start();
-            indexTransaction.onRowDelete((Row)unfiltered);
+            indexTransaction.onRowDelete((Row) unfiltered);
             indexTransaction.commit();
         }
     }
 
     /**
      * Called at query time to choose which (if any) of the registered index implementations to use for a given query.
-     *
+     * <p>
      * This is a two step processes, firstly compiling the set of searchable indexes then choosing the one which reduces
      * the search space the most.
-     *
+     * <p>
      * In the first phase, if the command's RowFilter contains any custom index expressions, the indexes that they
      * specify are automatically included. Following that, the registered indexes are filtered to include only those
      * which support the standard expressions in the RowFilter.
-     *
+     * <p>
      * The filtered set then sorted by selectivity, as reported by the Index implementations' getEstimatedResultRows
      * method.
-     *
+     * <p>
      * Implementation specific validation of the target expression, either custom or standard, by the selected
      * index should be performed in the searcherFor method to ensure that we pick the right index regardless of
      * the validity of the expression.
-     *
+     * <p>
      * This method is only called once during the lifecycle of a ReadCommand and the result is
      * cached for future use when obtaining a Searcher, getting the index's underlying CFS for
      * ReadOrderGroup, or an estimate of the result size from an average index query.
@@ -732,7 +997,7 @@ public class SecondaryIndexManager implements IndexRegistry
             {
                 // Only a single custom expression is allowed per query and, if present,
                 // we want to always favour the index specified in such an expression
-                RowFilter.CustomExpression customExpression = (RowFilter.CustomExpression)expression;
+                RowFilter.CustomExpression customExpression = (RowFilter.CustomExpression) expression;
                 logger.trace("Command contains a custom index expression, using target index {}", customExpression.getTargetIndex().name);
                 Tracing.trace("Command contains a custom index expression, using target index {}", customExpression.getTargetIndex().name);
                 return indexes.get(customExpression.getTargetIndex().name);
@@ -781,6 +1046,7 @@ public class SecondaryIndexManager implements IndexRegistry
      * will process it. The partition key as well as the clustering and
      * cell values for each row in the update may be checked by index
      * implementations
+     *
      * @param update PartitionUpdate containing the values to be validated by registered Index implementations
      * @throws InvalidRequestException
      */
@@ -808,9 +1074,7 @@ public class SecondaryIndexManager implements IndexRegistry
     private Index unregisterIndex(String name)
     {
         Index removed = indexes.remove(name);
-        builtIndexes.remove(name);
-        logger.trace(removed == null ? "Index {} was not registered" : "Removed index {} from registry",
-                     name);
+        logger.trace(removed == null ? "Index {} was not registered" : "Removed index {} from registry", name);
         return removed;
     }
 
@@ -882,7 +1146,7 @@ public class SecondaryIndexManager implements IndexRegistry
     {
         private final Index.Indexer[] indexers;
 
-        private WriteTimeTransaction(Index.Indexer...indexers)
+        private WriteTimeTransaction(Index.Indexer... indexers)
         {
             // don't allow null indexers, if we don't need any use a NullUpdater object
             for (Index.Indexer indexer : indexers) assert indexer != null;
@@ -945,7 +1209,6 @@ public class SecondaryIndexManager implements IndexRegistry
 
                     if (merged == null || (original != null && shouldCleanupOldValue(original, merged)))
                         toRemove.addCell(original);
-
                 }
             };
             Rows.diff(diffListener, updated, existing);
@@ -1011,7 +1274,7 @@ public class SecondaryIndexManager implements IndexRegistry
                 rows = new Row[versions];
         }
 
-        public void onRowMerge(Row merged, Row...versions)
+        public void onRowMerge(Row merged, Row... versions)
         {
             // Diff listener constructs rows representing deltas between the merged and original versions
             // These delta rows are then passed to registered indexes for removal processing
@@ -1051,7 +1314,7 @@ public class SecondaryIndexManager implements IndexRegistry
 
             Rows.diff(diffListener, merged, versions);
 
-            for(int i = 0; i < builders.length; i++)
+            for (int i = 0; i < builders.length; i++)
                 if (builders[i] != null)
                     rows[i] = builders[i].build();
         }
@@ -1147,13 +1410,17 @@ public class SecondaryIndexManager implements IndexRegistry
         }
     }
 
-    private static void executeBlocking(Callable<?> task)
+    private void executeBlocking(Callable<?> task, FutureCallback<Object> callback)
     {
         if (null != task)
-            FBUtilities.waitOnFuture(blockingExecutor.submit(task));
+        {
+            ListenableFuture<?> f = blockingExecutor.submit(task);
+            if (callback != null) Futures.addCallback(f, callback);
+            FBUtilities.waitOnFuture(f);
+        }
     }
 
-    private static void executeAllBlocking(Stream<Index> indexers, Function<Index, Callable<?>> function)
+    private void executeAllBlocking(Stream<Index> indexers, Function<Index, Callable<?>> function, FutureCallback<Object> callback)
     {
         if (function == null)
         {
@@ -1162,11 +1429,33 @@ public class SecondaryIndexManager implements IndexRegistry
         }
 
         List<Future<?>> waitFor = new ArrayList<>();
-        indexers.forEach(indexer -> {
-            Callable<?> task = function.apply(indexer);
-            if (null != task)
-                waitFor.add(blockingExecutor.submit(task));
-        });
+        indexers.forEach(indexer ->
+                         {
+                             Callable<?> task = function.apply(indexer);
+                             if (null != task)
+                             {
+                                 ListenableFuture<?> f = blockingExecutor.submit(task);
+                                 if (callback != null) Futures.addCallback(f, callback);
+                                 waitFor.add(f);
+                             }
+                         });
         FBUtilities.waitOnFutures(waitFor);
     }
+
+    public void handleNotification(INotification notification, Object sender)
+    {
+        if (!indexes.isEmpty() && notification instanceof SSTableAddedNotification)
+        {
+            SSTableAddedNotification notice = (SSTableAddedNotification) notification;
+
+            // SSTables asociated to a memtable come from a flush, so their contents have already been indexed
+            if (!notice.memtable().isPresent())
+                buildIndexesBlocking(Lists.newArrayList(notice.added),
+                                     indexes.values()
+                                            .stream()
+                                            .filter(Index::shouldBuildBlocking)
+                                            .collect(Collectors.toSet()),
+                                     false);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/679c3171/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
index c6f7d98..c7f3536 100644
--- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
@@ -699,7 +699,6 @@ public abstract class CassandraIndex implements Index
                             baseCfs.metadata.keyspace,
                             baseCfs.metadata.name,
                             metadata.name);
-                baseCfs.indexManager.markIndexBuilt(metadata.name);
                 return;
             }
 
@@ -713,7 +712,6 @@ public abstract class CassandraIndex implements Index
             Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
             FBUtilities.waitOnFuture(future);
             indexCfs.forceBlockingFlush();
-            baseCfs.indexManager.markIndexBuilt(metadata.name);
         }
         logger.info("Index build of {} complete", metadata.name);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/679c3171/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java b/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java
index 56d6130..9c95a18 100644
--- a/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java
+++ b/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java
@@ -17,13 +17,46 @@
  */
 package org.apache.cassandra.notifications;
 
+import java.util.Optional;
+
+import javax.annotation.Nullable;
+
+import org.apache.cassandra.db.Memtable;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 
+/**
+ * Notification sent after SSTables are added to their {@link org.apache.cassandra.db.ColumnFamilyStore}.
+ */
 public class SSTableAddedNotification implements INotification
 {
+    /** The added SSTables */
     public final Iterable<SSTableReader> added;
-    public SSTableAddedNotification(Iterable<SSTableReader> added)
+
+    /** The memtable from which the tables come when they have been added due to a flush, {@code null} otherwise. */
+    @Nullable
+    private final Memtable memtable;
+
+    /**
+     * Creates a new {@code SSTableAddedNotification} for the specified SSTables and optional memtable.
+     *
+     * @param added    the added SSTables
+     * @param memtable the memtable from which the tables come when they have been added due to a memtable flush,
+     *                 or {@code null} if they don't come from a flush
+     */
+    public SSTableAddedNotification(Iterable<SSTableReader> added, @Nullable Memtable memtable)
     {
         this.added = added;
+        this.memtable = memtable;
+    }
+
+    /**
+     * Returns the memtable from which the tables come when they have been added due to a memtable flush. If not, an
+     * empty Optional should be returned.
+     *
+     * @return the origin memtable in case of a flush, {@link Optional#empty()} otherwise
+     */
+    public Optional<Memtable> memtable()
+    {
+        return Optional.ofNullable(memtable);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/679c3171/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 34e7cc8..925dc85 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -33,7 +33,6 @@ import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -231,9 +230,8 @@ public class StreamReceiveTask extends StreamTask
                     {
                         task.finishTransaction();
 
-                        // add sstables and build secondary indexes
+                        // add sstables (this will build secondary indexes too, see CASSANDRA-10130)
                         cfs.addSSTables(readers);
-                        cfs.indexManager.buildAllIndexesBlocking(readers);
 
                         //invalidate row and counter cache
                         if (cfs.isRowCacheEnabled() || cfs.metadata().isCounter())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/679c3171/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
index 0b27f73..af629e5 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
@@ -87,7 +87,7 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
         csm.getForPendingRepair(repairID).forEach(Assert::assertNull);
 
         // add the sstable
-        csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable)), cfs.getTracker());
+        csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker());
         Assert.assertFalse(repairedContains(sstable));
         Assert.assertFalse(unrepairedContains(sstable));
         csm.getForPendingRepair(repairID).forEach(Assert::assertNotNull);
@@ -181,7 +181,7 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
 
         SSTableReader sstable = makeSSTable(true);
         mutateRepaired(sstable, repairID);
-        csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable)), cfs.getTracker());
+        csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker());
         Assert.assertTrue(pendingContains(repairID, sstable));
 
         // delete sstable
@@ -210,7 +210,7 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
 
         SSTableReader sstable = makeSSTable(true);
         mutateRepaired(sstable, repairID);
-        csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable)), cfs.getTracker());
+        csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker());
 
         strategies = csm.getStrategies();
         Assert.assertEquals(3, strategies.size());
@@ -228,7 +228,7 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
         LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
         SSTableReader sstable = makeSSTable(true);
         mutateRepaired(sstable, repairID);
-        csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable)), cfs.getTracker());
+        csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker());
         LocalSessionAccessor.finalizeUnsafe(repairID);
         csm.getForPendingRepair(repairID).forEach(Assert::assertNotNull);
         Assert.assertNotNull(pendingContains(repairID, sstable));
@@ -265,7 +265,7 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
         LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
         SSTableReader sstable = makeSSTable(true);
         mutateRepaired(sstable, repairID);
-        csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable)), cfs.getTracker());
+        csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker());
         LocalSessionAccessor.failUnsafe(repairID);
 
         csm.getForPendingRepair(repairID).forEach(Assert::assertNotNull);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/679c3171/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 360a2cd..624f119 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -378,7 +378,7 @@ public class LeveledCompactionStrategyTest
         assertFalse(unrepaired.manifest.generations[2].contains(sstable1));
 
         unrepaired.removeSSTable(sstable2);
-        manager.handleNotification(new SSTableAddedNotification(singleton(sstable2)), this);
+        manager.handleNotification(new SSTableAddedNotification(singleton(sstable2), null), this);
         assertTrue(unrepaired.manifest.getLevel(1).contains(sstable2));
         assertFalse(repaired.manifest.getLevel(1).contains(sstable2));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/679c3171/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
index a5a1baf..93ee198 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.db.lifecycle;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -305,6 +306,7 @@ public class TrackerTest
         Assert.assertEquals(2, listener.received.size());
         Assert.assertEquals(prev2, ((MemtableDiscardedNotification) listener.received.get(0)).memtable);
         Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(1)).added);
+        Assert.assertEquals(Optional.of(prev2), ((SSTableAddedNotification) listener.received.get(1)).memtable());
         listener.received.clear();
         Assert.assertTrue(reader.isKeyCacheSetup());
         Assert.assertEquals(10, cfs.metric.liveDiskSpaceUsed.getCount());
@@ -326,6 +328,7 @@ public class TrackerTest
         Assert.assertEquals(prev1, ((MemtableSwitchedNotification) listener.received.get(0)).memtable);
         Assert.assertEquals(prev1, ((MemtableDiscardedNotification) listener.received.get(1)).memtable);
         Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(2)).added);
+        Assert.assertEquals(Optional.of(prev1), ((SSTableAddedNotification) listener.received.get(2)).memtable());
         Assert.assertTrue(listener.received.get(3) instanceof SSTableDeletingNotification);
         Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(4)).removed.size());
         DatabaseDescriptor.setIncrementalBackupsEnabled(backups);
@@ -360,8 +363,9 @@ public class TrackerTest
         MockListener failListener = new MockListener(true);
         tracker.subscribe(failListener);
         tracker.subscribe(listener);
-        Assert.assertNotNull(tracker.notifyAdded(singleton(r1), null));
+        Assert.assertNotNull(tracker.notifyAdded(singleton(r1), null, null));
         Assert.assertEquals(singleton(r1), ((SSTableAddedNotification) listener.received.get(0)).added);
+        Assert.assertFalse(((SSTableAddedNotification) listener.received.get(0)).memtable().isPresent());
         listener.received.clear();
         Assert.assertNotNull(tracker.notifySSTablesChanged(singleton(r1), singleton(r2), OperationType.COMPACTION, null));
         Assert.assertEquals(singleton(r1), ((SSTableListChangedNotification) listener.received.get(0)).removed);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/679c3171/test/unit/org/apache/cassandra/index/CustomIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/CustomIndexTest.java b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
index ed999fa..d14b50d 100644
--- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
 import org.junit.Test;
 
 import com.datastax.driver.core.exceptions.QueryValidationException;
@@ -111,7 +112,7 @@ public class CustomIndexTest extends CQLTester
         excluded.reset();
         assertTrue(excluded.rowsInserted.isEmpty());
 
-        indexManager.buildAllIndexesBlocking(getCurrentColumnFamilyStore().getLiveSSTables());
+        indexManager.rebuildIndexesBlocking(Sets.newHashSet(toInclude, toExclude));
 
         assertEquals(3, included.rowsInserted.size());
         assertTrue(excluded.rowsInserted.isEmpty());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org