You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2017/06/23 10:02:11 UTC
[2/2] cassandra git commit: Improve secondary index (re)build failure
and concurrency handling
Improve secondary index (re)build failure and concurrency handling
patch by Andres de la Peña; reviewed by Paulo Motta and Sergio Bossa for CASSANDRA-10130
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/679c3171
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/679c3171
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/679c3171
Branch: refs/heads/trunk
Commit: 679c31718b709f5619bba80eeb6f388484b94c3c
Parents: a1c6a62
Author: Andrés de la Peña <a....@gmail.com>
Authored: Fri Jun 23 09:30:21 2017 +0100
Committer: Andrés de la Peña <a....@gmail.com>
Committed: Fri Jun 23 09:30:21 2017 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 17 +-
.../db/compaction/CompactionManager.java | 2 +-
.../apache/cassandra/db/lifecycle/Tracker.java | 8 +-
.../cassandra/index/SecondaryIndexManager.java | 597 +++++++++++----
.../index/internal/CassandraIndex.java | 2 -
.../notifications/SSTableAddedNotification.java | 35 +-
.../cassandra/streaming/StreamReceiveTask.java | 4 +-
...pactionStrategyManagerPendingRepairTest.java | 10 +-
.../LeveledCompactionStrategyTest.java | 2 +-
.../cassandra/db/lifecycle/TrackerTest.java | 6 +-
.../apache/cassandra/index/CustomIndexTest.java | 3 +-
.../index/SecondaryIndexManagerTest.java | 721 +++++++++++++++++++
.../index/internal/CassandraIndexTest.java | 1 +
.../index/internal/CustomCassandraIndex.java | 2 -
.../cassandra/index/sasi/SASIIndexTest.java | 12 +-
.../cassandra/stress/CompactionStress.java | 4 +
17 files changed, 1236 insertions(+), 191 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/679c3171/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 82a0bda..ab06e2b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0
+ * Improve secondary index (re)build failure and concurrency handling (CASSANDRA-10130)
* Improve calculation of available disk space for compaction (CASSANDRA-13068)
* Change the accessibility of RowCacheSerializer for third party row cache plugins (CASSANDRA-13579)
* Allow sub-range repairs for a preview of repaired data (CASSANDRA-13570)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/679c3171/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index dceb41d..893d525 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -410,7 +410,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
minCompactionThreshold = new DefaultValue<>(metadata.get().params.compaction.minCompactionThreshold());
maxCompactionThreshold = new DefaultValue<>(metadata.get().params.compaction.maxCompactionThreshold());
crcCheckChance = new DefaultValue<>(metadata.get().params.crcCheckChance);
- indexManager = new SecondaryIndexManager(this);
viewManager = keyspace.viewManager.forTable(metadata.id);
metric = new TableMetrics(this);
fileIndexGenerator.set(generation);
@@ -455,6 +454,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
// create the private ColumnFamilyStores for the secondary column indexes
+ indexManager = new SecondaryIndexManager(this);
for (IndexMetadata info : metadata.get().indexes)
indexManager.addIndex(info);
@@ -567,7 +567,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
data.dropSSTables();
LifecycleTransaction.waitForDeletions();
- indexManager.invalidateAllIndexesBlocking();
+ indexManager.dropAllIndexes();
invalidateCaches();
}
@@ -800,7 +800,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
try (Refs<SSTableReader> refs = Refs.ref(newSSTables))
{
data.addSSTables(newSSTables);
- indexManager.buildAllIndexesBlocking(newSSTables);
}
logger.info("Done loading load new SSTables for {}/{}", keyspace.getName(), name);
@@ -815,14 +814,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
ColumnFamilyStore cfs = Keyspace.open(ksName).getColumnFamilyStore(cfName);
- Set<String> indexes = new HashSet<String>(Arrays.asList(idxNames));
-
- Iterable<SSTableReader> sstables = cfs.getSSTables(SSTableSet.CANONICAL);
- try (Refs<SSTableReader> refs = Refs.ref(sstables))
- {
- logger.info("User Requested secondary index re-build for {}/{} indexes: {}", ksName, cfName, Joiner.on(',').join(idxNames));
- cfs.indexManager.rebuildIndexesBlocking(refs, indexes);
- }
+ logger.info("User Requested secondary index re-build for {}/{} indexes: {}", ksName, cfName, Joiner.on(',').join(idxNames));
+ cfs.indexManager.rebuildIndexesBlocking(Sets.newHashSet(Arrays.asList(idxNames)));
}
public AbstractCompactionStrategy createCompactionStrategyInstance(CompactionParams compactionParams)
@@ -1451,7 +1444,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public void addSSTable(SSTableReader sstable)
{
assert sstable.getColumnFamilyName().equals(name);
- addSSTables(Arrays.asList(sstable));
+ addSSTables(Collections.singletonList(sstable));
}
public void addSSTables(Collection<SSTableReader> sstables)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/679c3171/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index bc8b305..d7e00da 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1619,7 +1619,7 @@ public class CompactionManager implements CompactionManagerMBean
/**
* Is not scheduled, because it is performing disjoint work from sstable compaction.
*/
- public Future<?> submitIndexBuild(final SecondaryIndexBuilder builder)
+ public ListenableFuture<?> submitIndexBuild(final SecondaryIndexBuilder builder)
{
Runnable runnable = new Runnable()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/679c3171/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index e2fcb06..d46ee60 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@ -359,7 +359,7 @@ public class Tracker
notifyDiscarded(memtable);
// TODO: if we're invalidated, should we notifyadded AND removed, or just skip both?
- fail = notifyAdded(sstables, fail);
+ fail = notifyAdded(sstables, memtable, fail);
if (!isDummy() && !cfstore.isValid())
dropSSTables();
@@ -417,9 +417,9 @@ public class Tracker
return accumulate;
}
- Throwable notifyAdded(Iterable<SSTableReader> added, Throwable accumulate)
+ Throwable notifyAdded(Iterable<SSTableReader> added, Memtable memtable, Throwable accumulate)
{
- INotification notification = new SSTableAddedNotification(added);
+ INotification notification = new SSTableAddedNotification(added, memtable);
for (INotificationConsumer subscriber : subscribers)
{
try
@@ -436,7 +436,7 @@ public class Tracker
public void notifyAdded(Iterable<SSTableReader> added)
{
- maybeFail(notifyAdded(added, null));
+ maybeFail(notifyAdded(added, null, null));
}
public void notifySSTableRepairedStatusChanged(Collection<SSTableReader> repairStatusesChanged)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/679c3171/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index f7b7d13..c2ed134 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -20,20 +20,29 @@ package org.apache.cassandra.index;
import java.lang.reflect.Constructor;
import java.util.*;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.primitives.Longs;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+
import org.apache.commons.lang3.StringUtils;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,6 +62,9 @@ import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.index.internal.CassandraIndex;
import org.apache.cassandra.index.transactions.*;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.notifications.INotification;
+import org.apache.cassandra.notifications.INotificationConsumer;
+import org.apache.cassandra.notifications.SSTableAddedNotification;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.IndexMetadata;
import org.apache.cassandra.schema.Indexes;
@@ -60,6 +72,7 @@ import org.apache.cassandra.service.pager.SinglePartitionPager;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.concurrent.Refs;
@@ -67,7 +80,7 @@ import org.apache.cassandra.utils.concurrent.Refs;
* Handles the core maintenance functionality associated with indexes: adding/removing them to or from
* a table, (re)building during bootstrap or other streaming operations, flushing, reloading metadata
* and so on.
- *
+ * <br><br>
* The Index interface defines a number of methods which return {@code Callable<?>}. These are primarily the
* management tasks for an index implementation. Most of them are currently executed in a blocking
* fashion via submission to SIM's blockingExecutor. This provides the desired behaviour in pretty
@@ -76,51 +89,76 @@ import org.apache.cassandra.utils.concurrent.Refs;
* then be defined with as void and called directly from SIM (rather than being run via the executor service).
* Separating the task defintion from execution gives us greater flexibility though, so that in future, for example,
* if the flush process allows it we leave open the possibility of executing more of these tasks asynchronously.
- *
+ * <br><br>
* The primary exception to the above is the Callable returned from Index#addIndexedColumn. This may
* involve a significant effort, building a new index over any existing data. We perform this task asynchronously;
* as it is called as part of a schema update, which we do not want to block for a long period. Building non-custom
* indexes is performed on the CompactionManager.
- *
+ * <br><br>
* This class also provides instances of processors which listen to updates to the base table and forward to
* registered Indexes the info required to keep those indexes up to date.
* There are two variants of these processors, each with a factory method provided by SIM:
- * IndexTransaction: deals with updates generated on the regular write path.
- * CleanupTransaction: used when partitions are modified during compaction or cleanup operations.
+ * IndexTransaction: deals with updates generated on the regular write path.
+ * CleanupTransaction: used when partitions are modified during compaction or cleanup operations.
* Further details on their usage and lifecycles can be found in the interface definitions below.
- *
- * Finally, the bestIndexFor method is used at query time to identify the most selective index of those able
+ * <br><br>
+ * The bestIndexFor method is used at query time to identify the most selective index of those able
* to satisfy any search predicates defined by a ReadCommand's RowFilter. It returns a thin IndexAccessor object
* which enables the ReadCommand to access the appropriate functions of the Index at various stages in its lifecycle.
* e.g. the getEstimatedResultRows is required when StorageProxy calculates the initial concurrency factor for
* distributing requests to replicas, whereas a Searcher instance is needed when the ReadCommand is executed locally on
* a target replica.
+ * <br><br>
+ * Finally, this class provides a clear and safe lifecycle to manage index builds, either full rebuilds via
+ * {@link this#rebuildIndexesBlocking(Set)} or builds of new sstables
+ * added via {@link org.apache.cassandra.notifications.SSTableAddedNotification}s, guaranteeing
+ * the following:
+ * <ul>
+ * <li>The initialization task and any subsequent successful (re)build mark the index as built.</li>
+ * <li>If any (re)build operation fails, the index is not marked as built, and only another full rebuild can mark the
+ * index as built.</li>
+ * <li>Full rebuilds cannot be run concurrently with other full or sstable (re)builds.</li>
+ * <li>SSTable builds can always be run concurrently with any other builds.</li>
+ * </ul>
*/
-public class SecondaryIndexManager implements IndexRegistry
+public class SecondaryIndexManager implements IndexRegistry, INotificationConsumer
{
private static final Logger logger = LoggerFactory.getLogger(SecondaryIndexManager.class);
// default page size (in rows) when rebuilding the index for a whole partition
public static final int DEFAULT_PAGE_SIZE = 10000;
- private Map<String, Index> indexes = Maps.newConcurrentMap();
+ /**
+ * All registered indexes.
+ */
+ private final Map<String, Index> indexes = Maps.newConcurrentMap();
+
+ /**
+ * The indexes that had a build failure.
+ */
+ private final Set<String> needsFullRebuild = Sets.newConcurrentHashSet();
+
+ /**
+ * The indexes that are available for querying.
+ */
+ private final Set<String> queryableIndexes = Sets.newConcurrentHashSet();
/**
- * The indexes that are ready to server requests.
+ * The count of pending index builds for each index.
*/
- private Set<String> builtIndexes = Sets.newConcurrentHashSet();
+ private final Map<String, AtomicInteger> inProgressBuilds = Maps.newConcurrentMap();
// executes tasks returned by Indexer#addIndexColumn which may require index(es) to be (re)built
- private static final ExecutorService asyncExecutor =
- new JMXEnabledThreadPoolExecutor(1,
- StageManager.KEEPALIVE,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(),
- new NamedThreadFactory("SecondaryIndexManagement"),
- "internal");
+ private static final ListeningExecutorService asyncExecutor = MoreExecutors.listeningDecorator(
+ new JMXEnabledThreadPoolExecutor(1,
+ StageManager.KEEPALIVE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ new NamedThreadFactory("SecondaryIndexManagement"),
+ "internal"));
// executes all blocking tasks produced by Indexers e.g. getFlushTask, getMetadataReloadTask etc
- private static final ExecutorService blockingExecutor = MoreExecutors.newDirectExecutorService();
+ private static final ListeningExecutorService blockingExecutor = MoreExecutors.newDirectExecutorService();
/**
* The underlying column family containing the source data for these indexes
@@ -130,9 +168,9 @@ public class SecondaryIndexManager implements IndexRegistry
public SecondaryIndexManager(ColumnFamilyStore baseCfs)
{
this.baseCfs = baseCfs;
+ baseCfs.getTracker().subscribe(this);
}
-
/**
* Drops and adds new indexes associated with the underlying CF
*/
@@ -160,27 +198,63 @@ public class SecondaryIndexManager implements IndexRegistry
: blockingExecutor.submit(reloadTask);
}
- private Future<?> createIndex(IndexMetadata indexDef)
+ @SuppressWarnings("unchecked")
+ private synchronized Future<?> createIndex(IndexMetadata indexDef)
{
- Index index = createInstance(indexDef);
+ final Index index = createInstance(indexDef);
+ String indexName = index.getIndexMetadata().name;
index.register(this);
+ // now mark as building prior to initializing
+ markIndexesBuilding(ImmutableSet.of(index), true);
+
+ Callable<?> initialBuildTask = null;
// if the index didn't register itself, we can probably assume that no initialization needs to happen
- final Callable<?> initialBuildTask = indexes.containsKey(indexDef.name)
- ? index.getInitializationTask()
- : null;
+ if (indexes.containsKey(indexDef.name))
+ {
+ try
+ {
+ initialBuildTask = index.getInitializationTask();
+ }
+ catch (Throwable t)
+ {
+ logAndMarkIndexesFailed(Collections.singleton(index), t);
+ throw t;
+ }
+ }
+
+ // if there's no initialization, just mark as built and return:
if (initialBuildTask == null)
{
- // We need to make sure that the index is marked as built in the case where the initialBuildTask
- // does not need to be run (if the index didn't register itself or if the base table was empty).
- markIndexBuilt(indexDef.name);
+ markIndexBuilt(index, true);
return Futures.immediateFuture(null);
}
- return asyncExecutor.submit(index.getInitializationTask());
+
+ // otherwise run the initialization task asynchronously with a callback to mark it built or failed
+ final SettableFuture initialization = SettableFuture.create();
+ Futures.addCallback(asyncExecutor.submit(initialBuildTask), new FutureCallback()
+ {
+ @Override
+ public void onFailure(Throwable t)
+ {
+ logAndMarkIndexesFailed(Collections.singleton(index), t);
+ initialization.setException(t);
+ }
+
+ @Override
+ public void onSuccess(Object o)
+ {
+ markIndexBuilt(index, true);
+ initialization.set(o);
+ }
+ }, MoreExecutors.directExecutor());
+
+ return initialization;
}
/**
* Adds and builds a index
+ *
* @param indexDef the IndexMetadata describing the index
*/
public synchronized Future<?> addIndex(IndexMetadata indexDef)
@@ -195,11 +269,11 @@ public class SecondaryIndexManager implements IndexRegistry
* Checks if the specified index is queryable.
*
* @param index the index
- * @return <code>true</code> if the specified index is queryable, <code>false</code> otherwise
+ * @return <code>true</code> if the specified index is registered, <code>false</code> otherwise
*/
public boolean isIndexQueryable(Index index)
{
- return builtIndexes.contains(index.getIndexMetadata().name);
+ return queryableIndexes.contains(index.getIndexMetadata().name);
}
public synchronized void removeIndex(String indexName)
@@ -208,7 +282,7 @@ public class SecondaryIndexManager implements IndexRegistry
if (null != index)
{
markIndexRemoved(indexName);
- executeBlocking(index.getInvalidateTask());
+ executeBlocking(index.getInvalidateTask(), null);
}
}
@@ -231,57 +305,35 @@ public class SecondaryIndexManager implements IndexRegistry
*/
public void markAllIndexesRemoved()
{
- getBuiltIndexNames().forEach(this::markIndexRemoved);
+ getBuiltIndexNames().forEach(this::markIndexRemoved);
}
/**
- * Does a full, blocking rebuild of the indexes specified by columns from the sstables.
- * Caller must acquire and release references to the sstables used here.
- * Note also that only this method of (re)building indexes:
- * a) takes a set of index *names* rather than Indexers
- * b) marks exsiting indexes removed prior to rebuilding
- *
- * @param sstables the data to build from
- * @param indexNames the list of indexes to be rebuilt
- */
- public void rebuildIndexesBlocking(Collection<SSTableReader> sstables, Set<String> indexNames)
- {
- Set<Index> toRebuild = indexes.values().stream()
- .filter(index -> indexNames.contains(index.getIndexMetadata().name))
- .filter(Index::shouldBuildBlocking)
- .collect(Collectors.toSet());
- if (toRebuild.isEmpty())
- {
- logger.info("No defined indexes with the supplied names: {}", Joiner.on(',').join(indexNames));
- return;
- }
-
- toRebuild.forEach(indexer -> markIndexRemoved(indexer.getIndexMetadata().name));
-
- buildIndexesBlocking(sstables, toRebuild);
-
- toRebuild.forEach(indexer -> markIndexBuilt(indexer.getIndexMetadata().name));
- }
-
- public void buildAllIndexesBlocking(Collection<SSTableReader> sstables)
- {
- buildIndexesBlocking(sstables, indexes.values()
- .stream()
- .filter(Index::shouldBuildBlocking)
- .collect(Collectors.toSet()));
- }
-
- // For convenience, may be called directly from Index impls
- public void buildIndexBlocking(Index index)
+ * Does a blocking full rebuild of the specifed indexes from all the sstables in the base table.
+ * Note also that this method of (re)building indexes:
+ * a) takes a set of index *names* rather than Indexers
+ * b) marks existing indexes removed prior to rebuilding
+ * c) fails if such marking operation conflicts with any ongoing index builds, as full rebuilds cannot be run
+ * concurrently
+ *
+ * @param indexNames the list of indexes to be rebuilt
+ */
+ public void rebuildIndexesBlocking(Set<String> indexNames)
{
- if (index.shouldBuildBlocking())
+ try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.selectFunction(SSTableSet.CANONICAL));
+ Refs<SSTableReader> allSSTables = viewFragment.refs)
{
- try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.selectFunction(SSTableSet.CANONICAL));
- Refs<SSTableReader> sstables = viewFragment.refs)
+ Set<Index> toRebuild = indexes.values().stream()
+ .filter(index -> indexNames.contains(index.getIndexMetadata().name))
+ .filter(Index::shouldBuildBlocking)
+ .collect(Collectors.toSet());
+ if (toRebuild.isEmpty())
{
- buildIndexesBlocking(sstables, Collections.singleton(index));
- markIndexBuilt(index.getIndexMetadata().name);
+ logger.info("No defined indexes with the supplied names: {}", Joiner.on(',').join(indexNames));
+ return;
}
+
+ buildIndexesBlocking(allSSTables, toRebuild, true);
}
}
@@ -356,55 +408,255 @@ public class SecondaryIndexManager implements IndexRegistry
return StringUtils.substringAfter(cfName, Directories.SECONDARY_INDEX_NAME_SEPARATOR);
}
- private void buildIndexesBlocking(Collection<SSTableReader> sstables, Set<Index> indexes)
+ /**
+ * Performs a blocking (re)indexing of the specified SSTables for the specified indexes.
+ *
+ * @param sstables the SSTables to be (re)indexed
+ * @param indexes the indexes to be (re)built for the specifed SSTables
+ * @param isFullRebuild True if this method is invoked as a full index rebuild, false otherwise
+ */
+ @SuppressWarnings({ "unchecked" })
+ private void buildIndexesBlocking(Collection<SSTableReader> sstables, Set<Index> indexes, boolean isFullRebuild)
{
if (indexes.isEmpty())
return;
- logger.info("Submitting index build of {} for data in {}",
- indexes.stream().map(i -> i.getIndexMetadata().name).collect(Collectors.joining(",")),
- sstables.stream().map(SSTableReader::toString).collect(Collectors.joining(",")));
+ // Mark all indexes as building: this step must happen first, because if any index can't be marked, the whole
+ // process needs to abort
+ markIndexesBuilding(indexes, isFullRebuild);
+
+ // Build indexes in a try/catch, so that any index not marked as either built or failed will be marked as failed:
+ final Set<Index> builtIndexes = new HashSet<>();
+ final Set<Index> unbuiltIndexes = new HashSet<>();
- Map<Index.IndexBuildingSupport, Set<Index>> byType = new HashMap<>();
- for (Index index : indexes)
+ // Any exception thrown during index building that could be suppressed by the finally block
+ Exception accumulatedFail = null;
+
+ try
{
- Set<Index> stored = byType.computeIfAbsent(index.getBuildTaskSupport(), i -> new HashSet<>());
- stored.add(index);
+ logger.info("Submitting index build of {} for data in {}",
+ indexes.stream().map(i -> i.getIndexMetadata().name).collect(Collectors.joining(",")),
+ sstables.stream().map(SSTableReader::toString).collect(Collectors.joining(",")));
+
+ // Group all building tasks
+ Map<Index.IndexBuildingSupport, Set<Index>> byType = new HashMap<>();
+ for (Index index : indexes)
+ {
+ Set<Index> stored = byType.computeIfAbsent(index.getBuildTaskSupport(), i -> new HashSet<>());
+ stored.add(index);
+ }
+
+ // Schedule all index building tasks with a callback to mark them as built or failed
+ List<Future<?>> futures = new ArrayList<>(byType.size());
+ byType.forEach((buildingSupport, groupedIndexes) ->
+ {
+ SecondaryIndexBuilder builder = buildingSupport.getIndexBuildTask(baseCfs, groupedIndexes, sstables);
+ final SettableFuture build = SettableFuture.create();
+ Futures.addCallback(CompactionManager.instance.submitIndexBuild(builder), new FutureCallback()
+ {
+ @Override
+ public void onFailure(Throwable t)
+ {
+ logAndMarkIndexesFailed(groupedIndexes, t);
+ unbuiltIndexes.addAll(groupedIndexes);
+ build.setException(t);
+ }
+
+ @Override
+ public void onSuccess(Object o)
+ {
+ groupedIndexes.forEach(i -> markIndexBuilt(i, isFullRebuild));
+ logger.info("Index build of {} completed", getIndexNames(groupedIndexes));
+ builtIndexes.addAll(groupedIndexes);
+ build.set(o);
+ }
+ });
+ futures.add(build);
+ });
+
+ // Finally wait for the index builds to finish and flush the indexes that built successfully
+ FBUtilities.waitOnFutures(futures);
}
+ catch (Exception e)
+ {
+ accumulatedFail = e;
+ throw e;
+ }
+ finally
+ {
+ try
+ {
+ // Fail any indexes that couldn't be marked
+ Set<Index> failedIndexes = Sets.difference(indexes, Sets.union(builtIndexes, unbuiltIndexes));
+ if (!failedIndexes.isEmpty())
+ {
+ logAndMarkIndexesFailed(failedIndexes, accumulatedFail);
+ }
- List<Future<?>> futures = byType.entrySet()
- .stream()
- .map((e) -> e.getKey().getIndexBuildTask(baseCfs, e.getValue(), sstables))
- .map(CompactionManager.instance::submitIndexBuild)
- .collect(Collectors.toList());
+ // Flush all built indexes with an aynchronous callback to log the success or failure of the flush
+ flushIndexesBlocking(builtIndexes, new FutureCallback()
+ {
+ String indexNames = StringUtils.join(builtIndexes.stream()
+ .map(i -> i.getIndexMetadata().name)
+ .collect(Collectors.toList()), ',');
- FBUtilities.waitOnFutures(futures);
+ @Override
+ public void onFailure(Throwable ignored)
+ {
+ logger.info("Index flush of {} failed", indexNames);
+ }
- flushIndexesBlocking(indexes);
- logger.info("Index build of {} complete",
- indexes.stream().map(i -> i.getIndexMetadata().name).collect(Collectors.joining(",")));
+ @Override
+ public void onSuccess(Object ignored)
+ {
+ logger.info("Index flush of {} completed", indexNames);
+ }
+ });
+ }
+ catch (Exception e)
+ {
+ if (accumulatedFail != null)
+ {
+ accumulatedFail.addSuppressed(e);
+ }
+ else
+ {
+ throw e;
+ }
+ }
+ }
+ }
+
+ private String getIndexNames(Set<Index> indexes)
+ {
+ List<String> indexNames = indexes.stream()
+ .map(i -> i.getIndexMetadata().name)
+ .collect(Collectors.toList());
+ return StringUtils.join(indexNames, ',');
}
/**
- * Marks the specified index as build.
- * <p>This method is public as it need to be accessible from the {@link Index} implementations</p>
- * @param indexName the index name
+ * Marks the specified indexes as (re)building if:
+ * 1) There's no in progress rebuild of any of the given indexes.
+ * 2) There's an in progress rebuild but the caller is not a full rebuild.
+ * <p>
+ * Otherwise, this method invocation fails, as it is not possible to run full rebuilds while other concurrent rebuilds
+ * are in progress. Please note this is checked atomically against all given indexes; that is, no index will be marked
+ * if even a single one fails.
+ * <p>
+ * Marking an index as "building" practically means:
+ * 1) The index is removed from the "failed" set if this is a full rebuild.
+ * 2) The index is removed from the system keyspace built indexes.
+ * <p>
+ * Thread safety is guaranteed by having all methods managing index builds synchronized: being synchronized on
+ * the SecondaryIndexManager instance, it means all invocations for all different indexes will go through the same
+ * lock, but this is fine as the work done while holding such lock is trivial.
+ * <p>
+ * {@link #markIndexBuilt(Index, boolean)} or {@link #markIndexFailed(Index)} should be always called after the
+ * rebuilding has finished, so that the index build state can be correctly managed and the index rebuilt.
+ *
+ * @param indexes the index to be marked as building
+ * @param isFullRebuild {@code true} if this method is invoked as a full index rebuild, {@code false} otherwise
*/
- public void markIndexBuilt(String indexName)
+ private synchronized void markIndexesBuilding(Set<Index> indexes, boolean isFullRebuild)
{
- builtIndexes.add(indexName);
- if (DatabaseDescriptor.isDaemonInitialized())
- SystemKeyspace.setIndexBuilt(baseCfs.keyspace.getName(), indexName);
+ String keyspaceName = baseCfs.keyspace.getName();
+
+ // First step is to validate against concurrent rebuilds; it would be more optimized to do everything on a single
+ // step, but we're not really expecting a very high number of indexes, and this isn't on any hot path, so
+ // we're favouring readability over performance
+ indexes.forEach(index ->
+ {
+ String indexName = index.getIndexMetadata().name;
+ AtomicInteger counter = inProgressBuilds.computeIfAbsent(indexName, ignored -> new AtomicInteger(0));
+
+ if (counter.get() > 0 && isFullRebuild)
+ throw new IllegalStateException(String.format("Cannot rebuild index %s as another index build for the same index is currently in progress.", indexName));
+ });
+
+ // Second step is the actual marking:
+ indexes.forEach(index ->
+ {
+ String indexName = index.getIndexMetadata().name;
+ AtomicInteger counter = inProgressBuilds.computeIfAbsent(indexName, ignored -> new AtomicInteger(0));
+
+ if (isFullRebuild)
+ needsFullRebuild.remove(indexName);
+
+ if (counter.getAndIncrement() == 0 && DatabaseDescriptor.isDaemonInitialized())
+ SystemKeyspace.setIndexRemoved(keyspaceName, indexName);
+ });
+ }
+
+ /**
+ * Marks the specified index as built if there are no in progress index builds and the index is not failed.
+ * {@link #markIndexesBuilding(Set, boolean)} should always be invoked before this method.
+ *
+ * @param index the index to be marked as built
+ * @param isFullRebuild {@code true} if this method is invoked as a full index rebuild, {@code false} otherwise
+ */
+ private synchronized void markIndexBuilt(Index index, boolean isFullRebuild)
+ {
+ String indexName = index.getIndexMetadata().name;
+ AtomicInteger counter = inProgressBuilds.get(indexName);
+ if (counter != null)
+ {
+ assert counter.get() > 0;
+ if (counter.decrementAndGet() == 0)
+ {
+ if (isFullRebuild)
+ queryableIndexes.add(indexName);
+ inProgressBuilds.remove(indexName);
+ if (!needsFullRebuild.contains(indexName) && DatabaseDescriptor.isDaemonInitialized())
+ SystemKeyspace.setIndexBuilt(baseCfs.keyspace.getName(), indexName);
+ }
+ }
+ }
+
+ /**
+ * Marks the specified index as failed.
+ * {@link #markIndexesBuilding(Set, boolean)} should always be invoked before this method.
+ *
+ * @param index the index to be marked as built
+ */
+ private synchronized void markIndexFailed(Index index)
+ {
+ String indexName = index.getIndexMetadata().name;
+ AtomicInteger counter = inProgressBuilds.get(indexName);
+ if (counter != null)
+ {
+ assert counter.get() > 0;
+
+ counter.decrementAndGet();
+
+ if (DatabaseDescriptor.isDaemonInitialized())
+ SystemKeyspace.setIndexRemoved(baseCfs.keyspace.getName(), indexName);
+
+ needsFullRebuild.add(indexName);
+ }
+ }
+
+ private void logAndMarkIndexesFailed(Set<Index> indexes, Throwable indexBuildFailure)
+ {
+ JVMStabilityInspector.inspectThrowable(indexBuildFailure);
+ if (indexBuildFailure != null)
+ logger.warn("Index build of {} failed. Please run full index rebuild to fix it.", getIndexNames(indexes), indexBuildFailure);
+ else
+ logger.warn("Index build of {} failed. Please run full index rebuild to fix it.", getIndexNames(indexes));
+ indexes.forEach(SecondaryIndexManager.this::markIndexFailed);
}
/**
* Marks the specified index as removed.
- * <p>This method is public as it need to be accessible from the {@link Index} implementations</p>
+ *
* @param indexName the index name
*/
- public void markIndexRemoved(String indexName)
+ private synchronized void markIndexRemoved(String indexName)
{
SystemKeyspace.setIndexRemoved(baseCfs.keyspace.getName(), indexName);
+ queryableIndexes.remove(indexName);
+ needsFullRebuild.remove(indexName);
+ inProgressBuilds.remove(indexName);
}
public Index getIndexByName(String indexName)
@@ -419,7 +671,7 @@ public class SecondaryIndexManager implements IndexRegistry
{
assert indexDef.options != null;
String className = indexDef.options.get(IndexTarget.CUSTOM_INDEX_OPTION_NAME);
- assert ! Strings.isNullOrEmpty(className);
+ assert !Strings.isNullOrEmpty(className);
try
{
Class<? extends Index> indexClass = FBUtilities.classForName(className, "Index");
@@ -443,16 +695,22 @@ public class SecondaryIndexManager implements IndexRegistry
*/
public void truncateAllIndexesBlocking(final long truncatedAt)
{
- executeAllBlocking(indexes.values().stream(), (index) -> index.getTruncateTask(truncatedAt));
+ executeAllBlocking(indexes.values().stream(), (index) -> index.getTruncateTask(truncatedAt), null);
}
/**
* Remove all indexes
*/
- public void invalidateAllIndexesBlocking()
+ public void dropAllIndexes()
{
markAllIndexesRemoved();
- executeAllBlocking(indexes.values().stream(), Index::getInvalidateTask);
+ invalidateAllIndexesBlocking();
+ }
+
+ @VisibleForTesting
+ public void invalidateAllIndexesBlocking()
+ {
+ executeAllBlocking(indexes.values().stream(), Index::getInvalidateTask, null);
}
/**
@@ -460,7 +718,7 @@ public class SecondaryIndexManager implements IndexRegistry
*/
public void flushAllIndexesBlocking()
{
- flushIndexesBlocking(ImmutableSet.copyOf(indexes.values()));
+ flushIndexesBlocking(ImmutableSet.copyOf(indexes.values()));
}
/**
@@ -468,24 +726,7 @@ public class SecondaryIndexManager implements IndexRegistry
*/
public void flushIndexesBlocking(Set<Index> indexes)
{
- if (indexes.isEmpty())
- return;
-
- List<Future<?>> wait = new ArrayList<>();
- List<Index> nonCfsIndexes = new ArrayList<>();
-
- // for each CFS backed index, submit a flush task which we'll wait on for completion
- // for the non-CFS backed indexes, we'll flush those while we wait.
- synchronized (baseCfs.getTracker())
- {
- indexes.forEach(index ->
- index.getBackingTable()
- .map(cfs -> wait.add(cfs.forceFlush()))
- .orElseGet(() -> nonCfsIndexes.add(index)));
- }
-
- executeAllBlocking(nonCfsIndexes.stream(), Index::getBlockingFlushTask);
- FBUtilities.waitOnFutures(wait);
+ flushIndexesBlocking(indexes, null);
}
/**
@@ -496,7 +737,7 @@ public class SecondaryIndexManager implements IndexRegistry
executeAllBlocking(indexes.values()
.stream()
.filter(index -> !index.getBackingTable().isPresent()),
- Index::getBlockingFlushTask);
+ Index::getBlockingFlushTask, null);
}
/**
@@ -505,9 +746,32 @@ public class SecondaryIndexManager implements IndexRegistry
public void executePreJoinTasksBlocking(boolean hadBootstrap)
{
logger.info("Executing pre-join{} tasks for: {}", hadBootstrap ? " post-bootstrap" : "", this.baseCfs);
- executeAllBlocking(indexes.values().stream(), (index) -> {
+ executeAllBlocking(indexes.values().stream(), (index) ->
+ {
return index.getPreJoinTask(hadBootstrap);
- });
+ }, null);
+ }
+
+ private void flushIndexesBlocking(Set<Index> indexes, FutureCallback<Object> callback)
+ {
+ if (indexes.isEmpty())
+ return;
+
+ List<Future<?>> wait = new ArrayList<>();
+ List<Index> nonCfsIndexes = new ArrayList<>();
+
+ // for each CFS backed index, submit a flush task which we'll wait on for completion
+ // for the non-CFS backed indexes, we'll flush those while we wait.
+ synchronized (baseCfs.getTracker())
+ {
+ indexes.forEach(index ->
+ index.getBackingTable()
+ .map(cfs -> wait.add(cfs.forceFlush()))
+ .orElseGet(() -> nonCfsIndexes.add(index)));
+ }
+
+ executeAllBlocking(nonCfsIndexes.stream(), Index::getBlockingFlushTask, callback);
+ FBUtilities.waitOnFutures(wait);
}
/**
@@ -517,8 +781,8 @@ public class SecondaryIndexManager implements IndexRegistry
{
Set<String> allIndexNames = new HashSet<>();
indexes.values().stream()
- .map(i -> i.getIndexMetadata().name)
- .forEach(allIndexNames::add);
+ .map(i -> i.getIndexMetadata().name)
+ .forEach(allIndexNames::add);
return SystemKeyspace.getBuiltIndexes(baseCfs.keyspace.getName(), allIndexNames);
}
@@ -566,7 +830,8 @@ public class SecondaryIndexManager implements IndexRegistry
if (!page.hasNext())
break;
- try (UnfilteredRowIterator partition = page.next()) {
+ try (UnfilteredRowIterator partition = page.next())
+ {
Set<Index.Indexer> indexers = indexes.stream()
.map(index -> index.indexerFor(key,
partition.columns(),
@@ -665,7 +930,7 @@ public class SecondaryIndexManager implements IndexRegistry
/**
* Delete all data from all indexes for this partition.
* For when cleanup rips a partition out entirely.
- *
+ * <p>
* TODO : improve cleanup transaction to batch updates and perform them async
*/
public void deletePartition(UnfilteredRowIterator partition, int nowInSec)
@@ -690,28 +955,28 @@ public class SecondaryIndexManager implements IndexRegistry
partition.columns(),
nowInSec);
indexTransaction.start();
- indexTransaction.onRowDelete((Row)unfiltered);
+ indexTransaction.onRowDelete((Row) unfiltered);
indexTransaction.commit();
}
}
/**
* Called at query time to choose which (if any) of the registered index implementations to use for a given query.
- *
+ * <p>
* This is a two step processes, firstly compiling the set of searchable indexes then choosing the one which reduces
* the search space the most.
- *
+ * <p>
* In the first phase, if the command's RowFilter contains any custom index expressions, the indexes that they
* specify are automatically included. Following that, the registered indexes are filtered to include only those
* which support the standard expressions in the RowFilter.
- *
+ * <p>
* The filtered set then sorted by selectivity, as reported by the Index implementations' getEstimatedResultRows
* method.
- *
+ * <p>
* Implementation specific validation of the target expression, either custom or standard, by the selected
* index should be performed in the searcherFor method to ensure that we pick the right index regardless of
* the validity of the expression.
- *
+ * <p>
* This method is only called once during the lifecycle of a ReadCommand and the result is
* cached for future use when obtaining a Searcher, getting the index's underlying CFS for
* ReadOrderGroup, or an estimate of the result size from an average index query.
@@ -732,7 +997,7 @@ public class SecondaryIndexManager implements IndexRegistry
{
// Only a single custom expression is allowed per query and, if present,
// we want to always favour the index specified in such an expression
- RowFilter.CustomExpression customExpression = (RowFilter.CustomExpression)expression;
+ RowFilter.CustomExpression customExpression = (RowFilter.CustomExpression) expression;
logger.trace("Command contains a custom index expression, using target index {}", customExpression.getTargetIndex().name);
Tracing.trace("Command contains a custom index expression, using target index {}", customExpression.getTargetIndex().name);
return indexes.get(customExpression.getTargetIndex().name);
@@ -781,6 +1046,7 @@ public class SecondaryIndexManager implements IndexRegistry
* will process it. The partition key as well as the clustering and
* cell values for each row in the update may be checked by index
* implementations
+ *
* @param update PartitionUpdate containing the values to be validated by registered Index implementations
* @throws InvalidRequestException
*/
@@ -808,9 +1074,7 @@ public class SecondaryIndexManager implements IndexRegistry
private Index unregisterIndex(String name)
{
Index removed = indexes.remove(name);
- builtIndexes.remove(name);
- logger.trace(removed == null ? "Index {} was not registered" : "Removed index {} from registry",
- name);
+ logger.trace(removed == null ? "Index {} was not registered" : "Removed index {} from registry", name);
return removed;
}
@@ -882,7 +1146,7 @@ public class SecondaryIndexManager implements IndexRegistry
{
private final Index.Indexer[] indexers;
- private WriteTimeTransaction(Index.Indexer...indexers)
+ private WriteTimeTransaction(Index.Indexer... indexers)
{
// don't allow null indexers, if we don't need any use a NullUpdater object
for (Index.Indexer indexer : indexers) assert indexer != null;
@@ -945,7 +1209,6 @@ public class SecondaryIndexManager implements IndexRegistry
if (merged == null || (original != null && shouldCleanupOldValue(original, merged)))
toRemove.addCell(original);
-
}
};
Rows.diff(diffListener, updated, existing);
@@ -1011,7 +1274,7 @@ public class SecondaryIndexManager implements IndexRegistry
rows = new Row[versions];
}
- public void onRowMerge(Row merged, Row...versions)
+ public void onRowMerge(Row merged, Row... versions)
{
// Diff listener constructs rows representing deltas between the merged and original versions
// These delta rows are then passed to registered indexes for removal processing
@@ -1051,7 +1314,7 @@ public class SecondaryIndexManager implements IndexRegistry
Rows.diff(diffListener, merged, versions);
- for(int i = 0; i < builders.length; i++)
+ for (int i = 0; i < builders.length; i++)
if (builders[i] != null)
rows[i] = builders[i].build();
}
@@ -1147,13 +1410,17 @@ public class SecondaryIndexManager implements IndexRegistry
}
}
- private static void executeBlocking(Callable<?> task)
+ private void executeBlocking(Callable<?> task, FutureCallback<Object> callback)
{
if (null != task)
- FBUtilities.waitOnFuture(blockingExecutor.submit(task));
+ {
+ ListenableFuture<?> f = blockingExecutor.submit(task);
+ if (callback != null) Futures.addCallback(f, callback);
+ FBUtilities.waitOnFuture(f);
+ }
}
- private static void executeAllBlocking(Stream<Index> indexers, Function<Index, Callable<?>> function)
+ private void executeAllBlocking(Stream<Index> indexers, Function<Index, Callable<?>> function, FutureCallback<Object> callback)
{
if (function == null)
{
@@ -1162,11 +1429,33 @@ public class SecondaryIndexManager implements IndexRegistry
}
List<Future<?>> waitFor = new ArrayList<>();
- indexers.forEach(indexer -> {
- Callable<?> task = function.apply(indexer);
- if (null != task)
- waitFor.add(blockingExecutor.submit(task));
- });
+ indexers.forEach(indexer ->
+ {
+ Callable<?> task = function.apply(indexer);
+ if (null != task)
+ {
+ ListenableFuture<?> f = blockingExecutor.submit(task);
+ if (callback != null) Futures.addCallback(f, callback);
+ waitFor.add(f);
+ }
+ });
FBUtilities.waitOnFutures(waitFor);
}
+
+ public void handleNotification(INotification notification, Object sender)
+ {
+ if (!indexes.isEmpty() && notification instanceof SSTableAddedNotification)
+ {
+ SSTableAddedNotification notice = (SSTableAddedNotification) notification;
+
+ // SSTables asociated to a memtable come from a flush, so their contents have already been indexed
+ if (!notice.memtable().isPresent())
+ buildIndexesBlocking(Lists.newArrayList(notice.added),
+ indexes.values()
+ .stream()
+ .filter(Index::shouldBuildBlocking)
+ .collect(Collectors.toSet()),
+ false);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/679c3171/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
index c6f7d98..c7f3536 100644
--- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
@@ -699,7 +699,6 @@ public abstract class CassandraIndex implements Index
baseCfs.metadata.keyspace,
baseCfs.metadata.name,
metadata.name);
- baseCfs.indexManager.markIndexBuilt(metadata.name);
return;
}
@@ -713,7 +712,6 @@ public abstract class CassandraIndex implements Index
Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
FBUtilities.waitOnFuture(future);
indexCfs.forceBlockingFlush();
- baseCfs.indexManager.markIndexBuilt(metadata.name);
}
logger.info("Index build of {} complete", metadata.name);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/679c3171/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java b/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java
index 56d6130..9c95a18 100644
--- a/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java
+++ b/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java
@@ -17,13 +17,46 @@
*/
package org.apache.cassandra.notifications;
+import java.util.Optional;
+
+import javax.annotation.Nullable;
+
+import org.apache.cassandra.db.Memtable;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+/**
+ * Notification sent after SSTables are added to their {@link org.apache.cassandra.db.ColumnFamilyStore}.
+ */
public class SSTableAddedNotification implements INotification
{
+ /** The added SSTables */
public final Iterable<SSTableReader> added;
- public SSTableAddedNotification(Iterable<SSTableReader> added)
+
+ /** The memtable from which the tables come when they have been added due to a flush, {@code null} otherwise. */
+ @Nullable
+ private final Memtable memtable;
+
+ /**
+ * Creates a new {@code SSTableAddedNotification} for the specified SSTables and optional memtable.
+ *
+ * @param added the added SSTables
+ * @param memtable the memtable from which the tables come when they have been added due to a memtable flush,
+ * or {@code null} if they don't come from a flush
+ */
+ public SSTableAddedNotification(Iterable<SSTableReader> added, @Nullable Memtable memtable)
{
this.added = added;
+ this.memtable = memtable;
+ }
+
+ /**
+ * Returns the memtable from which the tables come when they have been added due to a memtable flush. If not, an
+ * empty Optional should be returned.
+ *
+ * @return the origin memtable in case of a flush, {@link Optional#empty()} otherwise
+ */
+ public Optional<Memtable> memtable()
+ {
+ return Optional.ofNullable(memtable);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/679c3171/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 34e7cc8..925dc85 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -33,7 +33,6 @@ import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -231,9 +230,8 @@ public class StreamReceiveTask extends StreamTask
{
task.finishTransaction();
- // add sstables and build secondary indexes
+ // add sstables (this will build secondary indexes too, see CASSANDRA-10130)
cfs.addSSTables(readers);
- cfs.indexManager.buildAllIndexesBlocking(readers);
//invalidate row and counter cache
if (cfs.isRowCacheEnabled() || cfs.metadata().isCounter())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/679c3171/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
index 0b27f73..af629e5 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
@@ -87,7 +87,7 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
csm.getForPendingRepair(repairID).forEach(Assert::assertNull);
// add the sstable
- csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable)), cfs.getTracker());
+ csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker());
Assert.assertFalse(repairedContains(sstable));
Assert.assertFalse(unrepairedContains(sstable));
csm.getForPendingRepair(repairID).forEach(Assert::assertNotNull);
@@ -181,7 +181,7 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
SSTableReader sstable = makeSSTable(true);
mutateRepaired(sstable, repairID);
- csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable)), cfs.getTracker());
+ csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker());
Assert.assertTrue(pendingContains(repairID, sstable));
// delete sstable
@@ -210,7 +210,7 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
SSTableReader sstable = makeSSTable(true);
mutateRepaired(sstable, repairID);
- csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable)), cfs.getTracker());
+ csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker());
strategies = csm.getStrategies();
Assert.assertEquals(3, strategies.size());
@@ -228,7 +228,7 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
SSTableReader sstable = makeSSTable(true);
mutateRepaired(sstable, repairID);
- csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable)), cfs.getTracker());
+ csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker());
LocalSessionAccessor.finalizeUnsafe(repairID);
csm.getForPendingRepair(repairID).forEach(Assert::assertNotNull);
Assert.assertNotNull(pendingContains(repairID, sstable));
@@ -265,7 +265,7 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
SSTableReader sstable = makeSSTable(true);
mutateRepaired(sstable, repairID);
- csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable)), cfs.getTracker());
+ csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker());
LocalSessionAccessor.failUnsafe(repairID);
csm.getForPendingRepair(repairID).forEach(Assert::assertNotNull);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/679c3171/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 360a2cd..624f119 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -378,7 +378,7 @@ public class LeveledCompactionStrategyTest
assertFalse(unrepaired.manifest.generations[2].contains(sstable1));
unrepaired.removeSSTable(sstable2);
- manager.handleNotification(new SSTableAddedNotification(singleton(sstable2)), this);
+ manager.handleNotification(new SSTableAddedNotification(singleton(sstable2), null), this);
assertTrue(unrepaired.manifest.getLevel(1).contains(sstable2));
assertFalse(repaired.manifest.getLevel(1).contains(sstable2));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/679c3171/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
index a5a1baf..93ee198 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.db.lifecycle;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -305,6 +306,7 @@ public class TrackerTest
Assert.assertEquals(2, listener.received.size());
Assert.assertEquals(prev2, ((MemtableDiscardedNotification) listener.received.get(0)).memtable);
Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(1)).added);
+ Assert.assertEquals(Optional.of(prev2), ((SSTableAddedNotification) listener.received.get(1)).memtable());
listener.received.clear();
Assert.assertTrue(reader.isKeyCacheSetup());
Assert.assertEquals(10, cfs.metric.liveDiskSpaceUsed.getCount());
@@ -326,6 +328,7 @@ public class TrackerTest
Assert.assertEquals(prev1, ((MemtableSwitchedNotification) listener.received.get(0)).memtable);
Assert.assertEquals(prev1, ((MemtableDiscardedNotification) listener.received.get(1)).memtable);
Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(2)).added);
+ Assert.assertEquals(Optional.of(prev1), ((SSTableAddedNotification) listener.received.get(2)).memtable());
Assert.assertTrue(listener.received.get(3) instanceof SSTableDeletingNotification);
Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(4)).removed.size());
DatabaseDescriptor.setIncrementalBackupsEnabled(backups);
@@ -360,8 +363,9 @@ public class TrackerTest
MockListener failListener = new MockListener(true);
tracker.subscribe(failListener);
tracker.subscribe(listener);
- Assert.assertNotNull(tracker.notifyAdded(singleton(r1), null));
+ Assert.assertNotNull(tracker.notifyAdded(singleton(r1), null, null));
Assert.assertEquals(singleton(r1), ((SSTableAddedNotification) listener.received.get(0)).added);
+ Assert.assertFalse(((SSTableAddedNotification) listener.received.get(0)).memtable().isPresent());
listener.received.clear();
Assert.assertNotNull(tracker.notifySSTablesChanged(singleton(r1), singleton(r2), OperationType.COMPACTION, null));
Assert.assertEquals(singleton(r1), ((SSTableListChangedNotification) listener.received.get(0)).removed);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/679c3171/test/unit/org/apache/cassandra/index/CustomIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/CustomIndexTest.java b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
index ed999fa..d14b50d 100644
--- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
import org.junit.Test;
import com.datastax.driver.core.exceptions.QueryValidationException;
@@ -111,7 +112,7 @@ public class CustomIndexTest extends CQLTester
excluded.reset();
assertTrue(excluded.rowsInserted.isEmpty());
- indexManager.buildAllIndexesBlocking(getCurrentColumnFamilyStore().getLiveSSTables());
+ indexManager.rebuildIndexesBlocking(Sets.newHashSet(toInclude, toExclude));
assertEquals(3, included.rowsInserted.size());
assertTrue(excluded.rowsInserted.isEmpty());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org