You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2021/06/21 11:20:33 UTC
[ignite] branch master updated: IGNITE-14702 Fixing the
inconsistency of the built new indexes after restarting the node (#9150)
This is an automated email from the ASF dual-hosted git repository.
tledkov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new a95242d IGNITE-14702 Fixing the inconsistency of the built new indexes after restarting the node (#9150)
a95242d is described below
commit a95242de4fc7a5256341f40a3e8d014d311ef379
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Mon Jun 21 14:20:04 2021 +0300
IGNITE-14702 Fixing the inconsistency of the built new indexes after restarting the node (#9150)
---
.../org/apache/ignite/IgniteSystemProperties.java | 10 +
.../processors/query/GridQueryProcessor.java | 35 +-
.../query/aware/IndexBuildStatusHolder.java | 174 +++++++
...teStorage.java => IndexBuildStatusStorage.java} | 194 +++++---
.../processors/query/aware/IndexRebuildState.java | 104 -----
.../schema/SchemaIndexCachePartitionWorker.java | 9 +-
.../junits/common/GridCommonAbstractTest.java | 20 +-
.../cache/index/AbstractRebuildIndexTest.java | 118 ++++-
.../cache/index/ForceRebuildIndexTest.java | 28 +-
.../processors/cache/index/IgniteH2IndexingEx.java | 113 +++++
.../cache/index/IndexesRebuildTaskEx.java | 130 +-----
.../processors/cache/index/IndexingTestUtils.java | 179 ++++++++
.../cache/index/ResumeCreateIndexTest.java | 510 +++++++++++++++++++++
.../cache/index/ResumeRebuildIndexTest.java | 122 +++--
.../cache/index/StopRebuildIndexTest.java | 10 +-
.../IgnitePdsIndexingDefragmentationTest.java | 2 +
.../IgniteClusterSnapshotWithIndexesTest.java | 2 +
.../testsuites/IgnitePdsWithIndexingTestSuite.java | 4 +-
18 files changed, 1340 insertions(+), 424 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index f3c95e2..298c969 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointEntry;
import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
import org.apache.ignite.internal.processors.performancestatistics.FilePerformanceStatisticsWriter;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexCachePartitionWorker;
import org.apache.ignite.internal.processors.rest.GridRestCommand;
import org.apache.ignite.internal.util.GridLogThrottle;
import org.apache.ignite.lang.IgniteExperimental;
@@ -124,6 +125,7 @@ import static org.apache.ignite.internal.processors.performancestatistics.FilePe
import static org.apache.ignite.internal.processors.performancestatistics.FilePerformanceStatisticsWriter.DFLT_FILE_MAX_SIZE;
import static org.apache.ignite.internal.processors.performancestatistics.FilePerformanceStatisticsWriter.DFLT_FLUSH_SIZE;
import static org.apache.ignite.internal.processors.query.QueryUtils.DFLT_INDEXING_DISCOVERY_HISTORY_SIZE;
+import static org.apache.ignite.internal.processors.query.schema.SchemaIndexCachePartitionWorker.DFLT_IGNITE_INDEX_REBUILD_BATCH_SIZE;
import static org.apache.ignite.internal.processors.rest.GridRestProcessor.DFLT_SES_TIMEOUT;
import static org.apache.ignite.internal.processors.rest.GridRestProcessor.DFLT_SES_TOKEN_INVALIDATE_INTERVAL;
import static org.apache.ignite.internal.processors.rest.handlers.task.GridTaskCommandHandler.DFLT_MAX_TASK_RESULTS;
@@ -1993,6 +1995,14 @@ public final class IgniteSystemProperties {
public static final String IGNITE_PERF_STAT_CACHED_STRINGS_THRESHOLD = "IGNITE_PERF_STAT_CACHED_STRINGS_THRESHOLD";
/**
+ * Count of rows, being processed within a single checkpoint lock when indexes are rebuilt.
+ * The default value is {@link SchemaIndexCachePartitionWorker#DFLT_IGNITE_INDEX_REBUILD_BATCH_SIZE}.
+ */
+ @SystemProperty(value = "Count of rows, being processed within a single checkpoint lock when indexes are rebuilt",
+ type = Integer.class, defaults = "" + DFLT_IGNITE_INDEX_REBUILD_BATCH_SIZE)
+ public static final String IGNITE_INDEX_REBUILD_BATCH_SIZE = "IGNITE_INDEX_REBUILD_BATCH_SIZE";
+
+ /**
* Enforces singleton.
*/
private IgniteSystemProperties() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 916e595..ba6e808 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -87,8 +87,8 @@ import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
import org.apache.ignite.internal.processors.platform.PlatformContext;
import org.apache.ignite.internal.processors.platform.PlatformProcessor;
+import org.apache.ignite.internal.processors.query.aware.IndexBuildStatusStorage;
import org.apache.ignite.internal.processors.query.aware.IndexRebuildFutureStorage;
-import org.apache.ignite.internal.processors.query.aware.IndexRebuildStateStorage;
import org.apache.ignite.internal.processors.query.property.QueryBinaryProperty;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
@@ -247,8 +247,8 @@ public class GridQueryProcessor extends GridProcessorAdapter {
/** Index rebuild futures. */
private final IndexRebuildFutureStorage idxRebuildFutStorage = new IndexRebuildFutureStorage();
- /** Index rebuild states. */
- private final IndexRebuildStateStorage idxRebuildStateStorage;
+ /** Index build statuses. */
+ private final IndexBuildStatusStorage idxBuildStatusStorage;
/**
* Constructor.
@@ -282,7 +282,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
U.warn(log, "Unsupported IO message: " + msg);
};
- idxRebuildStateStorage = new IndexRebuildStateStorage(ctx);
+ idxBuildStatusStorage = new IndexBuildStatusStorage(ctx);
}
/** {@inheritDoc} */
@@ -303,7 +303,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
ctxs.queries().evictDetailMetrics();
}, QRY_DETAIL_METRICS_EVICTION_FREQ, QRY_DETAIL_METRICS_EVICTION_FREQ);
- idxRebuildStateStorage.start();
+ idxBuildStatusStorage.start();
registerMetadataForRegisteredCaches(false);
}
@@ -328,7 +328,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
busyLock.block();
- idxRebuildStateStorage.stop();
+ idxBuildStatusStorage.stop();
}
/** {@inheritDoc} */
@@ -357,7 +357,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
onSchemaPropose(schemaOp.proposeMessage());
}
- idxRebuildStateStorage.onCacheKernalStart();
+ idxBuildStatusStorage.onCacheKernalStart();
}
/**
@@ -1904,20 +1904,26 @@ public class GridQueryProcessor extends GridProcessorAdapter {
GridFutureAdapter<Void> createIdxFut = new GridFutureAdapter<>();
+ GridCacheContext<?, ?> cacheCtx = cacheInfo.cacheContext();
+
visitor = new SchemaIndexCacheVisitorImpl(
- cacheInfo.cacheContext(),
+ cacheCtx,
cancelTok,
createIdxFut
) {
/** {@inheritDoc} */
@Override public void visit(SchemaIndexCacheVisitorClosure clo) {
- super.visit(clo);
+ idxBuildStatusStorage.onStartBuildNewIndex(cacheCtx);
try {
+ super.visit(clo);
+
buildIdxFut.get();
}
catch (Exception e) {
throw new IgniteException(e);
+ } finally {
+ idxBuildStatusStorage.onFinishBuildNewIndex(cacheName);
}
}
};
@@ -3895,7 +3901,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
* @see #rebuildIndexesCompleted
*/
public void onStartRebuildIndexes(GridCacheContext cacheCtx) {
- idxRebuildStateStorage.onStartRebuildIndexes(cacheCtx);
+ idxBuildStatusStorage.onStartRebuildIndexes(cacheCtx);
}
/**
@@ -3908,7 +3914,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
* @param cacheCtx Cache context.
*/
public void onFinishRebuildIndexes(GridCacheContext cacheCtx) {
- idxRebuildStateStorage.onFinishRebuildIndexes(cacheCtx.name());
+ idxBuildStatusStorage.onFinishRebuildIndexes(cacheCtx.name());
}
/**
@@ -3916,11 +3922,9 @@ public class GridQueryProcessor extends GridProcessorAdapter {
*
* @param cacheCtx Cache context.
* @return {@code True} if completed.
- * @see #onStartRebuildIndexes
- * @see #onFinishRebuildIndexes
*/
public boolean rebuildIndexesCompleted(GridCacheContext cacheCtx) {
- return idxRebuildStateStorage.completed(cacheCtx.name());
+ return idxBuildStatusStorage.rebuildCompleted(cacheCtx.name());
}
/**
@@ -3930,10 +3934,9 @@ public class GridQueryProcessor extends GridProcessorAdapter {
* indexes is completed and the entry will be deleted from the MetaStorage
* at the end of the checkpoint. Otherwise, delete the index rebuild entry.
*
- *
* @param cacheName Cache name.
*/
public void completeRebuildIndexes(String cacheName) {
- idxRebuildStateStorage.onFinishRebuildIndexes(cacheName);
+ idxBuildStatusStorage.onFinishRebuildIndexes(cacheName);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/aware/IndexBuildStatusHolder.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/aware/IndexBuildStatusHolder.java
new file mode 100644
index 0000000..732aa3c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/aware/IndexBuildStatusHolder.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.aware;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import static org.apache.ignite.internal.processors.query.aware.IndexBuildStatusHolder.Status.COMPLETE;
+import static org.apache.ignite.internal.processors.query.aware.IndexBuildStatusHolder.Status.DELETE;
+import static org.apache.ignite.internal.processors.query.aware.IndexBuildStatusHolder.Status.INIT;
+
+/**
+ * Cache index build status.
+ * The following operations affect the status: rebuilding indexes and building a new index.
+ * <p/>
+ * If the operation starts executing, then the status is {@link Status#INIT INIT}.
+ * If all operations are completed, then the status is {@link Status#COMPLETE COMPLETED}.
+ * Status {@link Status#DELETE DELETE} is used for persistent cache to mark at the
+ * beginning of a checkpoint that all operations have been completed and they will be committed to it.
+ */
+public class IndexBuildStatusHolder {
+ /** Enumeration of statuses. */
+ public enum Status {
+ /** Initial status - operation(s) in progress. */
+ INIT,
+
+ /** All operations complete. */
+ COMPLETE,
+
+ /** To be deleted. */
+ DELETE
+ }
+
+ /** Persistent cache. */
+ private final boolean persistent;
+
+ /** Status. Guarded by {@code this}. */
+ private Status status;
+
+ /** Rebuilding indexes. Guarded by {@code this}. */
+ private boolean rebuild;
+
+ /** Count of new indexes being built. Guarded by {@code this}. */
+ private int newIdx;
+
+ /**
+ * Constructor.
+ *
+ * @param persistent Persistent cache.
+ * @param rebuild {@code True} if rebuilding indexes, otherwise building a new index.
+ */
+ public IndexBuildStatusHolder(boolean persistent, boolean rebuild) {
+ this.persistent = persistent;
+
+ onStartOperation(rebuild);
+ }
+
+ /**
+ * Callback on the start of of the operation.
+ *
+ * @param rebuild {@code True} if rebuilding indexes, otherwise building a new index.
+ * @see #onFinishOperation
+ */
+ public synchronized void onStartOperation(boolean rebuild) {
+ status = INIT;
+
+ if (rebuild)
+ this.rebuild = true;
+ else {
+ assert newIdx >= 0;
+
+ newIdx++;
+ }
+ }
+
+ /**
+ * Callback on the finish of the operation.
+ *
+ * @param rebuild {@code True} if rebuilding indexes, otherwise building a new index.
+ * @return Current status.
+ * @see #onStartOperation
+ */
+ public synchronized Status onFinishOperation(boolean rebuild) {
+ if (rebuild)
+ this.rebuild = false;
+ else {
+ assert newIdx > 0;
+
+ newIdx--;
+ }
+
+ if (!this.rebuild && newIdx == 0)
+ status = COMPLETE;
+
+ return status;
+ }
+
+ /**
+ * Change of status to {@link Status#DELETE} if the current status is {@link Status#COMPLETE}.
+ * Note that this will only be for persistent cache.
+ *
+ * @return {@code True} if successful.
+ */
+ public boolean delete() {
+ if (persistent) {
+ synchronized (this) {
+ if (status == COMPLETE) {
+ status = DELETE;
+
+ return true;
+ }
+ else
+ return false;
+ }
+ }
+ else
+ return false;
+ }
+
+ /**
+ * Getting the current status.
+ *
+ * @return Current status.
+ */
+ public synchronized Status status() {
+ return status;
+ }
+
+ /**
+ * Checking whether rebuilding indexes is in progress.
+ *
+ * @return {@code True} if in progress.
+ */
+ public synchronized boolean rebuild() {
+ return rebuild;
+ }
+
+ /**
+ * Getting the count of new indexes that are currently being built.
+ *
+ * @return Count of new indexes being built.
+ */
+ public synchronized int buildNewIndexes() {
+ return newIdx;
+ }
+
+ /**
+ * Checking if the cache is persistent.
+ *
+ * @return {@code True} if persistent.
+ */
+ public boolean persistent() {
+ return persistent;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized String toString() {
+ return S.toString(IndexBuildStatusHolder.class, this);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/aware/IndexRebuildStateStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/aware/IndexBuildStatusStorage.java
similarity index 61%
rename from modules/core/src/main/java/org/apache/ignite/internal/processors/query/aware/IndexRebuildStateStorage.java
rename to modules/core/src/main/java/org/apache/ignite/internal/processors/query/aware/IndexBuildStatusStorage.java
index 3a656ec..323994d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/aware/IndexRebuildStateStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/aware/IndexBuildStatusStorage.java
@@ -38,23 +38,18 @@ import org.apache.ignite.internal.util.GridBusyLock;
import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
import org.apache.ignite.internal.util.typedef.internal.CU;
-import static org.apache.ignite.internal.processors.query.aware.IndexRebuildState.State.COMPLETED;
-import static org.apache.ignite.internal.processors.query.aware.IndexRebuildState.State.DELETE;
-import static org.apache.ignite.internal.processors.query.aware.IndexRebuildState.State.INIT;
+import static org.apache.ignite.internal.processors.query.aware.IndexBuildStatusHolder.Status.COMPLETE;
+import static org.apache.ignite.internal.processors.query.aware.IndexBuildStatusHolder.Status.DELETE;
/**
- * Holder of up-to-date information about rebuilding cache indexes.
- * Helps to avoid the situation when the index rebuilding process is interrupted
- * and after a node restart/reactivation the indexes will become inconsistent.
+ * Holder of up-to-date information about cache index building operations (rebuilding indexes, building new indexes).
+ * Avoids index inconsistency when operations have completed, a checkpoint has not occurred, and the node has been restarted.
* <p/>
- * To do this, before rebuilding the indexes, call {@link #onStartRebuildIndexes}
- * and after it {@link #onFinishRebuildIndexes}. Use {@link #completed} to check
- * if the index rebuild has completed.
- * <p/>
- * To prevent leaks, it is necessary to use {@link #onFinishRebuildIndexes}
- * when detecting the fact of destroying the cache.
+ * For rebuild indexes, use {@link #onStartRebuildIndexes} and {@link #onFinishRebuildIndexes}.
+ * For build new indexes, use {@link #onStartBuildNewIndex} and {@link #onFinishBuildNewIndex}.
+ * Use {@link #rebuildCompleted} to check that the index rebuild is complete.
*/
-public class IndexRebuildStateStorage implements MetastorageLifecycleListener, CheckpointListener {
+public class IndexBuildStatusStorage implements MetastorageLifecycleListener, CheckpointListener {
/** Key prefix for the MetaStorage. */
public static final String KEY_PREFIX = "rebuild-sql-indexes-";
@@ -67,15 +62,15 @@ public class IndexRebuildStateStorage implements MetastorageLifecycleListener, C
/** Node stop lock. */
private final GridBusyLock stopNodeLock = new GridBusyLock();
- /** Current states. Mapping: cache name -> index rebuild state. */
- private final ConcurrentMap<String, IndexRebuildState> states = new ConcurrentHashMap<>();
+ /** Current statuses. Mapping: cache name -> index build status. */
+ private final ConcurrentMap<String, IndexBuildStatusHolder> statuses = new ConcurrentHashMap<>();
/**
* Constructor.
*
* @param ctx Kernal context.
*/
- public IndexRebuildStateStorage(GridKernalContext ctx) {
+ public IndexBuildStatusStorage(GridKernalContext ctx) {
this.ctx = ctx;
}
@@ -90,7 +85,7 @@ public class IndexRebuildStateStorage implements MetastorageLifecycleListener, C
* Callback on start of {@link GridCacheProcessor}.
*/
public void onCacheKernalStart() {
- Set<String> toComplete = new HashSet<>(states.keySet());
+ Set<String> toComplete = new HashSet<>(statuses.keySet());
toComplete.removeAll(ctx.cache().cacheDescriptors().keySet());
for (String cacheName : toComplete)
@@ -105,65 +100,61 @@ public class IndexRebuildStateStorage implements MetastorageLifecycleListener, C
}
/**
- * Callback on start of rebuild cache indexes.
+ * Callback on the start of rebuilding cache indexes.
* <p/>
- * Adding an entry that rebuilding the cache indexes in progress.
- * If the cache is persistent, then add this entry to the MetaStorage.
+ * Registers the start of rebuilding cache indexes, for persistent cache
+ * writes a entry to the MetaStorage so that if a failure occurs,
+ * the indexes are automatically rebuilt.
*
* @param cacheCtx Cache context.
* @see #onFinishRebuildIndexes
*/
public void onStartRebuildIndexes(GridCacheContext cacheCtx) {
- if (!stopNodeLock.enterBusy())
- throw new IgniteException("Node is stopping.");
-
- try {
- String cacheName = cacheCtx.name();
- boolean persistent = CU.isPersistentCache(cacheCtx.config(), ctx.config().getDataStorageConfiguration());
-
- states.compute(cacheName, (k, prev) -> {
- if (prev != null) {
- prev.state(INIT);
-
- return prev;
- }
- else
- return new IndexRebuildState(persistent);
- });
-
- if (persistent) {
- metaStorageOperation(metaStorage -> {
- assert metaStorage != null;
+ onStartOperation(cacheCtx, true);
+ }
- metaStorage.write(metaStorageKey(cacheName), new IndexRebuildCacheInfo(cacheName));
- });
- }
- }
- finally {
- stopNodeLock.leaveBusy();
- }
+ /**
+ * Callback on the start of building a new cache index.
+ * <p/>
+ * Registers the start of building a new cache index, for persistent cache
+ * writes a entry to the MetaStorage so that if a failure occurs,
+ * the indexes are automatically rebuilt.
+ *
+ * @param cacheCtx Cache context.
+ * @see #onFinishBuildNewIndex
+ */
+ public void onStartBuildNewIndex(GridCacheContext cacheCtx) {
+ onStartOperation(cacheCtx, false);
}
/**
- * Callback on finish of rebuild cache indexes.
+ * Callback on the finish of rebuilding cache indexes.
* <p/>
- * If the cache is persistent, then we mark that the rebuilding of the
- * indexes is completed and the entry will be deleted from the MetaStorage
- * at the end of the checkpoint. Otherwise, delete the index rebuild entry.
+ * Registers the finish of rebuilding cache indexes, if all operations
+ * have been completed for persistent cache, then the entry will be deleted
+ * from the MetaStorage at the end of the checkpoint,
+ * otherwise for the in-memory cache the status will be deleted immediately.
*
* @param cacheName Cache name.
* @see #onStartRebuildIndexes
*/
public void onFinishRebuildIndexes(String cacheName) {
- states.compute(cacheName, (k, prev) -> {
- if (prev != null && prev.persistent()) {
- prev.state(INIT, COMPLETED);
+ onFinishOperation(cacheName, true);
+ }
- return prev;
- }
- else
- return null;
- });
+ /**
+ * Callback on the finish of building a new cache index.
+ * <p/>
+ * Registers the finish of building a new cache index, if all operations
+ * have been completed for persistent cache, then the entry will be deleted
+ * from the MetaStorage at the end of the checkpoint,
+ * otherwise for the in-memory cache the status will be deleted immediately.
+ *
+ * @param cacheName Cache name.
+ * @see #onStartBuildNewIndex
+ */
+ public void onFinishBuildNewIndex(String cacheName) {
+ onFinishOperation(cacheName, false);
}
/**
@@ -172,10 +163,10 @@ public class IndexRebuildStateStorage implements MetastorageLifecycleListener, C
* @param cacheName Cache name.
* @return {@code True} if completed.
*/
- public boolean completed(String cacheName) {
- IndexRebuildState state = states.get(cacheName);
+ public boolean rebuildCompleted(String cacheName) {
+ IndexBuildStatusHolder status = statuses.get(cacheName);
- return state == null || state.state() != INIT;
+ return status == null || !status.rebuild();
}
/** {@inheritDoc} */
@@ -197,7 +188,7 @@ public class IndexRebuildStateStorage implements MetastorageLifecycleListener, C
(k, v) -> {
IndexRebuildCacheInfo cacheInfo = (IndexRebuildCacheInfo)v;
- states.put(cacheInfo.cacheName(), new IndexRebuildState(true));
+ statuses.put(cacheInfo.cacheName(), new IndexBuildStatusHolder(true, true));
},
true
);
@@ -214,9 +205,9 @@ public class IndexRebuildStateStorage implements MetastorageLifecycleListener, C
return;
try {
- for (IndexRebuildState state : states.values()) {
- if (state.state(COMPLETED, DELETE))
- assert state.persistent();
+ for (IndexBuildStatusHolder status : statuses.values()) {
+ if (status.delete())
+ assert status.persistent();
}
}
finally {
@@ -240,17 +231,17 @@ public class IndexRebuildStateStorage implements MetastorageLifecycleListener, C
return;
try {
- for (String cacheName : states.keySet()) {
+ for (String cacheName : statuses.keySet()) {
// Trying to concurrently delete the state.
- IndexRebuildState newVal =
- states.compute(cacheName, (k, prev) -> prev != null && prev.state() == DELETE ? null : prev);
+ IndexBuildStatusHolder newVal =
+ statuses.compute(cacheName, (k, prev) -> prev != null && prev.status() == DELETE ? null : prev);
// Assume that the state has been deleted.
if (newVal == null) {
metaStorageOperation(metaStorage -> {
assert metaStorage != null;
- if (!states.containsKey(cacheName))
+ if (!statuses.containsKey(cacheName))
metaStorage.remove(metaStorageKey(cacheName));
});
}
@@ -295,4 +286,67 @@ public class IndexRebuildStateStorage implements MetastorageLifecycleListener, C
private static String metaStorageKey(String cacheName) {
return KEY_PREFIX + cacheName;
}
+
+ /**
+ * Callback on the start of the cache index building operation.
+ * <p/>
+ * Registers the start of an index build operation, for persistent cache
+ * writes a entry to the MetaStorage so that if a failure occurs,
+ * the indexes are automatically rebuilt.
+ *
+ * @param cacheCtx Cache context.
+ * @param rebuild {@code True} if rebuilding indexes, otherwise building a new index.
+ * @see #onFinishOperation
+ */
+ private void onStartOperation(GridCacheContext cacheCtx, boolean rebuild) {
+ if (!stopNodeLock.enterBusy())
+ throw new IgniteException("Node is stopping.");
+
+ try {
+ String cacheName = cacheCtx.name();
+ boolean persistent = CU.isPersistentCache(cacheCtx.config(), ctx.config().getDataStorageConfiguration());
+
+ statuses.compute(cacheName, (k, prev) -> {
+ if (prev != null) {
+ prev.onStartOperation(rebuild);
+
+ return prev;
+ }
+ else
+ return new IndexBuildStatusHolder(persistent, rebuild);
+ });
+
+ if (persistent) {
+ metaStorageOperation(metaStorage -> {
+ assert metaStorage != null;
+
+ metaStorage.write(metaStorageKey(cacheName), new IndexRebuildCacheInfo(cacheName));
+ });
+ }
+ }
+ finally {
+ stopNodeLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Callback on the finish of the cache index building operation.
+ * <p/>
+ * Registers the finish of the index build operation, if all operations
+ * have been completed for persistent cache, then the entry will be deleted
+ * from the MetaStorage at the end of the checkpoint,
+ * otherwise for the in-memory cache the status will be deleted immediately.
+ *
+ * @param cacheName Cache name.
+ * @param rebuild {@code True} if rebuilding indexes, otherwise building a new index.
+ * @see #onStartOperation
+ */
+ private void onFinishOperation(String cacheName, boolean rebuild) {
+ statuses.compute(cacheName, (k, prev) -> {
+ if (prev != null && prev.onFinishOperation(rebuild) == COMPLETE && !prev.persistent())
+ return null;
+ else
+ return prev;
+ });
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/aware/IndexRebuildState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/aware/IndexRebuildState.java
deleted file mode 100644
index 06a2b62..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/aware/IndexRebuildState.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.aware;
-
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-import static org.apache.ignite.internal.processors.query.aware.IndexRebuildState.State.INIT;
-
-/**
- * State of rebuilding indexes for the cache.
- */
-public class IndexRebuildState {
- /**
- * Enumeration of index rebuild states.
- */
- enum State {
- /** Initial state. */
- INIT,
-
- /** Completed. */
- COMPLETED,
-
- /** To be deleted. */
- DELETE
- }
-
- /** Index rebuild state state atomic updater. */
- private static final AtomicReferenceFieldUpdater<IndexRebuildState, State> STATE_UPDATER =
- AtomicReferenceFieldUpdater.newUpdater(IndexRebuildState.class, State.class, "state");
-
- /** Persistent cache. */
- private final boolean persistent;
-
- /** Index rebuild state. */
- private volatile State state = INIT;
-
- /**
- * Constructor.
- *
- * @param persistent Persistent cache.
- */
- public IndexRebuildState(boolean persistent) {
- this.persistent = persistent;
- }
-
- /**
- * Checking if the cache is persistent.
- *
- * @return {@code True} if persistent.
- */
- public boolean persistent() {
- return persistent;
- }
-
- /**
- * Getting the state rebuild of indexes.
- *
- * @return Current state.
- */
- public State state() {
- return state;
- }
-
- /**
- * Setting the state rebuild of the indexes.
- *
- * @param state New state.
- */
- public void state(State state) {
- this.state = state;
- }
-
- /**
- * Atomically sets of the state rebuild of the indexes.
- *
- * @param exp Expected state.
- * @param newState New state.
- * @return {@code True} if successful.
- */
- public boolean state(State exp, State newState) {
- return STATE_UPDATER.compareAndSet(this, exp, newState);
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(IndexRebuildState.class, this);
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCachePartitionWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCachePartitionWorker.java
index e6e9f16..c8ed9c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCachePartitionWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCachePartitionWorker.java
@@ -40,7 +40,9 @@ import org.jetbrains.annotations.Nullable;
import static java.util.Objects.nonNull;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_ENABLE_EXTRA_INDEX_REBUILD_LOGGING;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_INDEX_REBUILD_BATCH_SIZE;
import static org.apache.ignite.IgniteSystemProperties.getBoolean;
+import static org.apache.ignite.IgniteSystemProperties.getInteger;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.EVICTED;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
@@ -52,8 +54,11 @@ import static org.apache.ignite.internal.processors.cache.persistence.CacheDataR
* Worker for creating/rebuilding indexes for cache per partition.
*/
public class SchemaIndexCachePartitionWorker extends GridWorker {
+ /** Default count of rows, being processed within a single checkpoint lock. */
+ public static final int DFLT_IGNITE_INDEX_REBUILD_BATCH_SIZE = 1_000;
+
/** Count of rows, being processed within a single checkpoint lock. */
- private static final int BATCH_SIZE = 1000;
+ private final int batchSize = getInteger(IGNITE_INDEX_REBUILD_BATCH_SIZE, DFLT_IGNITE_INDEX_REBUILD_BATCH_SIZE);
/** Cache context. */
private final GridCacheContext cctx;
@@ -185,7 +190,7 @@ public class SchemaIndexCachePartitionWorker extends GridWorker {
processKey(key);
- if (++cntr % BATCH_SIZE == 0) {
+ if (++cntr % batchSize == 0) {
cctx.shared().database().checkpointReadUnlock();
locked = false;
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index ad9e023..d06f2f2 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -2162,14 +2162,20 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
* @throws IgniteCheckedException If checkpoint was failed.
*/
protected void forceCheckpoint(Collection<Ignite> nodes) throws IgniteCheckedException {
- for (Ignite ignite : nodes) {
- if (ignite.cluster().localNode().isClient())
- continue;
-
- GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)((IgniteEx)ignite).context()
- .cache().context().database();
+ forceCheckpoint(nodes, "test");
+ }
- dbMgr.waitForCheckpoint("test");
+ /**
+ * Forces checkpoint on all specified nodes.
+ *
+ * @param nodes Nodes to force checkpoint on them.
+ * @param reason Reason for checkpoint wakeup if it would be required.
+ * @throws IgniteCheckedException If checkpoint was failed.
+ */
+ protected void forceCheckpoint(Collection<Ignite> nodes, String reason) throws IgniteCheckedException {
+ for (Ignite ignite : nodes) {
+ if (!ignite.cluster().localNode().isClient())
+ dbMgr((IgniteEx)ignite).waitForCheckpoint(reason);
}
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractRebuildIndexTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractRebuildIndexTest.java
index bb6c34a..085fdff 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractRebuildIndexTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractRebuildIndexTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.index;
import java.util.List;
+import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
@@ -31,19 +32,22 @@ import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.CacheMetricsImpl;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.BreakRebuildIndexConsumer;
-import org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.StopRebuildIndexConsumer;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
-import org.apache.ignite.internal.util.lang.IgniteThrowableBiPredicate;
+import org.apache.ignite.internal.processors.cache.index.IndexingTestUtils.BreakBuildIndexConsumer;
+import org.apache.ignite.internal.processors.cache.index.IndexingTestUtils.SlowdownBuildIndexConsumer;
+import org.apache.ignite.internal.processors.cache.index.IndexingTestUtils.StopBuildIndexConsumer;
+import org.apache.ignite.internal.processors.query.aware.IndexBuildStatusHolder;
+import org.apache.ignite.internal.processors.query.aware.IndexBuildStatusStorage;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.internal.processors.cache.index.IgniteH2IndexingEx.addIdxCreateCacheRowConsumer;
import static org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.addCacheRowConsumer;
-import static org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.nodeName;
+import static org.apache.ignite.internal.processors.cache.index.IndexingTestUtils.nodeName;
import static org.apache.ignite.testframework.GridTestUtils.deleteIndexBin;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
/**
* Base class for testing index rebuilds.
@@ -54,6 +58,7 @@ public abstract class AbstractRebuildIndexTest extends GridCommonAbstractTest {
super.beforeTest();
IndexesRebuildTaskEx.clean(getTestIgniteInstanceName());
+ IgniteH2IndexingEx.clean(getTestIgniteInstanceName());
stopAllGrids();
cleanPersistenceDir();
@@ -64,6 +69,7 @@ public abstract class AbstractRebuildIndexTest extends GridCommonAbstractTest {
super.afterTest();
IndexesRebuildTaskEx.clean(getTestIgniteInstanceName());
+ IgniteH2IndexingEx.clean(getTestIgniteInstanceName());
stopAllGrids();
cleanPersistenceDir();
@@ -90,38 +96,92 @@ public abstract class AbstractRebuildIndexTest extends GridCommonAbstractTest {
}
/**
- * Registering a {@link StopRebuildIndexConsumer} for cache.
+ * Registering a {@link StopBuildIndexConsumer} to {@link IndexesRebuildTaskEx#addCacheRowConsumer}.
*
* @param n Node.
* @param cacheName Cache name.
- * @return New instance of {@link StopRebuildIndexConsumer}.
+ * @return New instance of {@link StopBuildIndexConsumer}.
*/
- protected StopRebuildIndexConsumer addStopRebuildIndexConsumer(IgniteEx n, String cacheName) {
- StopRebuildIndexConsumer stopRebuildIdxConsumer = new StopRebuildIndexConsumer(getTestTimeout());
+ protected StopBuildIndexConsumer addStopRebuildIndexConsumer(IgniteEx n, String cacheName) {
+ StopBuildIndexConsumer consumer = new StopBuildIndexConsumer(getTestTimeout());
- addCacheRowConsumer(nodeName(n), cacheName, stopRebuildIdxConsumer);
+ addCacheRowConsumer(nodeName(n), cacheName, consumer);
- return stopRebuildIdxConsumer;
+ return consumer;
}
/**
- * Registering a {@link BreakRebuildIndexConsumer} for cache.
+ * Registering a {@link BreakBuildIndexConsumer} to {@link IndexesRebuildTaskEx#addCacheRowConsumer}.
*
* @param n Node.
* @param cacheName Cache name.
- * @param breakPred Predicate for throwing an {@link IgniteCheckedException}.
- * @return New instance of {@link BreakRebuildIndexConsumer}.
+ * @param breakCnt Count of rows processed, after which an {@link IgniteCheckedException} will be thrown.
+ * @return New instance of {@link BreakBuildIndexConsumer}.
*/
- protected BreakRebuildIndexConsumer addBreakRebuildIndexConsumer(
+ protected BreakBuildIndexConsumer addBreakRebuildIndexConsumer(IgniteEx n, String cacheName, int breakCnt) {
+ BreakBuildIndexConsumer consumer = new BreakBuildIndexConsumer(
+ getTestTimeout(),
+ (c, r) -> c.visitCnt.get() >= breakCnt
+ );
+
+ addCacheRowConsumer(nodeName(n), cacheName, consumer);
+
+ return consumer;
+ }
+
+ /**
+ * Registering a {@link SlowdownBuildIndexConsumer} to {@link IndexesRebuildTaskEx#addCacheRowConsumer}.
+ *
+ * @param n Node.
+ * @param cacheName Cache name.
+ * @param sleepTime Sleep time after processing each cache row in milliseconds.
+ * @return New instance of {@link SlowdownBuildIndexConsumer}.
+ */
+ protected SlowdownBuildIndexConsumer addSlowdownRebuildIndexConsumer(
IgniteEx n,
String cacheName,
- IgniteThrowableBiPredicate<BreakRebuildIndexConsumer, CacheDataRow> breakPred
+ long sleepTime
) {
- BreakRebuildIndexConsumer breakRebuildIdxConsumer = new BreakRebuildIndexConsumer(getTestTimeout(), breakPred);
+ SlowdownBuildIndexConsumer consumer = new SlowdownBuildIndexConsumer(getTestTimeout(), sleepTime);
- addCacheRowConsumer(nodeName(n), cacheName, breakRebuildIdxConsumer);
+ addCacheRowConsumer(nodeName(n), cacheName, consumer);
- return breakRebuildIdxConsumer;
+ return consumer;
+ }
+
+ /**
+ * Registering a {@link SlowdownBuildIndexConsumer} to {@link IgniteH2IndexingEx#addIdxCreateCacheRowConsumer}.
+ *
+ * @param n Node.
+ * @param idxName Index name.
+ * @param sleepTime Sleep time after processing each cache row in milliseconds.
+ * @return New instance of {@link SlowdownBuildIndexConsumer}.
+ */
+ protected SlowdownBuildIndexConsumer addSlowdownIdxCreateConsumer(IgniteEx n, String idxName, long sleepTime) {
+ SlowdownBuildIndexConsumer consumer = new SlowdownBuildIndexConsumer(getTestTimeout(), sleepTime);
+
+ addIdxCreateCacheRowConsumer(nodeName(n), idxName, consumer);
+
+ return consumer;
+ }
+
+ /**
+ * Registering a {@link BreakBuildIndexConsumer} to {@link IgniteH2IndexingEx#addIdxCreateCacheRowConsumer}.
+ *
+ * @param n Node.
+ * @param idxName Index name.
+ * @param breakCnt Count of rows processed, after which an {@link IgniteCheckedException} will be thrown.
+ * @return New instance of {@link BreakBuildIndexConsumer}.
+ */
+ protected BreakBuildIndexConsumer addBreakIdxCreateConsumer(IgniteEx n, String idxName, int breakCnt) {
+ BreakBuildIndexConsumer consumer = new BreakBuildIndexConsumer(
+ getTestTimeout(),
+ (c, r) -> c.visitCnt.get() >= breakCnt
+ );
+
+ addIdxCreateCacheRowConsumer(nodeName(n), idxName, consumer);
+
+ return consumer;
}
/**
@@ -215,4 +275,24 @@ public abstract class AbstractRebuildIndexTest extends GridCommonAbstractTest {
for (int i = 0; i < cnt; i++)
cache.put(i, new Person(i, "name_" + i));
}
+
+ /**
+ * Getting {@code GridQueryProcessor#idxBuildStatusStorage}.
+ *
+ * @param n Node.
+ * @return Index build status storage.
+ */
+ protected IndexBuildStatusStorage indexBuildStatusStorage(IgniteEx n) {
+ return getFieldValue(n.context().query(), "idxBuildStatusStorage");
+ }
+
+ /**
+ * Getting {@code IndexBuildStatusStorage#statuses}.
+ *
+ * @param n Node.
+ * @return Index build status storage.
+ */
+ protected ConcurrentMap<String, IndexBuildStatusHolder> statuses(IgniteEx n) {
+ return getFieldValue(indexBuildStatusStorage(n), "statuses");
+ }
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/ForceRebuildIndexTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/ForceRebuildIndexTest.java
index 60cf450..1c61d3b 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/ForceRebuildIndexTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/ForceRebuildIndexTest.java
@@ -21,7 +21,7 @@ import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
-import org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.StopRebuildIndexConsumer;
+import org.apache.ignite.internal.processors.cache.index.IndexingTestUtils.StopBuildIndexConsumer;
import org.apache.ignite.internal.processors.query.aware.IndexRebuildFutureStorage;
import org.apache.ignite.internal.util.typedef.F;
import org.junit.Test;
@@ -49,21 +49,21 @@ public class ForceRebuildIndexTest extends AbstractRebuildIndexTest {
GridCacheContext<?, ?> cacheCtx = n.cachex(DEFAULT_CACHE_NAME).context();
- StopRebuildIndexConsumer stopRebuildIdxConsumer = addStopRebuildIndexConsumer(n, cacheCtx.name());
+ StopBuildIndexConsumer stopRebuildIdxConsumer = addStopRebuildIndexConsumer(n, cacheCtx.name());
// The forced rebuild has begun - no rejected.
assertEqualsCollections(emptyList(), forceRebuildIndexes(n, cacheCtx));
IgniteInternalFuture<?> idxRebFut0 = checkStartRebuildIndexes(n, cacheCtx);
- stopRebuildIdxConsumer.startRebuildIdxFut.get(getTestTimeout());
+ stopRebuildIdxConsumer.startBuildIdxFut.get(getTestTimeout());
assertFalse(idxRebFut0.isDone());
// There will be no forced rebuilding since the previous one has not ended - they will be rejected.
assertEqualsCollections(F.asList(cacheCtx), forceRebuildIndexes(n, cacheCtx));
assertTrue(idxRebFut0 == indexRebuildFuture(n, cacheCtx.cacheId()));
- stopRebuildIdxConsumer.finishRebuildIdxFut.onDone();
+ stopRebuildIdxConsumer.finishBuildIdxFut.onDone();
idxRebFut0.get(getTestTimeout());
@@ -77,10 +77,10 @@ public class ForceRebuildIndexTest extends AbstractRebuildIndexTest {
IgniteInternalFuture<?> idxRebFut1 = checkStartRebuildIndexes(n, cacheCtx);
- stopRebuildIdxConsumer.startRebuildIdxFut.get(getTestTimeout());
+ stopRebuildIdxConsumer.startBuildIdxFut.get(getTestTimeout());
assertFalse(idxRebFut1.isDone());
- stopRebuildIdxConsumer.finishRebuildIdxFut.onDone();
+ stopRebuildIdxConsumer.finishBuildIdxFut.onDone();
idxRebFut1.get(getTestTimeout());
checkFinishRebuildIndexes(n, cacheCtx, 100);
@@ -102,13 +102,13 @@ public class ForceRebuildIndexTest extends AbstractRebuildIndexTest {
prepareBeforeNodeStart();
- StopRebuildIndexConsumer stopRebuildIdxConsumer = addStopRebuildIndexConsumer(n, DEFAULT_CACHE_NAME);
+ StopBuildIndexConsumer stopRebuildIdxConsumer = addStopRebuildIndexConsumer(n, DEFAULT_CACHE_NAME);
n = startGrid(0);
GridCacheContext<?, ?> cacheCtx = n.cachex(DEFAULT_CACHE_NAME).context();
- stopRebuildIdxConsumer.startRebuildIdxFut.get(getTestTimeout());
+ stopRebuildIdxConsumer.startBuildIdxFut.get(getTestTimeout());
IgniteInternalFuture<?> idxRebFut0 = checkStartRebuildIndexes(n, cacheCtx);
checkRebuildAfterExchange(n, cacheCtx.cacheId(), true);
@@ -118,7 +118,7 @@ public class ForceRebuildIndexTest extends AbstractRebuildIndexTest {
assertTrue(idxRebFut0 == indexRebuildFuture(n, cacheCtx.cacheId()));
checkRebuildAfterExchange(n, cacheCtx.cacheId(), true);
- stopRebuildIdxConsumer.finishRebuildIdxFut.onDone();
+ stopRebuildIdxConsumer.finishBuildIdxFut.onDone();
idxRebFut0.get(getTestTimeout());
@@ -134,10 +134,10 @@ public class ForceRebuildIndexTest extends AbstractRebuildIndexTest {
IgniteInternalFuture<?> idxRebFut1 = checkStartRebuildIndexes(n, cacheCtx);
checkRebuildAfterExchange(n, cacheCtx.cacheId(), false);
- stopRebuildIdxConsumer.startRebuildIdxFut.get(getTestTimeout());
+ stopRebuildIdxConsumer.startBuildIdxFut.get(getTestTimeout());
assertFalse(idxRebFut1.isDone());
- stopRebuildIdxConsumer.finishRebuildIdxFut.onDone();
+ stopRebuildIdxConsumer.finishBuildIdxFut.onDone();
idxRebFut1.get(getTestTimeout());
checkFinishRebuildIndexes(n, cacheCtx, 100);
@@ -160,13 +160,13 @@ public class ForceRebuildIndexTest extends AbstractRebuildIndexTest {
prepareBeforeNodeStart();
- StopRebuildIndexConsumer stopRebuildIdxConsumer = addStopRebuildIndexConsumer(n, DEFAULT_CACHE_NAME);
+ StopBuildIndexConsumer stopRebuildIdxConsumer = addStopRebuildIndexConsumer(n, DEFAULT_CACHE_NAME);
n = startGrid(0);
GridCacheContext<?, ?> cacheCtx = n.cachex(DEFAULT_CACHE_NAME).context();
- stopRebuildIdxConsumer.startRebuildIdxFut.get(getTestTimeout());
+ stopRebuildIdxConsumer.startBuildIdxFut.get(getTestTimeout());
IgniteInternalFuture<?> idxRebFut = checkStartRebuildIndexes(n, cacheCtx);
@@ -175,7 +175,7 @@ public class ForceRebuildIndexTest extends AbstractRebuildIndexTest {
assertTrue(idxRebFut == indexRebuildFuture(n, cacheCtx.cacheId()));
- stopRebuildIdxConsumer.finishRebuildIdxFut.onDone();
+ stopRebuildIdxConsumer.finishBuildIdxFut.onDone();
idxRebFut.get(getTestTimeout());
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IgniteH2IndexingEx.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IgniteH2IndexingEx.java
new file mode 100644
index 0000000..b845d85
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IgniteH2IndexingEx.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.index;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.processors.query.QueryIndexDescriptorImpl;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
+import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.Collections.emptyMap;
+import static org.apache.ignite.internal.processors.cache.index.IndexingTestUtils.DO_NOTHING_CACHE_DATA_ROW_CONSUMER;
+import static org.apache.ignite.internal.processors.cache.index.IndexingTestUtils.nodeName;
+
+/**
+ * Extension {@link IgniteH2Indexing} for the tests.
+ */
+public class IgniteH2IndexingEx extends IgniteH2Indexing {
+ /**
+ * Consumer for cache rows when creating an index on a node.
+ * Mapping: Node name -> Index name -> Consumer.
+ */
+ private static final Map<String, Map<String, IgniteThrowableConsumer<CacheDataRow>>> idxCreateCacheRowConsumer =
+ new ConcurrentHashMap<>();
+
+ /**
+ * Set {@link IgniteH2IndexingEx} to {@link GridQueryProcessor#idxCls} before starting the node.
+ */
+ static void prepareBeforeNodeStart() {
+ GridQueryProcessor.idxCls = IgniteH2IndexingEx.class;
+ }
+
+ /**
+ * Cleaning of internal structures. It is recommended to clean at
+ * {@code GridCommonAbstractTest#beforeTest} and {@code GridCommonAbstractTest#afterTest}.
+ *
+ * @param nodeNamePrefix Prefix of node name ({@link GridKernalContext#igniteInstanceName()})
+ * for which internal structures will be removed, if {@code null} will be removed for all.
+ *
+ * @see GridCommonAbstractTest#getTestIgniteInstanceName()
+ */
+ static void clean(@Nullable String nodeNamePrefix) {
+ if (nodeNamePrefix == null)
+ idxCreateCacheRowConsumer.clear();
+ else
+ idxCreateCacheRowConsumer.entrySet().removeIf(e -> e.getKey().startsWith(nodeNamePrefix));
+ }
+
+ /**
+ * Registering a consumer for cache rows when creating an index on a node.
+ *
+ * @param nodeName The name of the node,
+ * the value of which will return {@link GridKernalContext#igniteInstanceName()}.
+ * @param idxName Index name.
+ * @param c Cache row consumer.
+ *
+ * @see IndexingTestUtils#nodeName(GridKernalContext)
+ * @see IndexingTestUtils#nodeName(IgniteEx)
+ * @see IndexingTestUtils#nodeName(GridCacheContext)
+ * @see GridCommonAbstractTest#getTestIgniteInstanceName(int)
+ */
+ static void addIdxCreateCacheRowConsumer(
+ String nodeName,
+ String idxName,
+ IgniteThrowableConsumer<CacheDataRow> c
+ ) {
+ idxCreateCacheRowConsumer.computeIfAbsent(nodeName, s -> new ConcurrentHashMap<>()).put(idxName, c);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void dynamicIndexCreate(
+ String schemaName,
+ String tblName,
+ QueryIndexDescriptorImpl idxDesc,
+ boolean ifNotExists,
+ SchemaIndexCacheVisitor cacheVisitor
+ ) throws IgniteCheckedException {
+ super.dynamicIndexCreate(schemaName, tblName, idxDesc, ifNotExists, clo -> {
+ cacheVisitor.visit(row -> {
+ idxCreateCacheRowConsumer
+ .getOrDefault(nodeName(ctx), emptyMap())
+ .getOrDefault(idxDesc.name(), DO_NOTHING_CACHE_DATA_ROW_CONSUMER)
+ .accept(row);
+
+ clo.apply(row);
+ });
+ });
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IndexesRebuildTaskEx.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IndexesRebuildTaskEx.java
index 2596cf0..d182b46 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IndexesRebuildTaskEx.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IndexesRebuildTaskEx.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.index;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteEx;
@@ -31,12 +30,13 @@ import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexOperationCancellationToken;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.lang.IgniteThrowableBiPredicate;
import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import static java.util.Collections.emptyMap;
+import static org.apache.ignite.internal.processors.cache.index.IndexingTestUtils.DO_NOTHING_CACHE_DATA_ROW_CONSUMER;
+import static org.apache.ignite.internal.processors.cache.index.IndexingTestUtils.nodeName;
/**
* Extension {@link IndexesRebuildTask} for the tests.
@@ -65,8 +65,10 @@ class IndexesRebuildTaskEx extends IndexesRebuildTask {
super.startRebuild(cctx, rebuildIdxFut, new SchemaIndexCacheVisitorClosure() {
/** {@inheritDoc} */
@Override public void apply(CacheDataRow row) throws IgniteCheckedException {
- cacheRowConsumer.getOrDefault(nodeName(cctx), emptyMap())
- .getOrDefault(cctx.name(), r -> { }).accept(row);
+ cacheRowConsumer
+ .getOrDefault(nodeName(cctx), emptyMap())
+ .getOrDefault(cctx.name(), DO_NOTHING_CACHE_DATA_ROW_CONSUMER)
+ .accept(row);
clo.apply(row);
}
@@ -115,9 +117,9 @@ class IndexesRebuildTaskEx extends IndexesRebuildTask {
* @param cacheName Cache name.
* @param c Cache row consumer.
*
- * @see #nodeName(GridKernalContext)
- * @see #nodeName(IgniteEx)
- * @see #nodeName(GridCacheContext)
+ * @see IndexingTestUtils#nodeName(GridKernalContext)
+ * @see IndexingTestUtils#nodeName(IgniteEx)
+ * @see IndexingTestUtils#nodeName(GridCacheContext)
* @see GridCommonAbstractTest#getTestIgniteInstanceName(int)
*/
static void addCacheRowConsumer(String nodeName, String cacheName, IgniteThrowableConsumer<CacheDataRow> c) {
@@ -132,120 +134,12 @@ class IndexesRebuildTaskEx extends IndexesRebuildTask {
* @param cacheName Cache name.
* @param r A function that should run before preparing to rebuild the cache indexes.
*
- * @see #nodeName(GridKernalContext)
- * @see #nodeName(IgniteEx)
- * @see #nodeName(GridCacheContext)
+ * @see IndexingTestUtils#nodeName(GridKernalContext)
+ * @see IndexingTestUtils#nodeName(IgniteEx)
+ * @see IndexingTestUtils#nodeName(GridCacheContext)
* @see GridCommonAbstractTest#getTestIgniteInstanceName(int)
*/
static void addCacheRebuildRunner(String nodeName, String cacheName, Runnable r) {
cacheRebuildRunner.computeIfAbsent(nodeName, s -> new ConcurrentHashMap<>()).put(cacheName, r);
}
-
- /**
- * Getting local instance name of the node.
- *
- * @param cacheCtx Cache context.
- * @return Local instance name.
- */
- static String nodeName(GridCacheContext cacheCtx) {
- return nodeName(cacheCtx.kernalContext());
- }
-
- /**
- * Getting local instance name of the node.
- *
- * @param n Node.
- * @return Local instance name.
- */
- static String nodeName(IgniteEx n) {
- return nodeName(n.context());
- }
-
- /**
- * Getting local instance name of the node.
- *
- * @param kernalCtx Kernal context.
- * @return Local instance name.
- */
- static String nodeName(GridKernalContext kernalCtx) {
- return kernalCtx.igniteInstanceName();
- }
-
- /**
- * Consumer for stopping rebuilding indexes of cache.
- */
- static class StopRebuildIndexConsumer implements IgniteThrowableConsumer<CacheDataRow> {
- /** Future to indicate that the rebuilding indexes has begun. */
- final GridFutureAdapter<Void> startRebuildIdxFut = new GridFutureAdapter<>();
-
- /** Future to wait to continue rebuilding indexes. */
- final GridFutureAdapter<Void> finishRebuildIdxFut = new GridFutureAdapter<>();
-
- /** Counter of visits. */
- final AtomicLong visitCnt = new AtomicLong();
-
- /** The maximum time to wait finish future in milliseconds. */
- final long timeout;
-
- /**
- * Constructor.
- *
- * @param timeout The maximum time to wait finish future in milliseconds.
- */
- StopRebuildIndexConsumer(long timeout) {
- this.timeout = timeout;
- }
-
- /** {@inheritDoc} */
- @Override public void accept(CacheDataRow row) throws IgniteCheckedException {
- startRebuildIdxFut.onDone();
-
- visitCnt.incrementAndGet();
-
- finishRebuildIdxFut.get(timeout);
- }
-
- /**
- * Resetting internal futures.
- */
- void resetFutures() {
- startRebuildIdxFut.reset();
- finishRebuildIdxFut.reset();
- }
- }
-
- /**
- * Consumer breaking index rebuild for the cache.
- */
- static class BreakRebuildIndexConsumer extends StopRebuildIndexConsumer {
- /** Predicate for throwing an {@link IgniteCheckedException}. */
- final IgniteThrowableBiPredicate<BreakRebuildIndexConsumer, CacheDataRow> brakePred;
-
- /**
- * Constructor.
- *
- * @param timeout The maximum time to wait finish future in milliseconds.
- * @param brakePred Predicate for throwing an {@link IgniteCheckedException}.
- */
- BreakRebuildIndexConsumer(
- long timeout,
- IgniteThrowableBiPredicate<BreakRebuildIndexConsumer, CacheDataRow> brakePred
- ) {
- super(timeout);
-
- this.brakePred = brakePred;
- }
-
- /** {@inheritDoc} */
- @Override public void accept(CacheDataRow row) throws IgniteCheckedException {
- startRebuildIdxFut.onDone();
-
- finishRebuildIdxFut.get(timeout);
-
- visitCnt.incrementAndGet();
-
- if (brakePred.test(this, row))
- throw new IgniteCheckedException("From test.");
- }
- }
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IndexingTestUtils.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IndexingTestUtils.java
new file mode 100644
index 0000000..e98cf94
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IndexingTestUtils.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.index;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgniteThrowableBiPredicate;
+import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Utility class for indexing.
+ */
+class IndexingTestUtils {
+ /** A do-nothing {@link CacheDataRow} consumer. */
+ static final IgniteThrowableConsumer<CacheDataRow> DO_NOTHING_CACHE_DATA_ROW_CONSUMER = row -> {
+ };
+
+ /**
+ * Private constructor.
+ */
+ private IndexingTestUtils() {
+ // No-op.
+ }
+
+ /**
+ * Getting local instance name of the node.
+ *
+ * @param kernalCtx Kernal context.
+ * @return Local instance name.
+ */
+ static String nodeName(GridKernalContext kernalCtx) {
+ return kernalCtx.igniteInstanceName();
+ }
+
+ /**
+ * Getting local instance name of the node.
+ *
+ * @param n Node.
+ * @return Local instance name.
+ */
+ static String nodeName(IgniteEx n) {
+ return nodeName(n.context());
+ }
+
+ /**
+ * Getting local instance name of the node.
+ *
+ * @param cacheCtx Cache context.
+ * @return Local instance name.
+ */
+ static String nodeName(GridCacheContext cacheCtx) {
+ return nodeName(cacheCtx.kernalContext());
+ }
+
+ /**
+ * Consumer for stopping building indexes of cache.
+ */
+ static class StopBuildIndexConsumer implements IgniteThrowableConsumer<CacheDataRow> {
+ /** Future to indicate that the building indexes has begun. */
+ final GridFutureAdapter<Void> startBuildIdxFut = new GridFutureAdapter<>();
+
+ /** Future to wait to continue building indexes. */
+ final GridFutureAdapter<Void> finishBuildIdxFut = new GridFutureAdapter<>();
+
+ /** Counter of visits. */
+ final AtomicLong visitCnt = new AtomicLong();
+
+ /** The maximum time to wait finish future in milliseconds. */
+ final long timeout;
+
+ /**
+ * Constructor.
+ *
+ * @param timeout The maximum time to wait finish future in milliseconds.
+ */
+ StopBuildIndexConsumer(long timeout) {
+ this.timeout = timeout;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void accept(CacheDataRow row) throws IgniteCheckedException {
+ startBuildIdxFut.onDone();
+
+ visitCnt.incrementAndGet();
+
+ finishBuildIdxFut.get(timeout);
+ }
+
+ /**
+ * Resetting internal futures.
+ */
+ void resetFutures() {
+ startBuildIdxFut.reset();
+ finishBuildIdxFut.reset();
+ }
+ }
+
+ /**
+ * Consumer for slowdown building indexes of cache.
+ */
+ static class SlowdownBuildIndexConsumer extends StopBuildIndexConsumer {
+ /** Sleep time after processing each cache row in milliseconds. */
+ final AtomicLong sleepTime;
+
+ /**
+ * Constructor.
+ *
+ * @param timeout The maximum time to wait finish future in milliseconds.
+ * @param sleepTime Sleep time after processing each cache row in milliseconds.
+ */
+ SlowdownBuildIndexConsumer(long timeout, long sleepTime) {
+ super(timeout);
+
+ this.sleepTime = new AtomicLong(sleepTime);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void accept(CacheDataRow row) throws IgniteCheckedException {
+ super.accept(row);
+
+ long sleepTime = this.sleepTime.get();
+
+ if (sleepTime > 0)
+ U.sleep(sleepTime);
+ }
+ }
+
+ /**
+ * Consumer breaking index building for the cache.
+ */
+ static class BreakBuildIndexConsumer extends StopBuildIndexConsumer {
+ /** Predicate for throwing an {@link IgniteCheckedException}. */
+ final IgniteThrowableBiPredicate<BreakBuildIndexConsumer, CacheDataRow> brakePred;
+
+ /**
+ * Constructor.
+ *
+ * @param timeout The maximum time to wait finish future in milliseconds.
+ * @param brakePred Predicate for throwing an {@link IgniteCheckedException}.
+ */
+ BreakBuildIndexConsumer(
+ long timeout,
+ IgniteThrowableBiPredicate<BreakBuildIndexConsumer, CacheDataRow> brakePred
+ ) {
+ super(timeout);
+
+ this.brakePred = brakePred;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void accept(CacheDataRow row) throws IgniteCheckedException {
+ super.accept(row);
+
+ if (brakePred.test(this, row))
+ throw new IgniteCheckedException("From test.");
+ }
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/ResumeCreateIndexTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/ResumeCreateIndexTest.java
new file mode 100644
index 0000000..20bee5f
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/ResumeCreateIndexTest.java
@@ -0,0 +1,510 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.index;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.client.Person;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.index.IndexingTestUtils.BreakBuildIndexConsumer;
+import org.apache.ignite.internal.processors.cache.index.IndexingTestUtils.SlowdownBuildIndexConsumer;
+import org.apache.ignite.internal.processors.cache.index.IndexingTestUtils.StopBuildIndexConsumer;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.query.QueryIndexDescriptorImpl;
+import org.apache.ignite.internal.processors.query.QueryIndexKey;
+import org.apache.ignite.internal.processors.query.aware.IndexBuildStatusHolder;
+import org.apache.ignite.internal.processors.query.aware.IndexBuildStatusHolder.Status;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.junit.Test;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_INDEX_REBUILD_BATCH_SIZE;
+import static org.apache.ignite.internal.processors.query.aware.IndexBuildStatusHolder.Status.COMPLETE;
+import static org.apache.ignite.internal.processors.query.aware.IndexBuildStatusHolder.Status.INIT;
+import static org.apache.ignite.internal.processors.query.aware.IndexBuildStatusStorage.KEY_PREFIX;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
+/**
+ * Test to check consistency when adding a new index.
+ */
+@WithSystemProperty(key = IGNITE_INDEX_REBUILD_BATCH_SIZE, value = "1")
+public class ResumeCreateIndexTest extends AbstractRebuildIndexTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setCacheConfiguration(
+ cacheCfg(DEFAULT_CACHE_NAME, null).setAffinity(new RendezvousAffinityFunction(false, 1))
+ );
+ }
+
+ /**
+ * Checking the general flow of building a new index.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testGeneralFlow() throws Exception {
+ String cacheName = DEFAULT_CACHE_NAME;
+
+ IgniteEx n = prepareNodeToCreateNewIndex(cacheName, 10, true);
+
+ String idxName = "IDX0";
+ SlowdownBuildIndexConsumer slowdownIdxCreateConsumer = addSlowdownIdxCreateConsumer(n, idxName, 0);
+
+ IgniteInternalFuture<List<List<?>>> createIdxFut = createIdxAsync(n.cache(cacheName), idxName);
+
+ slowdownIdxCreateConsumer.startBuildIdxFut.get(getTestTimeout());
+
+ checkInitStatus(n, cacheName, false, 1);
+
+ slowdownIdxCreateConsumer.finishBuildIdxFut.onDone();
+ createIdxFut.get(getTestTimeout());
+
+ checkCompletedStatus(n, cacheName);
+
+ enableCheckpointsAsync(n, getTestIgniteInstanceName(), true).get(getTestTimeout());
+
+ checkNoStatus(n, cacheName);
+ }
+
+ /**
+ * Checks that if there is no checkpoint after the index is created and the
+ * node is restarted, the indexes will be rebuilt.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testNoCheckpointAfterIndexCreation() throws Exception {
+ String cacheName = DEFAULT_CACHE_NAME;
+
+ IgniteEx n = prepareNodeToCreateNewIndex(cacheName, 10, true);
+
+ String idxName = "IDX0";
+ SlowdownBuildIndexConsumer slowdownIdxCreateConsumer = addSlowdownIdxCreateConsumer(n, idxName, 0);
+
+ IgniteInternalFuture<List<List<?>>> createIdxFut = createIdxAsync(n.cache(cacheName), idxName);
+
+ slowdownIdxCreateConsumer.startBuildIdxFut.get(getTestTimeout());
+
+ checkInitStatus(n, cacheName, false, 1);
+
+ slowdownIdxCreateConsumer.finishBuildIdxFut.onDone();
+ createIdxFut.get(getTestTimeout());
+
+ checkCompletedStatus(n, cacheName);
+
+ stopGrid(0);
+
+ IndexesRebuildTaskEx.prepareBeforeNodeStart();
+ StopBuildIndexConsumer stopRebuildIdxConsumer = addStopRebuildIndexConsumer(n, cacheName);
+
+ n = startGrid(0);
+ stopRebuildIdxConsumer.startBuildIdxFut.get(getTestTimeout());
+
+ IgniteInternalFuture<?> idxRebFut = indexRebuildFuture(n, CU.cacheId(cacheName));
+ assertNotNull(idxRebFut);
+
+ checkInitStatus(n, cacheName, true, 0);
+ assertTrue(allIndexes(n).containsKey(new QueryIndexKey(cacheName, idxName)));
+
+ stopRebuildIdxConsumer.finishBuildIdxFut.onDone();
+ idxRebFut.get(getTestTimeout());
+
+ forceCheckpoint();
+
+ checkNoStatus(n, cacheName);
+ assertEquals(10, selectPersonByName(n.cache(cacheName)).size());
+ }
+
+ /**
+ * Checks that if errors occur while building a new index, then there will be no rebuilding of the indexes.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testErrorFlow() throws Exception {
+ String cacheName = DEFAULT_CACHE_NAME;
+
+ IgniteEx n = prepareNodeToCreateNewIndex(cacheName, 10, true);
+
+ String idxName = "IDX0";
+ BreakBuildIndexConsumer breakBuildIdxConsumer = addBreakIdxCreateConsumer(n, idxName, 1);
+
+ IgniteInternalFuture<List<List<?>>> createIdxFut = createIdxAsync(n.cache(cacheName), idxName);
+
+ breakBuildIdxConsumer.startBuildIdxFut.get(getTestTimeout());
+
+ checkInitStatus(n, cacheName, false, 1);
+
+ breakBuildIdxConsumer.finishBuildIdxFut.onDone();
+ assertThrows(log, () -> createIdxFut.get(getTestTimeout()), IgniteCheckedException.class, null);
+
+ checkCompletedStatus(n, cacheName);
+
+ enableCheckpointsAsync(n, getTestIgniteInstanceName(), true).get(getTestTimeout());
+
+ checkNoStatus(n, cacheName);
+ }
+
+ /**
+ * Checks that building a new index and rebuilding indexes at the same time
+ * does not break the {@link IndexBuildStatusHolder}.
+ * In this case, building a new index is completed earlier.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testConcurrentBuildNewIndexAndRebuildIndexes0() throws Exception {
+ String cacheName = DEFAULT_CACHE_NAME;
+
+ IgniteEx n = prepareNodeToCreateNewIndex(cacheName, 100_000, true);
+
+ String idxName = "IDX0";
+ SlowdownBuildIndexConsumer slowdownIdxCreateConsumer = addSlowdownIdxCreateConsumer(n, idxName, 0);
+
+ IgniteInternalFuture<List<List<?>>> createIdxFut = createIdxAsync(n.cache(cacheName), idxName);
+
+ slowdownIdxCreateConsumer.startBuildIdxFut.get(getTestTimeout());
+
+ checkInitStatus(n, cacheName, false, 1);
+
+ SlowdownBuildIndexConsumer slowdownRebuildIdxConsumer = addSlowdownRebuildIndexConsumer(n, cacheName, 100);
+ assertTrue(forceRebuildIndexes(n, n.cachex(cacheName).context()).isEmpty());
+
+ checkInitStatus(n, cacheName, true, 1);
+
+ IgniteInternalFuture<?> idxRebFut = indexRebuildFuture(n, CU.cacheId(cacheName));
+ assertNotNull(idxRebFut);
+
+ slowdownIdxCreateConsumer.finishBuildIdxFut.onDone();
+
+ slowdownRebuildIdxConsumer.startBuildIdxFut.get(getTestTimeout());
+ slowdownRebuildIdxConsumer.finishBuildIdxFut.onDone();
+
+ createIdxFut.get(getTestTimeout());
+
+ assertFalse(idxRebFut.isDone());
+ checkInitStatus(n, cacheName, true, 0);
+
+ slowdownRebuildIdxConsumer.sleepTime.set(0);
+ idxRebFut.get(getTestTimeout());
+
+ checkCompletedStatus(n, cacheName);
+
+ enableCheckpointsAsync(n, getTestIgniteInstanceName(), true).get(getTestTimeout());
+
+ checkNoStatus(n, cacheName);
+ }
+
+ /**
+ * Checks that building a new index and rebuilding indexes at the same time
+ * does not break the {@link IndexBuildStatusHolder}.
+ * In this case, rebuilding indexes is completed earlier.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testConcurrentBuildNewIndexAndRebuildIndexes1() throws Exception {
+ String cacheName = DEFAULT_CACHE_NAME;
+
+ IgniteEx n = prepareNodeToCreateNewIndex(cacheName, 100_000, true);
+
+ SlowdownBuildIndexConsumer slowdownRebuildIdxConsumer = addSlowdownRebuildIndexConsumer(n, cacheName, 10);
+ assertTrue(forceRebuildIndexes(n, n.cachex(cacheName).context()).isEmpty());
+
+ checkInitStatus(n, cacheName, true, 0);
+
+ slowdownRebuildIdxConsumer.startBuildIdxFut.get(getTestTimeout());
+
+ IgniteInternalFuture<?> idxRebFut = indexRebuildFuture(n, CU.cacheId(cacheName));
+ assertNotNull(idxRebFut);
+
+ String idxName = "IDX0";
+ SlowdownBuildIndexConsumer slowdownIdxCreateConsumer = addSlowdownIdxCreateConsumer(n, idxName, 100);
+
+ IgniteInternalFuture<List<List<?>>> createIdxFut = createIdxAsync(n.cache(cacheName), idxName);
+
+ slowdownRebuildIdxConsumer.finishBuildIdxFut.onDone();
+ slowdownIdxCreateConsumer.startBuildIdxFut.get(getTestTimeout());
+
+ checkInitStatus(n, cacheName, true, 1);
+
+ slowdownIdxCreateConsumer.finishBuildIdxFut.onDone();
+ slowdownRebuildIdxConsumer.sleepTime.set(0);
+ idxRebFut.get(getTestTimeout());
+
+ checkInitStatus(n, cacheName, false, 1);
+
+ slowdownIdxCreateConsumer.sleepTime.set(0);
+ createIdxFut.get(getTestTimeout());
+
+ checkCompletedStatus(n, cacheName);
+
+ enableCheckpointsAsync(n, getTestIgniteInstanceName(), true).get(getTestTimeout());
+
+ checkNoStatus(n, cacheName);
+ }
+
+ /**
+ * Checks that if a checkpoint fails after building a new index and the
+ * node restarts, then the indexes will be rebuilt.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testPartialCheckpointNewIndexRows() throws Exception {
+ String cacheName = DEFAULT_CACHE_NAME;
+
+ IgniteEx n = prepareNodeToCreateNewIndex(cacheName, 100_000, false);
+
+ String idxName = "IDX0";
+ SlowdownBuildIndexConsumer slowdownIdxCreateConsumer = addSlowdownIdxCreateConsumer(n, idxName, 10);
+
+ IgniteInternalFuture<List<List<?>>> createIdxFut = createIdxAsync(n.cache(cacheName), idxName);
+
+ slowdownIdxCreateConsumer.startBuildIdxFut.get(getTestTimeout());
+
+ checkInitStatus(n, cacheName, false, 1);
+
+ String reason = getTestIgniteInstanceName();
+ IgniteInternalFuture<Void> awaitBeforeCpBeginFut = awaitBeforeCheckpointBeginAsync(n, reason);
+ IgniteInternalFuture<Void> disableCpFut = enableCheckpointsAsync(n, reason, false);
+
+ awaitBeforeCpBeginFut.get(getTestTimeout());
+ slowdownIdxCreateConsumer.finishBuildIdxFut.onDone();
+
+ disableCpFut.get(getTestTimeout());
+ slowdownIdxCreateConsumer.sleepTime.set(0);
+
+ createIdxFut.get(getTestTimeout());
+
+ checkCompletedStatus(n, cacheName);
+
+ stopGrid(0);
+
+ IndexesRebuildTaskEx.prepareBeforeNodeStart();
+ StopBuildIndexConsumer stopRebuildIdxConsumer = addStopRebuildIndexConsumer(n, cacheName);
+
+ n = startGrid(0);
+ stopRebuildIdxConsumer.startBuildIdxFut.get(getTestTimeout());
+
+ IgniteInternalFuture<?> rebIdxFut = indexRebuildFuture(n, CU.cacheId(cacheName));
+ assertNotNull(rebIdxFut);
+
+ checkInitStatus(n, cacheName, true, 0);
+ assertTrue(allIndexes(n).containsKey(new QueryIndexKey(cacheName, idxName)));
+
+ stopRebuildIdxConsumer.finishBuildIdxFut.onDone();
+ rebIdxFut.get(getTestTimeout());
+
+ forceCheckpoint();
+
+ checkNoStatus(n, cacheName);
+ assertEquals(100_000, selectPersonByName(n.cache(cacheName)).size());
+ }
+
+ /**
+ * Asynchronous creation of a new index for the cache of {@link Person}.
+ * SQL: CREATE INDEX " + idxName + " ON Person(name)
+ *
+ * @param cache Cache.
+ * @param idxName Index name.
+ * @return Index creation future.
+ */
+ private IgniteInternalFuture<List<List<?>>> createIdxAsync(IgniteCache<Integer, Person> cache, String idxName) {
+ return runAsync(() -> {
+ String sql = "CREATE INDEX " + idxName + " ON Person(name)";
+
+ return cache.query(new SqlFieldsQuery(sql)).getAll();
+ });
+ }
+
+ /**
+ * Enable checkpoints asynchronously.
+ *
+ * @param n Node.
+ * @param reason Reason for checkpoint wakeup if it would be required.
+ * @param enable Enable/disable.
+ * @return Disable checkpoints future.
+ */
+ private IgniteInternalFuture<Void> enableCheckpointsAsync(IgniteEx n, String reason, boolean enable) {
+ return runAsync(() -> {
+ if (enable) {
+ dbMgr(n).enableCheckpoints(true).get(getTestTimeout());
+
+ forceCheckpoint(F.asList(n), reason);
+ }
+ else {
+ forceCheckpoint(F.asList(n), reason);
+
+ dbMgr(n).enableCheckpoints(false).get(getTestTimeout());
+ }
+
+ return null;
+ });
+ }
+
+ /**
+ * Waiting for a {@link CheckpointListener#beforeCheckpointBegin} asynchronously
+ * for a checkpoint for a specific reason.
+ *
+ * @param n Node.
+ * @param reason Checkpoint reason.
+ * @return Future for waiting for the {@link CheckpointListener#beforeCheckpointBegin}.
+ */
+ private IgniteInternalFuture<Void> awaitBeforeCheckpointBeginAsync(IgniteEx n, String reason) {
+ GridFutureAdapter<Void> fut = new GridFutureAdapter<>();
+
+ dbMgr(n).addCheckpointListener(new CheckpointListener() {
+ /** {@inheritDoc} */
+ @Override public void onMarkCheckpointBegin(Context ctx) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onCheckpointBegin(Context ctx) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void beforeCheckpointBegin(Context ctx) {
+ if (reason.equals(ctx.progress().reason()))
+ fut.onDone();
+ }
+ });
+
+ return fut;
+ }
+
+ /**
+ * Getting {@code GridQueryProcessor#idxs}.
+ *
+ * @param n Node.
+ * @return All indexes.
+ */
+ private Map<QueryIndexKey, QueryIndexDescriptorImpl> allIndexes(IgniteEx n) {
+ return getFieldValue(n.context().query(), "idxs");
+ }
+
+ /**
+ * Selection of all {@link Person} by name.
+ * SQL: SELECT * FROM Person where name LIKE 'name_%';
+ *
+ * @param cache Cache.
+ * @return List containing all query results.
+ */
+ private List<List<?>> selectPersonByName(IgniteCache<Integer, Person> cache) {
+ return cache.query(new SqlFieldsQuery("SELECT * FROM Person where name LIKE 'name_%';")).getAll();
+ }
+
+ /**
+ * Checking status.
+ *
+ * @param status Cache index build status.
+ * @param expStatus Expected status.
+ * @param expPersistent Expected persistence flag.
+ * @param expRebuild Expected rebuild flag.
+ * @param expNewIdx Expected count of new indexes being built.
+ */
+ private void checkStatus(
+ IndexBuildStatusHolder status,
+ Status expStatus,
+ boolean expPersistent,
+ boolean expRebuild,
+ int expNewIdx
+ ) {
+ assertEquals(expStatus, status.status());
+ assertEquals(expPersistent, status.persistent());
+ assertEquals(expRebuild, status.rebuild());
+ assertEquals(expNewIdx, status.buildNewIndexes());
+ }
+
+ /**
+ * Creating a node and filling the cache.
+ *
+ * @param cacheName Cache name.
+ * @param cnt Entry count.
+ * @param disableCp Disable checkpoint.
+ * @return New node.
+ * @throws Exception If failed.
+ */
+ private IgniteEx prepareNodeToCreateNewIndex(String cacheName, int cnt, boolean disableCp) throws Exception {
+ IgniteH2IndexingEx.prepareBeforeNodeStart();
+ IndexesRebuildTaskEx.prepareBeforeNodeStart();
+
+ IgniteEx n = startGrid(0);
+
+ populate(n.cache(cacheName), cnt);
+
+ if (disableCp)
+ enableCheckpointsAsync(n, getTestIgniteInstanceName(), false).get(getTestTimeout());
+
+ return n;
+ }
+
+ /**
+ * Checking {@link Status#INIT} status.
+ *
+ * @param n Node.
+ * @param cacheName Cache name.
+ * @param expRebuild Expected rebuild flag.
+ * @param expNewIdx Expected count of new indexes being built.
+ * @throws Exception If failed.
+ */
+ private void checkInitStatus(IgniteEx n, String cacheName, boolean expRebuild, int expNewIdx) throws Exception {
+ checkStatus(statuses(n).get(cacheName), INIT, true, expRebuild, expNewIdx);
+ assertNotNull(metaStorageOperation(n, metaStorage -> metaStorage.read(KEY_PREFIX + cacheName)));
+ assertEquals(!expRebuild, indexBuildStatusStorage(n).rebuildCompleted(cacheName));
+ }
+
+ /**
+ * Checking {@link Status#COMPLETE} status.
+ *
+ * @param n Node.
+ * @param cacheName Cache name.
+ * @throws Exception If failed.
+ */
+ private void checkCompletedStatus(IgniteEx n, String cacheName) throws Exception {
+ checkStatus(statuses(n).get(cacheName), COMPLETE, true, false, 0);
+ assertNotNull(metaStorageOperation(n, metaStorage -> metaStorage.read(KEY_PREFIX + cacheName)));
+ assertTrue(indexBuildStatusStorage(n).rebuildCompleted(cacheName));
+ }
+
+ /**
+ * Checking for no status.
+ *
+ * @param n Node.
+ * @param cacheName Cache name.
+ * @throws Exception If failed.
+ */
+ private void checkNoStatus(IgniteEx n, String cacheName) throws Exception {
+ assertNull(statuses(n).get(cacheName));
+ assertNull(metaStorageOperation(n, metaStorage -> metaStorage.read(KEY_PREFIX + cacheName)));
+ assertTrue(indexBuildStatusStorage(n).rebuildCompleted(cacheName));
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/ResumeRebuildIndexTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/ResumeRebuildIndexTest.java
index f133122..6869cde 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/ResumeRebuildIndexTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/ResumeRebuildIndexTest.java
@@ -23,26 +23,26 @@ import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.BreakRebuildIndexConsumer;
-import org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.StopRebuildIndexConsumer;
-import org.apache.ignite.internal.processors.query.aware.IndexRebuildStateStorage;
+import org.apache.ignite.internal.processors.cache.index.IndexingTestUtils.BreakBuildIndexConsumer;
+import org.apache.ignite.internal.processors.cache.index.IndexingTestUtils.StopBuildIndexConsumer;
+import org.apache.ignite.internal.processors.query.aware.IndexBuildStatusHolder;
+import org.apache.ignite.internal.processors.query.aware.IndexBuildStatusStorage;
import org.apache.ignite.internal.util.function.ThrowableFunction;
import org.junit.Test;
import static org.apache.ignite.cluster.ClusterState.ACTIVE;
import static org.apache.ignite.cluster.ClusterState.INACTIVE;
import static org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.prepareBeforeNodeStart;
-import static org.apache.ignite.internal.processors.query.aware.IndexRebuildStateStorage.KEY_PREFIX;
+import static org.apache.ignite.internal.processors.query.aware.IndexBuildStatusStorage.KEY_PREFIX;
import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
import static org.apache.ignite.testframework.GridTestUtils.deleteCacheGrpDir;
-import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
/**
* Class for testing rebuilding index resumes.
*/
public class ResumeRebuildIndexTest extends AbstractRebuildIndexTest {
/**
- * Checking normal flow for {@link IndexRebuildStateStorage}.
+ * Checking normal flow for {@link IndexBuildStatusStorage}.
*
* @throws Exception If failed.
*/
@@ -56,23 +56,23 @@ public class ResumeRebuildIndexTest extends AbstractRebuildIndexTest {
GridCacheContext<?, ?> cacheCtx = n.cachex(DEFAULT_CACHE_NAME).context();
- StopRebuildIndexConsumer stopRebuildIdxConsumer = addStopRebuildIndexConsumer(n, cacheCtx.name());
+ StopBuildIndexConsumer stopRebuildIdxConsumer = addStopRebuildIndexConsumer(n, cacheCtx.name());
dbMgr(n).enableCheckpoints(false).get(getTestTimeout());
assertTrue(forceRebuildIndexes(n, cacheCtx).isEmpty());
IgniteInternalFuture<?> idxRebFut = indexRebuildFuture(n, cacheCtx.cacheId());
- assertFalse(indexRebuildStateStorage(n).completed(cacheCtx.name()));
+ assertFalse(indexBuildStatusStorage(n).rebuildCompleted(cacheCtx.name()));
assertNotNull(metaStorageOperation(n, metaStorage -> metaStorage.read(KEY_PREFIX + cacheCtx.name())));
- stopRebuildIdxConsumer.startRebuildIdxFut.get(getTestTimeout());
- stopRebuildIdxConsumer.finishRebuildIdxFut.onDone();
+ stopRebuildIdxConsumer.startBuildIdxFut.get(getTestTimeout());
+ stopRebuildIdxConsumer.finishBuildIdxFut.onDone();
idxRebFut.get(getTestTimeout());
assertEquals(1_000, stopRebuildIdxConsumer.visitCnt.get());
- assertTrue(indexRebuildStateStorage(n).completed(cacheCtx.name()));
+ assertTrue(indexBuildStatusStorage(n).rebuildCompleted(cacheCtx.name()));
dbMgr(n).enableCheckpoints(true).get(getTestTimeout());
forceCheckpoint();
@@ -81,7 +81,7 @@ public class ResumeRebuildIndexTest extends AbstractRebuildIndexTest {
}
/**
- * Checking the flow in case of an error for {@link IndexRebuildStateStorage}.
+ * Checking the flow in case of an error for {@link IndexBuildStatusStorage}.
*
* @throws Exception If failed.
*/
@@ -95,42 +95,41 @@ public class ResumeRebuildIndexTest extends AbstractRebuildIndexTest {
GridCacheContext<?, ?> cacheCtx = n.cachex(DEFAULT_CACHE_NAME).context();
- BreakRebuildIndexConsumer breakRebuildIdxConsumer =
- addBreakRebuildIndexConsumer(n, cacheCtx.name(), (c, row) -> c.visitCnt.get() > 10);
+ BreakBuildIndexConsumer breakRebuildIdxConsumer = addBreakRebuildIndexConsumer(n, cacheCtx.name(), 10);
assertTrue(forceRebuildIndexes(n, cacheCtx).isEmpty());
IgniteInternalFuture<?> idxRebFut0 = indexRebuildFuture(n, cacheCtx.cacheId());
- assertFalse(indexRebuildStateStorage(n).completed(cacheCtx.name()));
+ assertFalse(indexBuildStatusStorage(n).rebuildCompleted(cacheCtx.name()));
assertNotNull(metaStorageOperation(n, metaStorage -> metaStorage.read(KEY_PREFIX + cacheCtx.name())));
- breakRebuildIdxConsumer.startRebuildIdxFut.get(getTestTimeout());
- breakRebuildIdxConsumer.finishRebuildIdxFut.onDone();
+ breakRebuildIdxConsumer.startBuildIdxFut.get(getTestTimeout());
+ breakRebuildIdxConsumer.finishBuildIdxFut.onDone();
assertThrows(log, () -> idxRebFut0.get(getTestTimeout()), Throwable.class, null);
assertTrue(breakRebuildIdxConsumer.visitCnt.get() < 1_000);
forceCheckpoint();
- assertFalse(indexRebuildStateStorage(n).completed(cacheCtx.name()));
+ assertFalse(indexBuildStatusStorage(n).rebuildCompleted(cacheCtx.name()));
assertNotNull(metaStorageOperation(n, metaStorage -> metaStorage.read(KEY_PREFIX + cacheCtx.name())));
- StopRebuildIndexConsumer stopRebuildIdxConsumer = addStopRebuildIndexConsumer(n, cacheCtx.name());
+ StopBuildIndexConsumer stopRebuildIdxConsumer = addStopRebuildIndexConsumer(n, cacheCtx.name());
dbMgr(n).enableCheckpoints(false).get(getTestTimeout());
assertTrue(forceRebuildIndexes(n, cacheCtx).isEmpty());
IgniteInternalFuture<?> idxRebFut1 = indexRebuildFuture(n, cacheCtx.cacheId());
- assertFalse(indexRebuildStateStorage(n).completed(cacheCtx.name()));
+ assertFalse(indexBuildStatusStorage(n).rebuildCompleted(cacheCtx.name()));
assertNotNull(metaStorageOperation(n, metaStorage -> metaStorage.read(KEY_PREFIX + cacheCtx.name())));
- stopRebuildIdxConsumer.startRebuildIdxFut.get(getTestTimeout());
- stopRebuildIdxConsumer.finishRebuildIdxFut.onDone();
+ stopRebuildIdxConsumer.startBuildIdxFut.get(getTestTimeout());
+ stopRebuildIdxConsumer.finishBuildIdxFut.onDone();
idxRebFut1.get(getTestTimeout());
assertEquals(1_000, stopRebuildIdxConsumer.visitCnt.get());
- assertTrue(indexRebuildStateStorage(n).completed(cacheCtx.name()));
+ assertTrue(indexBuildStatusStorage(n).rebuildCompleted(cacheCtx.name()));
dbMgr(n).enableCheckpoints(true).get(getTestTimeout());
forceCheckpoint();
@@ -139,7 +138,7 @@ public class ResumeRebuildIndexTest extends AbstractRebuildIndexTest {
}
/**
- * Checking the flow in case of an restart node for {@link IndexRebuildStateStorage}.
+ * Checking the flow in case of an restart node for {@link IndexBuildStatusStorage}.
*
* @throws Exception If failed.
*/
@@ -153,42 +152,41 @@ public class ResumeRebuildIndexTest extends AbstractRebuildIndexTest {
GridCacheContext<?, ?> cacheCtx = n.cachex(DEFAULT_CACHE_NAME).context();
- BreakRebuildIndexConsumer breakRebuildIdxConsumer =
- addBreakRebuildIndexConsumer(n, cacheCtx.name(), (c, row) -> c.visitCnt.get() > 10);
+ BreakBuildIndexConsumer breakRebuildIdxConsumer = addBreakRebuildIndexConsumer(n, cacheCtx.name(), 10);
assertTrue(forceRebuildIndexes(n, cacheCtx).isEmpty());
IgniteInternalFuture<?> idxRebFut0 = indexRebuildFuture(n, cacheCtx.cacheId());
- breakRebuildIdxConsumer.startRebuildIdxFut.get(getTestTimeout());
- breakRebuildIdxConsumer.finishRebuildIdxFut.onDone();
+ breakRebuildIdxConsumer.startBuildIdxFut.get(getTestTimeout());
+ breakRebuildIdxConsumer.finishBuildIdxFut.onDone();
assertThrows(log, () -> idxRebFut0.get(getTestTimeout()), Throwable.class, null);
forceCheckpoint();
- assertFalse(indexRebuildStateStorage(n).completed(cacheCtx.name()));
+ assertFalse(indexBuildStatusStorage(n).rebuildCompleted(cacheCtx.name()));
assertNotNull(metaStorageOperation(n, metaStorage -> metaStorage.read(KEY_PREFIX + cacheCtx.name())));
stopAllGrids();
- StopRebuildIndexConsumer stopRebuildIdxConsumer = addStopRebuildIndexConsumer(n, cacheCtx.name());
+ StopBuildIndexConsumer stopRebuildIdxConsumer = addStopRebuildIndexConsumer(n, cacheCtx.name());
prepareBeforeNodeStart();
n = startGrid(0);
- assertFalse(indexRebuildStateStorage(n).completed(cacheCtx.name()));
+ assertFalse(indexBuildStatusStorage(n).rebuildCompleted(cacheCtx.name()));
assertNotNull(metaStorageOperation(n, metaStorage -> metaStorage.read(KEY_PREFIX + cacheCtx.name())));
- stopRebuildIdxConsumer.startRebuildIdxFut.get(getTestTimeout());
+ stopRebuildIdxConsumer.startBuildIdxFut.get(getTestTimeout());
IgniteInternalFuture<?> idxRebFut1 = indexRebuildFuture(n, cacheCtx.cacheId());
dbMgr(n).enableCheckpoints(false).get(getTestTimeout());
- stopRebuildIdxConsumer.finishRebuildIdxFut.onDone();
+ stopRebuildIdxConsumer.finishBuildIdxFut.onDone();
idxRebFut1.get(getTestTimeout());
assertEquals(1_000, stopRebuildIdxConsumer.visitCnt.get());
- assertTrue(indexRebuildStateStorage(n).completed(cacheCtx.name()));
+ assertTrue(indexBuildStatusStorage(n).rebuildCompleted(cacheCtx.name()));
assertNotNull(metaStorageOperation(n, metaStorage -> metaStorage.read(KEY_PREFIX + cacheCtx.name())));
dbMgr(n).enableCheckpoints(true).get(getTestTimeout());
@@ -294,8 +292,7 @@ public class ResumeRebuildIndexTest extends AbstractRebuildIndexTest {
populate(n1.getOrCreateCache(cacheCfg(cacheName, grpName)), 10_000);
- BreakRebuildIndexConsumer breakRebuildIdxConsumer =
- addBreakRebuildIndexConsumer(n1, cacheName, (c, r) -> c.visitCnt.get() > 10);
+ BreakBuildIndexConsumer breakRebuildIdxConsumer = addBreakRebuildIndexConsumer(n1, cacheName, 10);
IgniteInternalCache<?, ?> cachex = n1.cachex(cacheName);
@@ -305,25 +302,25 @@ public class ResumeRebuildIndexTest extends AbstractRebuildIndexTest {
IgniteInternalFuture<?> rebIdxFut = indexRebuildFuture(n1, cachex.context().cacheId());
- breakRebuildIdxConsumer.startRebuildIdxFut.get(getTestTimeout());
- breakRebuildIdxConsumer.finishRebuildIdxFut.onDone();
+ breakRebuildIdxConsumer.startBuildIdxFut.get(getTestTimeout());
+ breakRebuildIdxConsumer.finishBuildIdxFut.onDone();
assertThrows(log, () -> rebIdxFut.get(getTestTimeout()), Throwable.class, null);
assertTrue(breakRebuildIdxConsumer.visitCnt.get() < cacheSize);
}
n1.destroyCache(DEFAULT_CACHE_NAME + 0);
- assertTrue(indexRebuildStateStorage(n1).completed(DEFAULT_CACHE_NAME + 0));
+ assertTrue(indexBuildStatusStorage(n1).rebuildCompleted(DEFAULT_CACHE_NAME + 0));
n1.destroyCache(DEFAULT_CACHE_NAME + 1);
- assertTrue(indexRebuildStateStorage(n1).completed(DEFAULT_CACHE_NAME + 1));
+ assertTrue(indexBuildStatusStorage(n1).rebuildCompleted(DEFAULT_CACHE_NAME + 1));
- assertFalse(indexRebuildStateStorage(n1).completed(DEFAULT_CACHE_NAME + 2));
- assertFalse(indexRebuildStateStorage(n1).completed(DEFAULT_CACHE_NAME + 3));
+ assertFalse(indexBuildStatusStorage(n1).rebuildCompleted(DEFAULT_CACHE_NAME + 2));
+ assertFalse(indexBuildStatusStorage(n1).rebuildCompleted(DEFAULT_CACHE_NAME + 3));
forceCheckpoint(n1);
- ConcurrentMap<String, Object> states = getFieldValue(indexRebuildStateStorage(n1), "states");
+ ConcurrentMap<String, IndexBuildStatusHolder> states = statuses(n1);
assertFalse(states.containsKey(DEFAULT_CACHE_NAME + 0));
assertFalse(states.containsKey(DEFAULT_CACHE_NAME + 1));
@@ -344,12 +341,12 @@ public class ResumeRebuildIndexTest extends AbstractRebuildIndexTest {
n1 = startGrid(getTestIgniteInstanceName(1));
- assertTrue(indexRebuildStateStorage(n1).completed(DEFAULT_CACHE_NAME + 2));
- assertTrue(indexRebuildStateStorage(n1).completed(DEFAULT_CACHE_NAME + 3));
+ assertTrue(indexBuildStatusStorage(n1).rebuildCompleted(DEFAULT_CACHE_NAME + 2));
+ assertTrue(indexBuildStatusStorage(n1).rebuildCompleted(DEFAULT_CACHE_NAME + 3));
forceCheckpoint(n1);
- states = getFieldValue(indexRebuildStateStorage(n1), "states");
+ states = statuses(n1);
assertFalse(states.containsKey(DEFAULT_CACHE_NAME + 2));
assertFalse(states.containsKey(DEFAULT_CACHE_NAME + 3));
@@ -394,21 +391,20 @@ public class ResumeRebuildIndexTest extends AbstractRebuildIndexTest {
GridCacheContext<?, ?> cacheCtx0 = cachex0.context();
GridCacheContext<?, ?> cacheCtx1 = cachex1.context();
- BreakRebuildIndexConsumer breakRebuildIdxConsumer =
- addBreakRebuildIndexConsumer(n, cacheCtx0.name(), (c, row) -> c.visitCnt.get() >= 10);
+ BreakBuildIndexConsumer breakRebuildIdxConsumer = addBreakRebuildIndexConsumer(n, cacheCtx0.name(), 10);
- StopRebuildIndexConsumer stopRebuildIdxConsumer0 = addStopRebuildIndexConsumer(n, cacheCtx1.name());
+ StopBuildIndexConsumer stopRebuildIdxConsumer0 = addStopRebuildIndexConsumer(n, cacheCtx1.name());
assertTrue(forceRebuildIndexes(n, cacheCtx0, cacheCtx1).isEmpty());
IgniteInternalFuture<?> rebIdxFut0 = indexRebuildFuture(n, cacheCtx0.cacheId());
IgniteInternalFuture<?> rebIdxFut1 = indexRebuildFuture(n, cacheCtx1.cacheId());
- breakRebuildIdxConsumer.startRebuildIdxFut.get(getTestTimeout());
- breakRebuildIdxConsumer.finishRebuildIdxFut.onDone();
+ breakRebuildIdxConsumer.startBuildIdxFut.get(getTestTimeout());
+ breakRebuildIdxConsumer.finishBuildIdxFut.onDone();
- stopRebuildIdxConsumer0.startRebuildIdxFut.get(getTestTimeout());
- stopRebuildIdxConsumer0.finishRebuildIdxFut.onDone();
+ stopRebuildIdxConsumer0.startBuildIdxFut.get(getTestTimeout());
+ stopRebuildIdxConsumer0.finishBuildIdxFut.onDone();
assertThrows(log, () -> rebIdxFut0.get(getTestTimeout()), Throwable.class, null);
assertTrue(breakRebuildIdxConsumer.visitCnt.get() < cacheSize0);
@@ -416,7 +412,7 @@ public class ResumeRebuildIndexTest extends AbstractRebuildIndexTest {
rebIdxFut1.get(getTestTimeout());
assertEquals(cacheSize1, stopRebuildIdxConsumer0.visitCnt.get());
- StopRebuildIndexConsumer stopRebuildIdxConsumer1 = addStopRebuildIndexConsumer(n, cacheCtx0.name());
+ StopBuildIndexConsumer stopRebuildIdxConsumer1 = addStopRebuildIndexConsumer(n, cacheCtx0.name());
stopRebuildIdxConsumer0.resetFutures();
forceCheckpoint();
@@ -426,16 +422,16 @@ public class ResumeRebuildIndexTest extends AbstractRebuildIndexTest {
IgniteInternalFuture<?> rebIdxFut01 = indexRebuildFuture(n, cacheCtx0.cacheId());
IgniteInternalFuture<?> rebIdxFut11 = indexRebuildFuture(n, cacheCtx1.cacheId());
- stopRebuildIdxConsumer1.startRebuildIdxFut.get(getTestTimeout());
- stopRebuildIdxConsumer1.finishRebuildIdxFut.onDone();
+ stopRebuildIdxConsumer1.startBuildIdxFut.get(getTestTimeout());
+ stopRebuildIdxConsumer1.finishBuildIdxFut.onDone();
assertThrows(
log,
- () -> stopRebuildIdxConsumer0.startRebuildIdxFut.get(1_000),
+ () -> stopRebuildIdxConsumer0.startBuildIdxFut.get(1_000),
IgniteFutureTimeoutCheckedException.class,
null
);
- stopRebuildIdxConsumer0.finishRebuildIdxFut.onDone();
+ stopRebuildIdxConsumer0.finishBuildIdxFut.onDone();
rebIdxFut01.get(getTestTimeout());
assertEquals(cacheSize0, stopRebuildIdxConsumer1.visitCnt.get());
@@ -443,14 +439,4 @@ public class ResumeRebuildIndexTest extends AbstractRebuildIndexTest {
assertNull(rebIdxFut11);
assertEquals(cacheSize1, stopRebuildIdxConsumer0.visitCnt.get());
}
-
- /**
- * Getting {@code GridQueryProcessor#idxRebuildStateStorage}.
- *
- * @param n Node.
- * @return Index rebuild state storage.
- */
- private IndexRebuildStateStorage indexRebuildStateStorage(IgniteEx n) {
- return getFieldValue(n.context().query(), "idxRebuildStateStorage");
- }
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/StopRebuildIndexTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/StopRebuildIndexTest.java
index 27ea528..48e0e28 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/StopRebuildIndexTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/StopRebuildIndexTest.java
@@ -23,7 +23,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.indexing.IndexesRebuildTask;
import org.apache.ignite.internal.processors.cache.CacheMetricsImpl;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.StopRebuildIndexConsumer;
+import org.apache.ignite.internal.processors.cache.index.IndexingTestUtils.StopBuildIndexConsumer;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheCompoundFuture;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheFuture;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheStat;
@@ -38,8 +38,8 @@ import org.junit.Test;
import static org.apache.ignite.cluster.ClusterState.INACTIVE;
import static org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.addCacheRebuildRunner;
import static org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.addCacheRowConsumer;
-import static org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.nodeName;
import static org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.prepareBeforeNodeStart;
+import static org.apache.ignite.internal.processors.cache.index.IndexingTestUtils.nodeName;
import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
import static org.apache.ignite.testframework.GridTestUtils.getFieldValueHierarchy;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
@@ -149,7 +149,7 @@ public class StopRebuildIndexTest extends AbstractRebuildIndexTest {
() -> assertNull(internalIndexRebuildFuture(n, cacheCtx.cacheId()))
);
- StopRebuildIndexConsumer stopRebuildIdxConsumer = addStopRebuildIndexConsumer(n, cacheCtx.name());
+ StopBuildIndexConsumer stopRebuildIdxConsumer = addStopRebuildIndexConsumer(n, cacheCtx.name());
forceRebuildIndexes(n, cacheCtx);
@@ -159,13 +159,13 @@ public class StopRebuildIndexTest extends AbstractRebuildIndexTest {
SchemaIndexCacheFuture rebFut1 = internalIndexRebuildFuture(n, cacheCtx.cacheId());
assertNotNull(rebFut1);
- stopRebuildIdxConsumer.startRebuildIdxFut.get(getTestTimeout());
+ stopRebuildIdxConsumer.startBuildIdxFut.get(getTestTimeout());
assertFalse(rebFut0.isDone());
assertFalse(rebFut1.isDone());
assertFalse(rebFut1.cancelToken().isCancelled());
- stopRebuildIdxConsumer.finishRebuildIdxFut.onDone();
+ stopRebuildIdxConsumer.finishBuildIdxFut.onDone();
rebFut0.get(getTestTimeout());
rebFut1.get(getTestTimeout());
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java
index 61e513b..c4cda6c 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java
@@ -265,6 +265,8 @@ public class IgnitePdsIndexingDefragmentationTest extends IgnitePdsDefragmentati
CacheGroupContext grp = grid(0).context().cache().cacheGroup(CU.cacheId(cacheName));
+ forceCheckpoint();
+
// Restart first time.
stopGrid(0);
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotWithIndexesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotWithIndexesTest.java
index d45ba0e..89ec92c 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotWithIndexesTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotWithIndexesTest.java
@@ -71,6 +71,8 @@ public class IgniteClusterSnapshotWithIndexesTest extends AbstractSnapshotSelfTe
"primary key (id, name)) WITH \"cache_name=" + tblName + "\"");
executeSql(ignite, "CREATE INDEX ON " + tblName + "(city, age)");
+ forceCheckpoint();
+
for (int i = 0; i < CACHE_KEYS_RANGE; i++)
executeSql(ignite, "INSERT INTO " + tblName + " (id, name, age, city) VALUES(?, 'name', 3, 'city')", i);
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
index 982ed83..aeb0460 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
@@ -22,6 +22,7 @@ import org.apache.ignite.internal.processors.cache.IgnitePdsSingleNodeWithIndexi
import org.apache.ignite.internal.processors.cache.IgnitePdsSingleNodeWithIndexingPutGetPersistenceTest;
import org.apache.ignite.internal.processors.cache.index.ClientReconnectWithSqlTableConfiguredTest;
import org.apache.ignite.internal.processors.cache.index.ForceRebuildIndexTest;
+import org.apache.ignite.internal.processors.cache.index.ResumeCreateIndexTest;
import org.apache.ignite.internal.processors.cache.index.ResumeRebuildIndexTest;
import org.apache.ignite.internal.processors.cache.index.StopRebuildIndexTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsIndexingDefragmentationTest;
@@ -70,7 +71,8 @@ import org.junit.runners.Suite;
StopRebuildIndexTest.class,
ForceRebuildIndexTest.class,
IgniteClusterSnapshotRestoreWithIndexingTest.class,
- ResumeRebuildIndexTest.class
+ ResumeRebuildIndexTest.class,
+ ResumeCreateIndexTest.class
})
public class IgnitePdsWithIndexingTestSuite {
}