You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2020/12/23 15:38:15 UTC

[ignite] branch master updated: IGNITE-13866 validate_indexes command is interrupted if connection to initiator is broken. Fixes #8593

This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 873bb4f  IGNITE-13866 validate_indexes command is interrupted if connection to initiator is broken. Fixes #8593
873bb4f is described below

commit 873bb4f67716cf41fd1bc339a026dbeaf7fa664f
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Wed Dec 23 18:37:33 2020 +0300

    IGNITE-13866 validate_indexes command is interrupted if connection to initiator is broken. Fixes #8593
    
    Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
 .../testsuites/IgniteControlUtilityTestSuite.java  |   2 +
 .../GridCommandHandlerInterruptCommandTest.java    | 326 +++++++++++++++++++++
 .../rest/protocols/tcp/GridTcpRestNioListener.java | 104 ++++++-
 .../visor/annotation/InterruptibleVisorTask.java   |  32 ++
 .../internal/visor/verify/VisorIdleVerifyJob.java  |  13 +
 .../visor/verify/ValidateIndexesClosure.java       | 254 ++++++++++------
 .../visor/verify/ValidateIndexesContext.java       |  30 ++
 .../visor/verify/VisorValidateIndexesTask.java     |  18 ++
 .../IgnitePdsIndexingDefragmentationTest.java      |   1 +
 ...xingMultithreadedLoadContinuousRestartTest.java |   4 +-
 .../IgniteClusterSnapshotWithIndexesTest.java      |   3 +-
 .../processors/database/RebuildIndexTest.java      |   2 +-
 .../RebuildIndexWithHistoricalRebalanceTest.java   |   2 +-
 13 files changed, 684 insertions(+), 107 deletions(-)

diff --git a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java
index cac12d4..6466487 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java
@@ -34,6 +34,7 @@ import org.apache.ignite.util.GridCommandHandlerIndexingClusterByClassTest;
 import org.apache.ignite.util.GridCommandHandlerIndexingClusterByClassWithSSLTest;
 import org.apache.ignite.util.GridCommandHandlerIndexingTest;
 import org.apache.ignite.util.GridCommandHandlerIndexingWithSSLTest;
+import org.apache.ignite.util.GridCommandHandlerInterruptCommandTest;
 import org.apache.ignite.util.GridCommandHandlerMetadataTest;
 import org.apache.ignite.util.GridCommandHandlerPropertiesTest;
 import org.apache.ignite.util.GridCommandHandlerSslTest;
@@ -68,6 +69,7 @@ import org.junit.runners.Suite;
     GridCommandHandlerIndexingClusterByClassWithSSLTest.class,
     GridCommandHandlerIndexingCheckSizeTest.class,
     GridCommandHandlerCheckIndexesInlineSizeTest.class,
+    GridCommandHandlerInterruptCommandTest.class,
     GridCommandHandlerMetadataTest.class,
 
     KillCommandsCommandShTest.class,
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerInterruptCommandTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerInterruptCommandTest.java
new file mode 100644
index 0000000..d750439
--- /dev/null
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerInterruptCommandTest.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.util;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.cache.QueryIndexType;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.DeploymentEvent;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.visor.verify.ValidateIndexesClosure;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_UNEXPECTED_ERROR;
+
+/**
+ * Checks cancel of execution validate_indexes command.
+ */
+public class GridCommandHandlerInterruptCommandTest extends GridCommandHandlerAbstractTest {
+    /** Load loop cycles. */
+    private static final int LOAD_LOOP = 500_000;
+
+    /** Idle verify task name. */
+    private static final String IDLE_VERIFY_TASK_V2 = "org.apache.ignite.internal.visor.verify.VisorIdleVerifyTaskV2";
+
+    /** Validate index task name. */
+    private static final String VALIDATE_INDEX_TASK = "org.apache.ignite.internal.visor.verify.VisorValidateIndexesTask";
+
+    /** Log listener. */
+    private ListeningTestLogger lnsrLog;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+
+        super.afterTest();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setGridLogger(lnsrLog)
+            .setDataStorageConfiguration(new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                    .setPersistenceEnabled(true)
+                    .setInitialSize(200L * 1024 * 1024)
+                    .setMaxSize(200L * 1024 * 1024)
+                )
+            )
+            .setIncludeEventTypes(EventType.EVT_TASK_DEPLOYED)
+            .setCacheConfiguration(new CacheConfiguration<Integer, UserValue>(DEFAULT_CACHE_NAME)
+                .setName(DEFAULT_CACHE_NAME)
+                .setQueryEntities(Collections.singleton(createQueryEntity())));
+    }
+
+    /**
+     * Creates predifened query entity.
+     *
+     * @return Query entity.
+     */
+    private QueryEntity createQueryEntity() {
+        QueryEntity qryEntity = new QueryEntity();
+        qryEntity.setKeyType(Integer.class.getTypeName());
+        qryEntity.setValueType(UserValue.class.getName());
+        qryEntity.setTableName("USER_TEST_TABLE");
+
+        LinkedHashMap<String, String> fields = new LinkedHashMap<>();
+        fields.put("x", "java.lang.Integer");
+        fields.put("y", "java.lang.Integer");
+        fields.put("z", "java.lang.Integer");
+        qryEntity.setFields(fields);
+
+        LinkedHashMap<String, Boolean> idxFields = new LinkedHashMap<>();
+
+        QueryIndex idx2 = new QueryIndex();
+        idx2.setName("IDX_2");
+        idx2.setIndexType(QueryIndexType.SORTED);
+        idxFields = new LinkedHashMap<>();
+        idxFields.put("x", false);
+        idx2.setFields(idxFields);
+
+        QueryIndex idx3 = new QueryIndex();
+        idx3.setName("IDX_3");
+        idx3.setIndexType(QueryIndexType.SORTED);
+        idxFields = new LinkedHashMap<>();
+        idxFields.put("y", false);
+        idx3.setFields(idxFields);
+
+        QueryIndex idx4 = new QueryIndex();
+        idx4.setName("IDX_4");
+        idx4.setIndexType(QueryIndexType.SORTED);
+        idxFields = new LinkedHashMap<>();
+        idxFields.put("z", false);
+        idx4.setFields(idxFields);
+
+        qryEntity.setIndexes(Arrays.asList(idx2, idx3, idx4));
+        return qryEntity;
+    }
+
+    /**
+     * User value.
+     */
+    private static class UserValue {
+        /** X. */
+        private int x;
+
+        /** Y. */
+        private int y;
+
+        /** Z. */
+        private int z;
+
+        /**
+         * @param x X.
+         * @param y Y.
+         * @param z Z.
+         */
+        public UserValue(int x, int y, int z) {
+            this.x = x;
+            this.y = y;
+            this.z = z;
+        }
+
+        /**
+         * @param seed Seed.
+         */
+        public UserValue(long seed) {
+            x = (int)(seed % 6991);
+            y = (int)(seed % 18679);
+            z = (int)(seed % 31721);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(UserValue.class, this);
+        }
+    }
+
+    /**
+     * Checks that validate_indexes command will cancel after it interrupted.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testValidateIndexesCommand() throws Exception {
+        lnsrLog = new ListeningTestLogger(false, log);
+
+        IgniteEx ignite = startGrid(0);
+
+        ignite.cluster().active(true);
+
+        preloadeData(ignite);
+
+        CountDownLatch startTaskLatch = waitForTaskEvent(ignite, VALIDATE_INDEX_TASK);
+
+        LogListener lnsrValidationCancelled = LogListener.matches("Index validation was cancelled.").build();
+
+        lnsrLog.registerListener(lnsrValidationCancelled);
+
+        IgniteInternalFuture fut = GridTestUtils.runAsync(() ->
+            assertSame(EXIT_CODE_UNEXPECTED_ERROR, execute("--cache", "validate_indexes")));
+
+        startTaskLatch.await();
+
+        fut.cancel();
+
+        fut.get();
+
+        assertTrue(GridTestUtils.waitForCondition(() ->
+            ignite.compute().activeTaskFutures().isEmpty(), 10_000));
+
+        assertTrue(GridTestUtils.waitForCondition(lnsrValidationCancelled::check, 10_000));
+    }
+
+    /**
+     * Checks that idle verify command will not cancel if initiator client interrupted.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testIdleVerifyCommand() throws Exception {
+        lnsrLog = new ListeningTestLogger(false, log);
+
+        IgniteEx ignite = startGrid(0);
+
+        ignite.cluster().active(true);
+
+        preloadeData(ignite);
+
+        CountDownLatch startTaskLatch = waitForTaskEvent(ignite, IDLE_VERIFY_TASK_V2);
+
+        LogListener lnsrValidationCancelled = LogListener.matches("Idle verify was cancelled.").build();
+
+        lnsrLog.registerListener(lnsrValidationCancelled);
+
+        IgniteInternalFuture fut = GridTestUtils.runAsync(() ->
+            assertSame(EXIT_CODE_UNEXPECTED_ERROR, execute("--cache", "idle_verify")));
+
+        startTaskLatch.await();
+
+        fut.cancel();
+
+        fut.get();
+
+        assertTrue(GridTestUtils.waitForCondition(() ->
+            ignite.compute().activeTaskFutures().isEmpty(), 30_000));
+
+        assertFalse(lnsrValidationCancelled.check());
+    }
+
+    /**
+     * Method subscribe on task event and return a latch for waiting.
+     *
+     * @param ignite Ignite.
+     * @param taskName Task name.
+     * @return Latch which will open after event received.
+     */
+
+    private CountDownLatch waitForTaskEvent(IgniteEx ignite, String taskName) {
+        CountDownLatch startTaskLatch = new CountDownLatch(1);
+
+        ignite.events().localListen((evt) -> {
+            assertTrue(evt instanceof DeploymentEvent);
+
+            if (taskName.equals(((DeploymentEvent)evt).alias())) {
+                startTaskLatch.countDown();
+
+                return false;
+            }
+
+            return true;
+        }, EventType.EVT_TASK_DEPLOYED);
+        return startTaskLatch;
+    }
+
+    /**
+     * Preload data to default cache.
+     *
+     * @param ignite Ignite.
+     */
+    private void preloadeData(IgniteEx ignite) {
+        try (IgniteDataStreamer streamr = ignite.dataStreamer(DEFAULT_CACHE_NAME)) {
+            for (int i = 0; i < LOAD_LOOP; i++)
+                streamr.addData(i, new UserValue(i));
+        }
+    }
+
+    /**
+     * Test invokes index validation closure and canceling it after started.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testCancelValidateIndexesClosure() throws Exception {
+        IgniteEx ignite0 = startGrid(0);
+
+        ignite0.cluster().active(true);
+
+        preloadeData(ignite0);
+
+        AtomicBoolean cancelled = new AtomicBoolean(false);
+
+        ValidateIndexesClosure clo = new ValidateIndexesClosure(cancelled::get, Collections.singleton(DEFAULT_CACHE_NAME),
+            0, 0, false, true);
+
+        ListeningTestLogger listeningLogger = new ListeningTestLogger(false, log);
+
+        GridTestUtils.setFieldValue(clo, "ignite", ignite0);
+        GridTestUtils.setFieldValue(clo, "log", listeningLogger);
+
+        LogListener lnsrValidationStarted = LogListener.matches("Current progress of ValidateIndexesClosure").build();
+
+        listeningLogger.registerListener(lnsrValidationStarted);
+
+        IgniteInternalFuture fut = GridTestUtils.runAsync(() ->
+            GridTestUtils.assertThrows(log, clo::call, IgniteException.class, ValidateIndexesClosure.CANCELLED_MSG));
+
+        assertTrue(GridTestUtils.waitForCondition(lnsrValidationStarted::check, 10_000));
+
+        assertFalse(fut.isDone());
+
+        cancelled.set(true);
+
+        fut.get(10_000);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java
index 6195460..aace013 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java
@@ -23,6 +23,8 @@ import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
@@ -67,6 +69,7 @@ import org.apache.ignite.internal.util.nio.GridNioSession;
 import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.visor.annotation.InterruptibleVisorTask;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_APPEND;
@@ -159,6 +162,10 @@ public class GridTcpRestNioListener extends GridNioServerListenerAdapter<GridCli
     /** Handler for all Redis requests. */
     private GridRedisNioListener redisLsnr;
 
+    /** Futures of currently executing tasks that can be interrupted if their session is closed
+     * (e.g. because of user interrupting control.sh operation). */
+    private final Map<GridNioSession, Set<IgniteInternalFuture>> sesInterruptibleFutMap = new ConcurrentHashMap<>();
+
     /**
      * Creates listener which will convert incoming tcp packets to rest requests and forward them to
      * a given rest handler.
@@ -196,6 +203,8 @@ public class GridTcpRestNioListener extends GridNioServerListenerAdapter<GridCli
 
     /** {@inheritDoc} */
     @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
+        onSessionClosed(ses);
+
         if (e != null) {
             if (e instanceof RuntimeException)
                 U.error(log, "Failed to process request from remote client: " + ses, e);
@@ -222,7 +231,7 @@ public class GridTcpRestNioListener extends GridNioServerListenerAdapter<GridCli
                     ", ver=" + ver +
                     ", supported=" + SUPP_VERS + ']');
 
-                ses.close();
+                onSessionClosed(ses);
             }
             else {
                 byte marshId = hs.marshallerId();
@@ -233,7 +242,7 @@ public class GridTcpRestNioListener extends GridNioServerListenerAdapter<GridCli
                     } catch (IgniteInterruptedCheckedException e) {
                         U.error(log, "Marshaller is not initialized.", e);
 
-                        ses.close();
+                        onSessionClosed(ses);
 
                         return;
                     }
@@ -245,7 +254,7 @@ public class GridTcpRestNioListener extends GridNioServerListenerAdapter<GridCli
                     U.error(log, "Client marshaller ID is invalid. Note that .NET and C++ clients " +
                         "are supported only in enterprise edition [ses=" + ses + ", marshId=" + marshId + ']');
 
-                    ses.close();
+                    onSessionClosed(ses);
                 }
                 else {
                     ses.addMeta(MARSHALLER.ordinal(), marsh);
@@ -257,9 +266,16 @@ public class GridTcpRestNioListener extends GridNioServerListenerAdapter<GridCli
         else {
             final GridRestRequest req = createRestRequest(ses, msg);
 
-            if (req != null)
-                hnd.handleAsync(req).listen(new CI1<IgniteInternalFuture<GridRestResponse>>() {
+            if (req != null) {
+                IgniteInternalFuture<GridRestResponse> taskFut = hnd.handleAsync(req);
+
+                if (isInterruptible(msg))
+                    addFutureToSession(ses, taskFut);
+
+                taskFut.listen(new CI1<IgniteInternalFuture<GridRestResponse>>() {
                     @Override public void apply(IgniteInternalFuture<GridRestResponse> fut) {
+                        removeFutureFromSession(ses, taskFut);
+
                         GridClientResponse res = new GridClientResponse();
 
                         res.requestId(msg.requestId());
@@ -303,6 +319,7 @@ public class GridTcpRestNioListener extends GridNioServerListenerAdapter<GridCli
                         });
                     }
                 });
+            }
             else
                 U.error(log, "Failed to process client request (unknown packet type) [ses=" + ses +
                     ", msg=" + msg + ']');
@@ -310,6 +327,81 @@ public class GridTcpRestNioListener extends GridNioServerListenerAdapter<GridCli
     }
 
     /**
+     * This method checks if request in client message can be interrupted.
+     *
+     * @param msg Message.
+     * @return True of task can be interrupted, false otherwise.
+     */
+    private boolean isInterruptible(GridClientMessage msg) {
+        if (!(msg instanceof GridClientTaskRequest))
+            return false;
+
+        GridClientTaskRequest taskRequest = (GridClientTaskRequest)msg;
+
+        try {
+            return U.hasAnnotation(U.forName(taskRequest.taskName(), null), InterruptibleVisorTask.class);
+        }
+        catch (ClassNotFoundException e) {
+            log.warning("Task closure can't be found: [task=" + taskRequest.taskName() + ']', e);
+
+            return false;
+        }
+    }
+
+    /**
+     * Memorize operation future until it was not completed.
+     *
+     * @param ses Session.
+     * @param fut Operation future.
+     */
+    private void addFutureToSession(GridNioSession ses, IgniteInternalFuture fut) {
+        sesInterruptibleFutMap.computeIfAbsent(ses, key ->
+            ConcurrentHashMap.newKeySet())
+            .add(fut);
+    }
+
+    /**
+     * Remove completed future from internal structure.
+     *
+     * @param ses Session.
+     * @param fut Operation future.
+     */
+    private void removeFutureFromSession(GridNioSession ses, IgniteInternalFuture fut) {
+        assert fut.isDone() : "Operation is in progress, session: " + ses;
+
+        sesInterruptibleFutMap.computeIfPresent(ses, (key, futs) -> {
+            futs.remove(fut);
+
+            return futs;
+        });
+    }
+
+    /**
+     * Close all future associated with given session.
+     *
+     * @param ses Session.
+     */
+    public void onSessionClosed(GridNioSession ses) {
+        sesInterruptibleFutMap.computeIfPresent(ses, (key, pendingFuts) -> {
+            for (IgniteInternalFuture fut : pendingFuts) {
+                try {
+                    if (!fut.isDone())
+                        fut.cancel();
+                }
+                catch (IgniteCheckedException e) {
+                    log.warning("Future was not cancelled: " + fut, e);
+                }
+            }
+
+            return pendingFuts;
+        });
+
+        sesInterruptibleFutMap.remove(ses);
+
+        ses.close();
+    }
+
+    /**
      * Creates a REST request object from client TCP binary packet.
      *
      * @param ses NIO session.
@@ -459,6 +551,6 @@ public class GridTcpRestNioListener extends GridNioServerListenerAdapter<GridCli
      * @param ses Session, that was inactive.
      */
     @Override public void onSessionIdleTimeout(GridNioSession ses) {
-        ses.close();
+        onSessionClosed(ses);
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/annotation/InterruptibleVisorTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/annotation/InterruptibleVisorTask.java
new file mode 100644
index 0000000..09439b1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/annotation/InterruptibleVisorTask.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * This annotation using for Visor's task which can interrupted by system reason.
+ * For example, when a connection between cluster and task initiator is breaking.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE})
+public @interface InterruptibleVisorTask {
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyJob.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyJob.java
index d4652cc..ceeb717 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyJob.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.visor.verify;
 
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.compute.ComputeJobContext;
 import org.apache.ignite.compute.ComputeTask;
 import org.apache.ignite.compute.ComputeTaskFuture;
@@ -26,6 +27,7 @@ import org.apache.ignite.internal.visor.VisorJob;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.resources.JobContextResource;
+import org.apache.ignite.resources.LoggerResource;
 
 /**
  *
@@ -41,6 +43,10 @@ class VisorIdleVerifyJob<ResultT> extends VisorJob<VisorIdleVerifyTaskArg, Resul
     @JobContextResource
     protected transient ComputeJobContext jobCtx;
 
+    /** Injected logger. */
+    @LoggerResource
+    private IgniteLogger log;
+
     /** Task class for execution */
     private final Class<? extends ComputeTask<VisorIdleVerifyTaskArg, ResultT>> taskCls;
 
@@ -76,6 +82,13 @@ class VisorIdleVerifyJob<ResultT> extends VisorJob<VisorIdleVerifyTaskArg, Resul
     }
 
     /** {@inheritDoc} */
+    @Override public void cancel() {
+        log.warning("Idle verify was cancelled.");
+
+        super.cancel();
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(VisorIdleVerifyJob.class, this);
     }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
index c83d62c..64c17eb 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
@@ -113,6 +113,9 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** Exception message throwing when closure was cancelled. */
+    public static final String CANCELLED_MSG = "Closure of index validation was cancelled.";
+
     /** Ignite. */
     @IgniteInstanceResource
     private transient IgniteEx ignite;
@@ -169,16 +172,28 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
     /** Group cache ids when calculating cache size was an error. */
     private final Set<Integer> failCalcCacheSizeGrpIds = newSetFromMap(new ConcurrentHashMap<>());
 
+    /** Validate index context. */
+    private final ValidateIndexesContext validateCtx;
+
     /**
      * Constructor.
      *
+     * @param validateCtx Context of validate index closure.
      * @param cacheNames Cache names.
      * @param checkFirst If positive only first K elements will be validated.
      * @param checkThrough If positive only each Kth element will be validated.
      * @param checkCrc Check CRC sum on stored pages on disk.
      * @param checkSizes Check that index size and cache size are same.
      */
-    public ValidateIndexesClosure(Set<String> cacheNames, int checkFirst, int checkThrough, boolean checkCrc, boolean checkSizes) {
+    public ValidateIndexesClosure(
+        ValidateIndexesContext validateCtx,
+        Set<String> cacheNames,
+        int checkFirst,
+        int checkThrough,
+        boolean checkCrc,
+        boolean checkSizes
+    ) {
+        this.validateCtx = validateCtx;
         this.cacheNames = cacheNames;
         this.checkFirst = checkFirst;
         this.checkThrough = checkThrough;
@@ -241,6 +256,9 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
      *
      */
     private VisorValidateIndexesJobResult call0() {
+        if (validateCtx.isCancelled())
+            throw new IgniteException(CANCELLED_MSG);
+
         Set<Integer> grpIds = collectGroupIds();
 
         /** Update counters per partition per group. */
@@ -393,6 +411,9 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
             throw unwrapFutureException(e);
         }
 
+        if (validateCtx.isCancelled())
+            throw new IgniteException(CANCELLED_MSG);
+
         return new VisorValidateIndexesJobResult(
             partResults,
             idxResults,
@@ -514,7 +535,7 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
         CacheGroupContext grpCtx,
         GridDhtLocalPartition part
     ) {
-        if (!part.reserve())
+        if (validateCtx.isCancelled() || !part.reserve())
             return emptyMap();
 
         ValidateIndexesPartitionResult partRes;
@@ -607,9 +628,9 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
         long current = 0;
         long processedNumber = 0;
 
-        while (it.hasNextX()) {
-            if (enoughIssues)
-                break;
+            while (it.hasNextX() && !validateCtx.isCancelled()) {
+                if (enoughIssues)
+                    break;
 
             CacheDataRow row = it.nextX();
 
@@ -677,9 +698,12 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
 
             ArrayList<Index> indexes = gridH2Tbl.getIndexes();
 
-            for (Index idx : indexes) {
-                if (!(idx instanceof H2TreeIndexBase))
-                    continue;
+                for (Index idx : indexes) {
+                    if (validateCtx.isCancelled())
+                        break;
+
+                    if (!(idx instanceof H2TreeIndexBase))
+                        continue;
 
                 try {
                     Cursor cursor = idx.find(session, h2Row, h2Row);
@@ -792,6 +816,9 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
         T2<GridCacheContext, Index> cacheCtxWithIdx,
         IgniteInClosure<Integer> idleChecker
     ) {
+        if (validateCtx.isCancelled())
+            return emptyMap();
+
         GridCacheContext ctx = cacheCtxWithIdx.get1();
 
         Index idx = cacheCtxWithIdx.get2();
@@ -826,7 +853,7 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
 
         KeyCacheObject previousKey = null;
 
-        while (!enoughIssues) {
+        while (!enoughIssues && !validateCtx.isCancelled()) {
             KeyCacheObject h2key = null;
 
             try {
@@ -943,110 +970,124 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
         GridDhtLocalPartition locPart
     ) {
         return calcExecutor.submit(() -> {
-            try {
-                @Nullable PartitionUpdateCounter updCntr = locPart.dataStore().partUpdateCounter();
+            return calcCacheSize(grpCtx, locPart);
+        });
+    }
 
-                PartitionUpdateCounter updateCntrBefore = updCntr == null ? updCntr : updCntr.copy();
+    /**
+     * Calculation of caches size with divided by tables.
+     *
+     * @param grpCtx Cache group context.
+     * @param locPart Local partition.
+     * @return Cache size representation object.
+     */
+    private CacheSize calcCacheSize(CacheGroupContext grpCtx,GridDhtLocalPartition locPart) {
+        try {
+            if (validateCtx.isCancelled())
+                return new CacheSize(null, emptyMap());
 
-                int grpId = grpCtx.groupId();
+            @Nullable PartitionUpdateCounter updCntr = locPart.dataStore().partUpdateCounter();
 
-                if (failCalcCacheSizeGrpIds.contains(grpId))
-                    return new CacheSize(null, null);
+            PartitionUpdateCounter updateCntrBefore = updCntr == null ? updCntr : updCntr.copy();
 
-                boolean reserve = false;
+            int grpId = grpCtx.groupId();
 
-                int partId = locPart.id();
+            if (failCalcCacheSizeGrpIds.contains(grpId))
+                return new CacheSize(null, null);
 
-                try {
-                    if (!(reserve = locPart.reserve()))
-                        throw new IgniteException("Can't reserve partition");
+            boolean reserve = false;
 
-                    if (locPart.state() != OWNING)
-                        throw new IgniteException("Partition not in state " + OWNING);
+            int partId = locPart.id();
 
-                    Map<Integer, Map<String, AtomicLong>> cacheSizeByTbl = new HashMap<>();
+            try {
+                if (!(reserve = locPart.reserve()))
+                    throw new IgniteException("Can't reserve partition");
 
-                    GridIterator<CacheDataRow> partIter = grpCtx.offheap().partitionIterator(partId);
+                if (locPart.state() != OWNING)
+                    throw new IgniteException("Partition not in state " + OWNING);
 
-                    GridQueryProcessor qryProcessor = ignite.context().query();
-                    IgniteH2Indexing h2Indexing = (IgniteH2Indexing)qryProcessor.getIndexing();
+                Map<Integer, Map<String, AtomicLong>> cacheSizeByTbl = new HashMap<>();
 
-                    while (partIter.hasNextX() && !failCalcCacheSizeGrpIds.contains(grpId)) {
-                        CacheDataRow cacheDataRow = partIter.nextX();
+                GridIterator<CacheDataRow> partIter = grpCtx.offheap().partitionIterator(partId);
 
-                        int cacheId = cacheDataRow.cacheId();
+                GridQueryProcessor qryProcessor = ignite.context().query();
+                IgniteH2Indexing h2Indexing = (IgniteH2Indexing)qryProcessor.getIndexing();
 
-                        GridCacheContext cacheCtx = cacheId == 0 ?
-                            grpCtx.singleCacheContext() : grpCtx.shared().cacheContext(cacheId);
+                while (partIter.hasNextX() && !failCalcCacheSizeGrpIds.contains(grpId)) {
+                    CacheDataRow cacheDataRow = partIter.nextX();
 
-                        if (cacheCtx == null)
-                            throw new IgniteException("Unknown cacheId of CacheDataRow: " + cacheId);
+                    int cacheId = cacheDataRow.cacheId();
 
-                        if (cacheDataRow.link() == 0L)
-                            throw new IgniteException("Contains invalid partition row, possibly deleted");
+                    GridCacheContext cacheCtx = cacheId == 0 ?
+                        grpCtx.singleCacheContext() : grpCtx.shared().cacheContext(cacheId);
 
-                        String cacheName = cacheCtx.name();
+                    if (cacheCtx == null)
+                        throw new IgniteException("Unknown cacheId of CacheDataRow: " + cacheId);
 
-                        QueryTypeDescriptorImpl qryTypeDesc = qryProcessor.typeByValue(
-                            cacheName,
-                            cacheCtx.cacheObjectContext(),
-                            cacheDataRow.key(),
-                            cacheDataRow.value(),
-                            true
-                        );
+                    if (cacheDataRow.link() == 0L)
+                        throw new IgniteException("Contains invalid partition row, possibly deleted");
 
-                        if (isNull(qryTypeDesc))
-                            continue; // Tolerate - (k, v) is just not indexed.
+                    String cacheName = cacheCtx.name();
 
-                        String tableName = qryTypeDesc.tableName();
+                    QueryTypeDescriptorImpl qryTypeDesc = qryProcessor.typeByValue(
+                        cacheName,
+                        cacheCtx.cacheObjectContext(),
+                        cacheDataRow.key(),
+                        cacheDataRow.value(),
+                        true
+                    );
 
-                        GridH2Table gridH2Tbl = h2Indexing.schemaManager().dataTable(cacheName, tableName);
+                    if (isNull(qryTypeDesc))
+                        continue; // Tolerate - (k, v) is just not indexed.
 
-                        if (isNull(gridH2Tbl))
-                            continue; // Tolerate - (k, v) is just not indexed.
+                    String tableName = qryTypeDesc.tableName();
 
-                        cacheSizeByTbl.computeIfAbsent(cacheCtx.cacheId(), i -> new HashMap<>())
-                            .computeIfAbsent(tableName, s -> new AtomicLong()).incrementAndGet();
-                    }
+                    GridH2Table gridH2Tbl = h2Indexing.schemaManager().dataTable(cacheName, tableName);
 
-                    PartitionUpdateCounter updateCntrAfter = locPart.dataStore().partUpdateCounter();
+                    if (isNull(gridH2Tbl))
+                        continue; // Tolerate - (k, v) is just not indexed.
 
-                    if (updateCntrAfter != null && !updateCntrAfter.equals(updateCntrBefore)) {
-                        throw new GridNotIdleException(GRID_NOT_IDLE_MSG + "[grpName=" + grpCtx.cacheOrGroupName() +
-                            ", grpId=" + grpCtx.groupId() + ", partId=" + locPart.id() + "] changed during size " +
-                            "calculation [updCntrBefore=" + updateCntrBefore + ", updCntrAfter=" + updateCntrAfter + "]");
-                    }
+                    cacheSizeByTbl.computeIfAbsent(cacheCtx.cacheId(), i -> new HashMap<>())
+                        .computeIfAbsent(tableName, s -> new AtomicLong()).incrementAndGet();
+                }
+
+                PartitionUpdateCounter updateCntrAfter = locPart.dataStore().partUpdateCounter();
 
-                    return new CacheSize(null, cacheSizeByTbl);
+                if (updateCntrAfter != null && !updateCntrAfter.equals(updateCntrBefore)) {
+                    throw new GridNotIdleException(GRID_NOT_IDLE_MSG + "[grpName=" + grpCtx.cacheOrGroupName() +
+                        ", grpId=" + grpCtx.groupId() + ", partId=" + locPart.id() + "] changed during size " +
+                        "calculation [updCntrBefore=" + updateCntrBefore + ", updCntrAfter=" + updateCntrAfter + "]");
                 }
-                catch (Throwable t) {
-                    IgniteException cacheSizeErr = new IgniteException("Cache size calculation error [" +
-                        cacheGrpInfo(grpCtx) + ", locParId=" + partId + ", err=" + t.getMessage() + "]", t);
 
-                    error(log, cacheSizeErr);
+                return new CacheSize(null, cacheSizeByTbl);
+            }
+            catch (Throwable t) {
+                IgniteException cacheSizeErr = new IgniteException("Cache size calculation error [" +
+                    cacheGrpInfo(grpCtx) + ", locParId=" + partId + ", err=" + t.getMessage() + "]", t);
+
+                error(log, cacheSizeErr);
 
-                    failCalcCacheSizeGrpIds.add(grpId);
+                failCalcCacheSizeGrpIds.add(grpId);
 
-                    return new CacheSize(cacheSizeErr, null);
-                }
-                finally {
-                    if (reserve)
-                        locPart.release();
-                }
+                return new CacheSize(cacheSizeErr, null);
             }
             finally {
-                processedCacheSizePartitions.incrementAndGet();
-
-                printProgressOfIndexValidationIfNeeded();
+                if (reserve)
+                    locPart.release();
             }
-        });
+        }
+        finally {
+            processedCacheSizePartitions.incrementAndGet();
+
+            printProgressOfIndexValidationIfNeeded();
+        }
     }
 
     /**
      * Asynchronous calculation of the index size for cache.
      *
      * @param cacheCtx Cache context.
-     * @param idx      Index.
+     * @param idx Index.
      * @param idleChecker Idle check closure.
      * @return Future with index size.
      */
@@ -1056,37 +1097,56 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
         IgniteInClosure<Integer> idleChecker
     ) {
         return calcExecutor.submit(() -> {
-            try {
-                if (failCalcCacheSizeGrpIds.contains(cacheCtx.groupId()))
-                    return new T2<>(null, 0L);
+            return calcIndexSize(cacheCtx, idx, idleChecker);
+        });
+    }
 
-                String cacheName = cacheCtx.name();
-                String tblName = idx.getTable().getName();
-                String idxName = idx.getName();
+    /**
+     * Calculation of the index size for cache.
+     *
+     * @param cacheCtx Cache context.
+     * @param idx Index.
+     * @param idleChecker Idle check closure.
+     * @return Tuple contains exception if it happened and size of index.
+     */
+    private T2<Throwable, Long> calcIndexSize(
+        GridCacheContext cacheCtx,
+        Index idx,
+        IgniteInClosure<Integer> idleChecker
+    ) {
+        if (validateCtx.isCancelled())
+            return new T2<>(null, 0L);
 
-                try {
-                    long indexSize = ignite.context().query().getIndexing().indexSize(cacheName, tblName, idxName);
+        try {
+            if (failCalcCacheSizeGrpIds.contains(cacheCtx.groupId()))
+                return new T2<>(null, 0L);
 
-                    idleChecker.apply(cacheCtx.groupId());
+            String cacheName = cacheCtx.name();
+            String tblName = idx.getTable().getName();
+            String idxName = idx.getName();
 
-                    return new T2<>(null, indexSize);
-                }
-                catch (Throwable t) {
-                    Throwable idxSizeErr = new IgniteException("Index size calculation error [" +
-                        cacheGrpInfo(cacheCtx.group()) + ", " + cacheInfo(cacheCtx) + ", tableName=" +
-                        tblName + ", idxName=" + idxName + ", err=" + t.getMessage() + "]", t);
+            try {
+                long indexSize = ignite.context().query().getIndexing().indexSize(cacheName, tblName, idxName);
 
-                    error(log, idxSizeErr);
+                idleChecker.apply(cacheCtx.groupId());
 
-                    return new T2<>(idxSizeErr, 0L);
-                }
+                return new T2<>(null, indexSize);
             }
-            finally {
-                processedIdxSizes.incrementAndGet();
+            catch (Throwable t) {
+                Throwable idxSizeErr = new IgniteException("Index size calculation error [" +
+                    cacheGrpInfo(cacheCtx.group()) + ", " + cacheInfo(cacheCtx) + ", tableName=" +
+                    tblName + ", idxName=" + idxName + ", err=" + t.getMessage() + "]", t);
+
+                error(log, idxSizeErr);
 
-                printProgressOfIndexValidationIfNeeded();
+                return new T2<>(idxSizeErr, 0L);
             }
-        });
+        }
+        finally {
+            processedIdxSizes.incrementAndGet();
+
+            printProgressOfIndexValidationIfNeeded();
+        }
     }
 
     /**
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesContext.java b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesContext.java
new file mode 100644
index 0000000..48909d6
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesContext.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.verify;
+
+/**
+ * Validate indexes context.
+ */
+public interface ValidateIndexesContext {
+    /**
+     * Returns a boolean value meaning whether the check is canceled or not.
+     *
+     * @return True if cancelled, otherwise false.
+     */
+    public boolean isCancelled();
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesTask.java b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesTask.java
index 08e85e5..0c385a9 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesTask.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesTask.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.compute.ComputeJobResult;
 import org.apache.ignite.internal.processors.task.GridInternal;
@@ -33,12 +34,15 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.visor.VisorJob;
 import org.apache.ignite.internal.visor.VisorMultiNodeTask;
 import org.apache.ignite.internal.visor.VisorTaskArgument;
+import org.apache.ignite.internal.visor.annotation.InterruptibleVisorTask;
+import org.apache.ignite.resources.LoggerResource;
 import org.jetbrains.annotations.Nullable;
 
 /**
  *
  */
 @GridInternal
+@InterruptibleVisorTask
 public class VisorValidateIndexesTask extends VisorMultiNodeTask<VisorValidateIndexesTaskArg,
     VisorValidateIndexesTaskResult, VisorValidateIndexesJobResult> {
     /** */
@@ -94,6 +98,10 @@ public class VisorValidateIndexesTask extends VisorMultiNodeTask<VisorValidateIn
         /** */
         private static final long serialVersionUID = 0L;
 
+        /** Injected logger. */
+        @LoggerResource
+        private IgniteLogger log;
+
         /**
          * @param arg Argument.
          * @param debug Debug.
@@ -106,6 +114,7 @@ public class VisorValidateIndexesTask extends VisorMultiNodeTask<VisorValidateIn
         @Override protected VisorValidateIndexesJobResult run(@Nullable VisorValidateIndexesTaskArg arg) throws IgniteException {
             try {
                 ValidateIndexesClosure clo = new ValidateIndexesClosure(
+                    this::isCancelled,
                     arg.getCaches(),
                     arg.getCheckFirst(),
                     arg.getCheckThrough(),
@@ -118,11 +127,20 @@ public class VisorValidateIndexesTask extends VisorMultiNodeTask<VisorValidateIn
                 return clo.call();
             }
             catch (Exception e) {
+                cancel();
+
                 throw new IgniteException(e);
             }
         }
 
         /** {@inheritDoc} */
+        @Override public void cancel() {
+            log.warning("Index validation was cancelled.");
+
+            super.cancel();
+        }
+
+        /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(VisorValidateIndexesJob.class, this);
         }
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java
index f768054..4ce570e 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java
@@ -179,6 +179,7 @@ public class IgnitePdsIndexingDefragmentationTest extends IgnitePdsDefragmentati
      */
     private static void validateIndexes(IgniteEx node) throws Exception {
         ValidateIndexesClosure clo = new ValidateIndexesClosure(
+            () -> false,
             Collections.singleton(DEFAULT_CACHE_NAME),
             0,
             0,
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IndexingMultithreadedLoadContinuousRestartTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IndexingMultithreadedLoadContinuousRestartTest.java
index 4253f40..a31ea8f 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IndexingMultithreadedLoadContinuousRestartTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IndexingMultithreadedLoadContinuousRestartTest.java
@@ -14,6 +14,7 @@
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
+
 package org.apache.ignite.internal.processors.cache.persistence.db;
 
 import java.util.Arrays;
@@ -156,7 +157,8 @@ public class IndexingMultithreadedLoadContinuousRestartTest extends GridCommonAb
             forceCheckpoint();
 
             // Validate indexes on start.
-            ValidateIndexesClosure clo = new ValidateIndexesClosure(Collections.singleton(CACHE_NAME), 0, 0, false, true);
+            ValidateIndexesClosure clo = new ValidateIndexesClosure(() -> false, Collections.singleton(CACHE_NAME),
+                0, 0, false, true);
             ignite.context().resource().injectGeneric(clo);
             VisorValidateIndexesJobResult res = clo.call();
 
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotWithIndexesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotWithIndexesTest.java
index a263df8..d45ba0e 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotWithIndexesTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotWithIndexesTest.java
@@ -109,7 +109,8 @@ public class IgniteClusterSnapshotWithIndexesTest extends AbstractSnapshotSelfTe
         forceCheckpoint();
 
         // Validate indexes on start.
-        ValidateIndexesClosure clo = new ValidateIndexesClosure(new HashSet<>(Arrays.asList(indexedCcfg.getName(), tblName)),
+        ValidateIndexesClosure clo = new ValidateIndexesClosure(() -> false,
+            new HashSet<>(Arrays.asList(indexedCcfg.getName(), tblName)),
             0, 0, false, true);
 
         for (Ignite node : G.allGrids()) {
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/RebuildIndexTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/RebuildIndexTest.java
index 386d731..5ed9649 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/RebuildIndexTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/RebuildIndexTest.java
@@ -186,7 +186,7 @@ public class RebuildIndexTest extends GridCommonAbstractTest {
         enableCheckpoints(G.allGrids(), false);
 
         // Validate indexes on start.
-        ValidateIndexesClosure clo = new ValidateIndexesClosure(Collections.singleton(CACHE_NAME), 0, 0, false, true);
+        ValidateIndexesClosure clo = new ValidateIndexesClosure(() -> false, Collections.singleton(CACHE_NAME), 0, 0, false, true);
 
         node.context().resource().injectGeneric(clo);
 
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/RebuildIndexWithHistoricalRebalanceTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/RebuildIndexWithHistoricalRebalanceTest.java
index 75590b8..1bd2da7 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/RebuildIndexWithHistoricalRebalanceTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/RebuildIndexWithHistoricalRebalanceTest.java
@@ -229,7 +229,7 @@ public class RebuildIndexWithHistoricalRebalanceTest extends GridCommonAbstractT
 
         awaitPartitionMapExchange();
 
-        ValidateIndexesClosure clo = new ValidateIndexesClosure(Collections.singleton(CACHE_NAME), 0, 0, false, true);
+        ValidateIndexesClosure clo = new ValidateIndexesClosure(() -> false, Collections.singleton(CACHE_NAME), 0, 0, false, true);
         node2.context().resource().injectGeneric(clo);
         VisorValidateIndexesJobResult res = clo.call();