You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2020/12/23 15:38:15 UTC
[ignite] branch master updated: IGNITE-13866 validate_indexes
command is interrupted if connection to initiator is broken. Fixes #8593
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 873bb4f IGNITE-13866 validate_indexes command is interrupted if connection to initiator is broken. Fixes #8593
873bb4f is described below
commit 873bb4f67716cf41fd1bc339a026dbeaf7fa664f
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Wed Dec 23 18:37:33 2020 +0300
IGNITE-13866 validate_indexes command is interrupted if connection to initiator is broken. Fixes #8593
Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
.../testsuites/IgniteControlUtilityTestSuite.java | 2 +
.../GridCommandHandlerInterruptCommandTest.java | 326 +++++++++++++++++++++
.../rest/protocols/tcp/GridTcpRestNioListener.java | 104 ++++++-
.../visor/annotation/InterruptibleVisorTask.java | 32 ++
.../internal/visor/verify/VisorIdleVerifyJob.java | 13 +
.../visor/verify/ValidateIndexesClosure.java | 254 ++++++++++------
.../visor/verify/ValidateIndexesContext.java | 30 ++
.../visor/verify/VisorValidateIndexesTask.java | 18 ++
.../IgnitePdsIndexingDefragmentationTest.java | 1 +
...xingMultithreadedLoadContinuousRestartTest.java | 4 +-
.../IgniteClusterSnapshotWithIndexesTest.java | 3 +-
.../processors/database/RebuildIndexTest.java | 2 +-
.../RebuildIndexWithHistoricalRebalanceTest.java | 2 +-
13 files changed, 684 insertions(+), 107 deletions(-)
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java
index cac12d4..6466487 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java
@@ -34,6 +34,7 @@ import org.apache.ignite.util.GridCommandHandlerIndexingClusterByClassTest;
import org.apache.ignite.util.GridCommandHandlerIndexingClusterByClassWithSSLTest;
import org.apache.ignite.util.GridCommandHandlerIndexingTest;
import org.apache.ignite.util.GridCommandHandlerIndexingWithSSLTest;
+import org.apache.ignite.util.GridCommandHandlerInterruptCommandTest;
import org.apache.ignite.util.GridCommandHandlerMetadataTest;
import org.apache.ignite.util.GridCommandHandlerPropertiesTest;
import org.apache.ignite.util.GridCommandHandlerSslTest;
@@ -68,6 +69,7 @@ import org.junit.runners.Suite;
GridCommandHandlerIndexingClusterByClassWithSSLTest.class,
GridCommandHandlerIndexingCheckSizeTest.class,
GridCommandHandlerCheckIndexesInlineSizeTest.class,
+ GridCommandHandlerInterruptCommandTest.class,
GridCommandHandlerMetadataTest.class,
KillCommandsCommandShTest.class,
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerInterruptCommandTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerInterruptCommandTest.java
new file mode 100644
index 0000000..d750439
--- /dev/null
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerInterruptCommandTest.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.util;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.cache.QueryIndexType;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.DeploymentEvent;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.visor.verify.ValidateIndexesClosure;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_UNEXPECTED_ERROR;
+
+/**
+ * Checks cancel of execution validate_indexes command.
+ */
+public class GridCommandHandlerInterruptCommandTest extends GridCommandHandlerAbstractTest {
+ /** Load loop cycles. */
+ private static final int LOAD_LOOP = 500_000;
+
+ /** Idle verify task name. */
+ private static final String IDLE_VERIFY_TASK_V2 = "org.apache.ignite.internal.visor.verify.VisorIdleVerifyTaskV2";
+
+ /** Validate index task name. */
+ private static final String VALIDATE_INDEX_TASK = "org.apache.ignite.internal.visor.verify.VisorValidateIndexesTask";
+
+ /** Log listener. */
+ private ListeningTestLogger lnsrLog;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+
+ super.afterTest();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setGridLogger(lnsrLog)
+ .setDataStorageConfiguration(new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+ .setPersistenceEnabled(true)
+ .setInitialSize(200L * 1024 * 1024)
+ .setMaxSize(200L * 1024 * 1024)
+ )
+ )
+ .setIncludeEventTypes(EventType.EVT_TASK_DEPLOYED)
+ .setCacheConfiguration(new CacheConfiguration<Integer, UserValue>(DEFAULT_CACHE_NAME)
+ .setName(DEFAULT_CACHE_NAME)
+ .setQueryEntities(Collections.singleton(createQueryEntity())));
+ }
+
+ /**
+ * Creates predifened query entity.
+ *
+ * @return Query entity.
+ */
+ private QueryEntity createQueryEntity() {
+ QueryEntity qryEntity = new QueryEntity();
+ qryEntity.setKeyType(Integer.class.getTypeName());
+ qryEntity.setValueType(UserValue.class.getName());
+ qryEntity.setTableName("USER_TEST_TABLE");
+
+ LinkedHashMap<String, String> fields = new LinkedHashMap<>();
+ fields.put("x", "java.lang.Integer");
+ fields.put("y", "java.lang.Integer");
+ fields.put("z", "java.lang.Integer");
+ qryEntity.setFields(fields);
+
+ LinkedHashMap<String, Boolean> idxFields = new LinkedHashMap<>();
+
+ QueryIndex idx2 = new QueryIndex();
+ idx2.setName("IDX_2");
+ idx2.setIndexType(QueryIndexType.SORTED);
+ idxFields = new LinkedHashMap<>();
+ idxFields.put("x", false);
+ idx2.setFields(idxFields);
+
+ QueryIndex idx3 = new QueryIndex();
+ idx3.setName("IDX_3");
+ idx3.setIndexType(QueryIndexType.SORTED);
+ idxFields = new LinkedHashMap<>();
+ idxFields.put("y", false);
+ idx3.setFields(idxFields);
+
+ QueryIndex idx4 = new QueryIndex();
+ idx4.setName("IDX_4");
+ idx4.setIndexType(QueryIndexType.SORTED);
+ idxFields = new LinkedHashMap<>();
+ idxFields.put("z", false);
+ idx4.setFields(idxFields);
+
+ qryEntity.setIndexes(Arrays.asList(idx2, idx3, idx4));
+ return qryEntity;
+ }
+
+ /**
+ * User value.
+ */
+ private static class UserValue {
+ /** X. */
+ private int x;
+
+ /** Y. */
+ private int y;
+
+ /** Z. */
+ private int z;
+
+ /**
+ * @param x X.
+ * @param y Y.
+ * @param z Z.
+ */
+ public UserValue(int x, int y, int z) {
+ this.x = x;
+ this.y = y;
+ this.z = z;
+ }
+
+ /**
+ * @param seed Seed.
+ */
+ public UserValue(long seed) {
+ x = (int)(seed % 6991);
+ y = (int)(seed % 18679);
+ z = (int)(seed % 31721);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(UserValue.class, this);
+ }
+ }
+
+ /**
+ * Checks that validate_indexes command will cancel after it interrupted.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testValidateIndexesCommand() throws Exception {
+ lnsrLog = new ListeningTestLogger(false, log);
+
+ IgniteEx ignite = startGrid(0);
+
+ ignite.cluster().active(true);
+
+ preloadeData(ignite);
+
+ CountDownLatch startTaskLatch = waitForTaskEvent(ignite, VALIDATE_INDEX_TASK);
+
+ LogListener lnsrValidationCancelled = LogListener.matches("Index validation was cancelled.").build();
+
+ lnsrLog.registerListener(lnsrValidationCancelled);
+
+ IgniteInternalFuture fut = GridTestUtils.runAsync(() ->
+ assertSame(EXIT_CODE_UNEXPECTED_ERROR, execute("--cache", "validate_indexes")));
+
+ startTaskLatch.await();
+
+ fut.cancel();
+
+ fut.get();
+
+ assertTrue(GridTestUtils.waitForCondition(() ->
+ ignite.compute().activeTaskFutures().isEmpty(), 10_000));
+
+ assertTrue(GridTestUtils.waitForCondition(lnsrValidationCancelled::check, 10_000));
+ }
+
+ /**
+ * Checks that idle verify command will not cancel if initiator client interrupted.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testIdleVerifyCommand() throws Exception {
+ lnsrLog = new ListeningTestLogger(false, log);
+
+ IgniteEx ignite = startGrid(0);
+
+ ignite.cluster().active(true);
+
+ preloadeData(ignite);
+
+ CountDownLatch startTaskLatch = waitForTaskEvent(ignite, IDLE_VERIFY_TASK_V2);
+
+ LogListener lnsrValidationCancelled = LogListener.matches("Idle verify was cancelled.").build();
+
+ lnsrLog.registerListener(lnsrValidationCancelled);
+
+ IgniteInternalFuture fut = GridTestUtils.runAsync(() ->
+ assertSame(EXIT_CODE_UNEXPECTED_ERROR, execute("--cache", "idle_verify")));
+
+ startTaskLatch.await();
+
+ fut.cancel();
+
+ fut.get();
+
+ assertTrue(GridTestUtils.waitForCondition(() ->
+ ignite.compute().activeTaskFutures().isEmpty(), 30_000));
+
+ assertFalse(lnsrValidationCancelled.check());
+ }
+
+ /**
+ * Method subscribe on task event and return a latch for waiting.
+ *
+ * @param ignite Ignite.
+ * @param taskName Task name.
+ * @return Latch which will open after event received.
+ */
+
+ private CountDownLatch waitForTaskEvent(IgniteEx ignite, String taskName) {
+ CountDownLatch startTaskLatch = new CountDownLatch(1);
+
+ ignite.events().localListen((evt) -> {
+ assertTrue(evt instanceof DeploymentEvent);
+
+ if (taskName.equals(((DeploymentEvent)evt).alias())) {
+ startTaskLatch.countDown();
+
+ return false;
+ }
+
+ return true;
+ }, EventType.EVT_TASK_DEPLOYED);
+ return startTaskLatch;
+ }
+
+ /**
+ * Preload data to default cache.
+ *
+ * @param ignite Ignite.
+ */
+ private void preloadeData(IgniteEx ignite) {
+ try (IgniteDataStreamer streamr = ignite.dataStreamer(DEFAULT_CACHE_NAME)) {
+ for (int i = 0; i < LOAD_LOOP; i++)
+ streamr.addData(i, new UserValue(i));
+ }
+ }
+
+ /**
+ * Test invokes index validation closure and canceling it after started.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testCancelValidateIndexesClosure() throws Exception {
+ IgniteEx ignite0 = startGrid(0);
+
+ ignite0.cluster().active(true);
+
+ preloadeData(ignite0);
+
+ AtomicBoolean cancelled = new AtomicBoolean(false);
+
+ ValidateIndexesClosure clo = new ValidateIndexesClosure(cancelled::get, Collections.singleton(DEFAULT_CACHE_NAME),
+ 0, 0, false, true);
+
+ ListeningTestLogger listeningLogger = new ListeningTestLogger(false, log);
+
+ GridTestUtils.setFieldValue(clo, "ignite", ignite0);
+ GridTestUtils.setFieldValue(clo, "log", listeningLogger);
+
+ LogListener lnsrValidationStarted = LogListener.matches("Current progress of ValidateIndexesClosure").build();
+
+ listeningLogger.registerListener(lnsrValidationStarted);
+
+ IgniteInternalFuture fut = GridTestUtils.runAsync(() ->
+ GridTestUtils.assertThrows(log, clo::call, IgniteException.class, ValidateIndexesClosure.CANCELLED_MSG));
+
+ assertTrue(GridTestUtils.waitForCondition(lnsrValidationStarted::check, 10_000));
+
+ assertFalse(fut.isDone());
+
+ cancelled.set(true);
+
+ fut.get(10_000);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java
index 6195460..aace013 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java
@@ -23,6 +23,8 @@ import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
@@ -67,6 +69,7 @@ import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.visor.annotation.InterruptibleVisorTask;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_APPEND;
@@ -159,6 +162,10 @@ public class GridTcpRestNioListener extends GridNioServerListenerAdapter<GridCli
/** Handler for all Redis requests. */
private GridRedisNioListener redisLsnr;
+ /** Futures of currently executing tasks that can be interrupted if their session is closed
+ * (e.g. because of user interrupting control.sh operation). */
+ private final Map<GridNioSession, Set<IgniteInternalFuture>> sesInterruptibleFutMap = new ConcurrentHashMap<>();
+
/**
* Creates listener which will convert incoming tcp packets to rest requests and forward them to
* a given rest handler.
@@ -196,6 +203,8 @@ public class GridTcpRestNioListener extends GridNioServerListenerAdapter<GridCli
/** {@inheritDoc} */
@Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
+ onSessionClosed(ses);
+
if (e != null) {
if (e instanceof RuntimeException)
U.error(log, "Failed to process request from remote client: " + ses, e);
@@ -222,7 +231,7 @@ public class GridTcpRestNioListener extends GridNioServerListenerAdapter<GridCli
", ver=" + ver +
", supported=" + SUPP_VERS + ']');
- ses.close();
+ onSessionClosed(ses);
}
else {
byte marshId = hs.marshallerId();
@@ -233,7 +242,7 @@ public class GridTcpRestNioListener extends GridNioServerListenerAdapter<GridCli
} catch (IgniteInterruptedCheckedException e) {
U.error(log, "Marshaller is not initialized.", e);
- ses.close();
+ onSessionClosed(ses);
return;
}
@@ -245,7 +254,7 @@ public class GridTcpRestNioListener extends GridNioServerListenerAdapter<GridCli
U.error(log, "Client marshaller ID is invalid. Note that .NET and C++ clients " +
"are supported only in enterprise edition [ses=" + ses + ", marshId=" + marshId + ']');
- ses.close();
+ onSessionClosed(ses);
}
else {
ses.addMeta(MARSHALLER.ordinal(), marsh);
@@ -257,9 +266,16 @@ public class GridTcpRestNioListener extends GridNioServerListenerAdapter<GridCli
else {
final GridRestRequest req = createRestRequest(ses, msg);
- if (req != null)
- hnd.handleAsync(req).listen(new CI1<IgniteInternalFuture<GridRestResponse>>() {
+ if (req != null) {
+ IgniteInternalFuture<GridRestResponse> taskFut = hnd.handleAsync(req);
+
+ if (isInterruptible(msg))
+ addFutureToSession(ses, taskFut);
+
+ taskFut.listen(new CI1<IgniteInternalFuture<GridRestResponse>>() {
@Override public void apply(IgniteInternalFuture<GridRestResponse> fut) {
+ removeFutureFromSession(ses, taskFut);
+
GridClientResponse res = new GridClientResponse();
res.requestId(msg.requestId());
@@ -303,6 +319,7 @@ public class GridTcpRestNioListener extends GridNioServerListenerAdapter<GridCli
});
}
});
+ }
else
U.error(log, "Failed to process client request (unknown packet type) [ses=" + ses +
", msg=" + msg + ']');
@@ -310,6 +327,81 @@ public class GridTcpRestNioListener extends GridNioServerListenerAdapter<GridCli
}
/**
+ * This method checks if request in client message can be interrupted.
+ *
+ * @param msg Message.
+ * @return True of task can be interrupted, false otherwise.
+ */
+ private boolean isInterruptible(GridClientMessage msg) {
+ if (!(msg instanceof GridClientTaskRequest))
+ return false;
+
+ GridClientTaskRequest taskRequest = (GridClientTaskRequest)msg;
+
+ try {
+ return U.hasAnnotation(U.forName(taskRequest.taskName(), null), InterruptibleVisorTask.class);
+ }
+ catch (ClassNotFoundException e) {
+ log.warning("Task closure can't be found: [task=" + taskRequest.taskName() + ']', e);
+
+ return false;
+ }
+ }
+
+ /**
+ * Memorize operation future until it was not completed.
+ *
+ * @param ses Session.
+ * @param fut Operation future.
+ */
+ private void addFutureToSession(GridNioSession ses, IgniteInternalFuture fut) {
+ sesInterruptibleFutMap.computeIfAbsent(ses, key ->
+ ConcurrentHashMap.newKeySet())
+ .add(fut);
+ }
+
+ /**
+ * Remove completed future from internal structure.
+ *
+ * @param ses Session.
+ * @param fut Operation future.
+ */
+ private void removeFutureFromSession(GridNioSession ses, IgniteInternalFuture fut) {
+ assert fut.isDone() : "Operation is in progress, session: " + ses;
+
+ sesInterruptibleFutMap.computeIfPresent(ses, (key, futs) -> {
+ futs.remove(fut);
+
+ return futs;
+ });
+ }
+
+ /**
+ * Close all future associated with given session.
+ *
+ * @param ses Session.
+ */
+ public void onSessionClosed(GridNioSession ses) {
+ sesInterruptibleFutMap.computeIfPresent(ses, (key, pendingFuts) -> {
+ for (IgniteInternalFuture fut : pendingFuts) {
+ try {
+ if (!fut.isDone())
+ fut.cancel();
+ }
+ catch (IgniteCheckedException e) {
+ log.warning("Future was not cancelled: " + fut, e);
+ }
+ }
+
+ return pendingFuts;
+ });
+
+ sesInterruptibleFutMap.remove(ses);
+
+ ses.close();
+ }
+
+ /**
* Creates a REST request object from client TCP binary packet.
*
* @param ses NIO session.
@@ -459,6 +551,6 @@ public class GridTcpRestNioListener extends GridNioServerListenerAdapter<GridCli
* @param ses Session, that was inactive.
*/
@Override public void onSessionIdleTimeout(GridNioSession ses) {
- ses.close();
+ onSessionClosed(ses);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/annotation/InterruptibleVisorTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/annotation/InterruptibleVisorTask.java
new file mode 100644
index 0000000..09439b1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/annotation/InterruptibleVisorTask.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * This annotation using for Visor's task which can interrupted by system reason.
+ * For example, when a connection between cluster and task initiator is breaking.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE})
+public @interface InterruptibleVisorTask {
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyJob.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyJob.java
index d4652cc..ceeb717 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyJob.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.visor.verify;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.compute.ComputeJobContext;
import org.apache.ignite.compute.ComputeTask;
import org.apache.ignite.compute.ComputeTaskFuture;
@@ -26,6 +27,7 @@ import org.apache.ignite.internal.visor.VisorJob;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.resources.JobContextResource;
+import org.apache.ignite.resources.LoggerResource;
/**
*
@@ -41,6 +43,10 @@ class VisorIdleVerifyJob<ResultT> extends VisorJob<VisorIdleVerifyTaskArg, Resul
@JobContextResource
protected transient ComputeJobContext jobCtx;
+ /** Injected logger. */
+ @LoggerResource
+ private IgniteLogger log;
+
/** Task class for execution */
private final Class<? extends ComputeTask<VisorIdleVerifyTaskArg, ResultT>> taskCls;
@@ -76,6 +82,13 @@ class VisorIdleVerifyJob<ResultT> extends VisorJob<VisorIdleVerifyTaskArg, Resul
}
/** {@inheritDoc} */
+ @Override public void cancel() {
+ log.warning("Idle verify was cancelled.");
+
+ super.cancel();
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(VisorIdleVerifyJob.class, this);
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
index c83d62c..64c17eb 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
@@ -113,6 +113,9 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
/** */
private static final long serialVersionUID = 0L;
+ /** Exception message throwing when closure was cancelled. */
+ public static final String CANCELLED_MSG = "Closure of index validation was cancelled.";
+
/** Ignite. */
@IgniteInstanceResource
private transient IgniteEx ignite;
@@ -169,16 +172,28 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
/** Group cache ids when calculating cache size was an error. */
private final Set<Integer> failCalcCacheSizeGrpIds = newSetFromMap(new ConcurrentHashMap<>());
+ /** Validate index context. */
+ private final ValidateIndexesContext validateCtx;
+
/**
* Constructor.
*
+ * @param validateCtx Context of validate index closure.
* @param cacheNames Cache names.
* @param checkFirst If positive only first K elements will be validated.
* @param checkThrough If positive only each Kth element will be validated.
* @param checkCrc Check CRC sum on stored pages on disk.
* @param checkSizes Check that index size and cache size are same.
*/
- public ValidateIndexesClosure(Set<String> cacheNames, int checkFirst, int checkThrough, boolean checkCrc, boolean checkSizes) {
+ public ValidateIndexesClosure(
+ ValidateIndexesContext validateCtx,
+ Set<String> cacheNames,
+ int checkFirst,
+ int checkThrough,
+ boolean checkCrc,
+ boolean checkSizes
+ ) {
+ this.validateCtx = validateCtx;
this.cacheNames = cacheNames;
this.checkFirst = checkFirst;
this.checkThrough = checkThrough;
@@ -241,6 +256,9 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
*
*/
private VisorValidateIndexesJobResult call0() {
+ if (validateCtx.isCancelled())
+ throw new IgniteException(CANCELLED_MSG);
+
Set<Integer> grpIds = collectGroupIds();
/** Update counters per partition per group. */
@@ -393,6 +411,9 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
throw unwrapFutureException(e);
}
+ if (validateCtx.isCancelled())
+ throw new IgniteException(CANCELLED_MSG);
+
return new VisorValidateIndexesJobResult(
partResults,
idxResults,
@@ -514,7 +535,7 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
CacheGroupContext grpCtx,
GridDhtLocalPartition part
) {
- if (!part.reserve())
+ if (validateCtx.isCancelled() || !part.reserve())
return emptyMap();
ValidateIndexesPartitionResult partRes;
@@ -607,9 +628,9 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
long current = 0;
long processedNumber = 0;
- while (it.hasNextX()) {
- if (enoughIssues)
- break;
+ while (it.hasNextX() && !validateCtx.isCancelled()) {
+ if (enoughIssues)
+ break;
CacheDataRow row = it.nextX();
@@ -677,9 +698,12 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
ArrayList<Index> indexes = gridH2Tbl.getIndexes();
- for (Index idx : indexes) {
- if (!(idx instanceof H2TreeIndexBase))
- continue;
+ for (Index idx : indexes) {
+ if (validateCtx.isCancelled())
+ break;
+
+ if (!(idx instanceof H2TreeIndexBase))
+ continue;
try {
Cursor cursor = idx.find(session, h2Row, h2Row);
@@ -792,6 +816,9 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
T2<GridCacheContext, Index> cacheCtxWithIdx,
IgniteInClosure<Integer> idleChecker
) {
+ if (validateCtx.isCancelled())
+ return emptyMap();
+
GridCacheContext ctx = cacheCtxWithIdx.get1();
Index idx = cacheCtxWithIdx.get2();
@@ -826,7 +853,7 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
KeyCacheObject previousKey = null;
- while (!enoughIssues) {
+ while (!enoughIssues && !validateCtx.isCancelled()) {
KeyCacheObject h2key = null;
try {
@@ -943,110 +970,124 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
GridDhtLocalPartition locPart
) {
return calcExecutor.submit(() -> {
- try {
- @Nullable PartitionUpdateCounter updCntr = locPart.dataStore().partUpdateCounter();
+ return calcCacheSize(grpCtx, locPart);
+ });
+ }
- PartitionUpdateCounter updateCntrBefore = updCntr == null ? updCntr : updCntr.copy();
+ /**
+ * Calculation of caches size with divided by tables.
+ *
+ * @param grpCtx Cache group context.
+ * @param locPart Local partition.
+ * @return Cache size representation object.
+ */
+ private CacheSize calcCacheSize(CacheGroupContext grpCtx,GridDhtLocalPartition locPart) {
+ try {
+ if (validateCtx.isCancelled())
+ return new CacheSize(null, emptyMap());
- int grpId = grpCtx.groupId();
+ @Nullable PartitionUpdateCounter updCntr = locPart.dataStore().partUpdateCounter();
- if (failCalcCacheSizeGrpIds.contains(grpId))
- return new CacheSize(null, null);
+ PartitionUpdateCounter updateCntrBefore = updCntr == null ? updCntr : updCntr.copy();
- boolean reserve = false;
+ int grpId = grpCtx.groupId();
- int partId = locPart.id();
+ if (failCalcCacheSizeGrpIds.contains(grpId))
+ return new CacheSize(null, null);
- try {
- if (!(reserve = locPart.reserve()))
- throw new IgniteException("Can't reserve partition");
+ boolean reserve = false;
- if (locPart.state() != OWNING)
- throw new IgniteException("Partition not in state " + OWNING);
+ int partId = locPart.id();
- Map<Integer, Map<String, AtomicLong>> cacheSizeByTbl = new HashMap<>();
+ try {
+ if (!(reserve = locPart.reserve()))
+ throw new IgniteException("Can't reserve partition");
- GridIterator<CacheDataRow> partIter = grpCtx.offheap().partitionIterator(partId);
+ if (locPart.state() != OWNING)
+ throw new IgniteException("Partition not in state " + OWNING);
- GridQueryProcessor qryProcessor = ignite.context().query();
- IgniteH2Indexing h2Indexing = (IgniteH2Indexing)qryProcessor.getIndexing();
+ Map<Integer, Map<String, AtomicLong>> cacheSizeByTbl = new HashMap<>();
- while (partIter.hasNextX() && !failCalcCacheSizeGrpIds.contains(grpId)) {
- CacheDataRow cacheDataRow = partIter.nextX();
+ GridIterator<CacheDataRow> partIter = grpCtx.offheap().partitionIterator(partId);
- int cacheId = cacheDataRow.cacheId();
+ GridQueryProcessor qryProcessor = ignite.context().query();
+ IgniteH2Indexing h2Indexing = (IgniteH2Indexing)qryProcessor.getIndexing();
- GridCacheContext cacheCtx = cacheId == 0 ?
- grpCtx.singleCacheContext() : grpCtx.shared().cacheContext(cacheId);
+ while (partIter.hasNextX() && !failCalcCacheSizeGrpIds.contains(grpId)) {
+ CacheDataRow cacheDataRow = partIter.nextX();
- if (cacheCtx == null)
- throw new IgniteException("Unknown cacheId of CacheDataRow: " + cacheId);
+ int cacheId = cacheDataRow.cacheId();
- if (cacheDataRow.link() == 0L)
- throw new IgniteException("Contains invalid partition row, possibly deleted");
+ GridCacheContext cacheCtx = cacheId == 0 ?
+ grpCtx.singleCacheContext() : grpCtx.shared().cacheContext(cacheId);
- String cacheName = cacheCtx.name();
+ if (cacheCtx == null)
+ throw new IgniteException("Unknown cacheId of CacheDataRow: " + cacheId);
- QueryTypeDescriptorImpl qryTypeDesc = qryProcessor.typeByValue(
- cacheName,
- cacheCtx.cacheObjectContext(),
- cacheDataRow.key(),
- cacheDataRow.value(),
- true
- );
+ if (cacheDataRow.link() == 0L)
+ throw new IgniteException("Contains invalid partition row, possibly deleted");
- if (isNull(qryTypeDesc))
- continue; // Tolerate - (k, v) is just not indexed.
+ String cacheName = cacheCtx.name();
- String tableName = qryTypeDesc.tableName();
+ QueryTypeDescriptorImpl qryTypeDesc = qryProcessor.typeByValue(
+ cacheName,
+ cacheCtx.cacheObjectContext(),
+ cacheDataRow.key(),
+ cacheDataRow.value(),
+ true
+ );
- GridH2Table gridH2Tbl = h2Indexing.schemaManager().dataTable(cacheName, tableName);
+ if (isNull(qryTypeDesc))
+ continue; // Tolerate - (k, v) is just not indexed.
- if (isNull(gridH2Tbl))
- continue; // Tolerate - (k, v) is just not indexed.
+ String tableName = qryTypeDesc.tableName();
- cacheSizeByTbl.computeIfAbsent(cacheCtx.cacheId(), i -> new HashMap<>())
- .computeIfAbsent(tableName, s -> new AtomicLong()).incrementAndGet();
- }
+ GridH2Table gridH2Tbl = h2Indexing.schemaManager().dataTable(cacheName, tableName);
- PartitionUpdateCounter updateCntrAfter = locPart.dataStore().partUpdateCounter();
+ if (isNull(gridH2Tbl))
+ continue; // Tolerate - (k, v) is just not indexed.
- if (updateCntrAfter != null && !updateCntrAfter.equals(updateCntrBefore)) {
- throw new GridNotIdleException(GRID_NOT_IDLE_MSG + "[grpName=" + grpCtx.cacheOrGroupName() +
- ", grpId=" + grpCtx.groupId() + ", partId=" + locPart.id() + "] changed during size " +
- "calculation [updCntrBefore=" + updateCntrBefore + ", updCntrAfter=" + updateCntrAfter + "]");
- }
+ cacheSizeByTbl.computeIfAbsent(cacheCtx.cacheId(), i -> new HashMap<>())
+ .computeIfAbsent(tableName, s -> new AtomicLong()).incrementAndGet();
+ }
+
+ PartitionUpdateCounter updateCntrAfter = locPart.dataStore().partUpdateCounter();
- return new CacheSize(null, cacheSizeByTbl);
+ if (updateCntrAfter != null && !updateCntrAfter.equals(updateCntrBefore)) {
+ throw new GridNotIdleException(GRID_NOT_IDLE_MSG + "[grpName=" + grpCtx.cacheOrGroupName() +
+ ", grpId=" + grpCtx.groupId() + ", partId=" + locPart.id() + "] changed during size " +
+ "calculation [updCntrBefore=" + updateCntrBefore + ", updCntrAfter=" + updateCntrAfter + "]");
}
- catch (Throwable t) {
- IgniteException cacheSizeErr = new IgniteException("Cache size calculation error [" +
- cacheGrpInfo(grpCtx) + ", locParId=" + partId + ", err=" + t.getMessage() + "]", t);
- error(log, cacheSizeErr);
+ return new CacheSize(null, cacheSizeByTbl);
+ }
+ catch (Throwable t) {
+ IgniteException cacheSizeErr = new IgniteException("Cache size calculation error [" +
+ cacheGrpInfo(grpCtx) + ", locParId=" + partId + ", err=" + t.getMessage() + "]", t);
+
+ error(log, cacheSizeErr);
- failCalcCacheSizeGrpIds.add(grpId);
+ failCalcCacheSizeGrpIds.add(grpId);
- return new CacheSize(cacheSizeErr, null);
- }
- finally {
- if (reserve)
- locPart.release();
- }
+ return new CacheSize(cacheSizeErr, null);
}
finally {
- processedCacheSizePartitions.incrementAndGet();
-
- printProgressOfIndexValidationIfNeeded();
+ if (reserve)
+ locPart.release();
}
- });
+ }
+ finally {
+ processedCacheSizePartitions.incrementAndGet();
+
+ printProgressOfIndexValidationIfNeeded();
+ }
}
/**
* Asynchronous calculation of the index size for cache.
*
* @param cacheCtx Cache context.
- * @param idx Index.
+ * @param idx Index.
* @param idleChecker Idle check closure.
* @return Future with index size.
*/
@@ -1056,37 +1097,56 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
IgniteInClosure<Integer> idleChecker
) {
return calcExecutor.submit(() -> {
- try {
- if (failCalcCacheSizeGrpIds.contains(cacheCtx.groupId()))
- return new T2<>(null, 0L);
+ return calcIndexSize(cacheCtx, idx, idleChecker);
+ });
+ }
- String cacheName = cacheCtx.name();
- String tblName = idx.getTable().getName();
- String idxName = idx.getName();
+ /**
+ * Calculation of the index size for cache.
+ *
+ * @param cacheCtx Cache context.
+ * @param idx Index.
+ * @param idleChecker Idle check closure.
+ * @return Tuple contains exception if it happened and size of index.
+ */
+ private T2<Throwable, Long> calcIndexSize(
+ GridCacheContext cacheCtx,
+ Index idx,
+ IgniteInClosure<Integer> idleChecker
+ ) {
+ if (validateCtx.isCancelled())
+ return new T2<>(null, 0L);
- try {
- long indexSize = ignite.context().query().getIndexing().indexSize(cacheName, tblName, idxName);
+ try {
+ if (failCalcCacheSizeGrpIds.contains(cacheCtx.groupId()))
+ return new T2<>(null, 0L);
- idleChecker.apply(cacheCtx.groupId());
+ String cacheName = cacheCtx.name();
+ String tblName = idx.getTable().getName();
+ String idxName = idx.getName();
- return new T2<>(null, indexSize);
- }
- catch (Throwable t) {
- Throwable idxSizeErr = new IgniteException("Index size calculation error [" +
- cacheGrpInfo(cacheCtx.group()) + ", " + cacheInfo(cacheCtx) + ", tableName=" +
- tblName + ", idxName=" + idxName + ", err=" + t.getMessage() + "]", t);
+ try {
+ long indexSize = ignite.context().query().getIndexing().indexSize(cacheName, tblName, idxName);
- error(log, idxSizeErr);
+ idleChecker.apply(cacheCtx.groupId());
- return new T2<>(idxSizeErr, 0L);
- }
+ return new T2<>(null, indexSize);
}
- finally {
- processedIdxSizes.incrementAndGet();
+ catch (Throwable t) {
+ Throwable idxSizeErr = new IgniteException("Index size calculation error [" +
+ cacheGrpInfo(cacheCtx.group()) + ", " + cacheInfo(cacheCtx) + ", tableName=" +
+ tblName + ", idxName=" + idxName + ", err=" + t.getMessage() + "]", t);
+
+ error(log, idxSizeErr);
- printProgressOfIndexValidationIfNeeded();
+ return new T2<>(idxSizeErr, 0L);
}
- });
+ }
+ finally {
+ processedIdxSizes.incrementAndGet();
+
+ printProgressOfIndexValidationIfNeeded();
+ }
}
/**
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesContext.java b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesContext.java
new file mode 100644
index 0000000..48909d6
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesContext.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.verify;
+
+/**
+ * Validate indexes context.
+ */
+public interface ValidateIndexesContext {
+ /**
+ * Returns a boolean value meaning whether the check is canceled or not.
+ *
+ * @return True if cancelled, otherwise false.
+ */
+ public boolean isCancelled();
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesTask.java b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesTask.java
index 08e85e5..0c385a9 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesTask.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesTask.java
@@ -26,6 +26,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.internal.processors.task.GridInternal;
@@ -33,12 +34,15 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.visor.VisorJob;
import org.apache.ignite.internal.visor.VisorMultiNodeTask;
import org.apache.ignite.internal.visor.VisorTaskArgument;
+import org.apache.ignite.internal.visor.annotation.InterruptibleVisorTask;
+import org.apache.ignite.resources.LoggerResource;
import org.jetbrains.annotations.Nullable;
/**
*
*/
@GridInternal
+@InterruptibleVisorTask
public class VisorValidateIndexesTask extends VisorMultiNodeTask<VisorValidateIndexesTaskArg,
VisorValidateIndexesTaskResult, VisorValidateIndexesJobResult> {
/** */
@@ -94,6 +98,10 @@ public class VisorValidateIndexesTask extends VisorMultiNodeTask<VisorValidateIn
/** */
private static final long serialVersionUID = 0L;
+ /** Injected logger. */
+ @LoggerResource
+ private IgniteLogger log;
+
/**
* @param arg Argument.
* @param debug Debug.
@@ -106,6 +114,7 @@ public class VisorValidateIndexesTask extends VisorMultiNodeTask<VisorValidateIn
@Override protected VisorValidateIndexesJobResult run(@Nullable VisorValidateIndexesTaskArg arg) throws IgniteException {
try {
ValidateIndexesClosure clo = new ValidateIndexesClosure(
+ this::isCancelled,
arg.getCaches(),
arg.getCheckFirst(),
arg.getCheckThrough(),
@@ -118,11 +127,20 @@ public class VisorValidateIndexesTask extends VisorMultiNodeTask<VisorValidateIn
return clo.call();
}
catch (Exception e) {
+ cancel();
+
throw new IgniteException(e);
}
}
/** {@inheritDoc} */
+ @Override public void cancel() {
+ log.warning("Index validation was cancelled.");
+
+ super.cancel();
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(VisorValidateIndexesJob.class, this);
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java
index f768054..4ce570e 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java
@@ -179,6 +179,7 @@ public class IgnitePdsIndexingDefragmentationTest extends IgnitePdsDefragmentati
*/
private static void validateIndexes(IgniteEx node) throws Exception {
ValidateIndexesClosure clo = new ValidateIndexesClosure(
+ () -> false,
Collections.singleton(DEFAULT_CACHE_NAME),
0,
0,
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IndexingMultithreadedLoadContinuousRestartTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IndexingMultithreadedLoadContinuousRestartTest.java
index 4253f40..a31ea8f 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IndexingMultithreadedLoadContinuousRestartTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IndexingMultithreadedLoadContinuousRestartTest.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.ignite.internal.processors.cache.persistence.db;
import java.util.Arrays;
@@ -156,7 +157,8 @@ public class IndexingMultithreadedLoadContinuousRestartTest extends GridCommonAb
forceCheckpoint();
// Validate indexes on start.
- ValidateIndexesClosure clo = new ValidateIndexesClosure(Collections.singleton(CACHE_NAME), 0, 0, false, true);
+ ValidateIndexesClosure clo = new ValidateIndexesClosure(() -> false, Collections.singleton(CACHE_NAME),
+ 0, 0, false, true);
ignite.context().resource().injectGeneric(clo);
VisorValidateIndexesJobResult res = clo.call();
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotWithIndexesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotWithIndexesTest.java
index a263df8..d45ba0e 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotWithIndexesTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotWithIndexesTest.java
@@ -109,7 +109,8 @@ public class IgniteClusterSnapshotWithIndexesTest extends AbstractSnapshotSelfTe
forceCheckpoint();
// Validate indexes on start.
- ValidateIndexesClosure clo = new ValidateIndexesClosure(new HashSet<>(Arrays.asList(indexedCcfg.getName(), tblName)),
+ ValidateIndexesClosure clo = new ValidateIndexesClosure(() -> false,
+ new HashSet<>(Arrays.asList(indexedCcfg.getName(), tblName)),
0, 0, false, true);
for (Ignite node : G.allGrids()) {
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/RebuildIndexTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/RebuildIndexTest.java
index 386d731..5ed9649 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/RebuildIndexTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/RebuildIndexTest.java
@@ -186,7 +186,7 @@ public class RebuildIndexTest extends GridCommonAbstractTest {
enableCheckpoints(G.allGrids(), false);
// Validate indexes on start.
- ValidateIndexesClosure clo = new ValidateIndexesClosure(Collections.singleton(CACHE_NAME), 0, 0, false, true);
+ ValidateIndexesClosure clo = new ValidateIndexesClosure(() -> false, Collections.singleton(CACHE_NAME), 0, 0, false, true);
node.context().resource().injectGeneric(clo);
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/RebuildIndexWithHistoricalRebalanceTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/RebuildIndexWithHistoricalRebalanceTest.java
index 75590b8..1bd2da7 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/RebuildIndexWithHistoricalRebalanceTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/RebuildIndexWithHistoricalRebalanceTest.java
@@ -229,7 +229,7 @@ public class RebuildIndexWithHistoricalRebalanceTest extends GridCommonAbstractT
awaitPartitionMapExchange();
- ValidateIndexesClosure clo = new ValidateIndexesClosure(Collections.singleton(CACHE_NAME), 0, 0, false, true);
+ ValidateIndexesClosure clo = new ValidateIndexesClosure(() -> false, Collections.singleton(CACHE_NAME), 0, 0, false, true);
node2.context().resource().injectGeneric(clo);
VisorValidateIndexesJobResult res = clo.call();