You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2020/05/29 11:39:27 UTC

[cassandra] branch trunk updated: Improve handling of 2i initialization failures

This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 595dc61  Improve handling of 2i initialization failures
595dc61 is described below

commit 595dc61290a9fda15b6765711141039ec9609bb3
Author: Bereng <be...@gmail.com>
AuthorDate: Fri May 29 12:33:50 2020 +0100

    Improve handling of 2i initialization failures
    
    patch by Berenguer Blasi; reviewed by Andres de la Peña for CASSANDRA-13606
---
 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/index/Index.java     |  39 +-
 .../cassandra/index/SecondaryIndexManager.java     | 155 +++++---
 .../cassandra/index/internal/CassandraIndex.java   |   3 +-
 .../org/apache/cassandra/index/sasi/SASIIndex.java |   1 -
 .../validation/entities/SecondaryIndexTest.java    | 139 +++++++
 .../apache/cassandra/db/SecondaryIndexTest.java    |   2 +-
 .../cassandra/index/SecondaryIndexManagerTest.java | 400 ++++++++++++++-------
 .../unit/org/apache/cassandra/index/StubIndex.java |   1 -
 9 files changed, 553 insertions(+), 188 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 9b62b06..7fbd0b4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha5
+ * Improve handling of 2i initialization failures (CASSANDRA-13606)
  * Add completion_ratio column to sstable_tasks virtual table (CASANDRA-15759)
  * Add support for adding custom Verbs (CASSANDRA-15725)
  * Speed up entire-file-streaming file containment check and allow entire-file-streaming for all compaction strategies (CASSANDRA-15657,CASSANDRA-15783)
diff --git a/src/java/org/apache/cassandra/index/Index.java b/src/java/org/apache/cassandra/index/Index.java
index b9f38ce..6d716be 100644
--- a/src/java/org/apache/cassandra/index/Index.java
+++ b/src/java/org/apache/cassandra/index/Index.java
@@ -44,7 +44,6 @@ import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.IndexMetadata;
-import org.apache.cassandra.utils.concurrent.OpOrder;
 
 /**
  * Consisting of a top level Index interface and two sub-interfaces which handle read and write operations,
@@ -136,6 +135,23 @@ import org.apache.cassandra.utils.concurrent.OpOrder;
  */
 public interface Index
 {
+    /**
+     * Supported loads. An index could be badly initialized and support only reads i.e.
+     */
+    public enum LoadType
+    {
+        READ, WRITE, ALL, NOOP;
+
+        public boolean supportsWrites()
+        {
+            return this == ALL || this == WRITE;
+        }
+
+        public boolean supportsReads()
+        {
+            return this == ALL || this == READ;
+        }
+    }
 
     /*
      * Helpers for building indexes from SSTable data
@@ -180,13 +196,32 @@ public interface Index
      * single pass through the data. The singleton instance returned from the default method implementation builds
      * indexes using a {@code ReducingKeyIterator} to provide a collated view of the SSTable data.
      *
-     * @return an instance of the index build taski helper. Index implementations which return <b>the same instance</b>
+     * @return an instance of the index build task helper. Index implementations which return <b>the same instance</b>
      * will be built using a single task.
      */
     default IndexBuildingSupport getBuildTaskSupport()
     {
         return INDEX_BUILDER_SUPPORT;
     }
+    
+    /**
+     * Same as {@code getBuildTaskSupport} but can be overloaded with a specific 'recover' logic different than the index building one
+     */
+    default IndexBuildingSupport getRecoveryTaskSupport()
+    {
+        return getBuildTaskSupport();
+    }
+    
+    /**
+     * Returns the type of operations supported by the index in case its building has failed and it's needing recovery.
+     *
+     * @param isInitialBuild {@code true} if the failure is for the initial build task on index creation, {@code false}
+     * if the failure is for a full rebuild or recovery.
+     */
+    default LoadType getSupportedLoadTypeOnFailure(boolean isInitialBuild)
+    {
+        return isInitialBuild ? LoadType.WRITE : LoadType.ALL;
+    }
 
     /**
      * Return a task to perform any initialization work when a new index instance is created.
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index 7f28c44..3822549 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -59,6 +59,7 @@ import org.apache.cassandra.db.lifecycle.View;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.index.Index.IndexBuildingSupport;
 import org.apache.cassandra.index.internal.CassandraIndex;
 import org.apache.cassandra.index.transactions.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -144,6 +145,11 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
      * The indexes that are available for querying.
      */
     private final Set<String> queryableIndexes = Sets.newConcurrentHashSet();
+    
+    /**
+     * The indexes that are available for writing.
+     */
+    private final Map<String, Index> writableIndexes = Maps.newConcurrentMap();
 
     /**
      * The count of pending index builds for each index.
@@ -207,6 +213,8 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
     {
         final Index index = createInstance(indexDef);
         index.register(this);
+        if (writableIndexes.put(index.getIndexMetadata().name, index) == null)
+            logger.info("Index [{}] registered and writable.", index.getIndexMetadata().name);
 
         markIndexesBuilding(ImmutableSet.of(index), true, isNewCF);
 
@@ -220,7 +228,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
             }
             catch (Throwable t)
             {
-                logAndMarkIndexesFailed(Collections.singleton(index), t);
+                logAndMarkIndexesFailed(Collections.singleton(index), t, true);
                 throw t;
             }
         }
@@ -239,7 +247,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
             @Override
             public void onFailure(Throwable t)
             {
-                logAndMarkIndexesFailed(Collections.singleton(index), t);
+                logAndMarkIndexesFailed(Collections.singleton(index), t, true);
                 initialization.setException(t);
             }
 
@@ -273,12 +281,23 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
      * Checks if the specified index is queryable.
      *
      * @param index the index
-     * @return <code>true</code> if the specified index is registered, <code>false</code> otherwise
+     * @return <code>true</code> if the specified index is queryable, <code>false</code> otherwise
      */
     public boolean isIndexQueryable(Index index)
     {
         return queryableIndexes.contains(index.getIndexMetadata().name);
     }
+    
+    /**
+     * Checks if the specified index is writable.
+     *
+     * @param index the index
+     * @return <code>true</code> if the specified index is writable, <code>false</code> otherwise
+     */
+    public boolean isIndexWritable(Index index)
+    {
+        return writableIndexes.containsKey(index.getIndexMetadata().name);
+    }
 
     /**
      * Checks if the specified index has any running build task.
@@ -326,8 +345,8 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
     }
 
     /**
-     * Does a blocking full rebuild of the specifed indexes from all the sstables in the base table.
-     * Note also that this method of (re)building indexes:
+     * Does a blocking full rebuild/recovery of the specifed indexes from all the sstables in the base table.
+     * Note also that this method of (re)building/recovering indexes:
      * a) takes a set of index *names* rather than Indexers
      * b) marks existing indexes removed prior to rebuilding
      * c) fails if such marking operation conflicts with any ongoing index builds, as full rebuilds cannot be run
@@ -337,19 +356,40 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
      */
     public void rebuildIndexesBlocking(Set<String> indexNames)
     {
-        try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.selectFunction(SSTableSet.CANONICAL));
-             Refs<SSTableReader> allSSTables = viewFragment.refs)
+        // Get the set of indexes that require blocking build
+        Set<Index> toRebuild = indexes.values()
+                                      .stream()
+                                      .filter(index -> indexNames.contains(index.getIndexMetadata().name))
+                                      .filter(Index::shouldBuildBlocking)
+                                      .collect(Collectors.toSet());
+
+        if (toRebuild.isEmpty())
         {
-            Set<Index> toRebuild = indexes.values().stream()
-                                          .filter(index -> indexNames.contains(index.getIndexMetadata().name))
-                                          .filter(Index::shouldBuildBlocking)
-                                          .collect(Collectors.toSet());
-            if (toRebuild.isEmpty())
+            logger.info("No defined indexes with the supplied names: {}", Joiner.on(',').join(indexNames));
+            return;
+        }
+
+        // Optimistically mark the indexes as writable, so we don't miss incoming writes
+        boolean needsFlush = false;
+        for (Index index : toRebuild)
+        {
+            String name = index.getIndexMetadata().name;
+            if (writableIndexes.put(name, index) == null)
             {
-                logger.info("No defined indexes with the supplied names: {}", Joiner.on(',').join(indexNames));
-                return;
+                logger.info("Index [{}] became writable starting recovery.", name);
+                needsFlush = true;
             }
+        }
+
+        // Once we are tracking new writes, flush any memtable contents to not miss them from the sstable-based rebuild
+        if (needsFlush)
+            baseCfs.forceBlockingFlush();
 
+        // Now that we are tracking new writes and we haven't left untracked contents on the memtables, we are ready to
+        // index the sstables
+        try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.selectFunction(SSTableSet.CANONICAL));
+             Refs<SSTableReader> allSSTables = viewFragment.refs)
+        {
             buildIndexesBlocking(allSSTables, toRebuild, true);
         }
     }
@@ -426,8 +466,11 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
     }
 
     /**
-     * Performs a blocking (re)indexing of the specified SSTables for the specified indexes.
+     * Performs a blocking (re)indexing/recovery of the specified SSTables for the specified indexes.
      *
+     * If the index doesn't support ALL {@link Index.LoadType} it performs a recovery {@link Index#getRecoveryTaskSupport()}
+     * instead of a build {@link Index#getBuildTaskSupport()}
+     * 
      * @param sstables      the SSTables to be (re)indexed
      * @param indexes       the indexes to be (re)built for the specifed SSTables
      * @param isFullRebuild True if this method is invoked as a full index rebuild, false otherwise
@@ -443,15 +486,16 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
         markIndexesBuilding(indexes, isFullRebuild, false);
 
         // Build indexes in a try/catch, so that any index not marked as either built or failed will be marked as failed:
-        final Set<Index> builtIndexes = new HashSet<>();
-        final Set<Index> unbuiltIndexes = new HashSet<>();
+        final Set<Index> builtIndexes = Sets.newConcurrentHashSet();
+        final Set<Index> unbuiltIndexes = Sets.newConcurrentHashSet();
 
         // Any exception thrown during index building that could be suppressed by the finally block
         Exception accumulatedFail = null;
 
         try
         {
-            logger.info("Submitting index build of {} for data in {}",
+            logger.info("Submitting index {} of {} for data in {}",
+                        isFullRebuild ? "recovery" : "build",
                         indexes.stream().map(i -> i.getIndexMetadata().name).collect(Collectors.joining(",")),
                         sstables.stream().map(SSTableReader::toString).collect(Collectors.joining(",")));
 
@@ -459,7 +503,10 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
             Map<Index.IndexBuildingSupport, Set<Index>> byType = new HashMap<>();
             for (Index index : indexes)
             {
-                Set<Index> stored = byType.computeIfAbsent(index.getBuildTaskSupport(), i -> new HashSet<>());
+                IndexBuildingSupport buildOrRecoveryTask = isFullRebuild
+                                                           ? index.getBuildTaskSupport()
+                                                           : index.getRecoveryTaskSupport();
+                Set<Index> stored = byType.computeIfAbsent(buildOrRecoveryTask, i -> new HashSet<>());
                 stored.add(index);
             }
 
@@ -474,7 +521,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
                                    @Override
                                    public void onFailure(Throwable t)
                                    {
-                                       logAndMarkIndexesFailed(groupedIndexes, t);
+                                       logAndMarkIndexesFailed(groupedIndexes, t, false);
                                        unbuiltIndexes.addAll(groupedIndexes);
                                        build.setException(t);
                                    }
@@ -507,7 +554,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
                 Set<Index> failedIndexes = Sets.difference(indexes, Sets.union(builtIndexes, unbuiltIndexes));
                 if (!failedIndexes.isEmpty())
                 {
-                    logAndMarkIndexesFailed(failedIndexes, accumulatedFail);
+                    logAndMarkIndexesFailed(failedIndexes, accumulatedFail, false);
                 }
 
                 // Flush all built indexes with an aynchronous callback to log the success or failure of the flush
@@ -571,8 +618,8 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
      * the SecondaryIndexManager instance, it means all invocations for all different indexes will go through the same
      * lock, but this is fine as the work done while holding such lock is trivial.
      * <p>
-     * {@link #markIndexBuilt(Index, boolean)} or {@link #markIndexFailed(Index)} should be always called after the
-     * rebuilding has finished, so that the index build state can be correctly managed and the index rebuilt.
+     * {@link #markIndexBuilt(Index, boolean)} or {@link #markIndexFailed(Index, boolean)} should be always called after
+     * the rebuilding has finished, so that the index build state can be correctly managed and the index rebuilt.
      *
      * @param indexes the index to be marked as building
      * @param isFullRebuild {@code true} if this method is invoked as a full index rebuild, {@code false} otherwise
@@ -620,7 +667,13 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
     {
         String indexName = index.getIndexMetadata().name;
         if (isFullRebuild)
-            queryableIndexes.add(indexName);
+        {
+            if (queryableIndexes.add(indexName))
+                logger.info("Index [{}] became queryable after successful build.", indexName);
+
+            if (writableIndexes.put(indexName, index) == null)
+                logger.info("Index [{}] became writable after successful build.", indexName);
+        }
         
         AtomicInteger counter = inProgressBuilds.get(indexName);
         if (counter != null)
@@ -640,10 +693,12 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
      * {@link #markIndexesBuilding(Set, boolean, boolean)} should always be invoked before this method.
      *
      * @param index the index to be marked as built
+     * @param isInitialBuild {@code true} if the index failed during its initial build, {@code false} otherwise
      */
-    private synchronized void markIndexFailed(Index index)
+    private synchronized void markIndexFailed(Index index, boolean isInitialBuild)
     {
         String indexName = index.getIndexMetadata().name;
+
         AtomicInteger counter = inProgressBuilds.get(indexName);
         if (counter != null)
         {
@@ -655,17 +710,23 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
                 SystemKeyspace.setIndexRemoved(baseCfs.keyspace.getName(), indexName);
 
             needsFullRebuild.add(indexName);
+
+            if (!index.getSupportedLoadTypeOnFailure(isInitialBuild).supportsWrites() && writableIndexes.remove(indexName) != null)
+                logger.info("Index [{}] became not-writable because of failed build.", indexName);
+
+            if (!index.getSupportedLoadTypeOnFailure(isInitialBuild).supportsReads() && queryableIndexes.remove(indexName))
+                logger.info("Index [{}] became not-queryable because of failed build.", indexName);
         }
     }
 
-    private void logAndMarkIndexesFailed(Set<Index> indexes, Throwable indexBuildFailure)
+    private void logAndMarkIndexesFailed(Set<Index> indexes, Throwable indexBuildFailure, boolean isInitialBuild)
     {
         JVMStabilityInspector.inspectThrowable(indexBuildFailure);
         if (indexBuildFailure != null)
             logger.warn("Index build of {} failed. Please run full index rebuild to fix it.", getIndexNames(indexes), indexBuildFailure);
         else
             logger.warn("Index build of {} failed. Please run full index rebuild to fix it.", getIndexNames(indexes));
-        indexes.forEach(SecondaryIndexManager.this::markIndexFailed);
+        indexes.forEach(i -> this.markIndexFailed(i, isInitialBuild));
     }
 
     /**
@@ -677,6 +738,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
     {
         SystemKeyspace.setIndexRemoved(baseCfs.keyspace.getName(), indexName);
         queryableIndexes.remove(indexName);
+        writableIndexes.remove(indexName);
         needsFullRebuild.remove(indexName);
         inProgressBuilds.remove(indexName);
     }
@@ -1081,9 +1143,10 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
             index.validate(update);
     }
 
-    /**
+    /*
      * IndexRegistry methods
      */
+
     public void registerIndex(Index index)
     {
         String name = index.getIndexMetadata().name;
@@ -1113,7 +1176,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
         return ImmutableSet.copyOf(indexes.values());
     }
 
-    /**
+    /*
      * Handling of index updates.
      * Implementations of the various IndexTransaction interfaces, for keeping indexes in sync with base data
      * during updates, compaction and cleanup. Plus factory methods for obtaining transaction instances.
@@ -1126,17 +1189,19 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
     {
         if (!hasIndexes())
             return UpdateTransaction.NO_OP;
-
-        Index.Indexer[] indexers = indexes.values().stream()
-                                          .map(i -> i.indexerFor(update.partitionKey(),
-                                                                 update.columns(),
-                                                                 nowInSec,
-                                                                 ctx,
-                                                                 IndexTransaction.Type.UPDATE))
-                                          .filter(Objects::nonNull)
-                                          .toArray(Index.Indexer[]::new);
-
-        return indexers.length == 0 ? UpdateTransaction.NO_OP : new WriteTimeTransaction(indexers);
+        
+        ArrayList<Index.Indexer> idxrs = new ArrayList<>();
+        for (Index i : writableIndexes.values())
+        {
+            Index.Indexer idxr = i.indexerFor(update.partitionKey(), update.columns(), nowInSec, ctx, IndexTransaction.Type.UPDATE);
+            if (idxr != null)
+                idxrs.add(idxr);
+        }
+        
+        if (idxrs.size() == 0)
+            return UpdateTransaction.NO_OP;
+        else
+            return new WriteTimeTransaction(idxrs.toArray(new Index.Indexer[idxrs.size()]));
     }
 
     /**
@@ -1148,7 +1213,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
                                                           int nowInSec)
     {
         // the check for whether there are any registered indexes is already done in CompactionIterator
-        return new IndexGCTransaction(key, regularAndStaticColumns, keyspace, versions, nowInSec, listIndexes());
+        return new IndexGCTransaction(key, regularAndStaticColumns, keyspace, versions, nowInSec, writableIndexes.values());
     }
 
     /**
@@ -1161,7 +1226,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
         if (!hasIndexes())
             return CleanupTransaction.NO_OP;
 
-        return new CleanupGCTransaction(key, regularAndStaticColumns, keyspace, nowInSec, listIndexes());
+        return new CleanupGCTransaction(key, regularAndStaticColumns, keyspace, nowInSec, writableIndexes.values());
     }
 
     /**
@@ -1283,7 +1348,8 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
 
         private IndexGCTransaction(DecoratedKey key,
                                    RegularAndStaticColumns columns,
-                                   Keyspace keyspace, int versions,
+                                   Keyspace keyspace,
+                                   int versions,
                                    int nowInSec,
                                    Collection<Index> indexes)
         {
@@ -1388,7 +1454,8 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
 
         private CleanupGCTransaction(DecoratedKey key,
                                      RegularAndStaticColumns columns,
-                                     Keyspace keyspace, int nowInSec,
+                                     Keyspace keyspace,
+                                     int nowInSec,
                                      Collection<Index> indexes)
         {
             this.key = key;
diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
index 58056b9..f74a656 100644
--- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
@@ -61,7 +61,6 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.concurrent.Refs;
 
 import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
@@ -145,7 +144,7 @@ public abstract class CassandraIndex implements Index
                                                   Clustering clustering,
                                                   CellPath path,
                                                   ByteBuffer cellValue);
-
+    
     public ColumnMetadata getIndexedColumn()
     {
         return indexedColumn;
diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
index 07327ea..592499e 100644
--- a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.index.sasi;
 
 import java.util.*;
 import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
 import java.util.function.BiFunction;
 
 import com.googlecode.concurrenttrees.common.Iterables;
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
index 8f3b97d..9563780 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
@@ -22,6 +22,8 @@ import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 
+import com.google.common.collect.ImmutableSet;
+
 import org.apache.commons.lang3.StringUtils;
 import org.junit.Test;
 
@@ -1053,6 +1055,84 @@ public class SecondaryIndexTest extends CQLTester
         }
     }
 
+    @Test // A Bad init could leave an index only accepting reads
+    public void testReadOnlyIndex() throws Throwable
+    {
+        // On successful initialization both reads and writes go through
+        String tableName = createTable("CREATE TABLE %s (pk int, ck int, value int, PRIMARY KEY (pk, ck))");
+        String indexName = createIndex("CREATE CUSTOM INDEX ON %s (value) USING '" + ReadOnlyOnFailureIndex.class.getName() + "'");
+        assertTrue(waitForIndex(keyspace(), tableName, indexName));
+        execute("SELECT value FROM %s WHERE value = 1");
+        execute("INSERT INTO %s (pk, ck, value) VALUES (?, ?, ?)", 1, 1, 1);
+        ReadOnlyOnFailureIndex index = (ReadOnlyOnFailureIndex) getCurrentColumnFamilyStore().indexManager.getIndexByName(indexName);
+        assertEquals(1, index.rowsInserted.size());
+
+        // Upon rebuild, both reads and writes still go through
+        getCurrentColumnFamilyStore().indexManager.rebuildIndexesBlocking(ImmutableSet.of(indexName));
+        assertEquals(1, index.rowsInserted.size());
+        execute("SELECT value FROM %s WHERE value = 1");
+        execute("INSERT INTO %s (pk, ck, value) VALUES (?, ?, ?)", 2, 1, 1);
+        assertEquals(2, index.rowsInserted.size());
+        dropIndex(format("DROP INDEX %s.%s", KEYSPACE, indexName));
+
+        // On bad initial build writes are not forwarded to the index
+        ReadOnlyOnFailureIndex.failInit = true;
+        indexName = createIndex("CREATE CUSTOM INDEX ON %s (value) USING '" + ReadOnlyOnFailureIndex.class.getName() + "'");
+        index = (ReadOnlyOnFailureIndex) getCurrentColumnFamilyStore().indexManager.getIndexByName(indexName);
+        assertTrue(waitForIndexBuilds(keyspace(), indexName));
+        assertInvalidThrow(IndexNotAvailableException.class, "SELECT value FROM %s WHERE value = 1");
+        execute("INSERT INTO %s (pk, ck, value) VALUES (?, ?, ?)", 1, 1, 1);
+        assertEquals(0, index.rowsInserted.size());
+
+        // Upon recovery, we can index data again
+        index.reset();
+        getCurrentColumnFamilyStore().indexManager.rebuildIndexesBlocking(ImmutableSet.of(indexName));
+        assertEquals(2, index.rowsInserted.size());
+        execute("SELECT value FROM %s WHERE value = 1");
+        execute("INSERT INTO %s (pk, ck, value) VALUES (?, ?, ?)", 2, 1, 1);
+        assertEquals(3, index.rowsInserted.size());
+        dropIndex(format("DROP INDEX %s.%s", KEYSPACE, indexName));
+    }
+
+    @Test  // A Bad init could leave an index only accepting writes
+    public void testWriteOnlyIndex() throws Throwable
+    {
+        // On successful initialization both reads and writes go through
+        String tableName = createTable("CREATE TABLE %s (pk int, ck int, value int, PRIMARY KEY (pk, ck))");
+        String indexName = createIndex("CREATE CUSTOM INDEX ON %s (value) USING '" + WriteOnlyOnFailureIndex.class.getName() + "'");
+        assertTrue(waitForIndex(keyspace(), tableName, indexName));
+        execute("SELECT value FROM %s WHERE value = 1");
+        execute("INSERT INTO %s (pk, ck, value) VALUES (?, ?, ?)", 1, 1, 1);
+        WriteOnlyOnFailureIndex index = (WriteOnlyOnFailureIndex) getCurrentColumnFamilyStore().indexManager.getIndexByName(indexName);
+        assertEquals(1, index.rowsInserted.size());
+
+        // Upon rebuild, both reads and writes still go through
+        getCurrentColumnFamilyStore().indexManager.rebuildIndexesBlocking(ImmutableSet.of(indexName));
+        assertEquals(1, index.rowsInserted.size());
+        execute("SELECT value FROM %s WHERE value = 1");
+        execute("INSERT INTO %s (pk, ck, value) VALUES (?, ?, ?)", 2, 1, 1);
+        assertEquals(2, index.rowsInserted.size());
+        dropIndex(format("DROP INDEX %s.%s", KEYSPACE, indexName));
+
+        // On bad initial build writes are forwarded to the index
+        WriteOnlyOnFailureIndex.failInit = true;
+        indexName = createIndex("CREATE CUSTOM INDEX ON %s (value) USING '" + WriteOnlyOnFailureIndex.class.getName() + "'");
+        index = (WriteOnlyOnFailureIndex) getCurrentColumnFamilyStore().indexManager.getIndexByName(indexName);
+        assertTrue(waitForIndexBuilds(keyspace(), indexName));
+        execute("INSERT INTO %s (pk, ck, value) VALUES (?, ?, ?)", 1, 1, 1);
+        assertEquals(1, index.rowsInserted.size());
+        assertInvalidThrow(IndexNotAvailableException.class, "SELECT value FROM %s WHERE value = 1");
+
+        // Upon recovery, we can query data again
+        index.reset();
+        getCurrentColumnFamilyStore().indexManager.rebuildIndexesBlocking(ImmutableSet.of(indexName));
+        assertEquals(2, index.rowsInserted.size());
+        execute("SELECT value FROM %s WHERE value = 1");
+        execute("INSERT INTO %s (pk, ck, value) VALUES (?, ?, ?)", 2, 1, 1);
+        assertEquals(3, index.rowsInserted.size());
+        dropIndex(format("DROP INDEX %s.%s", KEYSPACE, indexName));
+    }
+
     @Test
     public void droppingIndexInvalidatesPreparedStatements() throws Throwable
     {
@@ -1375,6 +1455,7 @@ public class SecondaryIndexTest extends CQLTester
 
         execute("INSERT INTO %s (k, v) VALUES (?, ?)", 0, udt1);
         String indexName = createIndex("CREATE INDEX ON %s (v)");
+
         execute("INSERT INTO %s (k, v) VALUES (?, ?)", 1, udt2);
         execute("INSERT INTO %s (k, v) VALUES (?, ?)", 1, udt1);
         assertTrue(waitForIndex(keyspace(), tableName, indexName));
@@ -1560,4 +1641,62 @@ public class SecondaryIndexTest extends CQLTester
             return super.getInvalidateTask();
         }
     }
+
+    /**
+     * {@code StubIndex} that only supports some load. Could be intentional or a result of a bad init.
+     */
+    public static class LoadTypeConstrainedIndex extends StubIndex
+    {
+        static volatile boolean failInit = false;
+        final LoadType supportedLoadOnFailure;
+
+        LoadTypeConstrainedIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef, LoadType supportedLoadOnFailure)
+        {
+            super(baseCfs, indexDef);
+            this.supportedLoadOnFailure = supportedLoadOnFailure;
+        }
+
+        @Override
+        public LoadType getSupportedLoadTypeOnFailure(boolean isInitialBuild)
+        {
+            return supportedLoadOnFailure;
+        }
+
+        @Override
+        public void reset()
+        {
+            super.reset();
+            failInit = false;
+        }
+
+        @Override
+        public Callable<?> getInitializationTask()
+        {
+            if (failInit)
+                return () -> {throw new IllegalStateException("Index is configured to fail.");};
+
+            return null;
+        }
+
+        public boolean shouldBuildBlocking()
+        {
+            return true;
+        }
+    }
+
+    public static class ReadOnlyOnFailureIndex extends LoadTypeConstrainedIndex
+    {
+        public ReadOnlyOnFailureIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
+        {
+            super(baseCfs, indexDef, LoadType.READ);
+        }
+    }
+
+    public static class WriteOnlyOnFailureIndex extends LoadTypeConstrainedIndex
+    {
+        public WriteOnlyOnFailureIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
+        {
+            super(baseCfs, indexDef, LoadType.WRITE);
+        }
+    }
 }
diff --git a/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java b/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
index c09b16c..bfe1c6d 100644
--- a/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
+++ b/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
@@ -484,7 +484,7 @@ public class SecondaryIndexTest
                    .build();
         MigrationManager.announceTableUpdate(updated, true);
 
-        // fait for the index to be built
+        // wait for the index to be built
         Index index = cfs.indexManager.getIndex(indexDef);
         do
         {
diff --git a/test/unit/org/apache/cassandra/index/SecondaryIndexManagerTest.java b/test/unit/org/apache/cassandra/index/SecondaryIndexManagerTest.java
index 2207f48..d8fb99f 100644
--- a/test/unit/org/apache/cassandra/index/SecondaryIndexManagerTest.java
+++ b/test/unit/org/apache/cassandra/index/SecondaryIndexManagerTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.collect.Sets;
 import org.junit.After;
 import org.junit.Test;
 
@@ -67,14 +68,14 @@ public class SecondaryIndexManagerTest extends CQLTester
     }
 
     @Test
-    public void rebuildingIndexMarksTheIndexAsBuilt() throws Throwable
+    public void rebuilOrRecoveringIndexMarksTheIndexAsBuilt() throws Throwable
     {
         String tableName = createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))");
         String indexName = createIndex("CREATE INDEX ON %s(c)");
 
         waitForIndex(KEYSPACE, tableName, indexName);
         assertMarkedAsBuilt(indexName);
-
+        
         assertTrue(tryRebuild(indexName, false));
         assertMarkedAsBuilt(indexName);
     }
@@ -119,56 +120,70 @@ public class SecondaryIndexManagerTest extends CQLTester
     }
 
     @Test
-    public void cannotRebuildWhileInitializationIsInProgress() throws Throwable
+    public void cannotRebuildRecoverWhileInitializationIsInProgress() throws Throwable
     {
         // create an index which blocks on creation
         TestingIndex.blockCreate();
         String tableName = createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))");
-        String indexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName()));
-
-        // try to rebuild the index before the index creation task has finished
-        assertFalse(tryRebuild(indexName, false));
-        assertNotMarkedAsBuilt(indexName);
+        String defaultIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName()));
+        String readOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(b) USING '%s'", ReadOnlyOnFailureIndex.class.getName()));
+        String writeOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(b) USING '%s'", WriteOnlyOnFailureIndex.class.getName()));
+
+        // try to rebuild/recover the index before the index creation task has finished
+        assertFalse(tryRebuild(defaultIndexName, false));
+        assertFalse(tryRebuild(readOnlyIndexName, false));
+        assertFalse(tryRebuild(writeOnlyIndexName, false));
+        assertNotMarkedAsBuilt(defaultIndexName);
+        assertNotMarkedAsBuilt(readOnlyIndexName);
+        assertNotMarkedAsBuilt(writeOnlyIndexName);
 
         // check that the index is marked as built when the creation finishes
         TestingIndex.unblockCreate();
-        waitForIndex(KEYSPACE, tableName, indexName);
-        assertMarkedAsBuilt(indexName);
-
-        // now verify you can rebuild
-        assertTrue(tryRebuild(indexName, false));
-        assertMarkedAsBuilt(indexName);
+        waitForIndex(KEYSPACE, tableName, defaultIndexName);
+        waitForIndex(KEYSPACE, tableName, readOnlyIndexName);
+        waitForIndex(KEYSPACE, tableName, writeOnlyIndexName);
+        assertMarkedAsBuilt(defaultIndexName);
+        assertMarkedAsBuilt(readOnlyIndexName);
+        assertMarkedAsBuilt(writeOnlyIndexName);
+
+        // now verify you can rebuild/recover
+        assertTrue(tryRebuild(defaultIndexName, false));
+        assertTrue(tryRebuild(readOnlyIndexName, false));
+        assertTrue(tryRebuild(readOnlyIndexName, false));
+        assertMarkedAsBuilt(defaultIndexName);
+        assertMarkedAsBuilt(readOnlyIndexName);
+        assertMarkedAsBuilt(writeOnlyIndexName);
     }
 
     @Test
-    public void cannotRebuildWhileAnotherRebuildIsInProgress() throws Throwable
+    public void cannotRebuildOrRecoverWhileAnotherRebuildIsInProgress() throws Throwable
     {
-        final String tableName = createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))");
-        final String indexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName()));
+        String tableName = createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))");
+        String defaultIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName()));
+        String readOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(b) USING '%s'", ReadOnlyOnFailureIndex.class.getName()));
+        String writeOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(b) USING '%s'", WriteOnlyOnFailureIndex.class.getName()));
         final AtomicBoolean error = new AtomicBoolean();
 
         // wait for index initialization and verify it's built:
-        waitForIndex(KEYSPACE, tableName, indexName);
-        assertMarkedAsBuilt(indexName);
+        waitForIndex(KEYSPACE, tableName, defaultIndexName);
+        waitForIndex(KEYSPACE, tableName, readOnlyIndexName);
+        waitForIndex(KEYSPACE, tableName, writeOnlyIndexName);
+        assertMarkedAsBuilt(defaultIndexName);
+        assertMarkedAsBuilt(readOnlyIndexName);
+        assertMarkedAsBuilt(writeOnlyIndexName);
 
         // rebuild the index in another thread, but make it block:
         TestingIndex.blockBuild();
-        Thread asyncBuild = new Thread()
-        {
-
-            @Override
-            public void run()
+        Thread asyncBuild = new Thread(() -> {
+            try
             {
-                try
-                {
-                    tryRebuild(indexName, false);
-                }
-                catch (Throwable ex)
-                {
-                    error.set(true);
-                }
+                tryRebuild(defaultIndexName, false);
             }
-        };
+            catch (Throwable ex)
+            {
+                error.set(true);
+            }
+        });
         asyncBuild.start();
 
         // wait for the rebuild to block, so that we can proceed unblocking all further operations:
@@ -178,17 +193,23 @@ public class SecondaryIndexManagerTest extends CQLTester
         TestingIndex.shouldBlockBuild = false;
 
         // verify rebuilding the index before the previous index build task has finished fails
-        assertFalse(tryRebuild(indexName, false));
-        assertNotMarkedAsBuilt(indexName);
+        assertFalse(tryRebuild(defaultIndexName, false));
+        assertNotMarkedAsBuilt(defaultIndexName);
 
         // check that the index is marked as built when the build finishes
         TestingIndex.unblockBuild();
         asyncBuild.join();
-        assertMarkedAsBuilt(indexName);
+        assertMarkedAsBuilt(defaultIndexName);
+        assertMarkedAsBuilt(readOnlyIndexName);
+        assertMarkedAsBuilt(writeOnlyIndexName);
 
         // now verify you can rebuild
-        assertTrue(tryRebuild(indexName, false));
-        assertMarkedAsBuilt(indexName);
+        assertTrue(tryRebuild(defaultIndexName, false));
+        assertTrue(tryRebuild(readOnlyIndexName, false));
+        assertTrue(tryRebuild(writeOnlyIndexName, false));
+        assertMarkedAsBuilt(defaultIndexName);
+        assertMarkedAsBuilt(readOnlyIndexName);
+        assertMarkedAsBuilt(writeOnlyIndexName);
     }
 
     @Test
@@ -204,23 +225,17 @@ public class SecondaryIndexManagerTest extends CQLTester
 
         // add sstables in another thread, but make it block:
         TestingIndex.blockBuild();
-        Thread asyncBuild = new Thread()
-        {
-
-            @Override
-            public void run()
+        Thread asyncBuild = new Thread(() -> {
+            ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+            try (Refs<SSTableReader> sstables = Refs.ref(cfs.getSSTables(SSTableSet.CANONICAL)))
             {
-                ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
-                try (Refs<SSTableReader> sstables = Refs.ref(cfs.getSSTables(SSTableSet.CANONICAL)))
-                {
-                    cfs.indexManager.handleNotification(new SSTableAddedNotification(sstables, null), cfs.getTracker());
-                }
-                catch (Throwable ex)
-                {
-                    error.set(true);
-                }
+                cfs.indexManager.handleNotification(new SSTableAddedNotification(sstables, null), cfs.getTracker());
+            }
+            catch (Throwable ex)
+            {
+                error.set(true);
             }
-        };
+        });
         asyncBuild.start();
 
         // wait for the build to block, so that we can proceed unblocking all further operations:
@@ -256,22 +271,16 @@ public class SecondaryIndexManagerTest extends CQLTester
 
         // rebuild the index in another thread, but make it block:
         TestingIndex.blockBuild();
-        Thread asyncBuild = new Thread()
-        {
-
-            @Override
-            public void run()
+        Thread asyncBuild = new Thread(() -> {
+            try
             {
-                try
-                {
-                    tryRebuild(indexName, false);
-                }
-                catch (Throwable ex)
-                {
-                    error.set(true);
-                }
+                tryRebuild(indexName, false);
+            }
+            catch (Throwable ex)
+            {
+                error.set(true);
             }
-        };
+        });
         asyncBuild.start();
 
         // wait for the rebuild to block, so that we can proceed unblocking all further operations:
@@ -310,22 +319,16 @@ public class SecondaryIndexManagerTest extends CQLTester
 
         // rebuild the index in another thread, but make it block:
         TestingIndex.blockBuild();
-        Thread asyncBuild = new Thread()
-        {
-
-            @Override
-            public void run()
+        Thread asyncBuild = new Thread(() -> {
+            try
             {
-                try
-                {
-                    tryRebuild(indexName, false);
-                }
-                catch (Throwable ex)
-                {
-                    error.set(true);
-                }
+                tryRebuild(indexName, false);
             }
-        };
+            catch (Throwable ex)
+            {
+                error.set(true);
+            }
+        });
         asyncBuild.start();
 
         // wait for the rebuild to block, so that we can proceed unblocking all further operations:
@@ -382,37 +385,57 @@ public class SecondaryIndexManagerTest extends CQLTester
     }
 
     @Test
-    public void initializingIndexNotQueryable() throws Throwable
+    public void initializingIndexNotQueryableButMaybeWritable() throws Throwable
     {
         TestingIndex.blockCreate();
         String tableName = createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))");
-        String indexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName()));
+        String defaultIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName()));
+        String readOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", ReadOnlyOnFailureIndex.class.getName()));
+        String writeOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", WriteOnlyOnFailureIndex.class.getName()));
 
         // the index shouldn't be queryable while the initialization hasn't finished
-        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
-        Index index = cfs.indexManager.getIndexByName(indexName);
-        assertFalse(cfs.indexManager.isIndexQueryable(index));
+        assertFalse(isQueryable(defaultIndexName));
+        assertFalse(isQueryable(readOnlyIndexName));
+        assertFalse(isQueryable(writeOnlyIndexName));
+        assertTrue(isWritable(defaultIndexName));
+        assertTrue(isWritable(readOnlyIndexName));
+        assertTrue(isWritable(writeOnlyIndexName));
 
         // the index should be queryable once the initialization has finished
         TestingIndex.unblockCreate();
-        waitForIndex(KEYSPACE, tableName, indexName);
-        assertTrue(cfs.indexManager.isIndexQueryable(index));
+        waitForIndex(KEYSPACE, tableName, defaultIndexName);
+        waitForIndex(KEYSPACE, tableName, readOnlyIndexName);
+        waitForIndex(KEYSPACE, tableName, writeOnlyIndexName);
+        assertTrue(isQueryable(defaultIndexName));
+        assertTrue(isQueryable(readOnlyIndexName));
+        assertTrue(isQueryable(writeOnlyIndexName));
+        assertTrue(isWritable(defaultIndexName));
+        assertTrue(isWritable(readOnlyIndexName));
+        assertTrue(isWritable(writeOnlyIndexName));
     }
 
     @Test
-    public void initializingIndexNotQueryableAfterPartialRebuild() throws Throwable
+    public void initializingIndexNotQueryableButMaybeNotWritableAfterPartialRebuild() throws Throwable
     {
         TestingIndex.blockCreate();
         String tableName = createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))");
-        String indexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName()));
+        String defaultIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName()));
+        String readOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", ReadOnlyOnFailureIndex.class.getName()));
+        String writeOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", WriteOnlyOnFailureIndex.class.getName()));
 
-        // the index shouldn't be queryable while the initialization hasn't finished
-        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
-        Index index = cfs.indexManager.getIndexByName(indexName);
-        assertFalse(cfs.indexManager.isIndexQueryable(index));
+        // the index should never be queryable while the initialization hasn't finished
+        assertFalse(isQueryable(defaultIndexName));
+        assertFalse(isQueryable(readOnlyIndexName));
+        assertFalse(isQueryable(writeOnlyIndexName));
+
+        // the index should always we writable while the initialization hasn't finished
+        assertTrue(isWritable(defaultIndexName));
+        assertTrue(isWritable(readOnlyIndexName));
+        assertTrue(isWritable(writeOnlyIndexName));
 
-        // a failing partial build doesn't set the index as queryable
+        // a failing partial build doesn't set the index as queryable, but might set it as not writable
         TestingIndex.shouldFailBuild = true;
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
         try
         {
             cfs.indexManager.handleNotification(new SSTableAddedNotification(cfs.getLiveSSTables(), null), this);
@@ -422,59 +445,99 @@ public class SecondaryIndexManagerTest extends CQLTester
         {
             assertTrue(ex.getMessage().contains("configured to fail"));
         }
-        assertFalse(cfs.indexManager.isIndexQueryable(index));
-
-        // a successful partial build doesn't set the index as queryable
+        assertFalse(isQueryable(defaultIndexName));
+        assertFalse(isQueryable(readOnlyIndexName));
+        assertFalse(isQueryable(writeOnlyIndexName));
+        assertTrue(isWritable(defaultIndexName));
+        assertFalse(isWritable(readOnlyIndexName));
+        assertTrue(isWritable(writeOnlyIndexName));
+
+        // a successful partial build doesn't set the index as queryable nor writable
         TestingIndex.shouldFailBuild = false;
         cfs.indexManager.handleNotification(new SSTableAddedNotification(cfs.getLiveSSTables(), null), this);
-        assertFalse(cfs.indexManager.isIndexQueryable(index));
+        assertFalse(isQueryable(defaultIndexName));
+        assertFalse(isQueryable(readOnlyIndexName));
+        assertFalse(isQueryable(writeOnlyIndexName));
+        assertTrue(isWritable(defaultIndexName));
+        assertFalse(isWritable(readOnlyIndexName));
+        assertTrue(isWritable(writeOnlyIndexName));
 
         // the index should be queryable once the initialization has finished
         TestingIndex.unblockCreate();
-        waitForIndex(KEYSPACE, tableName, indexName);
-        assertTrue(cfs.indexManager.isIndexQueryable(index));
+        waitForIndex(KEYSPACE, tableName, defaultIndexName);
+        assertTrue(isQueryable(defaultIndexName));
+        assertTrue(isQueryable(readOnlyIndexName));
+        assertTrue(isQueryable(writeOnlyIndexName));
+        assertTrue(isWritable(defaultIndexName));
+        assertTrue(isWritable(readOnlyIndexName));
+        assertTrue(isWritable(writeOnlyIndexName));
     }
 
     @Test
-    public void indexWithFailedInitializationIsQueryableAfterFullRebuild() throws Throwable
+    public void indexWithFailedInitializationIsQueryableAndWritableAfterFullRebuild() throws Throwable
     {
-        createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))");
-
         TestingIndex.shouldFailCreate = true;
-        String indexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName()));
-
-        tryRebuild(indexName, true);
+        createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))");
+        String defaultIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName()));
+        String readOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", ReadOnlyOnFailureIndex.class.getName()));
+        String writeOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", WriteOnlyOnFailureIndex.class.getName()));
+        assertTrue(waitForIndexBuilds(KEYSPACE, defaultIndexName));
+        assertTrue(waitForIndexBuilds(KEYSPACE, readOnlyIndexName));
+        assertTrue(waitForIndexBuilds(KEYSPACE, writeOnlyIndexName));
+
+        tryRebuild(defaultIndexName, true);
+        tryRebuild(readOnlyIndexName, true);
+        tryRebuild(writeOnlyIndexName, true);
         TestingIndex.shouldFailCreate = false;
 
-        // a successfull full rebuild should set the index as queryable
+        // a successfull full rebuild should set the index as queryable/writable
         ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
-        Index index = cfs.indexManager.getIndexByName(indexName);
-        cfs.indexManager.rebuildIndexesBlocking(Collections.singleton(indexName));
-        assertTrue(cfs.indexManager.isIndexQueryable(index));
+        cfs.indexManager.rebuildIndexesBlocking(Sets.newHashSet(defaultIndexName, readOnlyIndexName, writeOnlyIndexName));
+        assertTrue(isQueryable(defaultIndexName));
+        assertTrue(isQueryable(readOnlyIndexName));
+        assertTrue(isQueryable(writeOnlyIndexName));
+        assertTrue(isWritable(defaultIndexName));
+        assertTrue(isWritable(readOnlyIndexName));
+        assertTrue(isWritable(writeOnlyIndexName));
     }
 
     @Test
-    public void indexWithFailedInitializationIsNotQueryableAfterPartialRebuild() throws Throwable
+    public void indexWithFailedInitializationDoesNotChangeQueryabilityNorWritabilityAfterPartialRebuild() throws Throwable
     {
         TestingIndex.shouldFailCreate = true;
         createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))");
-        String indexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName()));
-        assertTrue(waitForIndexBuilds(KEYSPACE, indexName));
+        String defaultIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName()));
+        String readOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", ReadOnlyOnFailureIndex.class.getName()));
+        String writeOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", WriteOnlyOnFailureIndex.class.getName()));
+        assertTrue(waitForIndexBuilds(KEYSPACE, defaultIndexName));
+        assertTrue(waitForIndexBuilds(KEYSPACE, readOnlyIndexName));
+        assertTrue(waitForIndexBuilds(KEYSPACE, writeOnlyIndexName));
         TestingIndex.shouldFailCreate = false;
 
-        // the index shouldn't be queryable after the failed initialization
-        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
-        Index index = cfs.indexManager.getIndexByName(indexName);
-        assertFalse(cfs.indexManager.isIndexQueryable(index));
+        // the index should never be queryable, but it could be writable after the failed initialization
+        assertFalse(isQueryable(defaultIndexName));
+        assertFalse(isQueryable(readOnlyIndexName));
+        assertFalse(isQueryable(writeOnlyIndexName));
+        assertTrue(isWritable(defaultIndexName));
+        assertFalse(isWritable(readOnlyIndexName));
+        assertTrue(isWritable(writeOnlyIndexName));
 
-        // a successful partial build doesn't set the index as queryable
+        // a successful partial build doesn't set the index as queryable nor writable
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
         cfs.indexManager.handleNotification(new SSTableAddedNotification(cfs.getLiveSSTables(), null), this);
-        assertTrue(waitForIndexBuilds(KEYSPACE, indexName));
-        assertFalse(cfs.indexManager.isIndexQueryable(index));
+        assertTrue(waitForIndexBuilds(KEYSPACE, defaultIndexName));
+        assertTrue(waitForIndexBuilds(KEYSPACE, readOnlyIndexName));
+        assertTrue(waitForIndexBuilds(KEYSPACE, writeOnlyIndexName));
+        assertFalse(isQueryable(defaultIndexName));
+        assertFalse(isQueryable(readOnlyIndexName));
+        assertFalse(isQueryable(writeOnlyIndexName));
+        assertTrue(isWritable(defaultIndexName));
+        assertFalse(isWritable(readOnlyIndexName));
+        assertTrue(isWritable(writeOnlyIndexName));
     }
 
     @Test
-    public void handleJVMStablityOnFailedCreate() throws Throwable
+    public void handleJVMStablityOnFailedCreate()
     {
         handleJVMStablityOnFailedCreate(new SocketException("Should not fail"), false);
         handleJVMStablityOnFailedCreate(new FileNotFoundException("Should not fail"), false);
@@ -483,7 +546,7 @@ public class SecondaryIndexManagerTest extends CQLTester
         handleJVMStablityOnFailedCreate(new RuntimeException("Should not fail"), false);
     }
 
-    private void handleJVMStablityOnFailedCreate(Throwable throwable, boolean shouldKillJVM) throws Throwable
+    private void handleJVMStablityOnFailedCreate(Throwable throwable, boolean shouldKillJVM)
     {
         KillerForTests killerForTests = new KillerForTests();
         JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
@@ -584,71 +647,100 @@ public class SecondaryIndexManagerTest extends CQLTester
         return done;
     }
 
+    private boolean isQueryable(String indexName)
+    {
+        SecondaryIndexManager manager = getCurrentColumnFamilyStore().indexManager;
+        Index index = manager.getIndexByName(indexName);
+        return manager.isIndexQueryable(index);
+    }
+
+    private boolean isWritable(String indexName)
+    {
+        SecondaryIndexManager manager = getCurrentColumnFamilyStore().indexManager;
+        Index index = manager.getIndexByName(indexName);
+        return manager.isIndexWritable(index);
+    }
+
     public static class TestingIndex extends StubIndex
     {
         private static volatile CountDownLatch createLatch;
         private static volatile CountDownLatch buildLatch;
         private static volatile CountDownLatch createWaitLatch;
         private static volatile CountDownLatch buildWaitLatch;
-        public static volatile boolean shouldBlockCreate = false;
-        public static volatile boolean shouldBlockBuild = false;
-        public static volatile boolean shouldFailCreate = false;
-        public static volatile boolean shouldFailBuild = false;
-        public static volatile Throwable failedCreateThrowable;
-        public static volatile Throwable failedBuildTrowable;
-
+        static volatile boolean shouldBlockCreate = false;
+        static volatile boolean shouldBlockBuild = false;
+        static volatile boolean shouldFailCreate = false;
+        static volatile boolean shouldFailBuild = false;
+        static volatile Throwable failedCreateThrowable;
+        static volatile Throwable failedBuildTrowable;
+
+        @SuppressWarnings("WeakerAccess")
         public TestingIndex(ColumnFamilyStore baseCfs, IndexMetadata metadata)
         {
             super(baseCfs, metadata);
         }
 
-        public static void blockCreate()
+        static void blockCreate()
         {
             shouldBlockCreate = true;
             createLatch = new CountDownLatch(1);
             createWaitLatch = new CountDownLatch(1);
         }
 
-        public static void blockBuild()
+        static void blockBuild()
         {
             shouldBlockBuild = true;
             buildLatch = new CountDownLatch(1);
             buildWaitLatch = new CountDownLatch(1);
         }
 
-        public static void unblockCreate()
+        static void unblockCreate()
         {
             createLatch.countDown();
         }
 
-        public static void unblockBuild()
+        static void unblockBuild()
         {
             buildLatch.countDown();
         }
 
-        public static void waitBlockedOnCreate() throws InterruptedException
+        static void waitBlockedOnCreate() throws InterruptedException
         {
             createWaitLatch.await();
         }
 
-        public static void waitBlockedOnBuild() throws InterruptedException
+        static void waitBlockedOnBuild() throws InterruptedException
         {
             buildWaitLatch.await();
         }
 
-        public static void clear()
+        static void clear()
         {
+            reset(createLatch);
+            reset(createWaitLatch);
+            reset(buildLatch);
+            reset(buildWaitLatch);
             createLatch = null;
             createWaitLatch = null;
             buildLatch = null;
             buildWaitLatch = null;
             shouldBlockCreate = false;
             shouldBlockBuild = false;
+            shouldFailCreate = false;
             shouldFailBuild = false;
             failedCreateThrowable = null;
             failedBuildTrowable = null;
         }
 
+        private static void reset(CountDownLatch latch)
+        {
+            if (latch == null)
+                return;
+
+            while (0L < latch.getCount())
+                latch.countDown();
+        }
+
         public Callable<?> getInitializationTask()
         {
             return () ->
@@ -719,4 +811,38 @@ public class SecondaryIndexManagerTest extends CQLTester
             return true;
         }
     }
+
+    /**
+     * <code>TestingIndex</code> that only supports reads when initial build or full rebuild has failed.
+     */
+    public static class ReadOnlyOnFailureIndex extends TestingIndex
+    {
+        public ReadOnlyOnFailureIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
+        {
+            super(baseCfs, indexDef);
+        }
+
+        @Override
+        public LoadType getSupportedLoadTypeOnFailure(boolean isInitialBuild)
+        {
+            return LoadType.READ;
+        }
+    }
+
+    /**
+     * <code>TestingIndex</code> that only supports writes when initial build or full rebuild has failed.
+     */
+    public static class WriteOnlyOnFailureIndex extends TestingIndex
+    {
+        public WriteOnlyOnFailureIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
+        {
+            super(baseCfs, indexDef);
+        }
+
+        @Override
+        public LoadType getSupportedLoadTypeOnFailure(boolean isInitialBuild)
+        {
+            return LoadType.WRITE;
+        }
+    }
 }
diff --git a/test/unit/org/apache/cassandra/index/StubIndex.java b/test/unit/org/apache/cassandra/index/StubIndex.java
index a351cce..02ccbff 100644
--- a/test/unit/org/apache/cassandra/index/StubIndex.java
+++ b/test/unit/org/apache/cassandra/index/StubIndex.java
@@ -36,7 +36,6 @@ import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.index.transactions.IndexTransaction;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.concurrent.OpOrder;
 
 /**
  * Basic custom index implementation for testing.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org