You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2021/03/26 13:04:36 UTC
[ignite] branch master updated: IGNITE-14254 Graceful stop
rebuilding indexes on a cluster deactivation (#8837)
This is an automated email from the ASF dual-hosted git repository.
tledkov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new b3b7416 IGNITE-14254 Graceful stop rebuilding indexes on a cluster deactivation (#8837)
b3b7416 is described below
commit b3b741628079e07929142f067bf73ae54b917226
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Fri Mar 26 16:04:14 2021 +0300
IGNITE-14254 Graceful stop rebuilding indexes on a cluster deactivation (#8837)
---
.../GridCommandHandlerIndexForceRebuildTest.java | 142 +++++--
.../GridCommandHandlerIndexRebuildStatusTest.java | 8 +-
.../processors/cache/tree/AbstractDataLeafIO.java | 2 +-
.../processors/query/GridQueryProcessor.java | 42 +-
.../schema/SchemaIndexCacheCompoundFuture.java | 49 +++
.../query/schema/SchemaIndexCacheFuture.java | 46 ++
.../schema/SchemaIndexCachePartitionWorker.java | 20 +-
.../query/schema/SchemaIndexCacheVisitorImpl.java | 7 +-
.../SchemaIndexOperationCancellationException.java | 36 ++
.../processors/query/h2/IgniteH2Indexing.java | 44 +-
.../cache/index/AbstractIndexingCommonTest.java | 6 +-
.../cache/index/StopRebuildIndexTest.java | 461 +++++++++++++++++++++
.../persistence/db/wal/IgniteWalRecoveryTest.java | 3 +-
.../query/h2/GridIndexRebuildSelfTest.java | 6 +-
.../testsuites/IgnitePdsWithIndexingTestSuite.java | 4 +-
15 files changed, 784 insertions(+), 92 deletions(-)
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexForceRebuildTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexForceRebuildTest.java
index 733ec74..9cce435 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexForceRebuildTest.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexForceRebuildTest.java
@@ -19,22 +19,28 @@ package org.apache.ignite.util;
import java.util.Collection;
import java.util.Collections;
-import java.util.Random;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
-import org.apache.ignite.IgniteCache;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
+import org.apache.ignite.internal.processors.query.GridQueryIndexing;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheFuture;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
-import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexOperationCancellationToken;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
@@ -47,6 +53,9 @@ import static java.lang.String.valueOf;
import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_INVALID_ARGUMENTS;
import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK;
import static org.apache.ignite.testframework.GridTestUtils.assertContains;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
import static org.apache.ignite.util.GridCommandHandlerIndexingUtils.breakSqlIndex;
import static org.apache.ignite.util.GridCommandHandlerIndexingUtils.complexIndexEntity;
import static org.apache.ignite.util.GridCommandHandlerIndexingUtils.createAndFillCache;
@@ -87,10 +96,11 @@ public class GridCommandHandlerIndexForceRebuildTest extends GridCommandHandlerA
private static final int LAST_NODE_NUM = GRIDS_NUM - 1;
/**
- * Set containing names of caches for which index rebuild should be blocked.
- * See {@link BlockingIndexing}.
+ * Map for blocking index rebuilds in a {@link BlockingIndexing}.
+ * To stop blocking, need to delete the entry.
+ * Mapping: cache name -> future start blocking rebuilding indexes.
*/
- private static Set<String> cacheNamesBlockedIdxRebuild = new GridConcurrentHashSet<>();
+ private static final Map<String, GridFutureAdapter<Void>> blockRebuildIdx = new ConcurrentHashMap<>();
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
@@ -122,7 +132,7 @@ public class GridCommandHandlerIndexForceRebuildTest extends GridCommandHandlerA
@Override protected void afterTest() throws Exception {
super.afterTest();
- cacheNamesBlockedIdxRebuild.clear();
+ blockRebuildIdx.clear();
}
/** */
@@ -196,7 +206,7 @@ public class GridCommandHandlerIndexForceRebuildTest extends GridCommandHandlerA
*/
@Test
public void testCacheNamesArg() throws Exception {
- cacheNamesBlockedIdxRebuild.add(CACHE_NAME_2_1);
+ blockRebuildIdx.put(CACHE_NAME_2_1, new GridFutureAdapter<>());
injectTestSystemOut();
@@ -215,7 +225,7 @@ public class GridCommandHandlerIndexForceRebuildTest extends GridCommandHandlerA
"--node-id", grid(LAST_NODE_NUM).localNode().id().toString(),
"--cache-names", CACHE_NAME_1_1 + "," + CACHE_NAME_2_1 + "," + CACHE_NAME_NON_EXISTING));
- cacheNamesBlockedIdxRebuild.remove(CACHE_NAME_2_1);
+ blockRebuildIdx.remove(CACHE_NAME_2_1);
waitForIndexesRebuild(grid(LAST_NODE_NUM));
@@ -230,7 +240,7 @@ public class GridCommandHandlerIndexForceRebuildTest extends GridCommandHandlerA
assertFalse(cache2Lsnr.check());
}
finally {
- cacheNamesBlockedIdxRebuild.remove(CACHE_NAME_2_1);
+ blockRebuildIdx.remove(CACHE_NAME_2_1);
for (int i = 0; i < GRIDS_NUM; i++) {
removeLogListener(grid(i), cache1Listeners[i]);
@@ -247,7 +257,7 @@ public class GridCommandHandlerIndexForceRebuildTest extends GridCommandHandlerA
*/
@Test
public void testGroupNamesArg() throws Exception {
- cacheNamesBlockedIdxRebuild.add(CACHE_NAME_1_2);
+ blockRebuildIdx.put(CACHE_NAME_1_2, new GridFutureAdapter<>());
injectTestSystemOut();
@@ -266,7 +276,7 @@ public class GridCommandHandlerIndexForceRebuildTest extends GridCommandHandlerA
"--node-id", grid(LAST_NODE_NUM).localNode().id().toString(),
"--group-names", GRP_NAME_1 + "," + GRP_NAME_2 + "," + GRP_NAME_NON_EXISTING));
- cacheNamesBlockedIdxRebuild.remove(CACHE_NAME_1_2);
+ blockRebuildIdx.remove(CACHE_NAME_1_2);
waitForIndexesRebuild(grid(LAST_NODE_NUM));
@@ -280,7 +290,7 @@ public class GridCommandHandlerIndexForceRebuildTest extends GridCommandHandlerA
assertFalse(cache2Lsnr.check());
}
finally {
- cacheNamesBlockedIdxRebuild.remove(CACHE_NAME_1_2);
+ blockRebuildIdx.remove(CACHE_NAME_1_2);
for (int i = 0; i < GRIDS_NUM; i++) {
removeLogListener(grid(i), cache1Listeners[i]);
@@ -315,8 +325,8 @@ public class GridCommandHandlerIndexForceRebuildTest extends GridCommandHandlerA
*/
@Test
public void testAsyncIndexesRebuild() throws IgniteInterruptedCheckedException {
- cacheNamesBlockedIdxRebuild.add(CACHE_NAME_1_1);
- cacheNamesBlockedIdxRebuild.add(CACHE_NAME_1_2);
+ blockRebuildIdx.put(CACHE_NAME_1_1, new GridFutureAdapter<>());
+ blockRebuildIdx.put(CACHE_NAME_1_2, new GridFutureAdapter<>());
assertEquals(EXIT_CODE_OK, execute("--cache", "indexes_force_rebuild",
"--node-id", grid(0).localNode().id().toString(),
@@ -332,65 +342,88 @@ public class GridCommandHandlerIndexForceRebuildTest extends GridCommandHandlerA
assertTrue("Failed to wait for index rebuild start for second cache.",
GridTestUtils.waitForCondition(() -> getActiveRebuildCaches(grid(0)).size() == 2, 10_000));
- cacheNamesBlockedIdxRebuild.clear();
+ blockRebuildIdx.clear();
assertTrue("Failed to wait for final index rebuild.", waitForIndexesRebuild(grid(0)));
}
/**
* Checks how index force rebuild command behaves when caches are under load.
+ *
+ * @throws Exception If failed.
*/
@Test
- public void testIndexRebuildUnderLoad() throws IgniteInterruptedCheckedException {
- IgniteEx ignite = grid(0);
+ public void testIndexRebuildUnderLoad() throws Exception {
+ IgniteEx n = grid(0);
AtomicBoolean stopLoad = new AtomicBoolean(false);
- Random rand = new Random();
+ String cacheName1 = "tmpCache1";
+ String cacheName2 = "tmpCache2";
- final String cacheName1 = "tmpCache1";
- final String cacheName2 = "tmpCache2";
- final String grpName = "tmpGrp";
+ List<String> caches = F.asList(cacheName1, cacheName2);
try {
- createAndFillCache(ignite, cacheName1, grpName);
- createAndFillCache(ignite, cacheName2, grpName);
+ for (String c : caches)
+ createAndFillCache(n, c, "tmpGrp");
- IgniteCache<Long, Person> cache1 = ignite.cache(cacheName1);
+ int cacheSize = n.cache(cacheName1).size();
- cacheNamesBlockedIdxRebuild.add(cacheName1);
- cacheNamesBlockedIdxRebuild.add(cacheName2);
+ for (String c : caches)
+ blockRebuildIdx.put(c, new GridFutureAdapter<>());
assertEquals(EXIT_CODE_OK, execute("--cache", "indexes_force_rebuild",
- "--node-id", ignite.localNode().id().toString(),
+ "--node-id", n.localNode().id().toString(),
"--cache-names", cacheName1 + "," + cacheName2));
- GridTestUtils.runAsync(() -> {
+ IgniteInternalFuture<?> putCacheFut = runAsync(() -> {
+ ThreadLocalRandom r = ThreadLocalRandom.current();
+
while (!stopLoad.get())
- cache1.put(rand.nextLong(), new Person(rand.nextInt(), valueOf(rand.nextLong())));
+ n.cache(cacheName1).put(r.nextInt(), new Person(r.nextInt(), valueOf(r.nextLong())));
});
- ignite.destroyCache(cacheName2);
+ assertTrue(waitForCondition(() -> n.cache(cacheName1).size() > cacheSize, getTestTimeout()));
- U.sleep(2000);
+ for (String c : caches) {
+ IgniteInternalFuture<?> rebIdxFut = n.context().query().indexRebuildFuture(CU.cacheId(c));
+ assertNotNull(rebIdxFut);
+ assertFalse(rebIdxFut.isDone());
+
+ blockRebuildIdx.get(c).get(getTestTimeout());
+ }
+
+ IgniteInternalFuture<Boolean> destroyCacheFut = n.context().cache()
+ .dynamicDestroyCache(cacheName2, false, true, false, null);
+
+ SchemaIndexCacheFuture intlRebIdxFut = schemaIndexCacheFuture(n, CU.cacheId(cacheName2));
+ assertNotNull(intlRebIdxFut);
+
+ assertTrue(waitForCondition(intlRebIdxFut.cancelToken()::isCancelled, getTestTimeout()));
stopLoad.set(true);
- cacheNamesBlockedIdxRebuild.clear();
+ blockRebuildIdx.clear();
- waitForIndexesRebuild(ignite);
+ waitForIndexesRebuild(n);
injectTestSystemOut();
assertEquals(EXIT_CODE_OK, execute("--cache", "validate_indexes", "--check-crc", cacheName1));
assertContains(log, testOut.toString(), "no issues found.");
+
+ intlRebIdxFut.get(getTestTimeout());
+ destroyCacheFut.get(getTestTimeout());
+ putCacheFut.get(getTestTimeout());
}
finally {
stopLoad.set(true);
- ignite.destroyCache(cacheName1);
- ignite.destroyCache(cacheName2);
+ blockRebuildIdx.clear();
+
+ n.destroyCache(cacheName1);
+ n.destroyCache(cacheName2);
}
}
@@ -560,9 +593,13 @@ public class GridCommandHandlerIndexForceRebuildTest extends GridCommandHandlerA
*/
private static class BlockingIndexing extends IgniteH2Indexing {
/** {@inheritDoc} */
- @Override protected void rebuildIndexesFromHash0(GridCacheContext cctx, SchemaIndexCacheVisitorClosure clo, GridFutureAdapter<Void> rebuildIdxFut)
- {
- super.rebuildIndexesFromHash0(cctx, clo, new BlockingRebuildIdxFuture(rebuildIdxFut, cctx));
+ @Override protected void rebuildIndexesFromHash0(
+ GridCacheContext cctx,
+ SchemaIndexCacheVisitorClosure clo,
+ GridFutureAdapter<Void> rebuildIdxFut,
+ SchemaIndexOperationCancellationToken cancel
+ ) {
+ super.rebuildIndexesFromHash0(cctx, clo, new BlockingRebuildIdxFuture(rebuildIdxFut, cctx), cancel);
}
}
@@ -585,8 +622,14 @@ public class GridCommandHandlerIndexForceRebuildTest extends GridCommandHandlerA
/** {@inheritDoc} */
@Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) {
try {
- assertTrue("Failed to wait for indexes rebuild unblocking",
- GridTestUtils.waitForCondition(() -> !cacheNamesBlockedIdxRebuild.contains(cctx.name()), 60_000));
+ GridFutureAdapter<Void> fut = blockRebuildIdx.get(cctx.name());
+
+ if (fut != null) {
+ fut.onDone();
+
+ assertTrue("Failed to wait for indexes rebuild unblocking",
+ GridTestUtils.waitForCondition(() -> !blockRebuildIdx.containsKey(cctx.name()), 60_000));
+ }
}
catch (IgniteInterruptedCheckedException e) {
fail("Waiting for indexes rebuild unblocking was interrupted");
@@ -595,4 +638,19 @@ public class GridCommandHandlerIndexForceRebuildTest extends GridCommandHandlerA
return original.onDone(res, err);
}
}
+
+ /**
+ * Getting internal index rebuild future for cache.
+ *
+ * @param n Node.
+ * @param cacheId Cache id.
+ * @return Internal index rebuild future.
+ */
+ @Nullable private SchemaIndexCacheFuture schemaIndexCacheFuture(IgniteEx n, int cacheId) {
+ GridQueryIndexing indexing = n.context().query().getIndexing();
+
+ Map<Integer, SchemaIndexCacheFuture> idxRebuildFuts = getFieldValue(indexing, "idxRebuildFuts");
+
+ return idxRebuildFuts.get(cacheId);
+ }
}
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexRebuildStatusTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexRebuildStatusTest.java
index 661ab93..9b391cd 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexRebuildStatusTest.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexRebuildStatusTest.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexOperationCancellationToken;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.visor.cache.index.IndexRebuildStatusInfoContainer;
@@ -245,13 +246,14 @@ public class GridCommandHandlerIndexRebuildStatusTest extends GridCommandHandler
@Override protected void rebuildIndexesFromHash0(
GridCacheContext cctx,
SchemaIndexCacheVisitorClosure clo,
- GridFutureAdapter<Void> rebuildIdxFut)
- {
+ GridFutureAdapter<Void> rebuildIdxFut,
+ SchemaIndexOperationCancellationToken cancel
+ ) {
idxRebuildsStartedNum.incrementAndGet();
rebuildIdxFut.listen((CI1<IgniteInternalFuture<?>>)f -> idxRebuildsStartedNum.decrementAndGet());
- super.rebuildIndexesFromHash0(cctx, new BlockingSchemaIndexCacheVisitorClosure(clo), rebuildIdxFut);
+ super.rebuildIndexesFromHash0(cctx, new BlockingSchemaIndexCacheVisitorClosure(clo), rebuildIdxFut, cancel);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
index f0e5c90e..0d2a5d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
@@ -167,7 +167,7 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> imp
/** {@inheritDoc} */
@Override public final long getLink(long pageAddr, int idx) {
- assert idx < getCount(pageAddr) : idx;
+ assert idx < getCount(pageAddr) : "idx=" + idx + ", cnt=" + getCount(pageAddr);
return PageUtils.getLong(pageAddr, offset(idx));
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 0ee93c5..1bd6606 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -71,6 +71,7 @@ import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.ExchangeActions;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
@@ -112,6 +113,7 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.lang.GridClosureException;
+import org.apache.ignite.internal.util.lang.GridPlainOutClosure;
import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
@@ -2419,11 +2421,15 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
+ * Getting the cache object context.
+ *
* @param cacheName Cache name.
- * @return Cache object context.
+ * @return Cache object context, {@code null} if the cache was not found.
*/
- private CacheObjectContext cacheObjectContext(String cacheName) {
- return ctx.cache().internalCache(cacheName).context().cacheObjectContext();
+ @Nullable private CacheObjectContext cacheObjectContext(String cacheName) {
+ GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName);
+
+ return cache == null ? null : cache.context().cacheObjectContext();
}
/**
@@ -2600,17 +2606,21 @@ public class GridQueryProcessor extends GridProcessorAdapter {
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("ConstantConditions")
- private QueryTypeDescriptorImpl type(@Nullable String cacheName, CacheObject val) throws IgniteCheckedException {
- CacheObjectContext coctx = cacheObjectContext(cacheName);
-
+ private QueryTypeDescriptorImpl type(String cacheName, CacheObject val) throws IgniteCheckedException {
QueryTypeIdKey id;
boolean binaryVal = ctx.cacheObjects().isBinaryObject(val);
if (binaryVal)
id = new QueryTypeIdKey(cacheName, ctx.cacheObjects().typeId(val));
- else
+ else {
+ CacheObjectContext coctx = cacheObjectContext(cacheName);
+
+ if (coctx == null)
+ throw new IgniteCheckedException("Object context for cache not found: " + cacheName);
+
id = new QueryTypeIdKey(cacheName, val.value(coctx, false).getClass());
+ }
return types.get(id);
}
@@ -2863,7 +2873,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
* @param supplier Code to be executed.
* @return Result.
*/
- private <T> T executeQuerySafe(@Nullable final GridCacheContext<?, ?> cctx, SupplierX<T> supplier) {
+ private <T> T executeQuerySafe(@Nullable final GridCacheContext<?, ?> cctx, GridPlainOutClosure<T> supplier) {
GridCacheContext oldCctx = curCache.get();
curCache.set(cctx);
@@ -2872,7 +2882,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
throw new IllegalStateException("Failed to execute query (grid is stopping).");
try {
- return supplier.get();
+ return supplier.apply();
}
catch (IgniteCheckedException e) {
throw new CacheException(e);
@@ -3787,18 +3797,4 @@ public class GridQueryProcessor extends GridProcessorAdapter {
this.mgr = mgr;
}
}
-
- /**
- * Function which can throw exception.
- */
- @FunctionalInterface
- private interface SupplierX<T> {
- /**
- * Get value.
- *
- * @return Value.
- * @throws IgniteCheckedException If failed.
- */
- T get() throws IgniteCheckedException;
- }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheCompoundFuture.java
new file mode 100644
index 0000000..16a83ca
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheCompoundFuture.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema;
+
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+
+/**
+ * Compound index rebuilding feature.
+ * Waits for all internal features to complete, even if they throw exceptions.
+ * In this case, {@link #error()} will return the first thrown exception.
+ */
+public class SchemaIndexCacheCompoundFuture extends GridCompoundFuture<SchemaIndexCacheStat, SchemaIndexCacheStat> {
+ /** Container for the first index rebuild error. */
+ private final AtomicReference<Throwable> errRef = new AtomicReference<>();
+
+ /** {@inheritDoc} */
+ @Override protected boolean ignoreFailure(Throwable err) {
+ errRef.compareAndSet(null, err);
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Throwable error() {
+ Throwable err0 = super.error();
+ Throwable err1 = errRef.get();
+
+ if (err0 != null && err1 != null)
+ err0.addSuppressed(err1);
+
+ return err0 != null ? err0 : err1;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheFuture.java
new file mode 100644
index 0000000..28cfb5c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheFuture.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema;
+
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+
+/**
+ * Extending {@link GridFutureAdapter} to rebuild indices.
+ */
+public class SchemaIndexCacheFuture extends GridFutureAdapter<Void> {
+ /** Token for canceling index rebuilding. */
+ private final SchemaIndexOperationCancellationToken cancelTok;
+
+ /**
+ * Constructor.
+ *
+ * @param cancelTok Token for canceling index rebuilding.
+ */
+ public SchemaIndexCacheFuture(SchemaIndexOperationCancellationToken cancelTok) {
+ this.cancelTok = cancelTok;
+ }
+
+ /**
+ * Getting token for canceling index rebuilding.
+ *
+ * @return Cancellation token.
+ */
+ public SchemaIndexOperationCancellationToken cancelToken() {
+ return cancelTok;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCachePartitionWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCachePartitionWorker.java
index 18aa8f9..e6e9f16 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCachePartitionWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCachePartitionWorker.java
@@ -147,7 +147,7 @@ public class SchemaIndexCachePartitionWorker extends GridWorker {
* @throws IgniteCheckedException If failed.
*/
private void processPartition() throws IgniteCheckedException {
- if (stop.get() || stopNode())
+ if (stop())
return;
checkCancelled();
@@ -174,7 +174,7 @@ public class SchemaIndexCachePartitionWorker extends GridWorker {
try {
int cntr = 0;
- while (cursor.next() && !stop.get() && !stopNode()) {
+ while (!stop() && cursor.next()) {
KeyCacheObject key = cursor.get().key();
if (!locked) {
@@ -221,7 +221,7 @@ public class SchemaIndexCachePartitionWorker extends GridWorker {
private void processKey(KeyCacheObject key) throws IgniteCheckedException {
assert nonNull(key);
- while (true) {
+ while (!stop()) {
try {
checkCancelled();
@@ -248,20 +248,20 @@ public class SchemaIndexCachePartitionWorker extends GridWorker {
/**
* Check if visit process is not cancelled.
*
- * @throws IgniteCheckedException If cancelled.
+ * @throws SchemaIndexOperationCancellationException If cancelled.
*/
- private void checkCancelled() throws IgniteCheckedException {
+ private void checkCancelled() throws SchemaIndexOperationCancellationException {
if (nonNull(cancel) && cancel.isCancelled())
- throw new IgniteCheckedException("Index creation was cancelled.");
+ throw new SchemaIndexOperationCancellationException("Index creation was cancelled.");
}
/**
- * Returns node in the process of stopping or not.
+ * Check if index rebuilding needs to be stopped.
*
- * @return {@code True} if node is in the process of stopping.
+ * @return {@code True} if necessary to stop rebuilding indexes.
*/
- private boolean stopNode() {
- return cctx.kernalContext().isStopping();
+ private boolean stop() {
+ return stop.get() || cctx.kernalContext().isStopping();
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
index 4b519f2..9a28ba3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
@@ -61,7 +61,7 @@ public class SchemaIndexCacheVisitorImpl implements SchemaIndexCacheVisitor {
protected final GridFutureAdapter<Void> buildIdxFut;
/** Logger. */
- protected IgniteLogger log;
+ protected final IgniteLogger log;
/**
* Constructor.
@@ -110,8 +110,9 @@ public class SchemaIndexCacheVisitorImpl implements SchemaIndexCacheVisitor {
AtomicBoolean stop = new AtomicBoolean();
- GridCompoundFuture<SchemaIndexCacheStat, SchemaIndexCacheStat> buildIdxCompoundFut =
- new GridCompoundFuture<>();
+ // To avoid a race between clearing pageMemory (on a cache stop ex. deactivation)
+ // and rebuilding indexes, which can lead to a fail of the node.
+ SchemaIndexCacheCompoundFuture buildIdxCompoundFut = new SchemaIndexCacheCompoundFuture();
for (GridDhtLocalPartition locPart : locParts) {
GridWorkerFuture<SchemaIndexCacheStat> workerFut = new GridWorkerFuture<>();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexOperationCancellationException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexOperationCancellationException.java
new file mode 100644
index 0000000..360fc01
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexOperationCancellationException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.schema;
+
+import org.apache.ignite.IgniteCheckedException;
+
+/**
+ * Exception occurred when canceling index rebuild via {@link SchemaIndexOperationCancellationToken}.
+ */
+public class SchemaIndexOperationCancellationException extends IgniteCheckedException {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Constructor.
+ *
+ * @param msg Error message.
+ */
+ public SchemaIndexOperationCancellationException(String msg) {
+ super(msg);
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 06ca562..b7662776 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -35,6 +35,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import javax.cache.CacheException;
@@ -156,9 +157,12 @@ import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQuery
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlRequest;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlResponse;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheFuture;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorImpl;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexOperationCancellationException;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexOperationCancellationToken;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings;
import org.apache.ignite.internal.processors.tracing.Span;
@@ -338,6 +342,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/** Functions manager. */
private FunctionsManager funcMgr;
+ /** Index rebuilding futures for caches. Mapping: cacheId -> rebuild indexes future. */
+ private final Map<Integer, SchemaIndexCacheFuture> idxRebuildFuts = new ConcurrentHashMap<>();
+
/**
* @return Kernal context.
*/
@@ -2086,9 +2093,14 @@ public class IgniteH2Indexing implements GridQueryIndexing {
GridFutureAdapter<Void> rebuildCacheIdxFut = new GridFutureAdapter<>();
- //to avoid possible data race
+ // To avoid possible data race.
GridFutureAdapter<Void> outRebuildCacheIdxFut = new GridFutureAdapter<>();
+ // An internal future for the ability to cancel index rebuilding.
+ // This behavior should be discussed in IGNITE-14321.
+ SchemaIndexCacheFuture intRebFut = new SchemaIndexCacheFuture(new SchemaIndexOperationCancellationToken());
+ cancelIndexRebuildFuture(idxRebuildFuts.put(cctx.cacheId(), intRebFut));
+
rebuildCacheIdxFut.listen(fut -> {
Throwable err = fut.error();
@@ -2105,9 +2117,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
U.error(log, "Failed to rebuild indexes for cache: " + cacheName, err);
outRebuildCacheIdxFut.onDone(err);
+
+ idxRebuildFuts.remove(cctx.cacheId(), intRebFut);
+ intRebFut.onDone(err);
});
- rebuildIndexesFromHash0(cctx, clo, rebuildCacheIdxFut);
+ rebuildIndexesFromHash0(cctx, clo, rebuildCacheIdxFut, intRebFut.cancelToken());
return outRebuildCacheIdxFut;
}
@@ -2118,13 +2133,15 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @param cctx Cache context.
* @param clo Closure.
* @param rebuildIdxFut Future for rebuild indexes.
+ * @param cancel Cancellation token.
*/
protected void rebuildIndexesFromHash0(
GridCacheContext cctx,
SchemaIndexCacheVisitorClosure clo,
- GridFutureAdapter<Void> rebuildIdxFut
+ GridFutureAdapter<Void> rebuildIdxFut,
+ SchemaIndexOperationCancellationToken cancel
) {
- new SchemaIndexCacheVisitorImpl(cctx, null, rebuildIdxFut).visit(clo);
+ new SchemaIndexCacheVisitorImpl(cctx, cancel, rebuildIdxFut).visit(clo);
}
/**
@@ -2465,6 +2482,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/** {@inheritDoc} */
@Override public void unregisterCache(GridCacheContextInfo cacheInfo, boolean rmvIdx) {
+ cancelIndexRebuildFuture(idxRebuildFuts.remove(cacheInfo.cacheId()));
+
rowCache.onCacheUnregistered(cacheInfo);
String cacheName = cacheInfo.name();
@@ -3270,4 +3289,21 @@ public class IgniteH2Indexing implements GridQueryIndexing {
defragmentationThreadPool
);
}
+
+ /**
+ * Cancel rebuilding indexes for the cache through a future.
+ *
+ * @param rebFut Index rebuilding future.
+ */
+ private void cancelIndexRebuildFuture(@Nullable SchemaIndexCacheFuture rebFut) {
+ if (rebFut != null && !rebFut.isDone() && rebFut.cancelToken().cancel()) {
+ try {
+ rebFut.get();
+ }
+ catch (IgniteCheckedException e) {
+ if (!(e instanceof SchemaIndexOperationCancellationException))
+ log.warning("Error after canceling index rebuild.", e);
+ }
+ }
+ }
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractIndexingCommonTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractIndexingCommonTest.java
index e3986ba..7144d43 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractIndexingCommonTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractIndexingCommonTest.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.processors.query.h2.H2PooledConnection;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorImpl;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexOperationCancellationToken;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.G;
@@ -154,14 +155,15 @@ public class AbstractIndexingCommonTest extends GridCommonAbstractTest {
@Override protected void rebuildIndexesFromHash0(
GridCacheContext cctx,
SchemaIndexCacheVisitorClosure clo,
- GridFutureAdapter<Void> rebuildIdxFut
+ GridFutureAdapter<Void> rebuildIdxFut,
+ SchemaIndexOperationCancellationToken cancel
) {
CountDownLatch startThread = new CountDownLatch(1);
new Thread(() -> {
startThread.countDown();
- new SchemaIndexCacheVisitorImpl(cctx, null, rebuildIdxFut) {
+ new SchemaIndexCacheVisitorImpl(cctx, cancel, rebuildIdxFut) {
/** {@inheritDoc} */
@Override protected void beforeExecute() {
String cacheName = cctx.name();
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/StopRebuildIndexTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/StopRebuildIndexTest.java
new file mode 100644
index 0000000..791db3cd
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/StopRebuildIndexTest.java
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.index;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.client.Person;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.CacheMetricsImpl;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.query.GridQueryIndexing;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheCompoundFuture;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheFuture;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheStat;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexOperationCancellationException;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexOperationCancellationToken;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.cluster.ClusterState.INACTIVE;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+import static org.apache.ignite.testframework.GridTestUtils.deleteIndexBin;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValueHierarchy;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ * Class for checking the correct completion/stop of index rebuilding.
+ */
+public class StopRebuildIndexTest extends GridCommonAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ IgniteH2IndexingEx.cacheRowConsumer.clear();
+ IgniteH2IndexingEx.cacheRebuildRunner.clear();
+
+ stopAllGrids();
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ IgniteH2IndexingEx.cacheRowConsumer.clear();
+ IgniteH2IndexingEx.cacheRebuildRunner.clear();
+
+ stopAllGrids();
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setConsistentId(igniteInstanceName)
+ .setFailureHandler(new StopNodeFailureHandler())
+ .setDataStorageConfiguration(
+ new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true))
+ ).setCacheConfiguration(
+ new CacheConfiguration<>(DEFAULT_CACHE_NAME).setIndexedTypes(Integer.class, Person.class)
+ );
+ }
+
+ /**
+ * Checks the correctness {@link SchemaIndexCacheCompoundFuture}.
+ */
+ @Test
+ public void testSchemaIndexCacheCompoundFeature() {
+ SchemaIndexCacheCompoundFuture compoundFut = new SchemaIndexCacheCompoundFuture();
+ assertFalse(compoundFut.isDone());
+
+ GridFutureAdapter<SchemaIndexCacheStat> fut0 = new GridFutureAdapter<>();
+ GridFutureAdapter<SchemaIndexCacheStat> fut1 = new GridFutureAdapter<>();
+ GridFutureAdapter<SchemaIndexCacheStat> fut2 = new GridFutureAdapter<>();
+ GridFutureAdapter<SchemaIndexCacheStat> fut3 = new GridFutureAdapter<>();
+
+ compoundFut.add(fut0).add(fut1).add(fut2).add(fut3);
+ assertFalse(compoundFut.isDone());
+
+ fut0.onDone();
+ assertFalse(compoundFut.isDone());
+
+ fut1.onDone();
+ assertFalse(compoundFut.isDone());
+
+ fut2.onDone();
+ assertFalse(compoundFut.isDone());
+
+ fut3.onDone();
+ assertFalse(compoundFut.isDone());
+
+ compoundFut.markInitialized();
+ assertTrue(compoundFut.isDone());
+ assertNull(compoundFut.error());
+
+ compoundFut = new SchemaIndexCacheCompoundFuture();
+ fut0 = new GridFutureAdapter<>();
+ fut1 = new GridFutureAdapter<>();
+ fut2 = new GridFutureAdapter<>();
+ fut3 = new GridFutureAdapter<>();
+
+ compoundFut.add(fut0).add(fut1).add(fut2).add(fut3).markInitialized();
+ assertFalse(compoundFut.isDone());
+
+ fut0.onDone();
+ assertFalse(compoundFut.isDone());
+
+ Exception err0 = new Exception();
+ Exception err1 = new Exception();
+
+ fut1.onDone(err0);
+ assertFalse(compoundFut.isDone());
+
+ fut2.onDone(err1);
+ assertFalse(compoundFut.isDone());
+
+ fut3.onDone(err1);
+ assertTrue(compoundFut.isDone());
+ assertEquals(err0, compoundFut.error().getCause());
+ }
+
+ /**
+ * Checking that when the cluster is deactivated, index rebuilding will be completed correctly.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testStopRebuildIndexesOnDeactivation() throws Exception {
+ stopRebuildIndexes(n -> n.cluster().state(INACTIVE), true);
+
+ assertEquals(1, G.allGrids().size());
+ }
+
+ /**
+ * Checking that when the node stopped, index rebuilding will be completed correctly.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testStopRebuildIndexesOnStopNode() throws Exception {
+ stopRebuildIndexes(n -> stopAllGrids(), false);
+ }
+
+ /**
+ * Checking the correctness of the {@code IgniteH2Indexing#idxRebuildFuts}.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testInternalIndexingRebuildFuture() throws Exception {
+ GridQueryProcessor.idxCls = IgniteH2IndexingEx.class;
+
+ IgniteEx n = prepareCluster(10);
+
+ GridFutureAdapter<?> f0 = new GridFutureAdapter<>();
+ GridFutureAdapter<?> f1 = new GridFutureAdapter<>();
+
+ GridCacheContext<?, ?> cacheCtx = n.cachex(DEFAULT_CACHE_NAME).context();
+
+ IgniteH2IndexingEx.cacheRebuildRunner.put(
+ DEFAULT_CACHE_NAME, () -> assertNull(internalIndexRebuildFuture(n, cacheCtx.cacheId())));
+
+ IgniteH2IndexingEx.cacheRowConsumer.put(DEFAULT_CACHE_NAME, row -> {
+ f0.onDone();
+
+ f1.get(getTestTimeout());
+ });
+
+ n.context().cache().context().database().forceRebuildIndexes(F.asList(cacheCtx));
+
+ IgniteInternalFuture<?> rebFut0 = indexRebuildFuture(n, cacheCtx.cacheId());
+ assertNotNull(rebFut0);
+
+ SchemaIndexCacheFuture rebFut1 = internalIndexRebuildFuture(n, cacheCtx.cacheId());
+ assertNotNull(rebFut1);
+
+ f0.get(getTestTimeout());
+ assertFalse(rebFut0.isDone());
+
+ assertFalse(rebFut1.isDone());
+ assertFalse(rebFut1.cancelToken().isCancelled());
+
+ f1.onDone();
+
+ rebFut0.get(getTestTimeout());
+ rebFut1.get(getTestTimeout());
+
+ assertFalse(rebFut1.cancelToken().isCancelled());
+
+ assertNull(indexRebuildFuture(n, cacheCtx.cacheId()));
+ assertNull(internalIndexRebuildFuture(n, cacheCtx.cacheId()));
+ }
+
+ /**
+ * Checks that when starting an index rebuild sequentially,
+ * the previous rebuild will be canceled and a new one will start.
+ *
+ * This behavior should be discussed in IGNITE-14321.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSequentialRebuildIndexes() throws Exception {
+ IgniteEx n = prepareCluster(10);
+
+ int cacheId = n.cachex(DEFAULT_CACHE_NAME).context().cacheId();
+ int cacheSize = n.cachex(DEFAULT_CACHE_NAME).size();
+
+ stopAllGrids();
+ deleteIndexBin(n.context().igniteInstanceName());
+
+ GridQueryProcessor.idxCls = IgniteH2IndexingEx.class;
+
+ GridFutureAdapter<?> startBlockRebIdxFut0 = new GridFutureAdapter<>();
+ GridFutureAdapter<?> endBlockRebIdxFut0 = new GridFutureAdapter<>();
+
+ IgniteH2IndexingEx.cacheRowConsumer.put(DEFAULT_CACHE_NAME, row -> {
+ IgniteInternalFuture<?> fut = indexRebuildFuture(grid(0), cacheId);
+ assertNotNull(fut);
+ assertFalse(fut.isDone());
+
+ startBlockRebIdxFut0.onDone();
+ endBlockRebIdxFut0.get(getTestTimeout());
+ });
+
+ n = startGrid(0);
+
+ n.cluster().state(ACTIVE);
+ awaitPartitionMapExchange();
+
+ startBlockRebIdxFut0.get(getTestTimeout());
+
+ IgniteInternalFuture<?> rebIdxFut = indexRebuildFuture(n, cacheId);
+ assertNotNull(rebIdxFut);
+ assertFalse(rebIdxFut.isDone());
+
+ SchemaIndexCacheFuture intRebIdxFut = internalIndexRebuildFuture(n, cacheId);
+ assertNotNull(intRebIdxFut);
+ assertFalse(intRebIdxFut.isDone());
+ assertFalse(intRebIdxFut.cancelToken().isCancelled());
+
+ GridFutureAdapter<IgniteInternalFuture<?>> forceRebIdxFut = new GridFutureAdapter<>();
+
+ IgniteInternalFuture<?> startForceRebIdxFut = runAsync(() -> {
+ IgniteEx n0 = grid(0);
+
+ IgniteH2IndexingEx.cacheRowConsumer.put(DEFAULT_CACHE_NAME, row -> {
+ forceRebIdxFut.onDone(internalIndexRebuildFuture(n0, cacheId));
+ });
+
+ n0.context().cache().context().database().forceRebuildIndexes(
+ F.asList(n0.cachex(DEFAULT_CACHE_NAME).context()));
+
+ return null;
+ });
+
+ assertTrue(waitForCondition(intRebIdxFut.cancelToken()::isCancelled, getTestTimeout()));
+ endBlockRebIdxFut0.onDone();
+
+ rebIdxFut.get(getTestTimeout());
+
+ assertThrows(
+ log,
+ () -> intRebIdxFut.get(getTestTimeout()),
+ SchemaIndexOperationCancellationException.class,
+ null
+ );
+
+ startForceRebIdxFut.get(getTestTimeout());
+ forceRebIdxFut.get(getTestTimeout()).get(getTestTimeout());
+
+ assertEquals(cacheSize, cacheMetrics0(n, DEFAULT_CACHE_NAME).getIndexRebuildKeysProcessed());
+ }
+
+ /**
+ * Restart the rebuild of the indexes, checking that it completes gracefully.
+ *
+ * @param stopRebuildIndexes Stop index rebuild function.
+ * @param expThrowEx Expect an exception on index rebuild futures.
+ * @throws Exception If failed.
+ */
+ private void stopRebuildIndexes(
+ IgniteThrowableConsumer<IgniteEx> stopRebuildIndexes,
+ boolean expThrowEx
+ ) throws Exception {
+ GridQueryProcessor.idxCls = IgniteH2IndexingEx.class;
+
+ int keys = 100_000;
+ IgniteEx n = prepareCluster(keys);
+
+ IgniteH2IndexingEx.cacheRowConsumer.put(DEFAULT_CACHE_NAME, row -> {
+ U.sleep(10);
+ });
+
+ GridCacheContext<?, ?> cacheCtx = n.cachex(DEFAULT_CACHE_NAME).context();
+ n.context().cache().context().database().forceRebuildIndexes(F.asList(cacheCtx));
+
+ IgniteInternalFuture<?> fut0 = indexRebuildFuture(n, cacheCtx.cacheId());
+ assertNotNull(fut0);
+
+ SchemaIndexCacheFuture fut1 = internalIndexRebuildFuture(n, cacheCtx.cacheId());
+ assertNotNull(fut1);
+
+ CacheMetricsImpl metrics0 = cacheMetrics0(n, DEFAULT_CACHE_NAME);
+ assertTrue(metrics0.isIndexRebuildInProgress());
+ assertFalse(fut0.isDone());
+
+ assertFalse(fut1.isDone());
+ assertFalse(fut1.cancelToken().isCancelled());
+
+ assertTrue(waitForCondition(() -> metrics0.getIndexRebuildKeysProcessed() >= keys / 100, getTestTimeout()));
+ assertTrue(metrics0.isIndexRebuildInProgress());
+ assertFalse(fut0.isDone());
+
+ assertFalse(fut1.isDone());
+ assertFalse(fut1.cancelToken().isCancelled());
+
+ stopRebuildIndexes.accept(n);
+
+ assertFalse(metrics0.isIndexRebuildInProgress());
+ assertTrue(metrics0.getIndexRebuildKeysProcessed() < keys);
+
+ if (expThrowEx) {
+ assertThrows(log, () -> fut0.get(getTestTimeout()), SchemaIndexOperationCancellationException.class, null);
+ assertThrows(log, () -> fut1.get(getTestTimeout()), SchemaIndexOperationCancellationException.class, null);
+
+ assertTrue(fut1.cancelToken().isCancelled());
+ }
+ else {
+ fut0.get(getTestTimeout());
+
+ fut1.get(getTestTimeout());
+ assertFalse(fut1.cancelToken().isCancelled());
+ }
+
+ assertNull(internalIndexRebuildFuture(n, cacheCtx.cacheId()));
+ }
+
+ /**
+ * Prepare cluster for test.
+ *
+ * @param keys Key count.
+ * @throws Exception If failed.
+ */
+ private IgniteEx prepareCluster(int keys) throws Exception {
+ IgniteEx n = startGrid(0);
+
+ n.cluster().state(ACTIVE);
+
+ for (int i = 0; i < keys; i++)
+ n.cache(DEFAULT_CACHE_NAME).put(i, new Person(i, "p_" + i));
+
+ return n;
+ }
+
+ /**
+ * Getting internal rebuild index future for the cache.
+ *
+ * @param n Node.
+ * @param cacheId Cache id.
+ * @return Rebuild index future.
+ */
+ @Nullable private SchemaIndexCacheFuture internalIndexRebuildFuture(IgniteEx n, int cacheId) {
+ GridQueryIndexing indexing = n.context().query().getIndexing();
+
+ return ((Map<Integer, SchemaIndexCacheFuture>)getFieldValueHierarchy(indexing, "idxRebuildFuts")).get(cacheId);
+ }
+
+ /**
+ * Getting rebuild index future for the cache.
+ *
+ * @param n Node.
+ * @param cacheId Cache id.
+ * @return Rebuild index future.
+ */
+ @Nullable private IgniteInternalFuture<?> indexRebuildFuture(IgniteEx n, int cacheId) {
+ return n.context().query().indexRebuildFuture(cacheId);
+ }
+
+ /**
+ * Getting cache metrics.
+ *
+ * @param n Node.
+ * @param cacheName Cache name.
+ * @return Cache metrics.
+ */
+ private CacheMetricsImpl cacheMetrics0(IgniteEx n, String cacheName) {
+ return n.cachex(cacheName).context().cache().metrics0();
+ }
+
+ /**
+ * Extension {@link IgniteH2Indexing} for the test.
+ */
+ private static class IgniteH2IndexingEx extends IgniteH2Indexing {
+ /** Consumer for cache rows when rebuilding indexes. */
+ private static final Map<String, IgniteThrowableConsumer<CacheDataRow>> cacheRowConsumer =
+ new ConcurrentHashMap<>();
+
+ /** A function that should run before preparing to rebuild the cache indexes. */
+ private static final Map<String, Runnable> cacheRebuildRunner = new ConcurrentHashMap<>();
+
+ /** {@inheritDoc} */
+ @Override protected void rebuildIndexesFromHash0(
+ GridCacheContext cctx,
+ SchemaIndexCacheVisitorClosure clo,
+ GridFutureAdapter<Void> rebuildIdxFut,
+ SchemaIndexOperationCancellationToken cancel
+ ) {
+ super.rebuildIndexesFromHash0(cctx, new SchemaIndexCacheVisitorClosure() {
+ /** {@inheritDoc} */
+ @Override public void apply(CacheDataRow row) throws IgniteCheckedException {
+ cacheRowConsumer.getOrDefault(cctx.name(), r -> {}).accept(row);
+
+ clo.apply(row);
+ }
+ }, rebuildIdxFut, cancel);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<?> rebuildIndexesFromHash(GridCacheContext cctx) {
+ cacheRebuildRunner.getOrDefault(cctx.name(), () -> {}).run();
+
+ return super.rebuildIndexesFromHash(cctx);
+ }
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
index 4698c00..ea130c9 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
@@ -130,6 +130,7 @@ import org.apache.ignite.transactions.TransactionIsolation;
import org.junit.Assert;
import org.junit.Test;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISABLE_WAL_DURING_REBALANCING;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_LOG_TX_RECORDS;
import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_CHECKPOINT_FREQ;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
@@ -1803,7 +1804,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
* Tests a scenario when a coordinator has failed after recovery during activation.
*/
@Test
- @WithSystemProperty(key = "IGNITE_DISABLE_WAL_DURING_REBALANCING", value = "false")
+ @WithSystemProperty(key = IGNITE_DISABLE_WAL_DURING_REBALANCING, value = "false")
public void testRecoveryAfterRestart_Activate() throws Exception {
IgniteEx crd = startGrid(1);
crd.cluster().active(true);
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildSelfTest.java
index 8aa902b..06f2f20 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildSelfTest.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.processors.query.GridQueryIndexing;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexOperationCancellationToken;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -395,7 +396,8 @@ public class GridIndexRebuildSelfTest extends DynamicIndexAbstractSelfTest {
@Override protected void rebuildIndexesFromHash0(
GridCacheContext cctx,
SchemaIndexCacheVisitorClosure clo,
- GridFutureAdapter<Void> rebuildIdxFut
+ GridFutureAdapter<Void> rebuildIdxFut,
+ SchemaIndexOperationCancellationToken cancel
) {
if (!firstRbld) {
try {
@@ -419,7 +421,7 @@ public class GridIndexRebuildSelfTest extends DynamicIndexAbstractSelfTest {
});
}
- super.rebuildIndexesFromHash0(cctx, clo, rebuildIdxFut);
+ super.rebuildIndexesFromHash0(cctx, clo, rebuildIdxFut, cancel);
}
}
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
index 239d293..cdd352b 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
@@ -21,6 +21,7 @@ import org.apache.ignite.internal.encryption.CacheGroupReencryptionTest;
import org.apache.ignite.internal.processors.cache.IgnitePdsSingleNodeWithIndexingAndGroupPutGetPersistenceSelfTest;
import org.apache.ignite.internal.processors.cache.IgnitePdsSingleNodeWithIndexingPutGetPersistenceTest;
import org.apache.ignite.internal.processors.cache.index.ClientReconnectWithSqlTableConfiguredTest;
+import org.apache.ignite.internal.processors.cache.index.StopRebuildIndexTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsIndexingDefragmentationTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IgniteTcBotInitNewPageTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IndexingMultithreadedLoadContinuousRestartTest;
@@ -62,7 +63,8 @@ import org.junit.runners.Suite;
ClientReconnectWithSqlTableConfiguredTest.class,
MultipleParallelCacheDeleteDeadlockTest.class,
CacheGroupReencryptionTest.class,
- IgnitePdsIndexingDefragmentationTest.class
+ IgnitePdsIndexingDefragmentationTest.class,
+ StopRebuildIndexTest.class
})
public class IgnitePdsWithIndexingTestSuite {
}