You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2020/05/29 11:39:27 UTC
[cassandra] branch trunk updated: Improve handling of 2i
initialization failures
This is an automated email from the ASF dual-hosted git repository.
adelapena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 595dc61 Improve handling of 2i initialization failures
595dc61 is described below
commit 595dc61290a9fda15b6765711141039ec9609bb3
Author: Bereng <be...@gmail.com>
AuthorDate: Fri May 29 12:33:50 2020 +0100
Improve handling of 2i initialization failures
patch by Berenguer Blasi; reviewed by Andres de la Peña for CASSANDRA-13606
---
CHANGES.txt | 1 +
src/java/org/apache/cassandra/index/Index.java | 39 +-
.../cassandra/index/SecondaryIndexManager.java | 155 +++++---
.../cassandra/index/internal/CassandraIndex.java | 3 +-
.../org/apache/cassandra/index/sasi/SASIIndex.java | 1 -
.../validation/entities/SecondaryIndexTest.java | 139 +++++++
.../apache/cassandra/db/SecondaryIndexTest.java | 2 +-
.../cassandra/index/SecondaryIndexManagerTest.java | 400 ++++++++++++++-------
.../unit/org/apache/cassandra/index/StubIndex.java | 1 -
9 files changed, 553 insertions(+), 188 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 9b62b06..7fbd0b4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-alpha5
+ * Improve handling of 2i initialization failures (CASSANDRA-13606)
* Add completion_ratio column to sstable_tasks virtual table (CASANDRA-15759)
* Add support for adding custom Verbs (CASSANDRA-15725)
* Speed up entire-file-streaming file containment check and allow entire-file-streaming for all compaction strategies (CASSANDRA-15657,CASSANDRA-15783)
diff --git a/src/java/org/apache/cassandra/index/Index.java b/src/java/org/apache/cassandra/index/Index.java
index b9f38ce..6d716be 100644
--- a/src/java/org/apache/cassandra/index/Index.java
+++ b/src/java/org/apache/cassandra/index/Index.java
@@ -44,7 +44,6 @@ import org.apache.cassandra.io.sstable.ReducingKeyIterator;
import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.IndexMetadata;
-import org.apache.cassandra.utils.concurrent.OpOrder;
/**
* Consisting of a top level Index interface and two sub-interfaces which handle read and write operations,
@@ -136,6 +135,23 @@ import org.apache.cassandra.utils.concurrent.OpOrder;
*/
public interface Index
{
+ /**
+ * Supported loads. An index could be badly initialized and support only reads i.e.
+ */
+ public enum LoadType
+ {
+ READ, WRITE, ALL, NOOP;
+
+ public boolean supportsWrites()
+ {
+ return this == ALL || this == WRITE;
+ }
+
+ public boolean supportsReads()
+ {
+ return this == ALL || this == READ;
+ }
+ }
/*
* Helpers for building indexes from SSTable data
@@ -180,13 +196,32 @@ public interface Index
* single pass through the data. The singleton instance returned from the default method implementation builds
* indexes using a {@code ReducingKeyIterator} to provide a collated view of the SSTable data.
*
- * @return an instance of the index build taski helper. Index implementations which return <b>the same instance</b>
+ * @return an instance of the index build task helper. Index implementations which return <b>the same instance</b>
* will be built using a single task.
*/
default IndexBuildingSupport getBuildTaskSupport()
{
return INDEX_BUILDER_SUPPORT;
}
+
+ /**
+ * Same as {@code getBuildTaskSupport} but can be overloaded with a specific 'recover' logic different than the index building one
+ */
+ default IndexBuildingSupport getRecoveryTaskSupport()
+ {
+ return getBuildTaskSupport();
+ }
+
+ /**
+ * Returns the type of operations supported by the index in case its building has failed and it's needing recovery.
+ *
+ * @param isInitialBuild {@code true} if the failure is for the initial build task on index creation, {@code false}
+ * if the failure is for a full rebuild or recovery.
+ */
+ default LoadType getSupportedLoadTypeOnFailure(boolean isInitialBuild)
+ {
+ return isInitialBuild ? LoadType.WRITE : LoadType.ALL;
+ }
/**
* Return a task to perform any initialization work when a new index instance is created.
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index 7f28c44..3822549 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -59,6 +59,7 @@ import org.apache.cassandra.db.lifecycle.View;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.index.Index.IndexBuildingSupport;
import org.apache.cassandra.index.internal.CassandraIndex;
import org.apache.cassandra.index.transactions.*;
import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -144,6 +145,11 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
* The indexes that are available for querying.
*/
private final Set<String> queryableIndexes = Sets.newConcurrentHashSet();
+
+ /**
+ * The indexes that are available for writing.
+ */
+ private final Map<String, Index> writableIndexes = Maps.newConcurrentMap();
/**
* The count of pending index builds for each index.
@@ -207,6 +213,8 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
{
final Index index = createInstance(indexDef);
index.register(this);
+ if (writableIndexes.put(index.getIndexMetadata().name, index) == null)
+ logger.info("Index [{}] registered and writable.", index.getIndexMetadata().name);
markIndexesBuilding(ImmutableSet.of(index), true, isNewCF);
@@ -220,7 +228,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
}
catch (Throwable t)
{
- logAndMarkIndexesFailed(Collections.singleton(index), t);
+ logAndMarkIndexesFailed(Collections.singleton(index), t, true);
throw t;
}
}
@@ -239,7 +247,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
@Override
public void onFailure(Throwable t)
{
- logAndMarkIndexesFailed(Collections.singleton(index), t);
+ logAndMarkIndexesFailed(Collections.singleton(index), t, true);
initialization.setException(t);
}
@@ -273,12 +281,23 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
* Checks if the specified index is queryable.
*
* @param index the index
- * @return <code>true</code> if the specified index is registered, <code>false</code> otherwise
+ * @return <code>true</code> if the specified index is queryable, <code>false</code> otherwise
*/
public boolean isIndexQueryable(Index index)
{
return queryableIndexes.contains(index.getIndexMetadata().name);
}
+
+ /**
+ * Checks if the specified index is writable.
+ *
+ * @param index the index
+ * @return <code>true</code> if the specified index is writable, <code>false</code> otherwise
+ */
+ public boolean isIndexWritable(Index index)
+ {
+ return writableIndexes.containsKey(index.getIndexMetadata().name);
+ }
/**
* Checks if the specified index has any running build task.
@@ -326,8 +345,8 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
}
/**
- * Does a blocking full rebuild of the specifed indexes from all the sstables in the base table.
- * Note also that this method of (re)building indexes:
+ * Does a blocking full rebuild/recovery of the specifed indexes from all the sstables in the base table.
+ * Note also that this method of (re)building/recovering indexes:
* a) takes a set of index *names* rather than Indexers
* b) marks existing indexes removed prior to rebuilding
* c) fails if such marking operation conflicts with any ongoing index builds, as full rebuilds cannot be run
@@ -337,19 +356,40 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
*/
public void rebuildIndexesBlocking(Set<String> indexNames)
{
- try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.selectFunction(SSTableSet.CANONICAL));
- Refs<SSTableReader> allSSTables = viewFragment.refs)
+ // Get the set of indexes that require blocking build
+ Set<Index> toRebuild = indexes.values()
+ .stream()
+ .filter(index -> indexNames.contains(index.getIndexMetadata().name))
+ .filter(Index::shouldBuildBlocking)
+ .collect(Collectors.toSet());
+
+ if (toRebuild.isEmpty())
{
- Set<Index> toRebuild = indexes.values().stream()
- .filter(index -> indexNames.contains(index.getIndexMetadata().name))
- .filter(Index::shouldBuildBlocking)
- .collect(Collectors.toSet());
- if (toRebuild.isEmpty())
+ logger.info("No defined indexes with the supplied names: {}", Joiner.on(',').join(indexNames));
+ return;
+ }
+
+ // Optimistically mark the indexes as writable, so we don't miss incoming writes
+ boolean needsFlush = false;
+ for (Index index : toRebuild)
+ {
+ String name = index.getIndexMetadata().name;
+ if (writableIndexes.put(name, index) == null)
{
- logger.info("No defined indexes with the supplied names: {}", Joiner.on(',').join(indexNames));
- return;
+ logger.info("Index [{}] became writable starting recovery.", name);
+ needsFlush = true;
}
+ }
+
+ // Once we are tracking new writes, flush any memtable contents to not miss them from the sstable-based rebuild
+ if (needsFlush)
+ baseCfs.forceBlockingFlush();
+ // Now that we are tracking new writes and we haven't left untracked contents on the memtables, we are ready to
+ // index the sstables
+ try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.selectFunction(SSTableSet.CANONICAL));
+ Refs<SSTableReader> allSSTables = viewFragment.refs)
+ {
buildIndexesBlocking(allSSTables, toRebuild, true);
}
}
@@ -426,8 +466,11 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
}
/**
- * Performs a blocking (re)indexing of the specified SSTables for the specified indexes.
+ * Performs a blocking (re)indexing/recovery of the specified SSTables for the specified indexes.
*
+ * If the index doesn't support ALL {@link Index.LoadType} it performs a recovery {@link Index#getRecoveryTaskSupport()}
+ * instead of a build {@link Index#getBuildTaskSupport()}
+ *
* @param sstables the SSTables to be (re)indexed
* @param indexes the indexes to be (re)built for the specifed SSTables
* @param isFullRebuild True if this method is invoked as a full index rebuild, false otherwise
@@ -443,15 +486,16 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
markIndexesBuilding(indexes, isFullRebuild, false);
// Build indexes in a try/catch, so that any index not marked as either built or failed will be marked as failed:
- final Set<Index> builtIndexes = new HashSet<>();
- final Set<Index> unbuiltIndexes = new HashSet<>();
+ final Set<Index> builtIndexes = Sets.newConcurrentHashSet();
+ final Set<Index> unbuiltIndexes = Sets.newConcurrentHashSet();
// Any exception thrown during index building that could be suppressed by the finally block
Exception accumulatedFail = null;
try
{
- logger.info("Submitting index build of {} for data in {}",
+ logger.info("Submitting index {} of {} for data in {}",
+ isFullRebuild ? "recovery" : "build",
indexes.stream().map(i -> i.getIndexMetadata().name).collect(Collectors.joining(",")),
sstables.stream().map(SSTableReader::toString).collect(Collectors.joining(",")));
@@ -459,7 +503,10 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
Map<Index.IndexBuildingSupport, Set<Index>> byType = new HashMap<>();
for (Index index : indexes)
{
- Set<Index> stored = byType.computeIfAbsent(index.getBuildTaskSupport(), i -> new HashSet<>());
+ IndexBuildingSupport buildOrRecoveryTask = isFullRebuild
+ ? index.getBuildTaskSupport()
+ : index.getRecoveryTaskSupport();
+ Set<Index> stored = byType.computeIfAbsent(buildOrRecoveryTask, i -> new HashSet<>());
stored.add(index);
}
@@ -474,7 +521,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
@Override
public void onFailure(Throwable t)
{
- logAndMarkIndexesFailed(groupedIndexes, t);
+ logAndMarkIndexesFailed(groupedIndexes, t, false);
unbuiltIndexes.addAll(groupedIndexes);
build.setException(t);
}
@@ -507,7 +554,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
Set<Index> failedIndexes = Sets.difference(indexes, Sets.union(builtIndexes, unbuiltIndexes));
if (!failedIndexes.isEmpty())
{
- logAndMarkIndexesFailed(failedIndexes, accumulatedFail);
+ logAndMarkIndexesFailed(failedIndexes, accumulatedFail, false);
}
// Flush all built indexes with an aynchronous callback to log the success or failure of the flush
@@ -571,8 +618,8 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
* the SecondaryIndexManager instance, it means all invocations for all different indexes will go through the same
* lock, but this is fine as the work done while holding such lock is trivial.
* <p>
- * {@link #markIndexBuilt(Index, boolean)} or {@link #markIndexFailed(Index)} should be always called after the
- * rebuilding has finished, so that the index build state can be correctly managed and the index rebuilt.
+ * {@link #markIndexBuilt(Index, boolean)} or {@link #markIndexFailed(Index, boolean)} should be always called after
+ * the rebuilding has finished, so that the index build state can be correctly managed and the index rebuilt.
*
* @param indexes the index to be marked as building
* @param isFullRebuild {@code true} if this method is invoked as a full index rebuild, {@code false} otherwise
@@ -620,7 +667,13 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
{
String indexName = index.getIndexMetadata().name;
if (isFullRebuild)
- queryableIndexes.add(indexName);
+ {
+ if (queryableIndexes.add(indexName))
+ logger.info("Index [{}] became queryable after successful build.", indexName);
+
+ if (writableIndexes.put(indexName, index) == null)
+ logger.info("Index [{}] became writable after successful build.", indexName);
+ }
AtomicInteger counter = inProgressBuilds.get(indexName);
if (counter != null)
@@ -640,10 +693,12 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
* {@link #markIndexesBuilding(Set, boolean, boolean)} should always be invoked before this method.
*
* @param index the index to be marked as built
+ * @param isInitialBuild {@code true} if the index failed during its initial build, {@code false} otherwise
*/
- private synchronized void markIndexFailed(Index index)
+ private synchronized void markIndexFailed(Index index, boolean isInitialBuild)
{
String indexName = index.getIndexMetadata().name;
+
AtomicInteger counter = inProgressBuilds.get(indexName);
if (counter != null)
{
@@ -655,17 +710,23 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
SystemKeyspace.setIndexRemoved(baseCfs.keyspace.getName(), indexName);
needsFullRebuild.add(indexName);
+
+ if (!index.getSupportedLoadTypeOnFailure(isInitialBuild).supportsWrites() && writableIndexes.remove(indexName) != null)
+ logger.info("Index [{}] became not-writable because of failed build.", indexName);
+
+ if (!index.getSupportedLoadTypeOnFailure(isInitialBuild).supportsReads() && queryableIndexes.remove(indexName))
+ logger.info("Index [{}] became not-queryable because of failed build.", indexName);
}
}
- private void logAndMarkIndexesFailed(Set<Index> indexes, Throwable indexBuildFailure)
+ private void logAndMarkIndexesFailed(Set<Index> indexes, Throwable indexBuildFailure, boolean isInitialBuild)
{
JVMStabilityInspector.inspectThrowable(indexBuildFailure);
if (indexBuildFailure != null)
logger.warn("Index build of {} failed. Please run full index rebuild to fix it.", getIndexNames(indexes), indexBuildFailure);
else
logger.warn("Index build of {} failed. Please run full index rebuild to fix it.", getIndexNames(indexes));
- indexes.forEach(SecondaryIndexManager.this::markIndexFailed);
+ indexes.forEach(i -> this.markIndexFailed(i, isInitialBuild));
}
/**
@@ -677,6 +738,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
{
SystemKeyspace.setIndexRemoved(baseCfs.keyspace.getName(), indexName);
queryableIndexes.remove(indexName);
+ writableIndexes.remove(indexName);
needsFullRebuild.remove(indexName);
inProgressBuilds.remove(indexName);
}
@@ -1081,9 +1143,10 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
index.validate(update);
}
- /**
+ /*
* IndexRegistry methods
*/
+
public void registerIndex(Index index)
{
String name = index.getIndexMetadata().name;
@@ -1113,7 +1176,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
return ImmutableSet.copyOf(indexes.values());
}
- /**
+ /*
* Handling of index updates.
* Implementations of the various IndexTransaction interfaces, for keeping indexes in sync with base data
* during updates, compaction and cleanup. Plus factory methods for obtaining transaction instances.
@@ -1126,17 +1189,19 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
{
if (!hasIndexes())
return UpdateTransaction.NO_OP;
-
- Index.Indexer[] indexers = indexes.values().stream()
- .map(i -> i.indexerFor(update.partitionKey(),
- update.columns(),
- nowInSec,
- ctx,
- IndexTransaction.Type.UPDATE))
- .filter(Objects::nonNull)
- .toArray(Index.Indexer[]::new);
-
- return indexers.length == 0 ? UpdateTransaction.NO_OP : new WriteTimeTransaction(indexers);
+
+ ArrayList<Index.Indexer> idxrs = new ArrayList<>();
+ for (Index i : writableIndexes.values())
+ {
+ Index.Indexer idxr = i.indexerFor(update.partitionKey(), update.columns(), nowInSec, ctx, IndexTransaction.Type.UPDATE);
+ if (idxr != null)
+ idxrs.add(idxr);
+ }
+
+ if (idxrs.size() == 0)
+ return UpdateTransaction.NO_OP;
+ else
+ return new WriteTimeTransaction(idxrs.toArray(new Index.Indexer[idxrs.size()]));
}
/**
@@ -1148,7 +1213,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
int nowInSec)
{
// the check for whether there are any registered indexes is already done in CompactionIterator
- return new IndexGCTransaction(key, regularAndStaticColumns, keyspace, versions, nowInSec, listIndexes());
+ return new IndexGCTransaction(key, regularAndStaticColumns, keyspace, versions, nowInSec, writableIndexes.values());
}
/**
@@ -1161,7 +1226,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
if (!hasIndexes())
return CleanupTransaction.NO_OP;
- return new CleanupGCTransaction(key, regularAndStaticColumns, keyspace, nowInSec, listIndexes());
+ return new CleanupGCTransaction(key, regularAndStaticColumns, keyspace, nowInSec, writableIndexes.values());
}
/**
@@ -1283,7 +1348,8 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
private IndexGCTransaction(DecoratedKey key,
RegularAndStaticColumns columns,
- Keyspace keyspace, int versions,
+ Keyspace keyspace,
+ int versions,
int nowInSec,
Collection<Index> indexes)
{
@@ -1388,7 +1454,8 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
private CleanupGCTransaction(DecoratedKey key,
RegularAndStaticColumns columns,
- Keyspace keyspace, int nowInSec,
+ Keyspace keyspace,
+ int nowInSec,
Collection<Index> indexes)
{
this.key = key;
diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
index 58056b9..f74a656 100644
--- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
@@ -61,7 +61,6 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.IndexMetadata;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.concurrent.Refs;
import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
@@ -145,7 +144,7 @@ public abstract class CassandraIndex implements Index
Clustering clustering,
CellPath path,
ByteBuffer cellValue);
-
+
public ColumnMetadata getIndexedColumn()
{
return indexedColumn;
diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
index 07327ea..592499e 100644
--- a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.index.sasi;
import java.util.*;
import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import com.googlecode.concurrenttrees.common.Iterables;
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
index 8f3b97d..9563780 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
@@ -22,6 +22,8 @@ import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
+import com.google.common.collect.ImmutableSet;
+
import org.apache.commons.lang3.StringUtils;
import org.junit.Test;
@@ -1053,6 +1055,84 @@ public class SecondaryIndexTest extends CQLTester
}
}
+ @Test // A Bad init could leave an index only accepting reads
+ public void testReadOnlyIndex() throws Throwable
+ {
+ // On successful initialization both reads and writes go through
+ String tableName = createTable("CREATE TABLE %s (pk int, ck int, value int, PRIMARY KEY (pk, ck))");
+ String indexName = createIndex("CREATE CUSTOM INDEX ON %s (value) USING '" + ReadOnlyOnFailureIndex.class.getName() + "'");
+ assertTrue(waitForIndex(keyspace(), tableName, indexName));
+ execute("SELECT value FROM %s WHERE value = 1");
+ execute("INSERT INTO %s (pk, ck, value) VALUES (?, ?, ?)", 1, 1, 1);
+ ReadOnlyOnFailureIndex index = (ReadOnlyOnFailureIndex) getCurrentColumnFamilyStore().indexManager.getIndexByName(indexName);
+ assertEquals(1, index.rowsInserted.size());
+
+ // Upon rebuild, both reads and writes still go through
+ getCurrentColumnFamilyStore().indexManager.rebuildIndexesBlocking(ImmutableSet.of(indexName));
+ assertEquals(1, index.rowsInserted.size());
+ execute("SELECT value FROM %s WHERE value = 1");
+ execute("INSERT INTO %s (pk, ck, value) VALUES (?, ?, ?)", 2, 1, 1);
+ assertEquals(2, index.rowsInserted.size());
+ dropIndex(format("DROP INDEX %s.%s", KEYSPACE, indexName));
+
+ // On bad initial build writes are not forwarded to the index
+ ReadOnlyOnFailureIndex.failInit = true;
+ indexName = createIndex("CREATE CUSTOM INDEX ON %s (value) USING '" + ReadOnlyOnFailureIndex.class.getName() + "'");
+ index = (ReadOnlyOnFailureIndex) getCurrentColumnFamilyStore().indexManager.getIndexByName(indexName);
+ assertTrue(waitForIndexBuilds(keyspace(), indexName));
+ assertInvalidThrow(IndexNotAvailableException.class, "SELECT value FROM %s WHERE value = 1");
+ execute("INSERT INTO %s (pk, ck, value) VALUES (?, ?, ?)", 1, 1, 1);
+ assertEquals(0, index.rowsInserted.size());
+
+ // Upon recovery, we can index data again
+ index.reset();
+ getCurrentColumnFamilyStore().indexManager.rebuildIndexesBlocking(ImmutableSet.of(indexName));
+ assertEquals(2, index.rowsInserted.size());
+ execute("SELECT value FROM %s WHERE value = 1");
+ execute("INSERT INTO %s (pk, ck, value) VALUES (?, ?, ?)", 2, 1, 1);
+ assertEquals(3, index.rowsInserted.size());
+ dropIndex(format("DROP INDEX %s.%s", KEYSPACE, indexName));
+ }
+
+ @Test // A Bad init could leave an index only accepting writes
+ public void testWriteOnlyIndex() throws Throwable
+ {
+ // On successful initialization both reads and writes go through
+ String tableName = createTable("CREATE TABLE %s (pk int, ck int, value int, PRIMARY KEY (pk, ck))");
+ String indexName = createIndex("CREATE CUSTOM INDEX ON %s (value) USING '" + WriteOnlyOnFailureIndex.class.getName() + "'");
+ assertTrue(waitForIndex(keyspace(), tableName, indexName));
+ execute("SELECT value FROM %s WHERE value = 1");
+ execute("INSERT INTO %s (pk, ck, value) VALUES (?, ?, ?)", 1, 1, 1);
+ WriteOnlyOnFailureIndex index = (WriteOnlyOnFailureIndex) getCurrentColumnFamilyStore().indexManager.getIndexByName(indexName);
+ assertEquals(1, index.rowsInserted.size());
+
+ // Upon rebuild, both reads and writes still go through
+ getCurrentColumnFamilyStore().indexManager.rebuildIndexesBlocking(ImmutableSet.of(indexName));
+ assertEquals(1, index.rowsInserted.size());
+ execute("SELECT value FROM %s WHERE value = 1");
+ execute("INSERT INTO %s (pk, ck, value) VALUES (?, ?, ?)", 2, 1, 1);
+ assertEquals(2, index.rowsInserted.size());
+ dropIndex(format("DROP INDEX %s.%s", KEYSPACE, indexName));
+
+ // On bad initial build writes are forwarded to the index
+ WriteOnlyOnFailureIndex.failInit = true;
+ indexName = createIndex("CREATE CUSTOM INDEX ON %s (value) USING '" + WriteOnlyOnFailureIndex.class.getName() + "'");
+ index = (WriteOnlyOnFailureIndex) getCurrentColumnFamilyStore().indexManager.getIndexByName(indexName);
+ assertTrue(waitForIndexBuilds(keyspace(), indexName));
+ execute("INSERT INTO %s (pk, ck, value) VALUES (?, ?, ?)", 1, 1, 1);
+ assertEquals(1, index.rowsInserted.size());
+ assertInvalidThrow(IndexNotAvailableException.class, "SELECT value FROM %s WHERE value = 1");
+
+ // Upon recovery, we can query data again
+ index.reset();
+ getCurrentColumnFamilyStore().indexManager.rebuildIndexesBlocking(ImmutableSet.of(indexName));
+ assertEquals(2, index.rowsInserted.size());
+ execute("SELECT value FROM %s WHERE value = 1");
+ execute("INSERT INTO %s (pk, ck, value) VALUES (?, ?, ?)", 2, 1, 1);
+ assertEquals(3, index.rowsInserted.size());
+ dropIndex(format("DROP INDEX %s.%s", KEYSPACE, indexName));
+ }
+
@Test
public void droppingIndexInvalidatesPreparedStatements() throws Throwable
{
@@ -1375,6 +1455,7 @@ public class SecondaryIndexTest extends CQLTester
execute("INSERT INTO %s (k, v) VALUES (?, ?)", 0, udt1);
String indexName = createIndex("CREATE INDEX ON %s (v)");
+
execute("INSERT INTO %s (k, v) VALUES (?, ?)", 1, udt2);
execute("INSERT INTO %s (k, v) VALUES (?, ?)", 1, udt1);
assertTrue(waitForIndex(keyspace(), tableName, indexName));
@@ -1560,4 +1641,62 @@ public class SecondaryIndexTest extends CQLTester
return super.getInvalidateTask();
}
}
+
+ /**
+ * {@code StubIndex} that only supports some load. Could be intentional or a result of a bad init.
+ */
+ public static class LoadTypeConstrainedIndex extends StubIndex
+ {
+ static volatile boolean failInit = false;
+ final LoadType supportedLoadOnFailure;
+
+ LoadTypeConstrainedIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef, LoadType supportedLoadOnFailure)
+ {
+ super(baseCfs, indexDef);
+ this.supportedLoadOnFailure = supportedLoadOnFailure;
+ }
+
+ @Override
+ public LoadType getSupportedLoadTypeOnFailure(boolean isInitialBuild)
+ {
+ return supportedLoadOnFailure;
+ }
+
+ @Override
+ public void reset()
+ {
+ super.reset();
+ failInit = false;
+ }
+
+ @Override
+ public Callable<?> getInitializationTask()
+ {
+ if (failInit)
+ return () -> {throw new IllegalStateException("Index is configured to fail.");};
+
+ return null;
+ }
+
+ public boolean shouldBuildBlocking()
+ {
+ return true;
+ }
+ }
+
+ public static class ReadOnlyOnFailureIndex extends LoadTypeConstrainedIndex
+ {
+ public ReadOnlyOnFailureIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
+ {
+ super(baseCfs, indexDef, LoadType.READ);
+ }
+ }
+
+ public static class WriteOnlyOnFailureIndex extends LoadTypeConstrainedIndex
+ {
+ public WriteOnlyOnFailureIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
+ {
+ super(baseCfs, indexDef, LoadType.WRITE);
+ }
+ }
}
diff --git a/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java b/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
index c09b16c..bfe1c6d 100644
--- a/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
+++ b/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
@@ -484,7 +484,7 @@ public class SecondaryIndexTest
.build();
MigrationManager.announceTableUpdate(updated, true);
- // fait for the index to be built
+ // wait for the index to be built
Index index = cfs.indexManager.getIndex(indexDef);
do
{
diff --git a/test/unit/org/apache/cassandra/index/SecondaryIndexManagerTest.java b/test/unit/org/apache/cassandra/index/SecondaryIndexManagerTest.java
index 2207f48..d8fb99f 100644
--- a/test/unit/org/apache/cassandra/index/SecondaryIndexManagerTest.java
+++ b/test/unit/org/apache/cassandra/index/SecondaryIndexManagerTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.collect.Sets;
import org.junit.After;
import org.junit.Test;
@@ -67,14 +68,14 @@ public class SecondaryIndexManagerTest extends CQLTester
}
@Test
- public void rebuildingIndexMarksTheIndexAsBuilt() throws Throwable
+ public void rebuilOrRecoveringIndexMarksTheIndexAsBuilt() throws Throwable
{
String tableName = createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))");
String indexName = createIndex("CREATE INDEX ON %s(c)");
waitForIndex(KEYSPACE, tableName, indexName);
assertMarkedAsBuilt(indexName);
-
+
assertTrue(tryRebuild(indexName, false));
assertMarkedAsBuilt(indexName);
}
@@ -119,56 +120,70 @@ public class SecondaryIndexManagerTest extends CQLTester
}
@Test
- public void cannotRebuildWhileInitializationIsInProgress() throws Throwable
+ public void cannotRebuildRecoverWhileInitializationIsInProgress() throws Throwable
{
// create an index which blocks on creation
TestingIndex.blockCreate();
String tableName = createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))");
- String indexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName()));
-
- // try to rebuild the index before the index creation task has finished
- assertFalse(tryRebuild(indexName, false));
- assertNotMarkedAsBuilt(indexName);
+ String defaultIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName()));
+ String readOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(b) USING '%s'", ReadOnlyOnFailureIndex.class.getName()));
+ String writeOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(b) USING '%s'", WriteOnlyOnFailureIndex.class.getName()));
+
+ // try to rebuild/recover the index before the index creation task has finished
+ assertFalse(tryRebuild(defaultIndexName, false));
+ assertFalse(tryRebuild(readOnlyIndexName, false));
+ assertFalse(tryRebuild(writeOnlyIndexName, false));
+ assertNotMarkedAsBuilt(defaultIndexName);
+ assertNotMarkedAsBuilt(readOnlyIndexName);
+ assertNotMarkedAsBuilt(writeOnlyIndexName);
// check that the index is marked as built when the creation finishes
TestingIndex.unblockCreate();
- waitForIndex(KEYSPACE, tableName, indexName);
- assertMarkedAsBuilt(indexName);
-
- // now verify you can rebuild
- assertTrue(tryRebuild(indexName, false));
- assertMarkedAsBuilt(indexName);
+ waitForIndex(KEYSPACE, tableName, defaultIndexName);
+ waitForIndex(KEYSPACE, tableName, readOnlyIndexName);
+ waitForIndex(KEYSPACE, tableName, writeOnlyIndexName);
+ assertMarkedAsBuilt(defaultIndexName);
+ assertMarkedAsBuilt(readOnlyIndexName);
+ assertMarkedAsBuilt(writeOnlyIndexName);
+
+ // now verify you can rebuild/recover
+ assertTrue(tryRebuild(defaultIndexName, false));
+ assertTrue(tryRebuild(readOnlyIndexName, false));
+ assertTrue(tryRebuild(readOnlyIndexName, false));
+ assertMarkedAsBuilt(defaultIndexName);
+ assertMarkedAsBuilt(readOnlyIndexName);
+ assertMarkedAsBuilt(writeOnlyIndexName);
}
@Test
- public void cannotRebuildWhileAnotherRebuildIsInProgress() throws Throwable
+ public void cannotRebuildOrRecoverWhileAnotherRebuildIsInProgress() throws Throwable
{
- final String tableName = createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))");
- final String indexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName()));
+ String tableName = createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))");
+ String defaultIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName()));
+ String readOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(b) USING '%s'", ReadOnlyOnFailureIndex.class.getName()));
+ String writeOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(b) USING '%s'", WriteOnlyOnFailureIndex.class.getName()));
final AtomicBoolean error = new AtomicBoolean();
// wait for index initialization and verify it's built:
- waitForIndex(KEYSPACE, tableName, indexName);
- assertMarkedAsBuilt(indexName);
+ waitForIndex(KEYSPACE, tableName, defaultIndexName);
+ waitForIndex(KEYSPACE, tableName, readOnlyIndexName);
+ waitForIndex(KEYSPACE, tableName, writeOnlyIndexName);
+ assertMarkedAsBuilt(defaultIndexName);
+ assertMarkedAsBuilt(readOnlyIndexName);
+ assertMarkedAsBuilt(writeOnlyIndexName);
// rebuild the index in another thread, but make it block:
TestingIndex.blockBuild();
- Thread asyncBuild = new Thread()
- {
-
- @Override
- public void run()
+ Thread asyncBuild = new Thread(() -> {
+ try
{
- try
- {
- tryRebuild(indexName, false);
- }
- catch (Throwable ex)
- {
- error.set(true);
- }
+ tryRebuild(defaultIndexName, false);
}
- };
+ catch (Throwable ex)
+ {
+ error.set(true);
+ }
+ });
asyncBuild.start();
// wait for the rebuild to block, so that we can proceed unblocking all further operations:
@@ -178,17 +193,23 @@ public class SecondaryIndexManagerTest extends CQLTester
TestingIndex.shouldBlockBuild = false;
// verify rebuilding the index before the previous index build task has finished fails
- assertFalse(tryRebuild(indexName, false));
- assertNotMarkedAsBuilt(indexName);
+ assertFalse(tryRebuild(defaultIndexName, false));
+ assertNotMarkedAsBuilt(defaultIndexName);
// check that the index is marked as built when the build finishes
TestingIndex.unblockBuild();
asyncBuild.join();
- assertMarkedAsBuilt(indexName);
+ assertMarkedAsBuilt(defaultIndexName);
+ assertMarkedAsBuilt(readOnlyIndexName);
+ assertMarkedAsBuilt(writeOnlyIndexName);
// now verify you can rebuild
- assertTrue(tryRebuild(indexName, false));
- assertMarkedAsBuilt(indexName);
+ assertTrue(tryRebuild(defaultIndexName, false));
+ assertTrue(tryRebuild(readOnlyIndexName, false));
+ assertTrue(tryRebuild(writeOnlyIndexName, false));
+ assertMarkedAsBuilt(defaultIndexName);
+ assertMarkedAsBuilt(readOnlyIndexName);
+ assertMarkedAsBuilt(writeOnlyIndexName);
}
@Test
@@ -204,23 +225,17 @@ public class SecondaryIndexManagerTest extends CQLTester
// add sstables in another thread, but make it block:
TestingIndex.blockBuild();
- Thread asyncBuild = new Thread()
- {
-
- @Override
- public void run()
+ Thread asyncBuild = new Thread(() -> {
+ ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+ try (Refs<SSTableReader> sstables = Refs.ref(cfs.getSSTables(SSTableSet.CANONICAL)))
{
- ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
- try (Refs<SSTableReader> sstables = Refs.ref(cfs.getSSTables(SSTableSet.CANONICAL)))
- {
- cfs.indexManager.handleNotification(new SSTableAddedNotification(sstables, null), cfs.getTracker());
- }
- catch (Throwable ex)
- {
- error.set(true);
- }
+ cfs.indexManager.handleNotification(new SSTableAddedNotification(sstables, null), cfs.getTracker());
+ }
+ catch (Throwable ex)
+ {
+ error.set(true);
}
- };
+ });
asyncBuild.start();
// wait for the build to block, so that we can proceed unblocking all further operations:
@@ -256,22 +271,16 @@ public class SecondaryIndexManagerTest extends CQLTester
// rebuild the index in another thread, but make it block:
TestingIndex.blockBuild();
- Thread asyncBuild = new Thread()
- {
-
- @Override
- public void run()
+ Thread asyncBuild = new Thread(() -> {
+ try
{
- try
- {
- tryRebuild(indexName, false);
- }
- catch (Throwable ex)
- {
- error.set(true);
- }
+ tryRebuild(indexName, false);
+ }
+ catch (Throwable ex)
+ {
+ error.set(true);
}
- };
+ });
asyncBuild.start();
// wait for the rebuild to block, so that we can proceed unblocking all further operations:
@@ -310,22 +319,16 @@ public class SecondaryIndexManagerTest extends CQLTester
// rebuild the index in another thread, but make it block:
TestingIndex.blockBuild();
- Thread asyncBuild = new Thread()
- {
-
- @Override
- public void run()
+ Thread asyncBuild = new Thread(() -> {
+ try
{
- try
- {
- tryRebuild(indexName, false);
- }
- catch (Throwable ex)
- {
- error.set(true);
- }
+ tryRebuild(indexName, false);
}
- };
+ catch (Throwable ex)
+ {
+ error.set(true);
+ }
+ });
asyncBuild.start();
// wait for the rebuild to block, so that we can proceed unblocking all further operations:
@@ -382,37 +385,57 @@ public class SecondaryIndexManagerTest extends CQLTester
}
@Test
- public void initializingIndexNotQueryable() throws Throwable
+ public void initializingIndexNotQueryableButMaybeWritable() throws Throwable
{
TestingIndex.blockCreate();
String tableName = createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))");
- String indexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName()));
+ String defaultIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName()));
+ String readOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", ReadOnlyOnFailureIndex.class.getName()));
+ String writeOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", WriteOnlyOnFailureIndex.class.getName()));
// the index shouldn't be queryable while the initialization hasn't finished
- ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
- Index index = cfs.indexManager.getIndexByName(indexName);
- assertFalse(cfs.indexManager.isIndexQueryable(index));
+ assertFalse(isQueryable(defaultIndexName));
+ assertFalse(isQueryable(readOnlyIndexName));
+ assertFalse(isQueryable(writeOnlyIndexName));
+ assertTrue(isWritable(defaultIndexName));
+ assertTrue(isWritable(readOnlyIndexName));
+ assertTrue(isWritable(writeOnlyIndexName));
// the index should be queryable once the initialization has finished
TestingIndex.unblockCreate();
- waitForIndex(KEYSPACE, tableName, indexName);
- assertTrue(cfs.indexManager.isIndexQueryable(index));
+ waitForIndex(KEYSPACE, tableName, defaultIndexName);
+ waitForIndex(KEYSPACE, tableName, readOnlyIndexName);
+ waitForIndex(KEYSPACE, tableName, writeOnlyIndexName);
+ assertTrue(isQueryable(defaultIndexName));
+ assertTrue(isQueryable(readOnlyIndexName));
+ assertTrue(isQueryable(writeOnlyIndexName));
+ assertTrue(isWritable(defaultIndexName));
+ assertTrue(isWritable(readOnlyIndexName));
+ assertTrue(isWritable(writeOnlyIndexName));
}
@Test
- public void initializingIndexNotQueryableAfterPartialRebuild() throws Throwable
+ public void initializingIndexNotQueryableButMaybeNotWritableAfterPartialRebuild() throws Throwable
{
TestingIndex.blockCreate();
String tableName = createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))");
- String indexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName()));
+ String defaultIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName()));
+ String readOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", ReadOnlyOnFailureIndex.class.getName()));
+ String writeOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", WriteOnlyOnFailureIndex.class.getName()));
- // the index shouldn't be queryable while the initialization hasn't finished
- ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
- Index index = cfs.indexManager.getIndexByName(indexName);
- assertFalse(cfs.indexManager.isIndexQueryable(index));
+ // the index should never be queryable while the initialization hasn't finished
+ assertFalse(isQueryable(defaultIndexName));
+ assertFalse(isQueryable(readOnlyIndexName));
+ assertFalse(isQueryable(writeOnlyIndexName));
+
+ // the index should always we writable while the initialization hasn't finished
+ assertTrue(isWritable(defaultIndexName));
+ assertTrue(isWritable(readOnlyIndexName));
+ assertTrue(isWritable(writeOnlyIndexName));
- // a failing partial build doesn't set the index as queryable
+ // a failing partial build doesn't set the index as queryable, but might set it as not writable
TestingIndex.shouldFailBuild = true;
+ ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
try
{
cfs.indexManager.handleNotification(new SSTableAddedNotification(cfs.getLiveSSTables(), null), this);
@@ -422,59 +445,99 @@ public class SecondaryIndexManagerTest extends CQLTester
{
assertTrue(ex.getMessage().contains("configured to fail"));
}
- assertFalse(cfs.indexManager.isIndexQueryable(index));
-
- // a successful partial build doesn't set the index as queryable
+ assertFalse(isQueryable(defaultIndexName));
+ assertFalse(isQueryable(readOnlyIndexName));
+ assertFalse(isQueryable(writeOnlyIndexName));
+ assertTrue(isWritable(defaultIndexName));
+ assertFalse(isWritable(readOnlyIndexName));
+ assertTrue(isWritable(writeOnlyIndexName));
+
+ // a successful partial build doesn't set the index as queryable nor writable
TestingIndex.shouldFailBuild = false;
cfs.indexManager.handleNotification(new SSTableAddedNotification(cfs.getLiveSSTables(), null), this);
- assertFalse(cfs.indexManager.isIndexQueryable(index));
+ assertFalse(isQueryable(defaultIndexName));
+ assertFalse(isQueryable(readOnlyIndexName));
+ assertFalse(isQueryable(writeOnlyIndexName));
+ assertTrue(isWritable(defaultIndexName));
+ assertFalse(isWritable(readOnlyIndexName));
+ assertTrue(isWritable(writeOnlyIndexName));
// the index should be queryable once the initialization has finished
TestingIndex.unblockCreate();
- waitForIndex(KEYSPACE, tableName, indexName);
- assertTrue(cfs.indexManager.isIndexQueryable(index));
+ waitForIndex(KEYSPACE, tableName, defaultIndexName);
+ assertTrue(isQueryable(defaultIndexName));
+ assertTrue(isQueryable(readOnlyIndexName));
+ assertTrue(isQueryable(writeOnlyIndexName));
+ assertTrue(isWritable(defaultIndexName));
+ assertTrue(isWritable(readOnlyIndexName));
+ assertTrue(isWritable(writeOnlyIndexName));
}
@Test
- public void indexWithFailedInitializationIsQueryableAfterFullRebuild() throws Throwable
+ public void indexWithFailedInitializationIsQueryableAndWritableAfterFullRebuild() throws Throwable
{
- createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))");
-
TestingIndex.shouldFailCreate = true;
- String indexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName()));
-
- tryRebuild(indexName, true);
+ createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))");
+ String defaultIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName()));
+ String readOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", ReadOnlyOnFailureIndex.class.getName()));
+ String writeOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", WriteOnlyOnFailureIndex.class.getName()));
+ assertTrue(waitForIndexBuilds(KEYSPACE, defaultIndexName));
+ assertTrue(waitForIndexBuilds(KEYSPACE, readOnlyIndexName));
+ assertTrue(waitForIndexBuilds(KEYSPACE, writeOnlyIndexName));
+
+ tryRebuild(defaultIndexName, true);
+ tryRebuild(readOnlyIndexName, true);
+ tryRebuild(writeOnlyIndexName, true);
TestingIndex.shouldFailCreate = false;
- // a successfull full rebuild should set the index as queryable
+ // a successfull full rebuild should set the index as queryable/writable
ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
- Index index = cfs.indexManager.getIndexByName(indexName);
- cfs.indexManager.rebuildIndexesBlocking(Collections.singleton(indexName));
- assertTrue(cfs.indexManager.isIndexQueryable(index));
+ cfs.indexManager.rebuildIndexesBlocking(Sets.newHashSet(defaultIndexName, readOnlyIndexName, writeOnlyIndexName));
+ assertTrue(isQueryable(defaultIndexName));
+ assertTrue(isQueryable(readOnlyIndexName));
+ assertTrue(isQueryable(writeOnlyIndexName));
+ assertTrue(isWritable(defaultIndexName));
+ assertTrue(isWritable(readOnlyIndexName));
+ assertTrue(isWritable(writeOnlyIndexName));
}
@Test
- public void indexWithFailedInitializationIsNotQueryableAfterPartialRebuild() throws Throwable
+ public void indexWithFailedInitializationDoesNotChangeQueryabilityNorWritabilityAfterPartialRebuild() throws Throwable
{
TestingIndex.shouldFailCreate = true;
createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))");
- String indexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName()));
- assertTrue(waitForIndexBuilds(KEYSPACE, indexName));
+ String defaultIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", TestingIndex.class.getName()));
+ String readOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", ReadOnlyOnFailureIndex.class.getName()));
+ String writeOnlyIndexName = createIndex(String.format("CREATE CUSTOM INDEX ON %%s(c) USING '%s'", WriteOnlyOnFailureIndex.class.getName()));
+ assertTrue(waitForIndexBuilds(KEYSPACE, defaultIndexName));
+ assertTrue(waitForIndexBuilds(KEYSPACE, readOnlyIndexName));
+ assertTrue(waitForIndexBuilds(KEYSPACE, writeOnlyIndexName));
TestingIndex.shouldFailCreate = false;
- // the index shouldn't be queryable after the failed initialization
- ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
- Index index = cfs.indexManager.getIndexByName(indexName);
- assertFalse(cfs.indexManager.isIndexQueryable(index));
+ // the index should never be queryable, but it could be writable after the failed initialization
+ assertFalse(isQueryable(defaultIndexName));
+ assertFalse(isQueryable(readOnlyIndexName));
+ assertFalse(isQueryable(writeOnlyIndexName));
+ assertTrue(isWritable(defaultIndexName));
+ assertFalse(isWritable(readOnlyIndexName));
+ assertTrue(isWritable(writeOnlyIndexName));
- // a successful partial build doesn't set the index as queryable
+ // a successful partial build doesn't set the index as queryable nor writable
+ ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
cfs.indexManager.handleNotification(new SSTableAddedNotification(cfs.getLiveSSTables(), null), this);
- assertTrue(waitForIndexBuilds(KEYSPACE, indexName));
- assertFalse(cfs.indexManager.isIndexQueryable(index));
+ assertTrue(waitForIndexBuilds(KEYSPACE, defaultIndexName));
+ assertTrue(waitForIndexBuilds(KEYSPACE, readOnlyIndexName));
+ assertTrue(waitForIndexBuilds(KEYSPACE, writeOnlyIndexName));
+ assertFalse(isQueryable(defaultIndexName));
+ assertFalse(isQueryable(readOnlyIndexName));
+ assertFalse(isQueryable(writeOnlyIndexName));
+ assertTrue(isWritable(defaultIndexName));
+ assertFalse(isWritable(readOnlyIndexName));
+ assertTrue(isWritable(writeOnlyIndexName));
}
@Test
- public void handleJVMStablityOnFailedCreate() throws Throwable
+ public void handleJVMStablityOnFailedCreate()
{
handleJVMStablityOnFailedCreate(new SocketException("Should not fail"), false);
handleJVMStablityOnFailedCreate(new FileNotFoundException("Should not fail"), false);
@@ -483,7 +546,7 @@ public class SecondaryIndexManagerTest extends CQLTester
handleJVMStablityOnFailedCreate(new RuntimeException("Should not fail"), false);
}
- private void handleJVMStablityOnFailedCreate(Throwable throwable, boolean shouldKillJVM) throws Throwable
+ private void handleJVMStablityOnFailedCreate(Throwable throwable, boolean shouldKillJVM)
{
KillerForTests killerForTests = new KillerForTests();
JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
@@ -584,71 +647,100 @@ public class SecondaryIndexManagerTest extends CQLTester
return done;
}
+ private boolean isQueryable(String indexName)
+ {
+ SecondaryIndexManager manager = getCurrentColumnFamilyStore().indexManager;
+ Index index = manager.getIndexByName(indexName);
+ return manager.isIndexQueryable(index);
+ }
+
+ private boolean isWritable(String indexName)
+ {
+ SecondaryIndexManager manager = getCurrentColumnFamilyStore().indexManager;
+ Index index = manager.getIndexByName(indexName);
+ return manager.isIndexWritable(index);
+ }
+
public static class TestingIndex extends StubIndex
{
private static volatile CountDownLatch createLatch;
private static volatile CountDownLatch buildLatch;
private static volatile CountDownLatch createWaitLatch;
private static volatile CountDownLatch buildWaitLatch;
- public static volatile boolean shouldBlockCreate = false;
- public static volatile boolean shouldBlockBuild = false;
- public static volatile boolean shouldFailCreate = false;
- public static volatile boolean shouldFailBuild = false;
- public static volatile Throwable failedCreateThrowable;
- public static volatile Throwable failedBuildTrowable;
-
+ static volatile boolean shouldBlockCreate = false;
+ static volatile boolean shouldBlockBuild = false;
+ static volatile boolean shouldFailCreate = false;
+ static volatile boolean shouldFailBuild = false;
+ static volatile Throwable failedCreateThrowable;
+ static volatile Throwable failedBuildTrowable;
+
+ @SuppressWarnings("WeakerAccess")
public TestingIndex(ColumnFamilyStore baseCfs, IndexMetadata metadata)
{
super(baseCfs, metadata);
}
- public static void blockCreate()
+ static void blockCreate()
{
shouldBlockCreate = true;
createLatch = new CountDownLatch(1);
createWaitLatch = new CountDownLatch(1);
}
- public static void blockBuild()
+ static void blockBuild()
{
shouldBlockBuild = true;
buildLatch = new CountDownLatch(1);
buildWaitLatch = new CountDownLatch(1);
}
- public static void unblockCreate()
+ static void unblockCreate()
{
createLatch.countDown();
}
- public static void unblockBuild()
+ static void unblockBuild()
{
buildLatch.countDown();
}
- public static void waitBlockedOnCreate() throws InterruptedException
+ static void waitBlockedOnCreate() throws InterruptedException
{
createWaitLatch.await();
}
- public static void waitBlockedOnBuild() throws InterruptedException
+ static void waitBlockedOnBuild() throws InterruptedException
{
buildWaitLatch.await();
}
- public static void clear()
+ static void clear()
{
+ reset(createLatch);
+ reset(createWaitLatch);
+ reset(buildLatch);
+ reset(buildWaitLatch);
createLatch = null;
createWaitLatch = null;
buildLatch = null;
buildWaitLatch = null;
shouldBlockCreate = false;
shouldBlockBuild = false;
+ shouldFailCreate = false;
shouldFailBuild = false;
failedCreateThrowable = null;
failedBuildTrowable = null;
}
+ private static void reset(CountDownLatch latch)
+ {
+ if (latch == null)
+ return;
+
+ while (0L < latch.getCount())
+ latch.countDown();
+ }
+
public Callable<?> getInitializationTask()
{
return () ->
@@ -719,4 +811,38 @@ public class SecondaryIndexManagerTest extends CQLTester
return true;
}
}
+
+ /**
+ * <code>TestingIndex</code> that only supports reads when initial build or full rebuild has failed.
+ */
+ public static class ReadOnlyOnFailureIndex extends TestingIndex
+ {
+ public ReadOnlyOnFailureIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
+ {
+ super(baseCfs, indexDef);
+ }
+
+ @Override
+ public LoadType getSupportedLoadTypeOnFailure(boolean isInitialBuild)
+ {
+ return LoadType.READ;
+ }
+ }
+
+ /**
+ * <code>TestingIndex</code> that only supports writes when initial build or full rebuild has failed.
+ */
+ public static class WriteOnlyOnFailureIndex extends TestingIndex
+ {
+ public WriteOnlyOnFailureIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
+ {
+ super(baseCfs, indexDef);
+ }
+
+ @Override
+ public LoadType getSupportedLoadTypeOnFailure(boolean isInitialBuild)
+ {
+ return LoadType.WRITE;
+ }
+ }
}
diff --git a/test/unit/org/apache/cassandra/index/StubIndex.java b/test/unit/org/apache/cassandra/index/StubIndex.java
index a351cce..02ccbff 100644
--- a/test/unit/org/apache/cassandra/index/StubIndex.java
+++ b/test/unit/org/apache/cassandra/index/StubIndex.java
@@ -36,7 +36,6 @@ import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.index.transactions.IndexTransaction;
import org.apache.cassandra.schema.IndexMetadata;
import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.concurrent.OpOrder;
/**
* Basic custom index implementation for testing.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org