You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2021/04/12 14:49:18 UTC
[ignite] branch master updated: IGNITE-14321 Fix force index
rebuilding (#8962)
This is an automated email from the ASF dual-hosted git repository.
tledkov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 43a7a1a IGNITE-14321 Fix force index rebuilding (#8962)
43a7a1a is described below
commit 43a7a1a14cec08ec1bbe706a57753b7498cc04d2
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Mon Apr 12 17:49:01 2021 +0300
IGNITE-14321 Fix force index rebuilding (#8962)
---
.../GridCommandHandlerIndexForceRebuildTest.java | 8 +-
.../internal/cache/query/index/IndexProcessor.java | 9 +-
.../managers/indexing/IndexesRebuildTask.java | 53 ++--
.../preloader/GridDhtPartitionsExchangeFuture.java | 7 +
.../GridCacheDatabaseSharedManager.java | 74 +++--
.../IgniteCacheDatabaseSharedManager.java | 10 +-
.../processors/query/GridQueryProcessor.java | 143 +++++++---
.../processors/query/IndexRebuildAware.java | 145 ++++++++++
.../junits/common/GridCommonAbstractTest.java | 38 +++
.../cache/index/ForceRebuildIndexTest.java | 317 +++++++++++++++++++++
.../cache/index/IndexesRebuildTaskEx.java | 207 ++++++++++++++
.../cache/index/StopRebuildIndexTest.java | 191 ++-----------
.../IgnitePdsIndexingDefragmentationTest.java | 5 +-
.../testsuites/IgnitePdsWithIndexingTestSuite.java | 4 +-
14 files changed, 950 insertions(+), 261 deletions(-)
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexForceRebuildTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexForceRebuildTest.java
index 904055fd..4d8b181 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexForceRebuildTest.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexForceRebuildTest.java
@@ -406,15 +406,15 @@ public class GridCommandHandlerIndexForceRebuildTest extends GridCommandHandlerA
waitForIndexesRebuild(n);
+ intlRebIdxFut.get(getTestTimeout());
+ destroyCacheFut.get(getTestTimeout());
+ putCacheFut.get(getTestTimeout());
+
injectTestSystemOut();
assertEquals(EXIT_CODE_OK, execute("--cache", "validate_indexes", "--check-crc", cacheName1));
assertContains(log, testOut.toString(), "no issues found.");
-
- intlRebIdxFut.get(getTestTimeout());
- destroyCacheFut.get(getTestTimeout());
- putCacheFut.get(getTestTimeout());
}
finally {
stopLoad.set(true);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexProcessor.java
index f10b0f2..1b16e33 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexProcessor.java
@@ -341,9 +341,13 @@ public class IndexProcessor extends GridProcessorAdapter {
/**
* Start rebuild of indexes for specified cache.
+ *
+ * @param cctx Cache context.
+ * @param force Force rebuild indexes.
+ * @return A future of rebuilding cache indexes.
*/
- public IgniteInternalFuture<?> rebuildIndexesForCache(GridCacheContext<?, ?> cctx) {
- return idxRebuild.rebuild(cctx);
+ @Nullable public IgniteInternalFuture<?> rebuildIndexesForCache(GridCacheContext<?, ?> cctx, boolean force) {
+ return idxRebuild.rebuild(cctx, force);
}
/** */
@@ -555,6 +559,7 @@ public class IndexProcessor extends GridProcessorAdapter {
/**
* Collect indexes for rebuild.
*
+ * @param cctx Cache context.
* @param createdOnly Get only created indexes (not restored from dick).
*/
public List<InlineIndex> treeIndexes(GridCacheContext cctx, boolean createdOnly) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/IndexesRebuildTask.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/IndexesRebuildTask.java
index df70a128..6cb3850 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/IndexesRebuildTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/IndexesRebuildTask.java
@@ -38,9 +38,6 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
-import static java.util.Objects.isNull;
-import static java.util.Objects.nonNull;
-
/**
* Task that rebuilds indexes.
*/
@@ -48,9 +45,15 @@ public class IndexesRebuildTask {
/** Index rebuilding futures for caches. Mapping: cacheId -> rebuild indexes future. */
private final Map<Integer, SchemaIndexCacheFuture> idxRebuildFuts = new ConcurrentHashMap<>();
- /** Start to rebuild. */
- public IgniteInternalFuture<?> rebuild(GridCacheContext cctx) {
- assert nonNull(cctx);
+ /**
+ * Start to rebuild.
+ *
+ * @param cctx Cache context.
+ * @param force Force rebuild indexes.
+ * @return A future of rebuilding cache indexes.
+ */
+ @Nullable public IgniteInternalFuture<?> rebuild(GridCacheContext cctx, boolean force) {
+ assert cctx != null;
if (!CU.affinityNode(cctx.localNode(), cctx.config().getNodeFilter()))
return null;
@@ -68,7 +71,7 @@ public class IndexesRebuildTask {
clo = row -> cctx.queries().store(row, null, mvccEnabled);
}
else {
- Collection<InlineIndex> toRebuild = cctx.kernalContext().indexProcessor().treeIndexes(cctx, true);
+ Collection<InlineIndex> toRebuild = cctx.kernalContext().indexProcessor().treeIndexes(cctx, !force);
if (F.isEmpty(toRebuild))
return null;
@@ -84,17 +87,20 @@ public class IndexesRebuildTask {
// To avoid possible data race.
GridFutureAdapter<Void> outRebuildCacheIdxFut = new GridFutureAdapter<>();
- // An internal future for the ability to cancel index rebuilding.
- // This behavior should be discussed in IGNITE-14321.
IgniteLogger log = cctx.kernalContext().grid().log();
+ // An internal future for the ability to cancel index rebuilding.
SchemaIndexCacheFuture intRebFut = new SchemaIndexCacheFuture(new SchemaIndexOperationCancellationToken());
- cancelIndexRebuildFuture(idxRebuildFuts.put(cctx.cacheId(), intRebFut), log);
+
+ SchemaIndexCacheFuture prevIntRebFut = idxRebuildFuts.put(cctx.cacheId(), intRebFut);
+
+ // Check that the previous rebuild is completed.
+ assert prevIntRebFut == null;
rebuildCacheIdxFut.listen(fut -> {
Throwable err = fut.error();
- if (isNull(err)) {
+ if (err == null) {
try {
cctx.kernalContext().query().markAsRebuildNeeded(cctx, false);
}
@@ -103,13 +109,13 @@ public class IndexesRebuildTask {
}
}
- if (nonNull(err))
+ if (err != null)
U.error(log, "Failed to rebuild indexes for cache: " + cacheName, err);
- outRebuildCacheIdxFut.onDone(err);
-
idxRebuildFuts.remove(cctx.cacheId(), intRebFut);
intRebFut.onDone(err);
+
+ outRebuildCacheIdxFut.onDone(err);
});
startRebuild(cctx, rebuildCacheIdxFut, clo, intRebFut.cancelToken());
@@ -117,9 +123,20 @@ public class IndexesRebuildTask {
return outRebuildCacheIdxFut;
}
- /** Actual start rebuilding. Use this method for test purposes only. */
- protected void startRebuild(GridCacheContext cctx, GridFutureAdapter<Void> fut,
- SchemaIndexCacheVisitorClosure clo, SchemaIndexOperationCancellationToken cancel) {
+ /**
+ * Actual start rebuilding. Use this method for test purposes only.
+ *
+ * @param cctx Cache context.
+ * @param fut Future for rebuild indexes.
+ * @param clo Closure.
+ * @param cancel Cancellation token.
+ */
+ protected void startRebuild(
+ GridCacheContext cctx,
+ GridFutureAdapter<Void> fut,
+ SchemaIndexCacheVisitorClosure clo,
+ SchemaIndexOperationCancellationToken cancel
+ ) {
new SchemaIndexCacheVisitorImpl(cctx, cancel, fut).visit(clo);
}
@@ -127,6 +144,7 @@ public class IndexesRebuildTask {
* Stop rebuilding indexes.
*
* @param cacheInfo Cache context info.
+ * @param log Logger.
*/
public void stopRebuild(GridCacheContextInfo cacheInfo, IgniteLogger log) {
cancelIndexRebuildFuture(idxRebuildFuts.remove(cacheInfo.cacheId()), log);
@@ -136,6 +154,7 @@ public class IndexesRebuildTask {
* Cancel rebuilding indexes for the cache through a future.
*
* @param rebFut Index rebuilding future.
+ * @param log Logger.
*/
private void cancelIndexRebuildFuture(@Nullable SchemaIndexCacheFuture rebFut, IgniteLogger log) {
if (rebFut != null && !rebFut.isDone() && rebFut.cancelToken().cancel()) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index dcb131f..e63d78a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2429,6 +2429,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
span.addTag(SpanTags.ERROR, errf::toString);
}
+ boolean cleanIdxRebuildFutures = true;
+
try {
waitUntilNewCachesAreRegistered();
@@ -2529,6 +2531,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (err == null) {
cctx.database().rebuildIndexesIfNeeded(this);
+ cleanIdxRebuildFutures = false;
+
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
if (!grp.isLocal())
grp.topology().onExchangeDone(this, grp.affinity().readyAffinity(res), false);
@@ -2548,6 +2552,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
final Throwable err0 = err;
+ if (err0 != null && cleanIdxRebuildFutures)
+ cctx.kernalContext().query().removeIndexRebuildFuturesOnExchange(this, null);
+
// Should execute this listener first, before any external listeners.
// Listeners use stack as data structure.
listen(f -> {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 87b15b6..e6ca08c 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -166,8 +166,11 @@ import org.apache.ignite.transactions.TransactionState;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import static java.util.Collections.emptyList;
import static java.util.Objects.nonNull;
import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFRAGMENTATION_REGION_SIZE_PERCENTAGE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PREFER_WAL_REBALANCE;
@@ -403,7 +406,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
catch (IgniteCheckedException e) {
log.warning("Metastore iteration error", e);
- return Collections.emptyList();
+ return emptyList();
}
}, identity());
}
@@ -1429,25 +1432,55 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
if (defrgMgr != null)
return;
- rebuildIndexes(cctx.cacheContexts(), (cacheCtx) -> cacheCtx.startTopologyVersion().equals(exchangeFut.initialVersion()));
+ Collection<GridCacheContext> rejected = rebuildIndexes(
+ cctx.cacheContexts(),
+ (cacheCtx) -> cacheCtx.startTopologyVersion().equals(exchangeFut.initialVersion()) &&
+ cctx.kernalContext().query().rebuildIndexOnExchange(cacheCtx.cacheId(), exchangeFut),
+ false
+ );
+
+ if (!rejected.isEmpty()) {
+ cctx.kernalContext().query().removeIndexRebuildFuturesOnExchange(
+ exchangeFut,
+ rejected.stream().map(GridCacheContext::cacheId).collect(toSet())
+ );
+ }
}
/** {@inheritDoc} */
- @Override public void forceRebuildIndexes(Collection<GridCacheContext> contexts) {
- contexts.forEach(ctx -> cctx.kernalContext().query().prepareIndexRebuildFuture(ctx.cacheId()));
+ @Override public Collection<GridCacheContext> forceRebuildIndexes(Collection<GridCacheContext> contexts) {
+ Set<Integer> cacheIds = contexts.stream().map(GridCacheContext::cacheId).collect(toSet());
- rebuildIndexes(contexts, (cacheCtx) -> true);
+ Set<Integer> rejected = cctx.kernalContext().query().prepareRebuildIndexes(cacheIds);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Preparing features of rebuilding indexes for caches on force rebuild [requested=" + cacheIds +
+ ", rejected=" + rejected + ']');
+ }
+
+ rebuildIndexes(contexts, (cacheCtx) -> !rejected.contains(cacheCtx.cacheId()), true);
+
+ return rejected.isEmpty() ? emptyList() :
+ contexts.stream().filter(ctx -> rejected.contains(ctx.cacheId())).collect(toList());
}
/**
+ * Rebuilding indexes for caches.
+ *
* @param contexts Collection of cache contexts for which indexes should be rebuilt.
* @param rebuildCond Condition that should be met for indexes to be rebuilt for specific cache.
+ * @param force Force rebuild indexes.
+ * @return Cache contexts that did not pass by {@code rebuildCond}.
*/
- private void rebuildIndexes(Collection<GridCacheContext> contexts, Predicate<GridCacheContext> rebuildCond) {
+ private Collection<GridCacheContext> rebuildIndexes(
+ Collection<GridCacheContext> contexts,
+ Predicate<GridCacheContext> rebuildCond,
+ boolean force
+ ) {
GridQueryProcessor qryProc = cctx.kernalContext().query();
if (!qryProc.moduleEnabled())
- return;
+ return emptyList();
GridCountDownCallback rebuildIndexesCompleteCntr = new GridCountDownCallback(
contexts.size(),
@@ -1458,17 +1491,26 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
1 //need at least 1 index rebuilded to print message about rebuilding completion
);
+ Collection<GridCacheContext> rejected = null;
+
for (GridCacheContext cacheCtx : contexts) {
- if (!rebuildCond.test(cacheCtx))
- continue;
+ if (rebuildCond.test(cacheCtx)) {
+ IgniteInternalFuture<?> rebuildFut = qryProc.rebuildIndexesFromHash(cacheCtx, force);
- IgniteInternalFuture<?> rebuildFut = qryProc.rebuildIndexesFromHash(cacheCtx);
+ if (nonNull(rebuildFut))
+ rebuildFut.listen(fut -> rebuildIndexesCompleteCntr.countDown(true));
+ else
+ rebuildIndexesCompleteCntr.countDown(false);
+ }
+ else {
+ if (rejected == null)
+ rejected = new ArrayList<>();
- if (nonNull(rebuildFut))
- rebuildFut.listen(fut -> rebuildIndexesCompleteCntr.countDown(true));
- else
- rebuildIndexesCompleteCntr.countDown(false);
+ rejected.add(cacheCtx);
+ }
}
+
+ return rejected == null ? emptyList() : rejected;
}
/**
@@ -1492,7 +1534,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
List<Integer> stoppedGrpIds = stoppedGrps.stream()
.filter(IgniteBiTuple::get2)
.map(t -> t.get1().groupId())
- .collect(Collectors.toList());
+ .collect(toList());
cctx.snapshotMgr().onCacheGroupsStopped(stoppedGrpIds);
@@ -3545,7 +3587,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
return cctx.cacheContext(cacheId) != null && cacheGroupPredicate.apply(cctx.cacheContext(cacheId).groupId());
})
- .collect(Collectors.toList());
+ .collect(toList());
return record.setWriteEntries(filteredEntries);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
index 207ef8c..b6886ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
@@ -1085,12 +1085,14 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
}
/**
- * Performs indexes rebuild for all cache contexts from {@code contexts}.
+ * Initiate an asynchronous forced index rebuild for caches.
*
- * @param contexts List of cache contexts.
+ * @param contexts Cache contexts.
+ * @return Cache contexts for which index rebuilding is not initiated by
+ * this call because they are already in the process of rebuilding.
*/
- public void forceRebuildIndexes(Collection<GridCacheContext> contexts) {
- // No-op.
+ public Collection<GridCacheContext> forceRebuildIndexes(Collection<GridCacheContext> contexts) {
+ return Collections.emptyList();
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index f335444..6f99fdd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -56,7 +56,6 @@ import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.events.CacheQueryExecutedEvent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -133,9 +132,12 @@ import org.apache.ignite.spi.indexing.IndexingQueryFilter;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
+import static java.util.Collections.emptySet;
import static java.util.Collections.newSetFromMap;
+import static java.util.Collections.singleton;
import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
+import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
import static org.apache.ignite.internal.GridTopic.TOPIC_SCHEMA;
@@ -204,9 +206,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
/** Active propose messages. */
private final LinkedHashMap<UUID, SchemaProposeDiscoveryMessage> activeProposals = new LinkedHashMap<>();
- /** Map from a cacheId to a future indicating that there is an in-progress index rebuild for the given cache. */
- private final ConcurrentMap<Integer, GridFutureAdapter<Void>> idxRebuildFuts = new ConcurrentHashMap<>();
-
/** General state mutex. */
private final Object stateMux = new Object();
@@ -242,6 +241,9 @@ public class GridQueryProcessor extends GridProcessorAdapter {
/** Cache name - value typeId pairs for which type mismatch message was logged. */
private final Set<Long> missedCacheTypes = ConcurrentHashMap.newKeySet();
+ /** Index rebuild aware. */
+ private final IndexRebuildAware idxRebuildAware = new IndexRebuildAware();
+
/**
* @param ctx Kernal context.
*/
@@ -454,33 +456,14 @@ public class GridQueryProcessor extends GridProcessorAdapter {
* @param fut Exchange future.
*/
public void beforeExchange(GridDhtPartitionsExchangeFuture fut) {
- ExchangeActions acts = fut.exchangeActions();
-
- if (acts != null) {
- if (!F.isEmpty(acts.cacheStartRequests())) {
- for (ExchangeActions.CacheActionData actionData : acts.cacheStartRequests())
- prepareIndexRebuildFuture(CU.cacheId(actionData.request().cacheName()));
- }
- else if (acts.localJoinContext() != null && !F.isEmpty(acts.localJoinContext().caches())) {
- for (T2<DynamicCacheDescriptor, NearCacheConfiguration> tup : acts.localJoinContext().caches())
- prepareIndexRebuildFuture(tup.get1().cacheId());
- }
- }
- }
+ Set<Integer> cacheIds = rebuildIndexCacheIds(fut);
- /**
- * Creates a new index rebuild future that should be completed later after exchange is done. The future
- * has to be created before exchange is initialized to guarantee that we will capture a correct future
- * after activation or restore completes.
- * If there was an old future for the given ID, it will be completed.
- *
- * @param cacheId Cache ID.
- */
- public void prepareIndexRebuildFuture(int cacheId) {
- GridFutureAdapter<Void> old = idxRebuildFuts.put(cacheId, new GridFutureAdapter<>());
+ Set<Integer> rejected = idxRebuildAware.prepareRebuildIndexes(cacheIds, fut.initialVersion());
- if (old != null)
- old.onDone();
+ if (log.isDebugEnabled()) {
+ log.debug("Preparing features of rebuilding indexes for caches on exchange [requested=" + cacheIds +
+ ", rejected=" + rejected + ']');
+ }
}
/**
@@ -1963,7 +1946,12 @@ public class GridQueryProcessor extends GridProcessorAdapter {
registerCache0(op0.cacheName(), op.schemaName(), cacheInfo, candRes.get1(), false);
}
- rebuildIndexesFromHash0(cacheInfo.cacheContext());
+ if (idxRebuildAware.prepareRebuildIndexes(singleton(cacheInfo.cacheId()), null).isEmpty())
+ rebuildIndexesFromHash0(cacheInfo.cacheContext(), false);
+ else {
+ if (log.isInfoEnabled())
+ log.info("Rebuilding indexes for the cache is already in progress: " + cacheInfo.name());
+ }
}
else
throw new SchemaOperationException("Unsupported operation: " + op);
@@ -2328,9 +2316,10 @@ public class GridQueryProcessor extends GridProcessorAdapter {
* Rebuilds indexes for provided caches from corresponding hash indexes.
*
* @param cctx Cache context.
+ * @param force Force rebuild indexes.
* @return Future that will be completed when rebuilding is finished.
*/
- public IgniteInternalFuture<?> rebuildIndexesFromHash(GridCacheContext cctx) {
+ public IgniteInternalFuture<?> rebuildIndexesFromHash(GridCacheContext cctx, boolean force) {
assert nonNull(cctx);
// Indexing module is disabled, nothing to rebuild.
@@ -2357,7 +2346,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
try {
- return rebuildIndexesFromHash0(cctx);
+ return rebuildIndexesFromHash0(cctx, force);
}
finally {
busyLock.leaveBusy();
@@ -2365,10 +2354,13 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
+ * Rebuild indexes for cache.
+ *
* @param cctx Cache context.
+ * @param force Force rebuild indexes.
*/
- private IgniteInternalFuture<?> rebuildIndexesFromHash0(GridCacheContext<?, ?> cctx) {
- IgniteInternalFuture<?> idxFut = idxProc.rebuildIndexesForCache(cctx);
+ private IgniteInternalFuture<?> rebuildIndexesFromHash0(GridCacheContext<?, ?> cctx, boolean force) {
+ IgniteInternalFuture<?> idxFut = idxProc.rebuildIndexesForCache(cctx, force);
return chainIndexRebuildFuture(idxFut, cctx);
}
@@ -2384,19 +2376,17 @@ public class GridQueryProcessor extends GridProcessorAdapter {
@Nullable IgniteInternalFuture<?> idxFut,
GridCacheContext<?, ?> cctx
) {
- int cacheId = cctx.cacheId();
+ GridFutureAdapter<Void> res = idxRebuildAware.indexRebuildFuture(cctx.cacheId());
- if (nonNull(idxFut)) {
- GridFutureAdapter<Void> res = idxRebuildFuts.computeIfAbsent(cacheId, id -> new GridFutureAdapter<>());
+ assert res != null;
+ if (idxFut != null) {
String cacheInfo = "[name=" + cctx.name() + ", grpName=" + cctx.group().name() + "]";
if (log.isInfoEnabled())
log.info("Started indexes rebuilding for cache " + cacheInfo);
idxFut.listen(fut -> {
- idxRebuildFuts.remove(cacheId, res);
-
Throwable err = fut.error();
if (isNull(err) && log.isInfoEnabled())
@@ -2404,16 +2394,13 @@ public class GridQueryProcessor extends GridProcessorAdapter {
else if (!(err instanceof NodeStoppingException))
log.error("Failed to rebuild indexes for cache " + cacheInfo, err);
- res.onDone(err);
+ idxRebuildAware.onFinishRebuildIndexes(cctx.cacheId(), err);
});
return res;
}
else {
- GridFutureAdapter<Void> fut = idxRebuildFuts.remove(cacheId);
-
- if (fut != null)
- fut.onDone();
+ idxRebuildAware.onFinishRebuildIndexes(cctx.cacheId(), null);
return null;
}
@@ -2423,7 +2410,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
* @return Future that will be completed when indexes for given cache are restored.
*/
@Nullable public IgniteInternalFuture<?> indexRebuildFuture(int cacheId) {
- return idxRebuildFuts.get(cacheId);
+ return idxRebuildAware.indexRebuildFuture(cacheId);
}
/**
@@ -3808,4 +3795,70 @@ public class GridQueryProcessor extends GridProcessorAdapter {
this.mgr = mgr;
}
}
+
+ /**
+ * Removing futures of rebuilding indexes that should have been rebuilt on the exchange.
+ *
+ * @param fut Exchange future.
+ * @param cacheIds Cache ids for which futures will be deleted,
+ * if {@code null} then ids will be taken from the {@code fut}.
+ */
+ public void removeIndexRebuildFuturesOnExchange(
+ GridDhtPartitionsExchangeFuture fut,
+ @Nullable Set<Integer> cacheIds
+ ) {
+ idxRebuildAware.cancelRebuildIndexesOnExchange(
+ cacheIds != null ? cacheIds : rebuildIndexCacheIds(fut),
+ fut.initialVersion()
+ );
+ }
+
+ /**
+ * Checks that the indexes need to be rebuilt on the exchange.
+ *
+ * @param cacheId Cache id.
+ * @param fut Exchange future.
+ * @return {@code True} if need to rebuild.
+ */
+ public boolean rebuildIndexOnExchange(int cacheId, GridDhtPartitionsExchangeFuture fut) {
+ return idxRebuildAware.rebuildIndexesOnExchange(cacheId, fut.initialVersion());
+ }
+
+ /**
+ * Preparing futures of rebuilding indexes for caches.
+ * The future for the cache will be added only if the previous one is missing or completed.
+ *
+ * @param cacheIds Cache ids.
+ * @return Cache ids for which features have not been added.
+ */
+ public Set<Integer> prepareRebuildIndexes(Set<Integer> cacheIds) {
+ return idxRebuildAware.prepareRebuildIndexes(cacheIds, null);
+ }
+
+ /**
+ * Getting cache ids for which will need to rebuild the indexes on the exchange.
+ *
+ * @param fut Exchange future.
+ * @return Cache ids.
+ */
+ private Set<Integer> rebuildIndexCacheIds(GridDhtPartitionsExchangeFuture fut) {
+ ExchangeActions acts = fut.exchangeActions();
+
+ Set<Integer> cacheIds = emptySet();
+
+ if (acts != null) {
+ if (!F.isEmpty(acts.cacheStartRequests())) {
+ cacheIds = acts.cacheStartRequests().stream()
+ .map(d -> CU.cacheId(d.request().cacheName()))
+ .collect(toSet());
+ }
+ else if (acts.localJoinContext() != null && !F.isEmpty(acts.localJoinContext().caches())) {
+ cacheIds = acts.localJoinContext().caches().stream()
+ .map(t2 -> t2.get1().cacheId())
+ .collect(toSet());
+ }
+ }
+
+ return cacheIds;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/IndexRebuildAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/IndexRebuildAware.java
new file mode 100644
index 0000000..0535705
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/IndexRebuildAware.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.Collections.emptySet;
+
+/**
+ * Holder actual information about the state of rebuilding indexes.
+ * Thread safe.
+ */
+public class IndexRebuildAware {
+ /**
+ * Futures to track the status of index rebuilds.
+ * Mapping: Cache id -> Future.
+ * Guarded by {@code this}.
+ */
+ private final Map<Integer, GridFutureAdapter<Void>> futs = new HashMap<>();
+
+ /**
+ * Topology versions that will require index rebuilding on exchange.
+ * Mapping: Cache id -> Topology version.
+ * Guarded by {@code this}.
+ */
+ private final Map<Integer, AffinityTopologyVersion> tops = new HashMap<>();
+
+ /**
+ * Getting index rebuild future for the cache.
+ *
+ * @param cacheId Cache id.
+ * @return Future if rebuilding is in progress or {@code null} if not.
+ */
+ @Nullable public synchronized GridFutureAdapter<Void> indexRebuildFuture(int cacheId) {
+ return futs.get(cacheId);
+ }
+
+ /**
+ * Checks that the indexes need to be rebuilt on the exchange.
+ *
+ * @param cacheId Cache id.
+ * @param topVer Topology version.
+ * @return {@code True} if need to rebuild.
+ */
+ public synchronized boolean rebuildIndexesOnExchange(int cacheId, AffinityTopologyVersion topVer) {
+ boolean rebuild = tops.containsKey(cacheId) && topVer.equals(tops.get(cacheId));
+
+ if (rebuild)
+ assert futs.get(cacheId) != null;
+
+ return rebuild;
+ }
+
+ /**
+ * Canceling index rebuilding for caches on the exchange.
+ *
+ * @param cacheIds Cache ids.
+ * @param topVer Topology version.
+ */
+ public void cancelRebuildIndexesOnExchange(Set<Integer> cacheIds, AffinityTopologyVersion topVer) {
+ if (!cacheIds.isEmpty()) {
+ synchronized (this) {
+ for (Integer cacheId : cacheIds) {
+ if (tops.containsKey(cacheId) && topVer.equals(tops.get(cacheId)))
+ onFinishRebuildIndexes(cacheId, null);
+ }
+ }
+ }
+ }
+
+ /**
+ * Preparing futures of rebuilding indexes for caches.
+ * The future for the cache will be added only if the previous one is missing or completed.
+ *
+ * @param cacheIds Cache ids.
+ * @param topVer Topology version if rebuilding indexes should be on exchange.
+ * @return Cache ids for which features have not been added.
+ */
+ public Set<Integer> prepareRebuildIndexes(Set<Integer> cacheIds, @Nullable AffinityTopologyVersion topVer) {
+ if (!cacheIds.isEmpty()) {
+ synchronized (this) {
+ Set<Integer> alreadyPrepared = new HashSet<>();
+
+ for (Integer cacheId : cacheIds) {
+ GridFutureAdapter<Void> prevFut = futs.get(cacheId);
+
+ if (prevFut == null || prevFut.isDone()) {
+ if (topVer != null) {
+ AffinityTopologyVersion prevTopVer = tops.put(cacheId, topVer);
+
+ assert prevTopVer == null;
+ }
+
+ futs.put(cacheId, new GridFutureAdapter<>());
+ }
+ else
+ alreadyPrepared.add(cacheId);
+ }
+
+ return alreadyPrepared;
+ }
+ }
+ else
+ return emptySet();
+ }
+
+ /**
+ * Callback on finish of rebuilding indexes for the cache.
+ *
+ * @param cacheId Cache id.
+ * @param err Error.
+ */
+ public synchronized void onFinishRebuildIndexes(int cacheId, @Nullable Throwable err) {
+ tops.remove(cacheId);
+
+ GridFutureAdapter<Void> rmv = futs.remove(cacheId);
+
+ assert rmv != null;
+
+ rmv.onDone(err);
+ }
+}
+
+
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index c4e3159..a7f4c74 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
@@ -81,6 +82,7 @@ import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl;
import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
+import org.apache.ignite.internal.processors.cache.CacheMetricsImpl;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -90,6 +92,7 @@ import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeMan
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
import org.apache.ignite.internal.processors.cache.IgniteCacheProxyImpl;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.PartitionUpdateCounter;
import org.apache.ignite.internal.processors.cache.WalStateManager.WALDisableContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
@@ -2676,4 +2679,39 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
else
assertNotNull(logPtr);
}
+
+ /**
+ * Initiate an asynchronous forced index rebuild for caches.
+ *
+ * @param n Node.
+ * @param cacheCtxs Cache contexts.
+ * @return Cache contexts for which index rebuilding is not initiated by
+ * this call because they are already in the process of rebuilding.
+ */
+ protected Collection<GridCacheContext> forceRebuildIndexes(IgniteEx n, GridCacheContext... cacheCtxs) {
+ return n.context().cache().context().database().forceRebuildIndexes(F.asList(cacheCtxs));
+ }
+
+ /**
+ * Getting rebuild index future for the cache.
+ *
+ * @param n Node.
+ * @param cacheId Cache id.
+ * @return Rebuild index future.
+ */
+ @Nullable protected IgniteInternalFuture<?> indexRebuildFuture(IgniteEx n, int cacheId) {
+ return n.context().query().indexRebuildFuture(cacheId);
+ }
+
+ /**
+ * Getting cache metrics.
+ *
+ * @param n Node.
+ * @param cacheName Cache name.
+ * @return Cache metrics.
+ */
+ @Nullable protected CacheMetricsImpl cacheMetrics0(IgniteEx n, String cacheName) {
+ return Optional.ofNullable(n.cachex(cacheName)).map(IgniteInternalCache::context)
+ .map(GridCacheContext::cache).map(GridCacheAdapter::metrics0).orElse(null);
+ }
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/ForceRebuildIndexTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/ForceRebuildIndexTest.java
new file mode 100644
index 0000000..8f012a7
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/ForceRebuildIndexTest.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.index;
+
+import org.apache.ignite.client.Person;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cache.query.index.IndexProcessor;
+import org.apache.ignite.internal.processors.cache.CacheMetricsImpl;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.StopRebuildIndexConsumer;
+import org.apache.ignite.internal.processors.query.IndexRebuildAware;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static java.util.Collections.emptyList;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.addCacheRowConsumer;
+import static org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.nodeName;
+import static org.apache.ignite.testframework.GridTestUtils.deleteIndexBin;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+
+/**
+ * Class for testing forced rebuilding of indexes.
+ */
+public class ForceRebuildIndexTest extends GridCommonAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ IndexesRebuildTaskEx.clean(getTestIgniteInstanceName());
+
+ stopAllGrids();
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ IndexesRebuildTaskEx.clean(getTestIgniteInstanceName());
+
+ stopAllGrids();
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setConsistentId(igniteInstanceName)
+ .setFailureHandler(new StopNodeFailureHandler())
+ .setDataStorageConfiguration(
+ new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true))
+ ).setCacheConfiguration(
+ new CacheConfiguration<>(DEFAULT_CACHE_NAME).setIndexedTypes(Integer.class, Person.class)
+ );
+ }
+
+ /**
+ * Checking that a forced rebuild of indexes is possible only after the previous one has finished.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSequentialForceRebuildIndexes() throws Exception {
+ IndexProcessor.idxRebuildCls = IndexesRebuildTaskEx.class;
+
+ IgniteEx n = prepareCluster(100);
+
+ GridCacheContext<?, ?> cacheCtx = n.cachex(DEFAULT_CACHE_NAME).context();
+
+ StopRebuildIndexConsumer stopRebuildIdxConsumer = new StopRebuildIndexConsumer(getTestTimeout());
+ addCacheRowConsumer(nodeName(n), cacheCtx.name(), stopRebuildIdxConsumer);
+
+ // The forced rebuild has begun - no rejected.
+ assertEqualsCollections(emptyList(), forceRebuildIndexes(n, cacheCtx));
+
+ IgniteInternalFuture<?> idxRebFut0 = checkStartRebuildIndexes(n, cacheCtx);
+
+ stopRebuildIdxConsumer.startRebuildIdxFut.get(getTestTimeout());
+ assertFalse(idxRebFut0.isDone());
+
+ // There will be no forced rebuilding since the previous one has not ended - they will be rejected.
+ assertEqualsCollections(F.asList(cacheCtx), forceRebuildIndexes(n, cacheCtx));
+ assertTrue(idxRebFut0 == indexRebuildFuture(n, cacheCtx.cacheId()));
+
+ stopRebuildIdxConsumer.finishRebuildIdxFut.onDone();
+
+ idxRebFut0.get(getTestTimeout());
+
+ checkFinishRebuildIndexes(n, cacheCtx, 100);
+ assertEquals(100, stopRebuildIdxConsumer.visitCnt.get());
+
+ stopRebuildIdxConsumer.resetFutures();
+
+ // Forced rebuilding is possible again as the past is over - no rejected.
+ assertEqualsCollections(emptyList(), forceRebuildIndexes(n, cacheCtx));
+
+ IgniteInternalFuture<?> idxRebFut1 = checkStartRebuildIndexes(n, cacheCtx);
+
+ stopRebuildIdxConsumer.startRebuildIdxFut.get(getTestTimeout());
+ assertFalse(idxRebFut1.isDone());
+
+ stopRebuildIdxConsumer.finishRebuildIdxFut.onDone();
+ idxRebFut1.get(getTestTimeout());
+
+ checkFinishRebuildIndexes(n, cacheCtx, 100);
+ assertEquals(200, stopRebuildIdxConsumer.visitCnt.get());
+ }
+
+ /**
+ * Checking that a forced index rebuild can only be performed after an index rebuild after an exchange.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testForceRebuildIndexesAfterExchange() throws Exception {
+ IgniteEx n = prepareCluster(100);
+
+ stopAllGrids();
+ deleteIndexBin(n.context().igniteInstanceName());
+
+ IndexProcessor.idxRebuildCls = IndexesRebuildTaskEx.class;
+
+ StopRebuildIndexConsumer stopRebuildIdxConsumer = new StopRebuildIndexConsumer(getTestTimeout());
+ addCacheRowConsumer(nodeName(n), DEFAULT_CACHE_NAME, stopRebuildIdxConsumer);
+
+ n = startGrid(0);
+ n.cluster().state(ACTIVE);
+
+ GridCacheContext<?, ?> cacheCtx = n.cachex(DEFAULT_CACHE_NAME).context();
+
+ stopRebuildIdxConsumer.startRebuildIdxFut.get(getTestTimeout());
+
+ IgniteInternalFuture<?> idxRebFut0 = checkStartRebuildIndexes(n, cacheCtx);
+ checkRebuildAfterExchange(n, cacheCtx.cacheId(), true);
+
+ // There will be no forced rebuilding of indexes since it has not ended after the exchange - they will be rejected.
+ assertEqualsCollections(F.asList(cacheCtx), forceRebuildIndexes(n, cacheCtx));
+ assertTrue(idxRebFut0 == indexRebuildFuture(n, cacheCtx.cacheId()));
+ checkRebuildAfterExchange(n, cacheCtx.cacheId(), true);
+
+ stopRebuildIdxConsumer.finishRebuildIdxFut.onDone();
+
+ idxRebFut0.get(getTestTimeout());
+
+ checkFinishRebuildIndexes(n, cacheCtx, 100);
+ assertEquals(100, stopRebuildIdxConsumer.visitCnt.get());
+ checkRebuildAfterExchange(n, cacheCtx.cacheId(), false);
+
+ stopRebuildIdxConsumer.resetFutures();
+
+ // A forced index rebuild will be triggered because it has ended after the exchange - no rejected.
+ assertEqualsCollections(emptyList(), forceRebuildIndexes(n, cacheCtx));
+
+ IgniteInternalFuture<?> idxRebFut1 = checkStartRebuildIndexes(n, cacheCtx);
+ checkRebuildAfterExchange(n, cacheCtx.cacheId(), false);
+
+ stopRebuildIdxConsumer.startRebuildIdxFut.get(getTestTimeout());
+ assertFalse(idxRebFut1.isDone());
+
+ stopRebuildIdxConsumer.finishRebuildIdxFut.onDone();
+ idxRebFut1.get(getTestTimeout());
+
+ checkFinishRebuildIndexes(n, cacheCtx, 100);
+ checkRebuildAfterExchange(n, cacheCtx.cacheId(), false);
+ assertEquals(200, stopRebuildIdxConsumer.visitCnt.get());
+ }
+
+ /**
+ * Checking that sequential index rebuilds on exchanges will not intersection.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSequentialRebuildIndexesOnExchange() throws Exception {
+ IgniteEx n = prepareCluster(100);
+
+ stopAllGrids();
+ deleteIndexBin(n.context().igniteInstanceName());
+
+ IndexProcessor.idxRebuildCls = IndexesRebuildTaskEx.class;
+
+ StopRebuildIndexConsumer stopRebuildIdxConsumer = new StopRebuildIndexConsumer(getTestTimeout());
+ addCacheRowConsumer(nodeName(n), DEFAULT_CACHE_NAME, stopRebuildIdxConsumer);
+
+ n = startGrid(0);
+ n.cluster().state(ACTIVE);
+
+ GridCacheContext<?, ?> cacheCtx = n.cachex(DEFAULT_CACHE_NAME).context();
+
+ stopRebuildIdxConsumer.startRebuildIdxFut.get(getTestTimeout());
+
+ IgniteInternalFuture<?> idxRebFut = checkStartRebuildIndexes(n, cacheCtx);
+
+ // To initiate an exchange.
+ n.getOrCreateCache(DEFAULT_CACHE_NAME + "_1");
+
+ assertTrue(idxRebFut == indexRebuildFuture(n, cacheCtx.cacheId()));
+
+ stopRebuildIdxConsumer.finishRebuildIdxFut.onDone();
+
+ idxRebFut.get(getTestTimeout());
+
+ checkFinishRebuildIndexes(n, cacheCtx, 100);
+ assertEquals(100, stopRebuildIdxConsumer.visitCnt.get());
+ }
+
+ /**
+ * Prepare cluster for test.
+ *
+ * @param keys Key count.
+ * @return Coordinator.
+ * @throws Exception If failed.
+ */
+ private IgniteEx prepareCluster(int keys) throws Exception {
+ IgniteEx n = startGrid(0);
+
+ n.cluster().state(ACTIVE);
+
+ for (int i = 0; i < keys; i++)
+ n.cache(DEFAULT_CACHE_NAME).put(i, new Person(i, "name_" + i));
+
+ return n;
+ }
+
+ /**
+ * Checking metrics rebuilding indexes of cache.
+ *
+ * @param n Node.
+ * @param cacheName Cache name.
+ * @param expIdxRebuildInProgress The expected status of rebuilding indexes.
+ * @param expIdxRebuildKeysProcessed The expected number of keys processed during index rebuilding.
+ */
+ private void checkCacheMetrics0(
+ IgniteEx n,
+ String cacheName,
+ boolean expIdxRebuildInProgress,
+ long expIdxRebuildKeysProcessed
+ ) {
+ CacheMetricsImpl metrics0 = cacheMetrics0(n, cacheName);
+ assertNotNull(metrics0);
+
+ assertEquals(expIdxRebuildInProgress, metrics0.isIndexRebuildInProgress());
+ assertEquals(expIdxRebuildKeysProcessed, metrics0.getIndexRebuildKeysProcessed());
+ }
+
+ /**
+ * Checking that rebuilding indexes for the cache has started.
+ *
+ * @param n Node.
+ * @param cacheCtx Cache context.
+ * @return Rebuild index future.
+ */
+ private IgniteInternalFuture<?> checkStartRebuildIndexes(IgniteEx n, GridCacheContext<?, ?> cacheCtx) {
+ IgniteInternalFuture<?> idxRebFut = indexRebuildFuture(n, cacheCtx.cacheId());
+
+ assertNotNull(idxRebFut);
+ assertFalse(idxRebFut.isDone());
+
+ checkCacheMetrics0(n, cacheCtx.name(), true, 0);
+
+ return idxRebFut;
+ }
+
+ /**
+ * Checking that the rebuild of indexes for the cache has completed.
+ *
+ * @param n Node.
+ * @param cacheCtx Cache context.
+ * @param expKeys The expected number of keys processed during index rebuilding
+ */
+ private void checkFinishRebuildIndexes(IgniteEx n, GridCacheContext<?, ?> cacheCtx, int expKeys) {
+ assertNull(indexRebuildFuture(n, cacheCtx.cacheId()));
+
+ checkCacheMetrics0(n, cacheCtx.name(), false, expKeys);
+ }
+
+ /**
+ * Checking the contents of the cache in {@code GridQueryProcessor#idxRebuildOnExchange}.
+ * Allows to check if the cache will be marked, that the rebuild for it should be after the exchange.
+ *
+ * @param n Node.
+ * @param cacheId Cache id.
+ * @param expContains Whether a cache is expected.
+ */
+ private void checkRebuildAfterExchange(IgniteEx n, int cacheId, boolean expContains) {
+ IndexRebuildAware idxRebuildAware = getFieldValue(n.context().query(), "idxRebuildAware");
+
+ GridDhtPartitionsExchangeFuture exhFut = n.context().cache().context().exchange().lastTopologyFuture();
+
+ assertEquals(expContains, idxRebuildAware.rebuildIndexesOnExchange(cacheId, exhFut.initialVersion()));
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IndexesRebuildTaskEx.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IndexesRebuildTaskEx.java
new file mode 100644
index 0000000..65c0b6b
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IndexesRebuildTaskEx.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.index;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.indexing.IndexesRebuildTask;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexOperationCancellationToken;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.Collections.emptyMap;
+
+/**
+ * Extension {@link IndexesRebuildTask} for the tests.
+ */
+class IndexesRebuildTaskEx extends IndexesRebuildTask {
+ /**
+ * Consumer for cache rows when rebuilding indexes on a node.
+ * Mapping: Node name -> Cache name -> Consumer.
+ */
+ private static final Map<String, Map<String, IgniteThrowableConsumer<CacheDataRow>>> cacheRowConsumer =
+ new ConcurrentHashMap<>();
+
+ /**
+ * A function that should run before preparing to rebuild the cache indexes on a node.
+ * Mapping: Node name -> Cache name -> Function.
+ */
+ private static final Map<String, Map<String, Runnable>> cacheRebuildRunner = new ConcurrentHashMap<>();
+
+ /** {@inheritDoc} */
+ @Override protected void startRebuild(
+ GridCacheContext cctx,
+ GridFutureAdapter<Void> rebuildIdxFut,
+ SchemaIndexCacheVisitorClosure clo,
+ SchemaIndexOperationCancellationToken cancel
+ ) {
+ super.startRebuild(cctx, rebuildIdxFut, new SchemaIndexCacheVisitorClosure() {
+ /** {@inheritDoc} */
+ @Override public void apply(CacheDataRow row) throws IgniteCheckedException {
+ cacheRowConsumer.getOrDefault(nodeName(cctx), emptyMap())
+ .getOrDefault(cctx.name(), r -> { }).accept(row);
+
+ clo.apply(row);
+ }
+ }, cancel);
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public IgniteInternalFuture<?> rebuild(GridCacheContext cctx, boolean force) {
+ cacheRebuildRunner.getOrDefault(nodeName(cctx), emptyMap()).getOrDefault(cctx.name(), () -> { }).run();
+
+ return super.rebuild(cctx, force);
+ }
+
+ /**
+ * Cleaning of internal structures. It is recommended to clean at
+ * {@code GridCommonAbstractTest#beforeTest} and {@code GridCommonAbstractTest#afterTest}.
+ *
+ * @param nodeNamePrefix Prefix of node name ({@link GridKernalContext#igniteInstanceName()})
+ * for which internal structures will be removed, if {@code null} will be removed for all.
+ *
+ * @see GridCommonAbstractTest#getTestIgniteInstanceName()
+ */
+ static void clean(@Nullable String nodeNamePrefix) {
+ if (nodeNamePrefix == null) {
+ cacheRowConsumer.clear();
+ cacheRebuildRunner.clear();
+ }
+ else {
+ cacheRowConsumer.entrySet().removeIf(e -> e.getKey().startsWith(nodeNamePrefix));
+ cacheRebuildRunner.entrySet().removeIf(e -> e.getKey().startsWith(nodeNamePrefix));
+ }
+ }
+
+ /**
+ * Registering a consumer for cache rows when rebuilding indexes on a node.
+ *
+ * @param nodeName The name of the node,
+ * the value of which will return {@link GridKernalContext#igniteInstanceName()}.
+ * @param cacheName Cache name.
+ * @param c Cache row consumer.
+ *
+ * @see #nodeName(GridKernalContext)
+ * @see #nodeName(IgniteEx)
+ * @see #nodeName(GridCacheContext)
+ * @see GridCommonAbstractTest#getTestIgniteInstanceName(int)
+ */
+ static void addCacheRowConsumer(String nodeName, String cacheName, IgniteThrowableConsumer<CacheDataRow> c) {
+ cacheRowConsumer.computeIfAbsent(nodeName, s -> new ConcurrentHashMap<>()).put(cacheName, c);
+ }
+
+ /**
+ * Registering A function that should run before preparing to rebuild the cache indexes on a node.
+ *
+ * @param nodeName The name of the node,
+ * the value of which will return {@link GridKernalContext#igniteInstanceName()}.
+ * @param cacheName Cache name.
+ * @param r A function that should run before preparing to rebuild the cache indexes.
+ *
+ * @see #nodeName(GridKernalContext)
+ * @see #nodeName(IgniteEx)
+ * @see #nodeName(GridCacheContext)
+ * @see GridCommonAbstractTest#getTestIgniteInstanceName(int)
+ */
+ static void addCacheRebuildRunner(String nodeName, String cacheName, Runnable r) {
+ cacheRebuildRunner.computeIfAbsent(nodeName, s -> new ConcurrentHashMap<>()).put(cacheName, r);
+ }
+
+ /**
+ * Getting local instance name of the node.
+ *
+ * @param cacheCtx Cache context.
+ * @return Local instance name.
+ */
+ static String nodeName(GridCacheContext cacheCtx) {
+ return nodeName(cacheCtx.kernalContext());
+ }
+
+ /**
+ * Getting local instance name of the node.
+ *
+ * @param n Node.
+ * @return Local instance name.
+ */
+ static String nodeName(IgniteEx n) {
+ return nodeName(n.context());
+ }
+
+ /**
+ * Getting local instance name of the node.
+ *
+ * @param kernalCtx Kernal context.
+ * @return Local instance name.
+ */
+ static String nodeName(GridKernalContext kernalCtx) {
+ return kernalCtx.igniteInstanceName();
+ }
+
+ /**
+ * Consumer for stopping rebuilding indexes of cache.
+ */
+ static class StopRebuildIndexConsumer implements IgniteThrowableConsumer<CacheDataRow> {
+ /** Future to indicate that the rebuilding indexes has begun. */
+ final GridFutureAdapter<Void> startRebuildIdxFut = new GridFutureAdapter<>();
+
+ /** Future to wait to continue rebuilding indexes. */
+ final GridFutureAdapter<Void> finishRebuildIdxFut = new GridFutureAdapter<>();
+
+ /** Counter of visits. */
+ final AtomicLong visitCnt = new AtomicLong();
+
+ /** The maximum time to wait finish future in milliseconds. */
+ final long timeout;
+
+ /**
+ * Constructor.
+ *
+ * @param timeout The maximum time to wait finish future in milliseconds.
+ */
+ StopRebuildIndexConsumer(long timeout) {
+ this.timeout = timeout;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void accept(CacheDataRow row) throws IgniteCheckedException {
+ startRebuildIdxFut.onDone();
+
+ visitCnt.incrementAndGet();
+
+ finishRebuildIdxFut.get(timeout);
+ }
+
+ /**
+ * Resetting internal futures.
+ */
+ void resetFutures() {
+ startRebuildIdxFut.reset();
+ finishRebuildIdxFut.reset();
+ }
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/StopRebuildIndexTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/StopRebuildIndexTest.java
index d308065..04fe0d7 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/StopRebuildIndexTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/StopRebuildIndexTest.java
@@ -18,8 +18,6 @@
package org.apache.ignite.internal.processors.cache.index;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.client.Person;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
@@ -32,16 +30,13 @@ import org.apache.ignite.internal.cache.query.index.IndexProcessor;
import org.apache.ignite.internal.managers.indexing.IndexesRebuildTask;
import org.apache.ignite.internal.processors.cache.CacheMetricsImpl;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.StopRebuildIndexConsumer;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheCompoundFuture;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheFuture;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheStat;
-import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexOperationCancellationException;
-import org.apache.ignite.internal.processors.query.schema.SchemaIndexOperationCancellationToken;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
-import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -50,10 +45,11 @@ import org.junit.Test;
import static org.apache.ignite.cluster.ClusterState.ACTIVE;
import static org.apache.ignite.cluster.ClusterState.INACTIVE;
+import static org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.addCacheRebuildRunner;
+import static org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.addCacheRowConsumer;
+import static org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.nodeName;
import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
-import static org.apache.ignite.testframework.GridTestUtils.deleteIndexBin;
import static org.apache.ignite.testframework.GridTestUtils.getFieldValueHierarchy;
-import static org.apache.ignite.testframework.GridTestUtils.runAsync;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/**
@@ -64,8 +60,7 @@ public class StopRebuildIndexTest extends GridCommonAbstractTest {
@Override protected void beforeTest() throws Exception {
super.beforeTest();
- IndexesRebuildTaskEx.cacheRowConsumer.clear();
- IndexesRebuildTaskEx.cacheRebuildRunner.clear();
+ IndexesRebuildTaskEx.clean(getTestIgniteInstanceName());
stopAllGrids();
cleanPersistenceDir();
@@ -75,8 +70,7 @@ public class StopRebuildIndexTest extends GridCommonAbstractTest {
@Override protected void afterTest() throws Exception {
super.afterTest();
- IndexesRebuildTaskEx.cacheRowConsumer.clear();
- IndexesRebuildTaskEx.cacheRebuildRunner.clear();
+ IndexesRebuildTaskEx.clean(getTestIgniteInstanceName());
stopAllGrids();
cleanPersistenceDir();
@@ -186,21 +180,18 @@ public class StopRebuildIndexTest extends GridCommonAbstractTest {
IgniteEx n = prepareCluster(10);
- GridFutureAdapter<?> f0 = new GridFutureAdapter<>();
- GridFutureAdapter<?> f1 = new GridFutureAdapter<>();
-
GridCacheContext<?, ?> cacheCtx = n.cachex(DEFAULT_CACHE_NAME).context();
- IndexesRebuildTaskEx.cacheRebuildRunner.put(
- DEFAULT_CACHE_NAME, () -> assertNull(internalIndexRebuildFuture(n, cacheCtx.cacheId())));
-
- IndexesRebuildTaskEx.cacheRowConsumer.put(DEFAULT_CACHE_NAME, row -> {
- f0.onDone();
+ addCacheRebuildRunner(
+ nodeName(n),
+ cacheCtx.name(),
+ () -> assertNull(internalIndexRebuildFuture(n, cacheCtx.cacheId()))
+ );
- f1.get(getTestTimeout());
- });
+ StopRebuildIndexConsumer stopRebuildIdxConsumer = new StopRebuildIndexConsumer(getTestTimeout());
+ addCacheRowConsumer(nodeName(n), cacheCtx.name(), stopRebuildIdxConsumer);
- n.context().cache().context().database().forceRebuildIndexes(F.asList(cacheCtx));
+ forceRebuildIndexes(n, cacheCtx);
IgniteInternalFuture<?> rebFut0 = indexRebuildFuture(n, cacheCtx.cacheId());
assertNotNull(rebFut0);
@@ -208,13 +199,13 @@ public class StopRebuildIndexTest extends GridCommonAbstractTest {
SchemaIndexCacheFuture rebFut1 = internalIndexRebuildFuture(n, cacheCtx.cacheId());
assertNotNull(rebFut1);
- f0.get(getTestTimeout());
+ stopRebuildIdxConsumer.startRebuildIdxFut.get(getTestTimeout());
assertFalse(rebFut0.isDone());
assertFalse(rebFut1.isDone());
assertFalse(rebFut1.cancelToken().isCancelled());
- f1.onDone();
+ stopRebuildIdxConsumer.finishRebuildIdxFut.onDone();
rebFut0.get(getTestTimeout());
rebFut1.get(getTestTimeout());
@@ -226,87 +217,6 @@ public class StopRebuildIndexTest extends GridCommonAbstractTest {
}
/**
- * Checks that when starting an index rebuild sequentially,
- * the previous rebuild will be canceled and a new one will start.
- *
- * This behavior should be discussed in IGNITE-14321.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testSequentialRebuildIndexes() throws Exception {
- IgniteEx n = prepareCluster(10);
-
- int cacheId = n.cachex(DEFAULT_CACHE_NAME).context().cacheId();
- int cacheSize = n.cachex(DEFAULT_CACHE_NAME).size();
-
- stopAllGrids();
- deleteIndexBin(n.context().igniteInstanceName());
-
- IndexProcessor.idxRebuildCls = IndexesRebuildTaskEx.class;
-
- GridFutureAdapter<?> startBlockRebIdxFut0 = new GridFutureAdapter<>();
- GridFutureAdapter<?> endBlockRebIdxFut0 = new GridFutureAdapter<>();
-
- IndexesRebuildTaskEx.cacheRowConsumer.put(DEFAULT_CACHE_NAME, row -> {
- IgniteInternalFuture<?> fut = indexRebuildFuture(grid(0), cacheId);
- assertNotNull(fut);
- assertFalse(fut.isDone());
-
- startBlockRebIdxFut0.onDone();
- endBlockRebIdxFut0.get(getTestTimeout());
- });
-
- n = startGrid(0);
-
- n.cluster().state(ACTIVE);
- awaitPartitionMapExchange();
-
- startBlockRebIdxFut0.get(getTestTimeout());
-
- IgniteInternalFuture<?> rebIdxFut = indexRebuildFuture(n, cacheId);
- assertNotNull(rebIdxFut);
- assertFalse(rebIdxFut.isDone());
-
- SchemaIndexCacheFuture intRebIdxFut = internalIndexRebuildFuture(n, cacheId);
- assertNotNull(intRebIdxFut);
- assertFalse(intRebIdxFut.isDone());
- assertFalse(intRebIdxFut.cancelToken().isCancelled());
-
- GridFutureAdapter<IgniteInternalFuture<?>> forceRebIdxFut = new GridFutureAdapter<>();
-
- IgniteInternalFuture<?> startForceRebIdxFut = runAsync(() -> {
- IgniteEx n0 = grid(0);
-
- IndexesRebuildTaskEx.cacheRowConsumer.put(DEFAULT_CACHE_NAME, row -> {
- forceRebIdxFut.onDone(internalIndexRebuildFuture(n0, cacheId));
- });
-
- n0.context().cache().context().database().forceRebuildIndexes(
- F.asList(n0.cachex(DEFAULT_CACHE_NAME).context()));
-
- return null;
- });
-
- assertTrue(waitForCondition(intRebIdxFut.cancelToken()::isCancelled, getTestTimeout()));
- endBlockRebIdxFut0.onDone();
-
- rebIdxFut.get(getTestTimeout());
-
- assertThrows(
- log,
- () -> intRebIdxFut.get(getTestTimeout()),
- SchemaIndexOperationCancellationException.class,
- null
- );
-
- startForceRebIdxFut.get(getTestTimeout());
- forceRebIdxFut.get(getTestTimeout()).get(getTestTimeout());
-
- assertEquals(cacheSize, cacheMetrics0(n, DEFAULT_CACHE_NAME).getIndexRebuildKeysProcessed());
- }
-
- /**
* Restart the rebuild of the indexes, checking that it completes gracefully.
*
* @param stopRebuildIndexes Stop index rebuild function.
@@ -322,12 +232,11 @@ public class StopRebuildIndexTest extends GridCommonAbstractTest {
int keys = 100_000;
IgniteEx n = prepareCluster(keys);
- IndexesRebuildTaskEx.cacheRowConsumer.put(DEFAULT_CACHE_NAME, row -> {
- U.sleep(10);
- });
-
GridCacheContext<?, ?> cacheCtx = n.cachex(DEFAULT_CACHE_NAME).context();
- n.context().cache().context().database().forceRebuildIndexes(F.asList(cacheCtx));
+
+ addCacheRowConsumer(nodeName(n), cacheCtx.name(), row -> U.sleep(10));
+
+ forceRebuildIndexes(n, cacheCtx);
IgniteInternalFuture<?> fut0 = indexRebuildFuture(n, cacheCtx.cacheId());
assertNotNull(fut0);
@@ -335,7 +244,7 @@ public class StopRebuildIndexTest extends GridCommonAbstractTest {
SchemaIndexCacheFuture fut1 = internalIndexRebuildFuture(n, cacheCtx.cacheId());
assertNotNull(fut1);
- CacheMetricsImpl metrics0 = cacheMetrics0(n, DEFAULT_CACHE_NAME);
+ CacheMetricsImpl metrics0 = cacheMetrics0(n, cacheCtx.name());
assertTrue(metrics0.isIndexRebuildInProgress());
assertFalse(fut0.isDone());
@@ -399,62 +308,4 @@ public class StopRebuildIndexTest extends GridCommonAbstractTest {
return ((Map<Integer, SchemaIndexCacheFuture>)getFieldValueHierarchy(idxRebuild, "idxRebuildFuts")).get(cacheId);
}
-
- /**
- * Getting rebuild index future for the cache.
- *
- * @param n Node.
- * @param cacheId Cache id.
- * @return Rebuild index future.
- */
- @Nullable private IgniteInternalFuture<?> indexRebuildFuture(IgniteEx n, int cacheId) {
- return n.context().query().indexRebuildFuture(cacheId);
- }
-
- /**
- * Getting cache metrics.
- *
- * @param n Node.
- * @param cacheName Cache name.
- * @return Cache metrics.
- */
- private CacheMetricsImpl cacheMetrics0(IgniteEx n, String cacheName) {
- return n.cachex(cacheName).context().cache().metrics0();
- }
-
- /**
- * Extension {@link IndexesRebuildTask} for the test.
- */
- private static class IndexesRebuildTaskEx extends IndexesRebuildTask {
- /** Consumer for cache rows when rebuilding indexes. */
- private static final Map<String, IgniteThrowableConsumer<CacheDataRow>> cacheRowConsumer =
- new ConcurrentHashMap<>();
-
- /** A function that should run before preparing to rebuild the cache indexes. */
- private static final Map<String, Runnable> cacheRebuildRunner = new ConcurrentHashMap<>();
-
- /** {@inheritDoc} */
- @Override protected void startRebuild(
- GridCacheContext cctx,
- GridFutureAdapter<Void> rebuildIdxFut,
- SchemaIndexCacheVisitorClosure clo,
- SchemaIndexOperationCancellationToken cancel
- ) {
- super.startRebuild(cctx, rebuildIdxFut, new SchemaIndexCacheVisitorClosure() {
- /** {@inheritDoc} */
- @Override public void apply(CacheDataRow row) throws IgniteCheckedException {
- cacheRowConsumer.getOrDefault(cctx.name(), r -> {}).accept(row);
-
- clo.apply(row);
- }
- }, cancel);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> rebuild(GridCacheContext cctx) {
- cacheRebuildRunner.getOrDefault(cctx.name(), () -> {}).run();
-
- return super.rebuild(cctx);
- }
- }
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java
index f3b09c6..61e513b 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.visor.verify.ValidateIndexesClosure;
import org.apache.ignite.internal.visor.verify.VisorValidateIndexesJobResult;
import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
@@ -308,8 +309,8 @@ public class IgnitePdsIndexingDefragmentationTest extends IgnitePdsDefragmentati
private boolean rebuiltIndexes;
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> rebuild(GridCacheContext cctx) {
- IgniteInternalFuture<?> future = super.rebuild(cctx);
+ @Override @Nullable public IgniteInternalFuture<?> rebuild(GridCacheContext cctx, boolean force) {
+ IgniteInternalFuture<?> future = super.rebuild(cctx, force);
rebuiltIndexes = future != null;
return future;
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
index cdd352b..593f5df 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
@@ -21,6 +21,7 @@ import org.apache.ignite.internal.encryption.CacheGroupReencryptionTest;
import org.apache.ignite.internal.processors.cache.IgnitePdsSingleNodeWithIndexingAndGroupPutGetPersistenceSelfTest;
import org.apache.ignite.internal.processors.cache.IgnitePdsSingleNodeWithIndexingPutGetPersistenceTest;
import org.apache.ignite.internal.processors.cache.index.ClientReconnectWithSqlTableConfiguredTest;
+import org.apache.ignite.internal.processors.cache.index.ForceRebuildIndexTest;
import org.apache.ignite.internal.processors.cache.index.StopRebuildIndexTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsIndexingDefragmentationTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IgniteTcBotInitNewPageTest;
@@ -64,7 +65,8 @@ import org.junit.runners.Suite;
MultipleParallelCacheDeleteDeadlockTest.class,
CacheGroupReencryptionTest.class,
IgnitePdsIndexingDefragmentationTest.class,
- StopRebuildIndexTest.class
+ StopRebuildIndexTest.class,
+ ForceRebuildIndexTest.class
})
public class IgnitePdsWithIndexingTestSuite {
}