You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2021/06/07 09:11:14 UTC
[ignite] branch master updated: IGNITE-8719: Fix index create or
rebuild procedure in failover scenariots. (#9090)
This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 959a6be IGNITE-8719: Fix index create or rebuild procedure in failover scenariots. (#9090)
959a6be is described below
commit 959a6be5c5a81765497bd8b90095c451b5634668
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Mon Jun 7 12:10:56 2021 +0300
IGNITE-8719: Fix index create or rebuild procedure in failover scenariots. (#9090)
---
.../internal/dto/IgniteDataTransferObject.java | 2 +-
.../managers/indexing/IndexesRebuildTask.java | 4 +
.../processors/cache/GridCacheProcessor.java | 97 ++---
.../GridCacheDatabaseSharedManager.java | 7 +-
.../processors/query/GridQueryProcessor.java | 119 ++++--
.../query/aware/IndexRebuildCacheInfo.java | 80 ++++
.../IndexRebuildFutureStorage.java} | 6 +-
.../processors/query/aware/IndexRebuildState.java | 104 +++++
.../query/aware/IndexRebuildStateStorage.java | 298 ++++++++++++++
.../internal/util/function/ThrowableFunction.java | 37 ++
.../DurableBackgroundTasksProcessorSelfTest.java | 27 --
.../apache/ignite/testframework/GridTestUtils.java | 17 +
.../junits/common/GridCommonAbstractTest.java | 26 ++
.../cache/index/AbstractRebuildIndexTest.java | 218 ++++++++++
.../cache/index/ForceRebuildIndexTest.java | 158 +------
.../cache/index/IndexesRebuildTaskEx.java | 44 ++
.../cache/index/ResumeRebuildIndexTest.java | 456 +++++++++++++++++++++
.../cache/index/StopRebuildIndexTest.java | 82 +---
.../testsuites/IgnitePdsWithIndexingTestSuite.java | 4 +-
19 files changed, 1476 insertions(+), 310 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/dto/IgniteDataTransferObject.java b/modules/core/src/main/java/org/apache/ignite/internal/dto/IgniteDataTransferObject.java
index 279586d..8f30be7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/dto/IgniteDataTransferObject.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/dto/IgniteDataTransferObject.java
@@ -32,7 +32,7 @@ import org.jetbrains.annotations.Nullable;
* Base class for data transfer objects.
*/
public abstract class IgniteDataTransferObject implements Externalizable {
- /** */
+ /** Serial version UUID. */
private static final long serialVersionUID = 0L;
/** Magic number to detect correct transfer objects. */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/IndexesRebuildTask.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/IndexesRebuildTask.java
index 6cb3850..69a5d95 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/IndexesRebuildTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/IndexesRebuildTask.java
@@ -97,6 +97,8 @@ public class IndexesRebuildTask {
// Check that the previous rebuild is completed.
assert prevIntRebFut == null;
+ cctx.kernalContext().query().onStartRebuildIndexes(cctx);
+
rebuildCacheIdxFut.listen(fut -> {
Throwable err = fut.error();
@@ -111,6 +113,8 @@ public class IndexesRebuildTask {
if (err != null)
U.error(log, "Failed to rebuild indexes for cache: " + cacheName, err);
+ else
+ cctx.kernalContext().query().onFinishRebuildIndexes(cctx);
idxRebuildFuts.remove(cctx.cacheId(), intRebFut);
intRebFut.onDone(err);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 142c4db..6ef5047 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -201,6 +201,7 @@ import static java.util.Objects.nonNull;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_REMOVED_ENTRIES_TTL;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
import static org.apache.ignite.IgniteSystemProperties.getBoolean;
@@ -2611,19 +2612,17 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/**
* @param cacheName Cache name.
* @param destroy Cache data destroy flag. Setting to <code>true</code> will remove all cache data.
- * @return Stopped cache context.
*/
- public GridCacheContext<?, ?> prepareCacheStop(String cacheName, boolean destroy) {
- return prepareCacheStop(cacheName, destroy, true);
+ public void prepareCacheStop(String cacheName, boolean destroy) {
+ prepareCacheStop(cacheName, destroy, true);
}
/**
* @param cacheName Cache name.
* @param destroy Cache data destroy flag. Setting to <code>true</code> will remove all cache data.
* @param clearDbObjects If {@code false} DB objects don't removed (used for cache.close() on client node).
- * @return Stopped cache context.
*/
- public GridCacheContext<?, ?> prepareCacheStop(String cacheName, boolean destroy, boolean clearDbObjects) {
+ public void prepareCacheStop(String cacheName, boolean destroy, boolean clearDbObjects) {
assert sharedCtx.database().checkpointLockIsHeldByThread();
GridCacheAdapter<?, ?> cache = caches.remove(cacheName);
@@ -2636,14 +2635,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
onKernalStop(cache, true);
stopCache(cache, true, destroy, clearDbObjects);
-
- return ctx;
}
else
- //Try to unregister query structures for not started caches.
+ // Try to unregister query structures for not started caches.
ctx.query().onCacheStop(cacheName);
-
- return null;
}
/**
@@ -2777,57 +2772,67 @@ public class GridCacheProcessor extends GridProcessorAdapter {
removeOffheapListenerAfterCheckpoint(grpsToStop);
Map<Integer, List<ExchangeActions.CacheActionData>> cachesToStop = exchActions.cacheStopRequests().stream()
- .collect(groupingBy(action -> action.descriptor().groupId()));
+ .collect(groupingBy(action -> action.descriptor().groupId()));
+
+ Set<Integer> grpIdToDestroy = grpsToStop.stream()
+ .filter(IgniteBiTuple::get2).map(t2 -> t2.get1().groupId()).collect(toSet());
try {
doInParallel(
- parallelismLvl,
- sharedCtx.kernalContext().getSystemExecutorService(),
- cachesToStop.entrySet(),
- cachesToStopByGrp -> {
- CacheGroupContext gctx = cacheGrps.get(cachesToStopByGrp.getKey());
+ parallelismLvl,
+ sharedCtx.kernalContext().getSystemExecutorService(),
+ cachesToStop.entrySet(),
+ cachesToStopByGrp -> {
+ CacheGroupContext gctx = cacheGrps.get(cachesToStopByGrp.getKey());
- if (gctx != null)
- gctx.preloader().pause();
+ if (gctx != null)
+ gctx.preloader().pause();
- try {
- if (gctx != null) {
- final String msg = "Failed to wait for topology update, cache group is stopping.";
+ try {
+ if (gctx != null) {
+ final String msg = "Failed to wait for topology update, cache group is stopping.";
- // If snapshot operation in progress we must throw CacheStoppedException
- // for correct cache proxy restart. For more details see
- // IgniteCacheProxy.cacheException()
- gctx.affinity().cancelFutures(new CacheStoppedException(msg));
- }
+ // If snapshot operation in progress we must throw CacheStoppedException
+ // for correct cache proxy restart. For more details see
+ // IgniteCacheProxy.cacheException()
+ gctx.affinity().cancelFutures(new CacheStoppedException(msg));
+ }
- for (ExchangeActions.CacheActionData action: cachesToStopByGrp.getValue()) {
- stopGateway(action.request());
+ for (ExchangeActions.CacheActionData action : cachesToStopByGrp.getValue()) {
+ stopGateway(action.request());
- context().tm().rollbackTransactionsForStoppingCache(action.descriptor().cacheId());
+ context().tm().rollbackTransactionsForStoppingCache(action.descriptor().cacheId());
- // TTL manager has to be unregistered before the checkpointReadLock is acquired.
- GridCacheAdapter<?, ?> cache = caches.get(action.request().cacheName());
+ String cacheName = action.request().cacheName();
- if (cache != null)
- cache.context().ttl().unregister();
+ // TTL manager has to be unregistered before the checkpointReadLock is acquired.
+ GridCacheAdapter<?, ?> cache = caches.get(cacheName);
- sharedCtx.database().checkpointReadLock();
+ if (cache != null)
+ cache.context().ttl().unregister();
- try {
- prepareCacheStop(action.request().cacheName(), action.request().destroy());
- }
- finally {
- sharedCtx.database().checkpointReadUnlock();
- }
+ sharedCtx.database().checkpointReadLock();
+
+ try {
+ boolean destroyCache = action.request().destroy();
+
+ prepareCacheStop(cacheName, destroyCache);
+
+ if (destroyCache || grpIdToDestroy.contains(cachesToStopByGrp.getKey()))
+ ctx.query().completeRebuildIndexes(cacheName);
+ }
+ finally {
+ sharedCtx.database().checkpointReadUnlock();
}
}
- finally {
- if (gctx != null)
- gctx.preloader().resume();
- }
-
- return null;
}
+ finally {
+ if (gctx != null)
+ gctx.preloader().resume();
+ }
+
+ return null;
+ }
);
}
catch (IgniteCheckedException e) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index a518c92..b7f9625 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -1492,9 +1492,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
for (GridCacheContext cacheCtx : contexts) {
if (rebuildCond.test(cacheCtx)) {
- IgniteInternalFuture<?> rebuildFut = qryProc.rebuildIndexesFromHash(cacheCtx, force);
+ IgniteInternalFuture<?> rebuildFut = qryProc.rebuildIndexesFromHash(
+ cacheCtx,
+ force || !qryProc.rebuildIndexesCompleted(cacheCtx)
+ );
- if (nonNull(rebuildFut))
+ if (rebuildFut != null)
rebuildFut.listen(fut -> rebuildIndexesCompleteCntr.countDown(true));
else
rebuildIndexesCompleteCntr.countDown(false);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 8348ad3..f3e4f99 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -87,6 +87,8 @@ import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
import org.apache.ignite.internal.processors.platform.PlatformContext;
import org.apache.ignite.internal.processors.platform.PlatformProcessor;
+import org.apache.ignite.internal.processors.query.aware.IndexRebuildFutureStorage;
+import org.apache.ignite.internal.processors.query.aware.IndexRebuildStateStorage;
import org.apache.ignite.internal.processors.query.property.QueryBinaryProperty;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
@@ -242,10 +244,15 @@ public class GridQueryProcessor extends GridProcessorAdapter {
/** Cache name - value typeId pairs for which type mismatch message was logged. */
private final Set<Long> missedCacheTypes = ConcurrentHashMap.newKeySet();
- /** Index rebuild aware. */
- private final IndexRebuildAware idxRebuildAware = new IndexRebuildAware();
+ /** Index rebuild futures. */
+ private final IndexRebuildFutureStorage idxRebuildFutStorage = new IndexRebuildFutureStorage();
+
+ /** Index rebuild states. */
+ private final IndexRebuildStateStorage idxRebuildStateStorage;
/**
+ * Constructor.
+ *
* @param ctx Kernal context.
*/
public GridQueryProcessor(GridKernalContext ctx) throws IgniteCheckedException {
@@ -257,25 +264,25 @@ public class GridQueryProcessor extends GridProcessorAdapter {
idxCls = null;
}
else
- idx = INDEXING.inClassPath() ? U.<GridQueryIndexing>newInstance(INDEXING.className()) : null;
+ idx = INDEXING.inClassPath() ? U.newInstance(INDEXING.className()) : null;
idxProc = ctx.indexProcessor();
valCtx = new CacheQueryObjectValueContext(ctx);
- ioLsnr = new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
- if (msg instanceof SchemaOperationStatusMessage) {
- SchemaOperationStatusMessage msg0 = (SchemaOperationStatusMessage)msg;
+ ioLsnr = (nodeId, msg, plc) -> {
+ if (msg instanceof SchemaOperationStatusMessage) {
+ SchemaOperationStatusMessage msg0 = (SchemaOperationStatusMessage)msg;
- msg0.senderNodeId(nodeId);
+ msg0.senderNodeId(nodeId);
- processStatusMessage(msg0);
- }
- else
- U.warn(log, "Unsupported IO message: " + msg);
+ processStatusMessage(msg0);
}
+ else
+ U.warn(log, "Unsupported IO message: " + msg);
};
+
+ idxRebuildStateStorage = new IndexRebuildStateStorage(ctx);
}
/** {@inheritDoc} */
@@ -291,12 +298,12 @@ public class GridQueryProcessor extends GridProcessorAdapter {
ctx.io().addMessageListener(TOPIC_SCHEMA, ioLsnr);
// Schedule queries detail metrics eviction.
- qryDetailMetricsEvictTask = ctx.timeout().schedule(new Runnable() {
- @Override public void run() {
- for (GridCacheContext ctxs : ctx.cache().context().cacheContexts())
- ctxs.queries().evictDetailMetrics();
- }
+ qryDetailMetricsEvictTask = ctx.timeout().schedule(() -> {
+ for (GridCacheContext ctxs : ctx.cache().context().cacheContexts())
+ ctxs.queries().evictDetailMetrics();
}, QRY_DETAIL_METRICS_EVICTION_FREQ, QRY_DETAIL_METRICS_EVICTION_FREQ);
+
+ idxRebuildStateStorage.start();
}
/** {@inheritDoc} */
@@ -318,6 +325,8 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
busyLock.block();
+
+ idxRebuildStateStorage.stop();
}
/** {@inheritDoc} */
@@ -347,6 +356,8 @@ public class GridQueryProcessor extends GridProcessorAdapter {
for (SchemaOperation schemaOp : schemaOps.values())
onSchemaPropose(schemaOp.proposeMessage());
}
+
+ idxRebuildStateStorage.onCacheKernalStart();
}
/**
@@ -459,7 +470,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
public void beforeExchange(GridDhtPartitionsExchangeFuture fut) {
Set<Integer> cacheIds = rebuildIndexCacheIds(fut);
- Set<Integer> rejected = idxRebuildAware.prepareRebuildIndexes(cacheIds, fut.initialVersion());
+ Set<Integer> rejected = idxRebuildFutStorage.prepareRebuildIndexes(cacheIds, fut.initialVersion());
if (log.isDebugEnabled()) {
log.debug("Preparing features of rebuilding indexes for caches on exchange [requested=" + cacheIds +
@@ -1947,7 +1958,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
registerCache0(op0.cacheName(), op.schemaName(), cacheInfo, candRes.get1(), false);
}
- if (idxRebuildAware.prepareRebuildIndexes(singleton(cacheInfo.cacheId()), null).isEmpty())
+ if (idxRebuildFutStorage.prepareRebuildIndexes(singleton(cacheInfo.cacheId()), null).isEmpty())
rebuildIndexesFromHash0(cacheInfo.cacheContext(), false);
else {
if (log.isInfoEnabled())
@@ -2377,7 +2388,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
@Nullable IgniteInternalFuture<?> idxFut,
GridCacheContext<?, ?> cctx
) {
- GridFutureAdapter<Void> res = idxRebuildAware.indexRebuildFuture(cctx.cacheId());
+ GridFutureAdapter<Void> res = idxRebuildFutStorage.indexRebuildFuture(cctx.cacheId());
assert res != null;
@@ -2395,13 +2406,13 @@ public class GridQueryProcessor extends GridProcessorAdapter {
else if (!(err instanceof NodeStoppingException))
log.error("Failed to rebuild indexes for cache " + cacheInfo, err);
- idxRebuildAware.onFinishRebuildIndexes(cctx.cacheId(), err);
+ idxRebuildFutStorage.onFinishRebuildIndexes(cctx.cacheId(), err);
});
return res;
}
else {
- idxRebuildAware.onFinishRebuildIndexes(cctx.cacheId(), null);
+ idxRebuildFutStorage.onFinishRebuildIndexes(cctx.cacheId(), null);
return null;
}
@@ -2411,7 +2422,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
* @return Future that will be completed when indexes for given cache are restored.
*/
@Nullable public IgniteInternalFuture<?> indexRebuildFuture(int cacheId) {
- return idxRebuildAware.indexRebuildFuture(cacheId);
+ return idxRebuildFutStorage.indexRebuildFuture(cacheId);
}
/**
@@ -3811,7 +3822,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
GridDhtPartitionsExchangeFuture fut,
@Nullable Set<Integer> cacheIds
) {
- idxRebuildAware.cancelRebuildIndexesOnExchange(
+ idxRebuildFutStorage.cancelRebuildIndexesOnExchange(
cacheIds != null ? cacheIds : rebuildIndexCacheIds(fut),
fut.initialVersion()
);
@@ -3825,7 +3836,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
* @return {@code True} if need to rebuild.
*/
public boolean rebuildIndexOnExchange(int cacheId, GridDhtPartitionsExchangeFuture fut) {
- return idxRebuildAware.rebuildIndexesOnExchange(cacheId, fut.initialVersion());
+ return idxRebuildFutStorage.rebuildIndexesOnExchange(cacheId, fut.initialVersion());
}
/**
@@ -3836,7 +3847,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
* @return Cache ids for which features have not been added.
*/
public Set<Integer> prepareRebuildIndexes(Set<Integer> cacheIds) {
- return idxRebuildAware.prepareRebuildIndexes(cacheIds, null);
+ return idxRebuildFutStorage.prepareRebuildIndexes(cacheIds, null);
}
/**
@@ -3865,4 +3876,60 @@ public class GridQueryProcessor extends GridProcessorAdapter {
return cacheIds;
}
+
+ /**
+ * Callback on start of rebuild cache indexes.
+ * <p/>
+ * Adding an entry that rebuilding the cache indexes in progress.
+ * If the cache is persistent, then add this entry to the MetaStorage.
+ * <p/>
+ * When restarting/reactivating the node, it will be possible to check if
+ * the rebuilding of the indexes has been {@link #rebuildIndexesCompleted}.
+ *
+ * @param cacheCtx Cache context.
+ * @see #onFinishRebuildIndexes
+ * @see #rebuildIndexesCompleted
+ */
+ public void onStartRebuildIndexes(GridCacheContext cacheCtx) {
+ idxRebuildStateStorage.onStartRebuildIndexes(cacheCtx);
+ }
+
+ /**
+ * Callback on finish of rebuild cache indexes.
+ * <p/>
+ * If the cache is persistent, then we mark that the rebuilding of the
+ * indexes is completed and the entry will be deleted from the MetaStorage
+ * at the end of the checkpoint. Otherwise, delete the index rebuild entry.
+ *
+ * @param cacheCtx Cache context.
+ */
+ public void onFinishRebuildIndexes(GridCacheContext cacheCtx) {
+ idxRebuildStateStorage.onFinishRebuildIndexes(cacheCtx.name());
+ }
+
+ /**
+ * Check if rebuilding of indexes for the cache has been completed.
+ *
+ * @param cacheCtx Cache context.
+ * @return {@code True} if completed.
+ * @see #onStartRebuildIndexes
+ * @see #onFinishRebuildIndexes
+ */
+ public boolean rebuildIndexesCompleted(GridCacheContext cacheCtx) {
+ return idxRebuildStateStorage.completed(cacheCtx.name());
+ }
+
+ /**
+ * Force a mark that the index rebuild for the cache has completed.
+ * <p/>
+ * If the cache is persistent, then we mark that the rebuilding of the
+ * indexes is completed and the entry will be deleted from the MetaStorage
+ * at the end of the checkpoint. Otherwise, delete the index rebuild entry.
+ *
+ *
+ * @param cacheName Cache name.
+ */
+ public void completeRebuildIndexes(String cacheName) {
+ idxRebuildStateStorage.onFinishRebuildIndexes(cacheName);
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/aware/IndexRebuildCacheInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/aware/IndexRebuildCacheInfo.java
new file mode 100644
index 0000000..4bab971
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/aware/IndexRebuildCacheInfo.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.aware;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.ignite.internal.dto.IgniteDataTransferObject;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Information about the cache for which index rebuilding was started.
+ * Designed for MetaStorage.
+ */
+public class IndexRebuildCacheInfo extends IgniteDataTransferObject {
+ /** Serial version UUID. */
+ private static final long serialVersionUID = 0L;
+
+ /** Cache name. */
+ private String cacheName;
+
+ /**
+ * Default constructor for {@link Externalizable}.
+ */
+ public IndexRebuildCacheInfo() {
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param cacheName Cache name.
+ */
+ public IndexRebuildCacheInfo(String cacheName) {
+ this.cacheName = cacheName;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeExternalData(ObjectOutput out) throws IOException {
+ U.writeLongString(out, cacheName);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void readExternalData(
+ byte protoVer,
+ ObjectInput in
+ ) throws IOException, ClassNotFoundException {
+ cacheName = U.readLongString(in);
+ }
+
+ /**
+ * Getting cache name.
+ *
+ * @return Cache name.
+ */
+ public String cacheName() {
+ return cacheName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IndexRebuildCacheInfo.class, this);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/IndexRebuildAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/aware/IndexRebuildFutureStorage.java
similarity index 96%
rename from modules/core/src/main/java/org/apache/ignite/internal/processors/query/IndexRebuildAware.java
rename to modules/core/src/main/java/org/apache/ignite/internal/processors/query/aware/IndexRebuildFutureStorage.java
index 0535705..df917b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/IndexRebuildAware.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/aware/IndexRebuildFutureStorage.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query;
+package org.apache.ignite.internal.processors.query.aware;
import java.util.HashMap;
import java.util.HashSet;
@@ -28,10 +28,10 @@ import org.jetbrains.annotations.Nullable;
import static java.util.Collections.emptySet;
/**
- * Holder actual information about the state of rebuilding indexes.
+ * Holder of up-to-date information of rebuild index futures for caches.
* Thread safe.
*/
-public class IndexRebuildAware {
+public class IndexRebuildFutureStorage {
/**
* Futures to track the status of index rebuilds.
* Mapping: Cache id -> Future.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/aware/IndexRebuildState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/aware/IndexRebuildState.java
new file mode 100644
index 0000000..06a2b62
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/aware/IndexRebuildState.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.aware;
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import static org.apache.ignite.internal.processors.query.aware.IndexRebuildState.State.INIT;
+
+/**
+ * State of rebuilding indexes for the cache.
+ */
+public class IndexRebuildState {
+ /**
+ * Enumeration of index rebuild states.
+ */
+ enum State {
+ /** Initial state. */
+ INIT,
+
+ /** Completed. */
+ COMPLETED,
+
+ /** To be deleted. */
+ DELETE
+ }
+
+ /** Index rebuild state state atomic updater. */
+ private static final AtomicReferenceFieldUpdater<IndexRebuildState, State> STATE_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(IndexRebuildState.class, State.class, "state");
+
+ /** Persistent cache. */
+ private final boolean persistent;
+
+ /** Index rebuild state. */
+ private volatile State state = INIT;
+
+ /**
+ * Constructor.
+ *
+ * @param persistent Persistent cache.
+ */
+ public IndexRebuildState(boolean persistent) {
+ this.persistent = persistent;
+ }
+
+ /**
+ * Checking if the cache is persistent.
+ *
+ * @return {@code True} if persistent.
+ */
+ public boolean persistent() {
+ return persistent;
+ }
+
+ /**
+ * Getting the state rebuild of indexes.
+ *
+ * @return Current state.
+ */
+ public State state() {
+ return state;
+ }
+
+ /**
+ * Setting the state rebuild of the indexes.
+ *
+ * @param state New state.
+ */
+ public void state(State state) {
+ this.state = state;
+ }
+
+ /**
+ * Atomically sets of the state rebuild of the indexes.
+ *
+ * @param exp Expected state.
+ * @param newState New state.
+ * @return {@code True} if successful.
+ */
+ public boolean state(State exp, State newState) {
+ return STATE_UPDATER.compareAndSet(this, exp, newState);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IndexRebuildState.class, this);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/aware/IndexRebuildStateStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/aware/IndexRebuildStateStorage.java
new file mode 100644
index 0000000..3a656ec
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/aware/IndexRebuildStateStorage.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.aware;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.util.GridBusyLock;
+import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+
+import static org.apache.ignite.internal.processors.query.aware.IndexRebuildState.State.COMPLETED;
+import static org.apache.ignite.internal.processors.query.aware.IndexRebuildState.State.DELETE;
+import static org.apache.ignite.internal.processors.query.aware.IndexRebuildState.State.INIT;
+
+/**
+ * Holder of up-to-date information about rebuilding cache indexes.
+ * Helps to avoid the situation when the index rebuilding process is interrupted
+ * and after a node restart/reactivation the indexes will become inconsistent.
+ * <p/>
+ * To do this, before rebuilding the indexes, call {@link #onStartRebuildIndexes}
+ * and after it {@link #onFinishRebuildIndexes}. Use {@link #completed} to check
+ * if the index rebuild has completed.
+ * <p/>
+ * To prevent leaks, it is necessary to use {@link #onFinishRebuildIndexes}
+ * when detecting the fact of destroying the cache.
+ */
+public class IndexRebuildStateStorage implements MetastorageLifecycleListener, CheckpointListener {
+ /** Key prefix for the MetaStorage. */
+ public static final String KEY_PREFIX = "rebuild-sql-indexes-";
+
+ /** Kernal context. */
+ private final GridKernalContext ctx;
+
+ /** MetaStorage synchronization mutex. */
+ private final Object metaStorageMux = new Object();
+
+ /** Node stop lock. */
+ private final GridBusyLock stopNodeLock = new GridBusyLock();
+
+ /** Current states. Mapping: cache name -> index rebuild state. */
+ private final ConcurrentMap<String, IndexRebuildState> states = new ConcurrentHashMap<>();
+
+ /**
+ * Constructor.
+ *
+ * @param ctx Kernal context.
+ */
+ public IndexRebuildStateStorage(GridKernalContext ctx) {
+ this.ctx = ctx;
+ }
+
+ /**
+ * Callback on start of {@link GridQueryProcessor}.
+ */
+ public void start() {
+ ctx.internalSubscriptionProcessor().registerMetastorageListener(this);
+ }
+
+ /**
+ * Callback on start of {@link GridCacheProcessor}.
+ */
+ public void onCacheKernalStart() {
+ Set<String> toComplete = new HashSet<>(states.keySet());
+ toComplete.removeAll(ctx.cache().cacheDescriptors().keySet());
+
+ for (String cacheName : toComplete)
+ onFinishRebuildIndexes(cacheName);
+ }
+
+ /**
+ * Callback on kernel stop.
+ */
+ public void stop() {
+ stopNodeLock.block();
+ }
+
+ /**
+ * Callback on start of rebuild cache indexes.
+ * <p/>
+ * Adding an entry that rebuilding the cache indexes in progress.
+ * If the cache is persistent, then add this entry to the MetaStorage.
+ *
+ * @param cacheCtx Cache context.
+ * @see #onFinishRebuildIndexes
+ */
+ public void onStartRebuildIndexes(GridCacheContext cacheCtx) {
+ if (!stopNodeLock.enterBusy())
+ throw new IgniteException("Node is stopping.");
+
+ try {
+ String cacheName = cacheCtx.name();
+ boolean persistent = CU.isPersistentCache(cacheCtx.config(), ctx.config().getDataStorageConfiguration());
+
+ states.compute(cacheName, (k, prev) -> {
+ if (prev != null) {
+ prev.state(INIT);
+
+ return prev;
+ }
+ else
+ return new IndexRebuildState(persistent);
+ });
+
+ if (persistent) {
+ metaStorageOperation(metaStorage -> {
+ assert metaStorage != null;
+
+ metaStorage.write(metaStorageKey(cacheName), new IndexRebuildCacheInfo(cacheName));
+ });
+ }
+ }
+ finally {
+ stopNodeLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Callback on finish of rebuild cache indexes.
+ * <p/>
+ * If the cache is persistent, then we mark that the rebuilding of the
+ * indexes is completed and the entry will be deleted from the MetaStorage
+ * at the end of the checkpoint. Otherwise, delete the index rebuild entry.
+ *
+ * @param cacheName Cache name.
+ * @see #onStartRebuildIndexes
+ */
+ public void onFinishRebuildIndexes(String cacheName) {
+ states.compute(cacheName, (k, prev) -> {
+ if (prev != null && prev.persistent()) {
+ prev.state(INIT, COMPLETED);
+
+ return prev;
+ }
+ else
+ return null;
+ });
+ }
+
+ /**
+ * Check if rebuilding of indexes for the cache has been completed.
+ *
+ * @param cacheName Cache name.
+ * @return {@code True} if completed.
+ */
+ public boolean completed(String cacheName) {
+ IndexRebuildState state = states.get(cacheName);
+
+ return state == null || state.state() != INIT;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onReadyForReadWrite(ReadWriteMetastorage metastorage) {
+ ((GridCacheDatabaseSharedManager)ctx.cache().context().database()).addCheckpointListener(this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onReadyForRead(ReadOnlyMetastorage metastorage) {
+ if (!stopNodeLock.enterBusy())
+ return;
+
+ try {
+ metaStorageOperation(metaStorage -> {
+ assert metaStorage != null;
+
+ metaStorage.iterate(
+ KEY_PREFIX,
+ (k, v) -> {
+ IndexRebuildCacheInfo cacheInfo = (IndexRebuildCacheInfo)v;
+
+ states.put(cacheInfo.cacheName(), new IndexRebuildState(true));
+ },
+ true
+ );
+ });
+ }
+ finally {
+ stopNodeLock.leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMarkCheckpointBegin(Context ctx) {
+ if (!stopNodeLock.enterBusy())
+ return;
+
+ try {
+ for (IndexRebuildState state : states.values()) {
+ if (state.state(COMPLETED, DELETE))
+ assert state.persistent();
+ }
+ }
+ finally {
+ stopNodeLock.leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onCheckpointBegin(Context ctx) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void beforeCheckpointBegin(Context ctx) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void afterCheckpointEnd(Context ctx) {
+ if (!stopNodeLock.enterBusy())
+ return;
+
+ try {
+ for (String cacheName : states.keySet()) {
+ // Trying to concurrently delete the state.
+ IndexRebuildState newVal =
+ states.compute(cacheName, (k, prev) -> prev != null && prev.state() == DELETE ? null : prev);
+
+ // Assume that the state has been deleted.
+ if (newVal == null) {
+ metaStorageOperation(metaStorage -> {
+ assert metaStorage != null;
+
+ if (!states.containsKey(cacheName))
+ metaStorage.remove(metaStorageKey(cacheName));
+ });
+ }
+ }
+ }
+ finally {
+ stopNodeLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Performing an operation on a {@link MetaStorage}.
+ * Guarded by {@link #metaStorageMux}.
+ *
+ * @param consumer MetaStorage operation, argument can be {@code null}.
+ * @throws IgniteException If an exception is thrown from the {@code consumer}.
+ */
+ private void metaStorageOperation(IgniteThrowableConsumer<MetaStorage> consumer) {
+ synchronized (metaStorageMux) {
+ IgniteCacheDatabaseSharedManager db = ctx.cache().context().database();
+
+ db.checkpointReadLock();
+
+ try {
+ consumer.accept(db.metaStorage());
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ finally {
+ db.checkpointReadUnlock();
+ }
+ }
+ }
+
+ /**
+ * Getting MetaStorage key for index rebuild state.
+ *
+ * @param cacheName Cache name.
+ * @return MetaStorage key.
+ */
+ private static String metaStorageKey(String cacheName) {
+ return KEY_PREFIX + cacheName;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/function/ThrowableFunction.java b/modules/core/src/main/java/org/apache/ignite/internal/util/function/ThrowableFunction.java
new file mode 100644
index 0000000..f7bf372
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/function/ThrowableFunction.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.function;
+
+/**
+ * Specific interface for transmitting exceptions from lambda to external method without a catch.
+ *
+ * @param <T> The type of the input to the function.
+ * @param <R> The type of the result of the function.
+ * @param <E> The exception to be thrown from the body of the function.
+ */
+@FunctionalInterface
+public interface ThrowableFunction<R, T, E extends Exception> {
+ /**
+ * Applies this function to the given argument.
+ *
+ * @param t The function argument.
+ * @return The function result.
+ * @throws E If failed.
+ */
+ R apply(T t) throws E;
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessorSelfTest.java
index 8b389b8..be2c25d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessorSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessorSelfTest.java
@@ -28,15 +28,12 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.StopNodeFailureHandler;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointWorkflow;
-import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTaskResult;
import org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.lang.IgniteThrowableFunction;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
@@ -488,30 +485,6 @@ public class DurableBackgroundTasksProcessorSelfTest extends GridCommonAbstractT
}
/**
- * Performing an operation on a MetaStorage.
- *
- * @param n Node.
- * @param fun Function for working with MetaStorage, the argument can be {@code null}.
- * @return The function result.
- * @throws IgniteCheckedException If failed.
- */
- private <R> R metaStorageOperation(
- IgniteEx n,
- IgniteThrowableFunction<MetaStorage, R> fun
- ) throws IgniteCheckedException {
- GridCacheDatabaseSharedManager dbMgr = dbMgr(n);
-
- dbMgr.checkpointReadLock();
-
- try {
- return fun.apply(dbMgr.metaStorage());
- }
- finally {
- dbMgr.checkpointReadUnlock();
- }
- }
-
- /**
* Getting {@code CheckpointManager#checkpointWorkflow}.
*
* @param n Node.
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index facf22b..3800cfb 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -2200,6 +2200,23 @@ public final class GridTestUtils {
}
/**
+ * Removing the directory cache groups.
+ * Deletes all directory satisfy the {@code cacheGrpFilter}.
+ *
+ * @param igniteInstanceName Ignite instance name.
+ * @param cacheGrpFilter Filter cache groups.
+ * @throws Exception If failed.
+ */
+ public static void deleteCacheGrpDir(String igniteInstanceName, FilenameFilter cacheGrpFilter) throws Exception {
+ File workDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false);
+
+ String nodeDirName = U.maskForFileName(igniteInstanceName);
+
+ for (File cacheGrpDir : new File(workDir, nodeDirName).listFiles(cacheGrpFilter))
+ U.delete(cacheGrpDir);
+ }
+
+ /**
* {@link Class#getSimpleName()} does not return outer class name prefix for inner classes, for example,
* getSimpleName() returns "RegularDiscovery" instead of "GridDiscoveryManagerSelfTest$RegularDiscovery"
* This method return correct simple name for inner classes.
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 0f02435..ad9e023 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -108,6 +108,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCach
import org.apache.ignite.internal.processors.cache.local.GridLocalCache;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
@@ -116,6 +117,7 @@ import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
import org.apache.ignite.internal.processors.service.IgniteServiceProcessor;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.lang.IgniteThrowableFunction;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.PA;
@@ -2724,4 +2726,28 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
protected GridCacheDatabaseSharedManager dbMgr(IgniteEx n) {
return (GridCacheDatabaseSharedManager)n.context().cache().context().database();
}
+
+ /**
+ * Performing an operation on a MetaStorage.
+ *
+ * @param n Node.
+ * @param fun Function for working with MetaStorage, the argument can be {@code null}.
+ * @return The function result.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected <R> R metaStorageOperation(
+ IgniteEx n,
+ IgniteThrowableFunction<MetaStorage, R> fun
+ ) throws IgniteCheckedException {
+ GridCacheDatabaseSharedManager dbMgr = dbMgr(n);
+
+ dbMgr.checkpointReadLock();
+
+ try {
+ return fun.apply(dbMgr.metaStorage());
+ }
+ finally {
+ dbMgr.checkpointReadUnlock();
+ }
+ }
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractRebuildIndexTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractRebuildIndexTest.java
new file mode 100644
index 0000000..bb6c34a
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractRebuildIndexTest.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.index;
+
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.client.Person;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.CacheMetricsImpl;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.BreakRebuildIndexConsumer;
+import org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.StopRebuildIndexConsumer;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.util.lang.IgniteThrowableBiPredicate;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.addCacheRowConsumer;
+import static org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.nodeName;
+import static org.apache.ignite.testframework.GridTestUtils.deleteIndexBin;
+
+/**
+ * Base class for testing index rebuilds.
+ */
+public abstract class AbstractRebuildIndexTest extends GridCommonAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ IndexesRebuildTaskEx.clean(getTestIgniteInstanceName());
+
+ stopAllGrids();
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ IndexesRebuildTaskEx.clean(getTestIgniteInstanceName());
+
+ stopAllGrids();
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setConsistentId(igniteInstanceName)
+ .setFailureHandler(new StopNodeFailureHandler())
+ .setDataStorageConfiguration(
+ new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true))
+ ).setCacheConfiguration(cacheCfg(DEFAULT_CACHE_NAME, null));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteEx startGrid(int idx) throws Exception {
+ IgniteEx n = super.startGrid(idx);
+
+ n.cluster().state(ACTIVE);
+
+ return n;
+ }
+
+ /**
+ * Registering a {@link StopRebuildIndexConsumer} for cache.
+ *
+ * @param n Node.
+ * @param cacheName Cache name.
+ * @return New instance of {@link StopRebuildIndexConsumer}.
+ */
+ protected StopRebuildIndexConsumer addStopRebuildIndexConsumer(IgniteEx n, String cacheName) {
+ StopRebuildIndexConsumer stopRebuildIdxConsumer = new StopRebuildIndexConsumer(getTestTimeout());
+
+ addCacheRowConsumer(nodeName(n), cacheName, stopRebuildIdxConsumer);
+
+ return stopRebuildIdxConsumer;
+ }
+
+ /**
+ * Registering a {@link BreakRebuildIndexConsumer} for cache.
+ *
+ * @param n Node.
+ * @param cacheName Cache name.
+ * @param breakPred Predicate for throwing an {@link IgniteCheckedException}.
+ * @return New instance of {@link BreakRebuildIndexConsumer}.
+ */
+ protected BreakRebuildIndexConsumer addBreakRebuildIndexConsumer(
+ IgniteEx n,
+ String cacheName,
+ IgniteThrowableBiPredicate<BreakRebuildIndexConsumer, CacheDataRow> breakPred
+ ) {
+ BreakRebuildIndexConsumer breakRebuildIdxConsumer = new BreakRebuildIndexConsumer(getTestTimeout(), breakPred);
+
+ addCacheRowConsumer(nodeName(n), cacheName, breakRebuildIdxConsumer);
+
+ return breakRebuildIdxConsumer;
+ }
+
+ /**
+ * Checking that rebuilding indexes for the cache has started.
+ *
+ * @param n Node.
+ * @param cacheCtx Cache context.
+ * @return Rebuild index future.
+ */
+ protected IgniteInternalFuture<?> checkStartRebuildIndexes(IgniteEx n, GridCacheContext<?, ?> cacheCtx) {
+ IgniteInternalFuture<?> idxRebFut = indexRebuildFuture(n, cacheCtx.cacheId());
+
+ assertNotNull(idxRebFut);
+ assertFalse(idxRebFut.isDone());
+
+ checkCacheMetrics0(n, cacheCtx.name(), true, 0L);
+
+ return idxRebFut;
+ }
+
+ /**
+ * Checking metrics rebuilding indexes of cache.
+ *
+ * @param n Node.
+ * @param cacheName Cache name.
+ * @param expIdxRebuildInProgress The expected status of rebuilding indexes.
+ * @param expIdxRebuildKeysProcessed The expected number of keys processed during index rebuilding.
+ */
+ protected void checkCacheMetrics0(
+ IgniteEx n,
+ String cacheName,
+ boolean expIdxRebuildInProgress,
+ @Nullable Long expIdxRebuildKeysProcessed
+ ) {
+ CacheMetricsImpl metrics0 = cacheMetrics0(n, cacheName);
+ assertNotNull(metrics0);
+
+ assertEquals(expIdxRebuildInProgress, metrics0.isIndexRebuildInProgress());
+
+ if (expIdxRebuildKeysProcessed != null)
+ assertEquals(expIdxRebuildKeysProcessed.longValue(), metrics0.getIndexRebuildKeysProcessed());
+ }
+
+ /**
+ * Checking that the rebuild of indexes for the cache has completed.
+ *
+ * @param n Node.
+ * @param cacheCtx Cache context.
+ * @param expKeys The expected number of keys processed during index rebuilding
+ */
+ protected void checkFinishRebuildIndexes(IgniteEx n, GridCacheContext<?, ?> cacheCtx, int expKeys) {
+ assertNull(indexRebuildFuture(n, cacheCtx.cacheId()));
+
+ checkCacheMetrics0(n, cacheCtx.name(), false, (long)expKeys);
+ }
+
+ /**
+ * Stopping all nodes and deleting their index.bin.
+ *
+ * @throws Exception If failed.
+ */
+ protected void stopAllGridsWithDeleteIndexBin() throws Exception {
+ List<String> igniteInstanceNames = G.allGrids().stream().map(Ignite::name).collect(toList());
+
+ stopAllGrids();
+
+ for (String n : igniteInstanceNames)
+ deleteIndexBin(n);
+ }
+
+ /**
+ * Create cache configuration with index: {@link Integer} -> {@link Person}.
+ *
+ * @param cacheName Cache name.
+ * @param grpName Group name.
+ * @return New instance of the cache configuration.
+ */
+ protected <K, V> CacheConfiguration<K, V> cacheCfg(String cacheName, @Nullable String grpName) {
+ CacheConfiguration<K, V> cacheCfg = new CacheConfiguration<>(cacheName);
+
+ return cacheCfg.setGroupName(grpName).setIndexedTypes(Integer.class, Person.class);
+ }
+
+ /**
+ * Populate cache with {@link Person} sequentially.
+ *
+ * @param cache Cache.
+ * @param cnt Entry count.
+ */
+ protected void populate(IgniteCache<Integer, Person> cache, int cnt) {
+ for (int i = 0; i < cnt; i++)
+ cache.put(i, new Person(i, "name_" + i));
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/ForceRebuildIndexTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/ForceRebuildIndexTest.java
index 8f012a7..60cf450 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/ForceRebuildIndexTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/ForceRebuildIndexTest.java
@@ -17,68 +17,23 @@
package org.apache.ignite.internal.processors.cache.index;
-import org.apache.ignite.client.Person;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.DataRegionConfiguration;
-import org.apache.ignite.configuration.DataStorageConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.failure.StopNodeFailureHandler;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.cache.query.index.IndexProcessor;
-import org.apache.ignite.internal.processors.cache.CacheMetricsImpl;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.StopRebuildIndexConsumer;
-import org.apache.ignite.internal.processors.query.IndexRebuildAware;
+import org.apache.ignite.internal.processors.query.aware.IndexRebuildFutureStorage;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import static java.util.Collections.emptyList;
-import static org.apache.ignite.cluster.ClusterState.ACTIVE;
-import static org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.addCacheRowConsumer;
-import static org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.nodeName;
-import static org.apache.ignite.testframework.GridTestUtils.deleteIndexBin;
+import static org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.prepareBeforeNodeStart;
import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
/**
* Class for testing forced rebuilding of indexes.
*/
-public class ForceRebuildIndexTest extends GridCommonAbstractTest {
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- super.beforeTest();
-
- IndexesRebuildTaskEx.clean(getTestIgniteInstanceName());
-
- stopAllGrids();
- cleanPersistenceDir();
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- super.afterTest();
-
- IndexesRebuildTaskEx.clean(getTestIgniteInstanceName());
-
- stopAllGrids();
- cleanPersistenceDir();
- }
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
- return super.getConfiguration(igniteInstanceName)
- .setConsistentId(igniteInstanceName)
- .setFailureHandler(new StopNodeFailureHandler())
- .setDataStorageConfiguration(
- new DataStorageConfiguration()
- .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true))
- ).setCacheConfiguration(
- new CacheConfiguration<>(DEFAULT_CACHE_NAME).setIndexedTypes(Integer.class, Person.class)
- );
- }
-
+public class ForceRebuildIndexTest extends AbstractRebuildIndexTest {
/**
* Checking that a forced rebuild of indexes is possible only after the previous one has finished.
*
@@ -86,14 +41,15 @@ public class ForceRebuildIndexTest extends GridCommonAbstractTest {
*/
@Test
public void testSequentialForceRebuildIndexes() throws Exception {
- IndexProcessor.idxRebuildCls = IndexesRebuildTaskEx.class;
+ prepareBeforeNodeStart();
+
+ IgniteEx n = startGrid(0);
- IgniteEx n = prepareCluster(100);
+ populate(n.cache(DEFAULT_CACHE_NAME), 100);
GridCacheContext<?, ?> cacheCtx = n.cachex(DEFAULT_CACHE_NAME).context();
- StopRebuildIndexConsumer stopRebuildIdxConsumer = new StopRebuildIndexConsumer(getTestTimeout());
- addCacheRowConsumer(nodeName(n), cacheCtx.name(), stopRebuildIdxConsumer);
+ StopRebuildIndexConsumer stopRebuildIdxConsumer = addStopRebuildIndexConsumer(n, cacheCtx.name());
// The forced rebuild has begun - no rejected.
assertEqualsCollections(emptyList(), forceRebuildIndexes(n, cacheCtx));
@@ -138,18 +94,17 @@ public class ForceRebuildIndexTest extends GridCommonAbstractTest {
*/
@Test
public void testForceRebuildIndexesAfterExchange() throws Exception {
- IgniteEx n = prepareCluster(100);
+ IgniteEx n = startGrid(0);
+
+ populate(n.cache(DEFAULT_CACHE_NAME), 100);
- stopAllGrids();
- deleteIndexBin(n.context().igniteInstanceName());
+ stopAllGridsWithDeleteIndexBin();
- IndexProcessor.idxRebuildCls = IndexesRebuildTaskEx.class;
+ prepareBeforeNodeStart();
- StopRebuildIndexConsumer stopRebuildIdxConsumer = new StopRebuildIndexConsumer(getTestTimeout());
- addCacheRowConsumer(nodeName(n), DEFAULT_CACHE_NAME, stopRebuildIdxConsumer);
+ StopRebuildIndexConsumer stopRebuildIdxConsumer = addStopRebuildIndexConsumer(n, DEFAULT_CACHE_NAME);
n = startGrid(0);
- n.cluster().state(ACTIVE);
GridCacheContext<?, ?> cacheCtx = n.cachex(DEFAULT_CACHE_NAME).context();
@@ -197,18 +152,17 @@ public class ForceRebuildIndexTest extends GridCommonAbstractTest {
*/
@Test
public void testSequentialRebuildIndexesOnExchange() throws Exception {
- IgniteEx n = prepareCluster(100);
+ IgniteEx n = startGrid(0);
- stopAllGrids();
- deleteIndexBin(n.context().igniteInstanceName());
+ populate(n.cache(DEFAULT_CACHE_NAME), 100);
- IndexProcessor.idxRebuildCls = IndexesRebuildTaskEx.class;
+ stopAllGridsWithDeleteIndexBin();
- StopRebuildIndexConsumer stopRebuildIdxConsumer = new StopRebuildIndexConsumer(getTestTimeout());
- addCacheRowConsumer(nodeName(n), DEFAULT_CACHE_NAME, stopRebuildIdxConsumer);
+ prepareBeforeNodeStart();
+
+ StopRebuildIndexConsumer stopRebuildIdxConsumer = addStopRebuildIndexConsumer(n, DEFAULT_CACHE_NAME);
n = startGrid(0);
- n.cluster().state(ACTIVE);
GridCacheContext<?, ?> cacheCtx = n.cachex(DEFAULT_CACHE_NAME).context();
@@ -230,76 +184,6 @@ public class ForceRebuildIndexTest extends GridCommonAbstractTest {
}
/**
- * Prepare cluster for test.
- *
- * @param keys Key count.
- * @return Coordinator.
- * @throws Exception If failed.
- */
- private IgniteEx prepareCluster(int keys) throws Exception {
- IgniteEx n = startGrid(0);
-
- n.cluster().state(ACTIVE);
-
- for (int i = 0; i < keys; i++)
- n.cache(DEFAULT_CACHE_NAME).put(i, new Person(i, "name_" + i));
-
- return n;
- }
-
- /**
- * Checking metrics rebuilding indexes of cache.
- *
- * @param n Node.
- * @param cacheName Cache name.
- * @param expIdxRebuildInProgress The expected status of rebuilding indexes.
- * @param expIdxRebuildKeysProcessed The expected number of keys processed during index rebuilding.
- */
- private void checkCacheMetrics0(
- IgniteEx n,
- String cacheName,
- boolean expIdxRebuildInProgress,
- long expIdxRebuildKeysProcessed
- ) {
- CacheMetricsImpl metrics0 = cacheMetrics0(n, cacheName);
- assertNotNull(metrics0);
-
- assertEquals(expIdxRebuildInProgress, metrics0.isIndexRebuildInProgress());
- assertEquals(expIdxRebuildKeysProcessed, metrics0.getIndexRebuildKeysProcessed());
- }
-
- /**
- * Checking that rebuilding indexes for the cache has started.
- *
- * @param n Node.
- * @param cacheCtx Cache context.
- * @return Rebuild index future.
- */
- private IgniteInternalFuture<?> checkStartRebuildIndexes(IgniteEx n, GridCacheContext<?, ?> cacheCtx) {
- IgniteInternalFuture<?> idxRebFut = indexRebuildFuture(n, cacheCtx.cacheId());
-
- assertNotNull(idxRebFut);
- assertFalse(idxRebFut.isDone());
-
- checkCacheMetrics0(n, cacheCtx.name(), true, 0);
-
- return idxRebFut;
- }
-
- /**
- * Checking that the rebuild of indexes for the cache has completed.
- *
- * @param n Node.
- * @param cacheCtx Cache context.
- * @param expKeys The expected number of keys processed during index rebuilding
- */
- private void checkFinishRebuildIndexes(IgniteEx n, GridCacheContext<?, ?> cacheCtx, int expKeys) {
- assertNull(indexRebuildFuture(n, cacheCtx.cacheId()));
-
- checkCacheMetrics0(n, cacheCtx.name(), false, expKeys);
- }
-
- /**
* Checking the contents of the cache in {@code GridQueryProcessor#idxRebuildOnExchange}.
* Allows to check if the cache will be marked, that the rebuild for it should be after the exchange.
*
@@ -308,7 +192,7 @@ public class ForceRebuildIndexTest extends GridCommonAbstractTest {
* @param expContains Whether a cache is expected.
*/
private void checkRebuildAfterExchange(IgniteEx n, int cacheId, boolean expContains) {
- IndexRebuildAware idxRebuildAware = getFieldValue(n.context().query(), "idxRebuildAware");
+ IndexRebuildFutureStorage idxRebuildAware = getFieldValue(n.context().query(), "idxRebuildFutStorage");
GridDhtPartitionsExchangeFuture exhFut = n.context().cache().context().exchange().lastTopologyFuture();
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IndexesRebuildTaskEx.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IndexesRebuildTaskEx.java
index 65c0b6b..2596cf0 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IndexesRebuildTaskEx.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IndexesRebuildTaskEx.java
@@ -24,12 +24,14 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cache.query.index.IndexProcessor;
import org.apache.ignite.internal.managers.indexing.IndexesRebuildTask;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexOperationCancellationToken;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgniteThrowableBiPredicate;
import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
@@ -99,6 +101,13 @@ class IndexesRebuildTaskEx extends IndexesRebuildTask {
}
/**
+ * Set {@link IndexesRebuildTaskEx} to {@link IndexProcessor#idxRebuildCls} before starting the node.
+ */
+ static void prepareBeforeNodeStart() {
+ IndexProcessor.idxRebuildCls = IndexesRebuildTaskEx.class;
+ }
+
+ /**
* Registering a consumer for cache rows when rebuilding indexes on a node.
*
* @param nodeName The name of the node,
@@ -204,4 +213,39 @@ class IndexesRebuildTaskEx extends IndexesRebuildTask {
finishRebuildIdxFut.reset();
}
}
+
+ /**
+ * Consumer breaking index rebuild for the cache.
+ */
+ static class BreakRebuildIndexConsumer extends StopRebuildIndexConsumer {
+ /** Predicate for throwing an {@link IgniteCheckedException}. */
+ final IgniteThrowableBiPredicate<BreakRebuildIndexConsumer, CacheDataRow> brakePred;
+
+ /**
+ * Constructor.
+ *
+ * @param timeout The maximum time to wait finish future in milliseconds.
+ * @param brakePred Predicate for throwing an {@link IgniteCheckedException}.
+ */
+ BreakRebuildIndexConsumer(
+ long timeout,
+ IgniteThrowableBiPredicate<BreakRebuildIndexConsumer, CacheDataRow> brakePred
+ ) {
+ super(timeout);
+
+ this.brakePred = brakePred;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void accept(CacheDataRow row) throws IgniteCheckedException {
+ startRebuildIdxFut.onDone();
+
+ finishRebuildIdxFut.get(timeout);
+
+ visitCnt.incrementAndGet();
+
+ if (brakePred.test(this, row))
+ throw new IgniteCheckedException("From test.");
+ }
+ }
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/ResumeRebuildIndexTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/ResumeRebuildIndexTest.java
new file mode 100644
index 0000000..f133122
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/ResumeRebuildIndexTest.java
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.index;
+
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.BreakRebuildIndexConsumer;
+import org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.StopRebuildIndexConsumer;
+import org.apache.ignite.internal.processors.query.aware.IndexRebuildStateStorage;
+import org.apache.ignite.internal.util.function.ThrowableFunction;
+import org.junit.Test;
+
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.cluster.ClusterState.INACTIVE;
+import static org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.prepareBeforeNodeStart;
+import static org.apache.ignite.internal.processors.query.aware.IndexRebuildStateStorage.KEY_PREFIX;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+import static org.apache.ignite.testframework.GridTestUtils.deleteCacheGrpDir;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+
+/**
+ * Class for testing rebuilding index resumes.
+ */
+public class ResumeRebuildIndexTest extends AbstractRebuildIndexTest {
+ /**
+ * Checking normal flow for {@link IndexRebuildStateStorage}.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testNormalFlowIndexRebuildStateStorage() throws Exception {
+ prepareBeforeNodeStart();
+
+ IgniteEx n = startGrid(0);
+
+ populate(n.cache(DEFAULT_CACHE_NAME), 1_000);
+
+ GridCacheContext<?, ?> cacheCtx = n.cachex(DEFAULT_CACHE_NAME).context();
+
+ StopRebuildIndexConsumer stopRebuildIdxConsumer = addStopRebuildIndexConsumer(n, cacheCtx.name());
+
+ dbMgr(n).enableCheckpoints(false).get(getTestTimeout());
+
+ assertTrue(forceRebuildIndexes(n, cacheCtx).isEmpty());
+ IgniteInternalFuture<?> idxRebFut = indexRebuildFuture(n, cacheCtx.cacheId());
+
+ assertFalse(indexRebuildStateStorage(n).completed(cacheCtx.name()));
+ assertNotNull(metaStorageOperation(n, metaStorage -> metaStorage.read(KEY_PREFIX + cacheCtx.name())));
+
+ stopRebuildIdxConsumer.startRebuildIdxFut.get(getTestTimeout());
+ stopRebuildIdxConsumer.finishRebuildIdxFut.onDone();
+
+ idxRebFut.get(getTestTimeout());
+
+ assertEquals(1_000, stopRebuildIdxConsumer.visitCnt.get());
+ assertTrue(indexRebuildStateStorage(n).completed(cacheCtx.name()));
+
+ dbMgr(n).enableCheckpoints(true).get(getTestTimeout());
+ forceCheckpoint();
+
+ assertNull(metaStorageOperation(n, metaStorage -> metaStorage.read(KEY_PREFIX + cacheCtx.name())));
+ }
+
+ /**
+ * Checking the flow in case of an error for {@link IndexRebuildStateStorage}.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testErrorFlowIndexRebuildStateStorage() throws Exception {
+ prepareBeforeNodeStart();
+
+ IgniteEx n = startGrid(0);
+
+ populate(n.cache(DEFAULT_CACHE_NAME), 1_000);
+
+ GridCacheContext<?, ?> cacheCtx = n.cachex(DEFAULT_CACHE_NAME).context();
+
+ BreakRebuildIndexConsumer breakRebuildIdxConsumer =
+ addBreakRebuildIndexConsumer(n, cacheCtx.name(), (c, row) -> c.visitCnt.get() > 10);
+
+ assertTrue(forceRebuildIndexes(n, cacheCtx).isEmpty());
+ IgniteInternalFuture<?> idxRebFut0 = indexRebuildFuture(n, cacheCtx.cacheId());
+
+ assertFalse(indexRebuildStateStorage(n).completed(cacheCtx.name()));
+ assertNotNull(metaStorageOperation(n, metaStorage -> metaStorage.read(KEY_PREFIX + cacheCtx.name())));
+
+ breakRebuildIdxConsumer.startRebuildIdxFut.get(getTestTimeout());
+ breakRebuildIdxConsumer.finishRebuildIdxFut.onDone();
+
+ assertThrows(log, () -> idxRebFut0.get(getTestTimeout()), Throwable.class, null);
+ assertTrue(breakRebuildIdxConsumer.visitCnt.get() < 1_000);
+
+ forceCheckpoint();
+
+ assertFalse(indexRebuildStateStorage(n).completed(cacheCtx.name()));
+ assertNotNull(metaStorageOperation(n, metaStorage -> metaStorage.read(KEY_PREFIX + cacheCtx.name())));
+
+ StopRebuildIndexConsumer stopRebuildIdxConsumer = addStopRebuildIndexConsumer(n, cacheCtx.name());
+ dbMgr(n).enableCheckpoints(false).get(getTestTimeout());
+
+ assertTrue(forceRebuildIndexes(n, cacheCtx).isEmpty());
+ IgniteInternalFuture<?> idxRebFut1 = indexRebuildFuture(n, cacheCtx.cacheId());
+
+ assertFalse(indexRebuildStateStorage(n).completed(cacheCtx.name()));
+ assertNotNull(metaStorageOperation(n, metaStorage -> metaStorage.read(KEY_PREFIX + cacheCtx.name())));
+
+ stopRebuildIdxConsumer.startRebuildIdxFut.get(getTestTimeout());
+ stopRebuildIdxConsumer.finishRebuildIdxFut.onDone();
+
+ idxRebFut1.get(getTestTimeout());
+
+ assertEquals(1_000, stopRebuildIdxConsumer.visitCnt.get());
+ assertTrue(indexRebuildStateStorage(n).completed(cacheCtx.name()));
+
+ dbMgr(n).enableCheckpoints(true).get(getTestTimeout());
+ forceCheckpoint();
+
+ assertNull(metaStorageOperation(n, metaStorage -> metaStorage.read(KEY_PREFIX + cacheCtx.name())));
+ }
+
+ /**
+ * Checking the flow in case of an restart node for {@link IndexRebuildStateStorage}.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testRestartNodeFlowIndexRebuildStateStorage() throws Exception {
+ prepareBeforeNodeStart();
+
+ IgniteEx n = startGrid(0);
+
+ populate(n.cache(DEFAULT_CACHE_NAME), 1_000);
+
+ GridCacheContext<?, ?> cacheCtx = n.cachex(DEFAULT_CACHE_NAME).context();
+
+ BreakRebuildIndexConsumer breakRebuildIdxConsumer =
+ addBreakRebuildIndexConsumer(n, cacheCtx.name(), (c, row) -> c.visitCnt.get() > 10);
+
+ assertTrue(forceRebuildIndexes(n, cacheCtx).isEmpty());
+ IgniteInternalFuture<?> idxRebFut0 = indexRebuildFuture(n, cacheCtx.cacheId());
+
+ breakRebuildIdxConsumer.startRebuildIdxFut.get(getTestTimeout());
+ breakRebuildIdxConsumer.finishRebuildIdxFut.onDone();
+
+ assertThrows(log, () -> idxRebFut0.get(getTestTimeout()), Throwable.class, null);
+
+ forceCheckpoint();
+
+ assertFalse(indexRebuildStateStorage(n).completed(cacheCtx.name()));
+ assertNotNull(metaStorageOperation(n, metaStorage -> metaStorage.read(KEY_PREFIX + cacheCtx.name())));
+
+ stopAllGrids();
+
+ StopRebuildIndexConsumer stopRebuildIdxConsumer = addStopRebuildIndexConsumer(n, cacheCtx.name());
+
+ prepareBeforeNodeStart();
+ n = startGrid(0);
+
+ assertFalse(indexRebuildStateStorage(n).completed(cacheCtx.name()));
+ assertNotNull(metaStorageOperation(n, metaStorage -> metaStorage.read(KEY_PREFIX + cacheCtx.name())));
+
+ stopRebuildIdxConsumer.startRebuildIdxFut.get(getTestTimeout());
+ IgniteInternalFuture<?> idxRebFut1 = indexRebuildFuture(n, cacheCtx.cacheId());
+
+ dbMgr(n).enableCheckpoints(false).get(getTestTimeout());
+
+ stopRebuildIdxConsumer.finishRebuildIdxFut.onDone();
+ idxRebFut1.get(getTestTimeout());
+
+ assertEquals(1_000, stopRebuildIdxConsumer.visitCnt.get());
+ assertTrue(indexRebuildStateStorage(n).completed(cacheCtx.name()));
+ assertNotNull(metaStorageOperation(n, metaStorage -> metaStorage.read(KEY_PREFIX + cacheCtx.name())));
+
+ dbMgr(n).enableCheckpoints(true).get(getTestTimeout());
+ forceCheckpoint();
+
+ assertNull(metaStorageOperation(n, metaStorage -> metaStorage.read(KEY_PREFIX + cacheCtx.name())));
+ }
+
+ /**
+ * Checks that rebuilding indexes will be automatically started after
+ * restarting the node due to the fact that the previous one did not
+ * complete successfully.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSingleNodeRestart() throws Exception {
+ checkRestartRebuildIndexes(1, n -> {
+ stopAllGrids();
+
+ prepareBeforeNodeStart();
+
+ return startGrid(0);
+ });
+ }
+
+ /**
+ * Checks that rebuilding indexes will be automatically started after
+ * reactivation the node due to the fact that the previous one did not
+ * complete successfully.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSingleNodeReactivation() throws Exception {
+ checkRestartRebuildIndexes(1, n -> {
+ n.cluster().state(INACTIVE);
+
+ n.cluster().state(ACTIVE);
+
+ return n;
+ });
+ }
+
+ /**
+ * Checks that rebuilding indexes will be automatically started after
+ * restarting the node due to the fact that the previous one did not
+ * complete successfully. Two-node cluster, only one node will be restarted.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testTwoNodeRestart() throws Exception {
+ checkRestartRebuildIndexes(2, n -> {
+ String nodeName = n.name();
+
+ stopGrid(nodeName);
+
+ prepareBeforeNodeStart();
+
+ return startGrid(nodeName);
+ });
+ }
+
+ /**
+ * Checks that rebuilding indexes will be automatically started after
+ * reactivation the node due to the fact that the previous one did not
+ * complete successfully. Two-node cluster.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testTwoNodeReactivation() throws Exception {
+ checkRestartRebuildIndexes(2, n -> {
+ n.cluster().state(INACTIVE);
+
+ n.cluster().state(ACTIVE);
+
+ return n;
+ });
+ }
+
+ /**
+ * Checks that when the caches are destroyed,
+ * the index rebuild states will also be deleted.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testDeleteIndexRebuildStateOnDestroyCache() throws Exception {
+ IgniteEx n0 = startGrid(getTestIgniteInstanceName(0));
+
+ prepareBeforeNodeStart();
+ IgniteEx n1 = startGrid(getTestIgniteInstanceName(1));
+
+ n0.cluster().state(ACTIVE);
+ awaitPartitionMapExchange();
+
+ for (int i = 0; i < 4; i++) {
+ String cacheName = DEFAULT_CACHE_NAME + i;
+
+ String grpName = i == 1 || i == 2 ? DEFAULT_CACHE_NAME + "_G" : null;
+
+ populate(n1.getOrCreateCache(cacheCfg(cacheName, grpName)), 10_000);
+
+ BreakRebuildIndexConsumer breakRebuildIdxConsumer =
+ addBreakRebuildIndexConsumer(n1, cacheName, (c, r) -> c.visitCnt.get() > 10);
+
+ IgniteInternalCache<?, ?> cachex = n1.cachex(cacheName);
+
+ int cacheSize = cachex.size();
+
+ assertTrue(forceRebuildIndexes(n1, cachex.context()).isEmpty());
+
+ IgniteInternalFuture<?> rebIdxFut = indexRebuildFuture(n1, cachex.context().cacheId());
+
+ breakRebuildIdxConsumer.startRebuildIdxFut.get(getTestTimeout());
+ breakRebuildIdxConsumer.finishRebuildIdxFut.onDone();
+
+ assertThrows(log, () -> rebIdxFut.get(getTestTimeout()), Throwable.class, null);
+ assertTrue(breakRebuildIdxConsumer.visitCnt.get() < cacheSize);
+ }
+
+ n1.destroyCache(DEFAULT_CACHE_NAME + 0);
+ assertTrue(indexRebuildStateStorage(n1).completed(DEFAULT_CACHE_NAME + 0));
+
+ n1.destroyCache(DEFAULT_CACHE_NAME + 1);
+ assertTrue(indexRebuildStateStorage(n1).completed(DEFAULT_CACHE_NAME + 1));
+
+ assertFalse(indexRebuildStateStorage(n1).completed(DEFAULT_CACHE_NAME + 2));
+ assertFalse(indexRebuildStateStorage(n1).completed(DEFAULT_CACHE_NAME + 3));
+
+ forceCheckpoint(n1);
+
+ ConcurrentMap<String, Object> states = getFieldValue(indexRebuildStateStorage(n1), "states");
+
+ assertFalse(states.containsKey(DEFAULT_CACHE_NAME + 0));
+ assertFalse(states.containsKey(DEFAULT_CACHE_NAME + 1));
+
+ assertTrue(states.containsKey(DEFAULT_CACHE_NAME + 2));
+ assertTrue(states.containsKey(DEFAULT_CACHE_NAME + 3));
+
+ stopGrid(1);
+ awaitPartitionMapExchange();
+
+ n0.destroyCache(DEFAULT_CACHE_NAME + 2);
+ n0.destroyCache(DEFAULT_CACHE_NAME + 3);
+
+ deleteCacheGrpDir(
+ n1.name(),
+ (dir, name) -> name.contains(DEFAULT_CACHE_NAME + 3) || name.contains(DEFAULT_CACHE_NAME + "_G")
+ );
+
+ n1 = startGrid(getTestIgniteInstanceName(1));
+
+ assertTrue(indexRebuildStateStorage(n1).completed(DEFAULT_CACHE_NAME + 2));
+ assertTrue(indexRebuildStateStorage(n1).completed(DEFAULT_CACHE_NAME + 3));
+
+ forceCheckpoint(n1);
+
+ states = getFieldValue(indexRebuildStateStorage(n1), "states");
+
+ assertFalse(states.containsKey(DEFAULT_CACHE_NAME + 2));
+ assertFalse(states.containsKey(DEFAULT_CACHE_NAME + 3));
+ }
+
+ /**
+ * Check that for node the index rebuilding will be restarted
+ * automatically after executing the function on the node.
+ *
+ * @param nodeCnt Node count.
+ * @param function Function for node.
+ * @throws Exception If failed.
+ */
+ private void checkRestartRebuildIndexes(
+ int nodeCnt,
+ ThrowableFunction<IgniteEx, IgniteEx, Exception> function
+ ) throws Exception {
+ assertTrue(nodeCnt > 0);
+
+ for (int i = 0; i < nodeCnt - 1; i++)
+ startGrid(getTestIgniteInstanceName(i + 1));
+
+ prepareBeforeNodeStart();
+
+ IgniteEx n = startGrid(0);
+
+ populate(n.cache(DEFAULT_CACHE_NAME), 10_000);
+
+ populate(n.getOrCreateCache(cacheCfg(DEFAULT_CACHE_NAME + 0, null)), 10_000);
+
+ if (nodeCnt > 1)
+ awaitPartitionMapExchange();
+
+ IgniteInternalCache<?, ?> cachex0 = n.cachex(DEFAULT_CACHE_NAME);
+ IgniteInternalCache<?, ?> cachex1 = n.cachex(DEFAULT_CACHE_NAME + 0);
+
+ int cacheSize0 = cachex0.size();
+ int cacheSize1 = cachex1.size();
+ assertTrue(String.valueOf(cacheSize0), cacheSize0 >= 1_000);
+ assertTrue(String.valueOf(cacheSize1), cacheSize1 >= 1_000);
+
+ GridCacheContext<?, ?> cacheCtx0 = cachex0.context();
+ GridCacheContext<?, ?> cacheCtx1 = cachex1.context();
+
+ BreakRebuildIndexConsumer breakRebuildIdxConsumer =
+ addBreakRebuildIndexConsumer(n, cacheCtx0.name(), (c, row) -> c.visitCnt.get() >= 10);
+
+ StopRebuildIndexConsumer stopRebuildIdxConsumer0 = addStopRebuildIndexConsumer(n, cacheCtx1.name());
+
+ assertTrue(forceRebuildIndexes(n, cacheCtx0, cacheCtx1).isEmpty());
+
+ IgniteInternalFuture<?> rebIdxFut0 = indexRebuildFuture(n, cacheCtx0.cacheId());
+ IgniteInternalFuture<?> rebIdxFut1 = indexRebuildFuture(n, cacheCtx1.cacheId());
+
+ breakRebuildIdxConsumer.startRebuildIdxFut.get(getTestTimeout());
+ breakRebuildIdxConsumer.finishRebuildIdxFut.onDone();
+
+ stopRebuildIdxConsumer0.startRebuildIdxFut.get(getTestTimeout());
+ stopRebuildIdxConsumer0.finishRebuildIdxFut.onDone();
+
+ assertThrows(log, () -> rebIdxFut0.get(getTestTimeout()), Throwable.class, null);
+ assertTrue(breakRebuildIdxConsumer.visitCnt.get() < cacheSize0);
+
+ rebIdxFut1.get(getTestTimeout());
+ assertEquals(cacheSize1, stopRebuildIdxConsumer0.visitCnt.get());
+
+ StopRebuildIndexConsumer stopRebuildIdxConsumer1 = addStopRebuildIndexConsumer(n, cacheCtx0.name());
+ stopRebuildIdxConsumer0.resetFutures();
+
+ forceCheckpoint();
+
+ n = function.apply(n);
+
+ IgniteInternalFuture<?> rebIdxFut01 = indexRebuildFuture(n, cacheCtx0.cacheId());
+ IgniteInternalFuture<?> rebIdxFut11 = indexRebuildFuture(n, cacheCtx1.cacheId());
+
+ stopRebuildIdxConsumer1.startRebuildIdxFut.get(getTestTimeout());
+ stopRebuildIdxConsumer1.finishRebuildIdxFut.onDone();
+
+ assertThrows(
+ log,
+ () -> stopRebuildIdxConsumer0.startRebuildIdxFut.get(1_000),
+ IgniteFutureTimeoutCheckedException.class,
+ null
+ );
+ stopRebuildIdxConsumer0.finishRebuildIdxFut.onDone();
+
+ rebIdxFut01.get(getTestTimeout());
+ assertEquals(cacheSize0, stopRebuildIdxConsumer1.visitCnt.get());
+
+ assertNull(rebIdxFut11);
+ assertEquals(cacheSize1, stopRebuildIdxConsumer0.visitCnt.get());
+ }
+
+ /**
+ * Getting {@code GridQueryProcessor#idxRebuildStateStorage}.
+ *
+ * @param n Node.
+ * @return Index rebuild state storage.
+ */
+ private IndexRebuildStateStorage indexRebuildStateStorage(IgniteEx n) {
+ return getFieldValue(n.context().query(), "idxRebuildStateStorage");
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/StopRebuildIndexTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/StopRebuildIndexTest.java
index 04fe0d7..27ea528 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/StopRebuildIndexTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/StopRebuildIndexTest.java
@@ -18,15 +18,8 @@
package org.apache.ignite.internal.processors.cache.index;
import java.util.Map;
-import org.apache.ignite.client.Person;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.DataRegionConfiguration;
-import org.apache.ignite.configuration.DataStorageConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.failure.StopNodeFailureHandler;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.cache.query.index.IndexProcessor;
import org.apache.ignite.internal.managers.indexing.IndexesRebuildTask;
import org.apache.ignite.internal.processors.cache.CacheMetricsImpl;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -39,15 +32,14 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
-import static org.apache.ignite.cluster.ClusterState.ACTIVE;
import static org.apache.ignite.cluster.ClusterState.INACTIVE;
import static org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.addCacheRebuildRunner;
import static org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.addCacheRowConsumer;
import static org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.nodeName;
+import static org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.prepareBeforeNodeStart;
import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
import static org.apache.ignite.testframework.GridTestUtils.getFieldValueHierarchy;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
@@ -55,40 +47,7 @@ import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/**
* Class for checking the correct completion/stop of index rebuilding.
*/
-public class StopRebuildIndexTest extends GridCommonAbstractTest {
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- super.beforeTest();
-
- IndexesRebuildTaskEx.clean(getTestIgniteInstanceName());
-
- stopAllGrids();
- cleanPersistenceDir();
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- super.afterTest();
-
- IndexesRebuildTaskEx.clean(getTestIgniteInstanceName());
-
- stopAllGrids();
- cleanPersistenceDir();
- }
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
- return super.getConfiguration(igniteInstanceName)
- .setConsistentId(igniteInstanceName)
- .setFailureHandler(new StopNodeFailureHandler())
- .setDataStorageConfiguration(
- new DataStorageConfiguration()
- .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true))
- ).setCacheConfiguration(
- new CacheConfiguration<>(DEFAULT_CACHE_NAME).setIndexedTypes(Integer.class, Person.class)
- );
- }
-
+public class StopRebuildIndexTest extends AbstractRebuildIndexTest {
/**
* Checks the correctness {@link SchemaIndexCacheCompoundFuture}.
*/
@@ -176,9 +135,11 @@ public class StopRebuildIndexTest extends GridCommonAbstractTest {
*/
@Test
public void testInternalIndexingRebuildFuture() throws Exception {
- IndexProcessor.idxRebuildCls = IndexesRebuildTaskEx.class;
+ prepareBeforeNodeStart();
- IgniteEx n = prepareCluster(10);
+ IgniteEx n = startGrid(0);
+
+ populate(n.cache(DEFAULT_CACHE_NAME), 10);
GridCacheContext<?, ?> cacheCtx = n.cachex(DEFAULT_CACHE_NAME).context();
@@ -188,8 +149,7 @@ public class StopRebuildIndexTest extends GridCommonAbstractTest {
() -> assertNull(internalIndexRebuildFuture(n, cacheCtx.cacheId()))
);
- StopRebuildIndexConsumer stopRebuildIdxConsumer = new StopRebuildIndexConsumer(getTestTimeout());
- addCacheRowConsumer(nodeName(n), cacheCtx.name(), stopRebuildIdxConsumer);
+ StopRebuildIndexConsumer stopRebuildIdxConsumer = addStopRebuildIndexConsumer(n, cacheCtx.name());
forceRebuildIndexes(n, cacheCtx);
@@ -227,10 +187,13 @@ public class StopRebuildIndexTest extends GridCommonAbstractTest {
IgniteThrowableConsumer<IgniteEx> stopRebuildIndexes,
boolean expThrowEx
) throws Exception {
- IndexProcessor.idxRebuildCls = IndexesRebuildTaskEx.class;
+ prepareBeforeNodeStart();
int keys = 100_000;
- IgniteEx n = prepareCluster(keys);
+
+ IgniteEx n = startGrid(0);
+
+ populate(n.cache(DEFAULT_CACHE_NAME), keys);
GridCacheContext<?, ?> cacheCtx = n.cachex(DEFAULT_CACHE_NAME).context();
@@ -280,23 +243,6 @@ public class StopRebuildIndexTest extends GridCommonAbstractTest {
}
/**
- * Prepare cluster for test.
- *
- * @param keys Key count.
- * @throws Exception If failed.
- */
- private IgniteEx prepareCluster(int keys) throws Exception {
- IgniteEx n = startGrid(0);
-
- n.cluster().state(ACTIVE);
-
- for (int i = 0; i < keys; i++)
- n.cache(DEFAULT_CACHE_NAME).put(i, new Person(i, "p_" + i));
-
- return n;
- }
-
- /**
* Getting internal rebuild index future for the cache.
*
* @param n Node.
@@ -306,6 +252,8 @@ public class StopRebuildIndexTest extends GridCommonAbstractTest {
@Nullable private SchemaIndexCacheFuture internalIndexRebuildFuture(IgniteEx n, int cacheId) {
IndexesRebuildTask idxRebuild = n.context().indexProcessor().idxRebuild();
- return ((Map<Integer, SchemaIndexCacheFuture>)getFieldValueHierarchy(idxRebuild, "idxRebuildFuts")).get(cacheId);
+ Object idxRebuildFuts = getFieldValueHierarchy(idxRebuild, "idxRebuildFuts");
+
+ return ((Map<Integer, SchemaIndexCacheFuture>)idxRebuildFuts).get(cacheId);
}
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
index 69cbb71..982ed83 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
@@ -22,6 +22,7 @@ import org.apache.ignite.internal.processors.cache.IgnitePdsSingleNodeWithIndexi
import org.apache.ignite.internal.processors.cache.IgnitePdsSingleNodeWithIndexingPutGetPersistenceTest;
import org.apache.ignite.internal.processors.cache.index.ClientReconnectWithSqlTableConfiguredTest;
import org.apache.ignite.internal.processors.cache.index.ForceRebuildIndexTest;
+import org.apache.ignite.internal.processors.cache.index.ResumeRebuildIndexTest;
import org.apache.ignite.internal.processors.cache.index.StopRebuildIndexTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsIndexingDefragmentationTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IgniteTcBotInitNewPageTest;
@@ -68,7 +69,8 @@ import org.junit.runners.Suite;
IgnitePdsIndexingDefragmentationTest.class,
StopRebuildIndexTest.class,
ForceRebuildIndexTest.class,
- IgniteClusterSnapshotRestoreWithIndexingTest.class
+ IgniteClusterSnapshotRestoreWithIndexingTest.class,
+ ResumeRebuildIndexTest.class
})
public class IgnitePdsWithIndexingTestSuite {
}