You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2022/08/17 16:54:12 UTC

[ignite] branch master updated: IGNITE-17531 Tests covers consistency repair at inconsistent cluster with nonfinalized counters (#10199)

This is an automated email from the ASF dual-hosted git repository.

av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f15c2d4451 IGNITE-17531 Tests covers consistency repair at inconsistent cluster with nonfinalized counters (#10199)
5f15c2d4451 is described below

commit 5f15c2d44518f52159731a116721ce963881fcb8
Author: Anton Vinogradov <av...@apache.org>
AuthorDate: Wed Aug 17 19:54:05 2022 +0300

    IGNITE-17531 Tests covers consistency repair at inconsistent cluster with nonfinalized counters (#10199)
---
 .../testsuites/IgniteControlUtilityTestSuite.java  |   2 +
 .../GridCommandHandlerConsistencyCountersTest.java | 586 +++++++++++++++++++++
 .../apache/ignite/util/GridCommandHandlerTest.java | 162 ------
 .../cache/PartitionUpdateCounterMvccImpl.java      |   4 +-
 .../cache/PartitionUpdateCounterTrackingImpl.java  |  96 ++--
 ...IgnitePdsSpuriousRebalancingOnNodeJoinTest.java |   2 +-
 .../apache/ignite/testframework/GridTestUtils.java |   8 +-
 7 files changed, 644 insertions(+), 216 deletions(-)

diff --git a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java
index efbd4ce577d..d102c06ce9f 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java
@@ -28,6 +28,7 @@ import org.apache.ignite.util.GridCommandHandlerCheckIndexesInlineSizeTest;
 import org.apache.ignite.util.GridCommandHandlerClusterByClassTest;
 import org.apache.ignite.util.GridCommandHandlerClusterByClassWithSSLTest;
 import org.apache.ignite.util.GridCommandHandlerConsistencyBinaryTest;
+import org.apache.ignite.util.GridCommandHandlerConsistencyCountersTest;
 import org.apache.ignite.util.GridCommandHandlerConsistencyRepairCorrectnessAtomicTest;
 import org.apache.ignite.util.GridCommandHandlerConsistencyRepairCorrectnessTransactionalTest;
 import org.apache.ignite.util.GridCommandHandlerConsistencySensitiveTest;
@@ -98,6 +99,7 @@ import org.junit.runners.Suite;
     GridCommandHandlerDefragmentationTest.class,
 
     GridCommandHandlerConsistencyTest.class,
+    GridCommandHandlerConsistencyCountersTest.class,
     GridCommandHandlerConsistencyBinaryTest.class,
     GridCommandHandlerConsistencySensitiveTest.class,
     GridCommandHandlerConsistencyRepairCorrectnessTransactionalTest.class,
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyCountersTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyCountersTest.java
new file mode 100644
index 00000000000..140a0f0000e
--- /dev/null
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyCountersTest.java
@@ -0,0 +1,586 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.OpenOption;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.ReadRepairStrategy;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.commandline.consistency.ConsistencyCommand;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.transactions.TransactionHeuristicException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.ReadRepairStrategy.PRIMARY;
+import static org.apache.ignite.cache.ReadRepairStrategy.RELATIVE_MAJORITY;
+import static org.apache.ignite.cache.ReadRepairStrategy.REMOVE;
+import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK;
+import static org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager.DFLT_WAL_MARGIN_FOR_ATOMIC_CACHE_HISTORICAL_REBALANCE;
+import static org.apache.ignite.internal.visor.consistency.VisorConsistencyRepairTask.CONSISTENCY_VIOLATIONS_FOUND;
+import static org.apache.ignite.internal.visor.consistency.VisorConsistencyRepairTask.NOTHING_FOUND;
+import static org.apache.ignite.testframework.GridTestUtils.assertContains;
+import static org.apache.ignite.testframework.GridTestUtils.assertNotContains;
+import static org.apache.ignite.testframework.LogListener.matches;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class GridCommandHandlerConsistencyCountersTest extends GridCommandHandlerClusterPerMethodAbstractTest {
+    /** */
+    @Parameterized.Parameters(name = "strategy={0}, reuse={1}, historical={2}, atomicity={3}")
+    public static Iterable<Object[]> data() {
+        List<Object[]> res = new ArrayList<>();
+
+        for (ReadRepairStrategy strategy : ReadRepairStrategy.values()) {
+            for (boolean reuse : new boolean[] {false, true}) {
+                for (boolean historical : new boolean[] {false, true}) {
+                    for (CacheAtomicityMode atomicityMode : new CacheAtomicityMode[] {ATOMIC, TRANSACTIONAL})
+                        res.add(new Object[] {strategy, reuse, historical, atomicityMode});
+                }
+            }
+        }
+
+        return res;
+    }
+
+    /**
+     * ReadRepair strategy
+     */
+    @Parameterized.Parameter
+    public ReadRepairStrategy strategy;
+
+    /**
+     * When true, updates will reuse already existing keys.
+     */
+    @Parameterized.Parameter(1)
+    public boolean reuseKeys;
+
+    /**
+     * When true, historical rebalance will be used instead of full.
+     */
+    @Parameterized.Parameter(2)
+    public boolean historical;
+
+    /**
+     * Cache atomicity mode
+     */
+    @Parameterized.Parameter(3)
+    public CacheAtomicityMode atomicityMode;
+
+    /** Listening logger. */
+    protected final ListeningTestLogger listeningLog = new ListeningTestLogger(log);
+
+    /** File IO blocked flag. */
+    private static volatile boolean ioBlocked;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setGridLogger(listeningLog);
+
+        cfg.getDataStorageConfiguration().setFileIOFactory(
+            new BlockableFileIOFactory(cfg.getDataStorageConfiguration().getFileIOFactory()));
+
+        cfg.getDataStorageConfiguration().setWalMode(WALMode.FSYNC); // Allows to use special IO at WAL as well.
+
+        cfg.setFailureHandler(new StopNodeFailureHandler()); // Helps to kill nodes on stop with disabled IO.
+
+        return cfg;
+    }
+
+    /**
+     *
+     */
+    private static class BlockableFileIOFactory implements FileIOFactory {
+        /** IO Factory. */
+        private final FileIOFactory factory;
+
+        /**
+         * @param factory Factory.
+         */
+        public BlockableFileIOFactory(FileIOFactory factory) {
+            this.factory = factory;
+        }
+
+        /** {@inheritDoc} */
+        @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+            return new FileIODecorator(factory.create(file, modes)) {
+                @Override public int write(ByteBuffer srcBuf) throws IOException {
+                    if (ioBlocked)
+                        throw new IOException();
+
+                    return super.write(srcBuf);
+                }
+
+                @Override public int write(ByteBuffer srcBuf, long position) throws IOException {
+                    if (ioBlocked)
+                        throw new IOException();
+
+                    return super.write(srcBuf, position);
+                }
+
+                @Override public int write(byte[] buf, int off, int len) throws IOException {
+                    if (ioBlocked)
+                        throw new IOException();
+
+                    return super.write(buf, off, len);
+                }
+            };
+        }
+    }
+
+    /**
+     * Checks counters and behaviour on crash recovery.
+     */
+    @Test
+    public void testCountersOnCrachRecovery() throws Exception {
+        int nodes = 3;
+        int backupNodes = nodes - 1;
+
+        IgniteEx ignite = startGrids(nodes);
+
+        ignite.cluster().state(ClusterState.ACTIVE);
+
+        IgniteCache<Object, Object> cache = ignite.createCache(new CacheConfiguration<>()
+            .setAffinity(new RendezvousAffinityFunction(false, 1))
+            .setBackups(backupNodes)
+            .setName(DEFAULT_CACHE_NAME)
+            .setAtomicityMode(atomicityMode)
+            .setWriteSynchronizationMode(FULL_SYNC) // Allows to be sure that all messages are sent when put succeed.
+            .setReadFromBackup(true)); // Allows to check values on backups.
+
+        int updateCnt = 0;
+
+        // Initial preloading.
+        for (int i = 0; i < 2_000; i++) { // Enough to have historical rebalance when needed.
+            cache.put(i, i);
+
+            updateCnt++;
+        }
+
+        // Trick to have historical rebalance on cluster recovery (decreases percent of updates in comparison to cache size).
+        if (historical) {
+            stopAllGrids();
+            startGrids(nodes);
+        }
+
+        int preloadCnt = updateCnt;
+
+        Ignite prim = primaryNode(0L, DEFAULT_CACHE_NAME);
+        List<Ignite> backups = backupNodes(0L, DEFAULT_CACHE_NAME);
+
+        AtomicBoolean prepareBlock = new AtomicBoolean();
+        AtomicBoolean finishBlock = new AtomicBoolean();
+
+        AtomicReference<CountDownLatch> blockLatch = new AtomicReference<>();
+
+        TestRecordingCommunicationSpi.spi(prim).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+            @Override public boolean apply(ClusterNode node, Message msg) {
+                if ((msg instanceof GridDhtTxPrepareRequest && prepareBlock.get()) ||
+                    ((msg instanceof GridDhtTxFinishRequest || msg instanceof GridDhtAtomicSingleUpdateRequest) && finishBlock.get())) {
+                    CountDownLatch latch = blockLatch.get();
+
+                    assertTrue(latch.getCount() > 0);
+
+                    latch.countDown();
+
+                    return true; // Generating counter misses.
+                }
+                else
+                    return false;
+            }
+        });
+
+        int blockedFrom = reuseKeys ? 0 : preloadCnt;
+        int committedFrom = blockedFrom + 1_000;
+
+        int blockedKey = blockedFrom - 1; // None
+        int committedKey = committedFrom - 1; // None
+
+        List<String> backupMissed = new ArrayList<>();
+        List<String> primaryMissed = new ArrayList<>();
+
+        IgniteCache<Integer, Integer> primCache = prim.cache(DEFAULT_CACHE_NAME);
+
+        Consumer<Integer> cachePut = (key) -> primCache.put(key, -key);
+
+        GridCompoundFuture<?, ?> asyncPutFuts = new GridCompoundFuture<>();
+
+        Consumer<Integer> cachePutAsync = (key) -> asyncPutFuts.add(GridTestUtils.runAsync(() -> cachePut.accept(key)));
+
+        int primaryLwm = -1; // Primary LWM after data prepatation (at tx caches).
+        int backupHwm = -1; // Backups HWM after data preparation (at tx caches).
+
+        String backupMissedTail = null; // Misses after backupHwm, which backups are not aware of before the recovery.
+
+        int primaryKeysCnt = preloadCnt; // Keys present on primary.
+
+        int iters = 11;
+
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        // The main idea of this section is to generate missed counters updates, some of them should be on backups only,
+        // some at primary too.
+        // This emulates the situation of reordering possible on real highloaded clusters.
+        for (int it = 0; it < iters; it++) {
+            boolean first = it == 0;
+            boolean last = it == iters - 1;
+
+            boolean globalBlock = it % 2 != 0 && // Global means all nodes (including primary) will miss the counters updates.
+                // Since atomic cache commits entry on primary before sending the request to backups, so all misses are only on backups.
+                atomicityMode != ATOMIC;
+
+            if (first || last) // Odd number to gain misses on backups only at first and last iteration.
+                // First iteration will guarantee rebalance for tx caches: Primary LWM > backup LWM.
+                // Last iteration will guarantee rebalance for atomic caches: Primary counter > backup counter.
+                assertTrue(!globalBlock);
+
+            int range = rnd.nextInt(1, 5);
+
+            blockLatch.set(new CountDownLatch(backupNodes * range));
+
+            if (globalBlock)
+                prepareBlock.set(true);
+            else
+                finishBlock.set(true);
+
+            for (int i = 0; i < range; i++) {
+                blockedKey++;
+                updateCnt++;
+
+                if (!globalBlock)
+                    primaryKeysCnt++;
+
+                for (Ignite backup : backups) // Check before put.
+                    assertEquals(reuseKeys, backup.cache(DEFAULT_CACHE_NAME).get(blockedKey) != null);
+
+                cachePutAsync.accept(blockedKey);
+            }
+
+            blockLatch.get().await();
+
+            prepareBlock.set(false);
+            finishBlock.set(false);
+
+            String missed = range == 1 ? String.valueOf(updateCnt) : (updateCnt - range + 1) + " - " + updateCnt;
+
+            if (last)
+                backupMissedTail = missed;
+            else
+                backupMissed.add(missed);
+
+            if (globalBlock)
+                primaryMissed.add(missed);
+
+            if (!last)
+                for (int i = 0; i < range; i++) {
+                    committedKey++;
+                    updateCnt++;
+                    primaryKeysCnt++;
+
+                    cachePut.accept(committedKey);
+                }
+
+            if (first)
+                primaryLwm = updateCnt;
+
+            if (!last)
+                backupHwm = updateCnt;
+        }
+
+        assertNotNull(backupMissedTail);
+        assertTrue(primaryLwm != -1);
+        assertTrue(backupHwm != -1);
+        assertTrue(blockedKey < committedFrom); // Intersection check.
+
+        if (reuseKeys)
+            assertTrue(blockedKey < preloadCnt); // Owerflow check.
+
+        for (int key = blockedFrom; key <= blockedKey; key++) {
+            for (Ignite backup : backups) // Check after put.
+                assertEquals(reuseKeys, backup.cache(DEFAULT_CACHE_NAME).get(key) != null);
+        }
+
+        for (int key = committedFrom; key <= committedKey; key++) {
+            assertNotNull(primCache.get(key));
+
+            for (Ignite backup : backups)
+                assertNotNull(backup.cache(DEFAULT_CACHE_NAME).get(key));
+        }
+
+        injectTestSystemOut();
+
+        assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify"));
+
+        assertConflicts(true, true);
+
+        if (atomicityMode == TRANSACTIONAL) {
+            assertTxCounters(primaryLwm, primaryMissed, updateCnt); // Primary
+            assertTxCounters(preloadCnt, backupMissed, backupHwm); // Backups
+        }
+        else {
+            assertAtomicCounters(updateCnt); // Primary
+            assertAtomicCounters(backupHwm); // Backups
+        }
+
+        LogListener lsnrRebalanceType = matches("fullPartitions=[" + (historical ? "" : 0) + "], " +
+            "histPartitions=[" + (historical ? 0 : "") + "]").times(backupNodes).build();
+
+        LogListener lsnrRebalanceAmount = matches("receivedKeys=" +
+            (historical ?
+                atomicityMode == TRANSACTIONAL ?
+                    primaryLwm - preloadCnt : // Diff between primary LWM and backup LWM.
+                    updateCnt - backupHwm + // Diff between primary and backup counters
+                        DFLT_WAL_MARGIN_FOR_ATOMIC_CACHE_HISTORICAL_REBALANCE : // + magic number.
+                reuseKeys ?
+                    preloadCnt : // Since keys are reused, amount is the same as at initial preloading.
+                    atomicityMode == TRANSACTIONAL ?
+                        primaryKeysCnt : // Calculated amount of entries (initial preload amount + updates not missed at primary)
+                        updateCnt)) // All updates since only misses on backups generated for atomic caches at this test.
+            .times(backupNodes).build();
+
+        listeningLog.registerListener(lsnrRebalanceType);
+        listeningLog.registerListener(lsnrRebalanceAmount);
+
+        ioBlocked = true; // Emulating power off, OOM or disk overflow. Keeping data as is, with missed counters updates.
+
+        stopAllGrids();
+
+        checkAsyncPutOperationsFinished(asyncPutFuts);
+
+        ioBlocked = false;
+
+        ignite = startGrids(nodes);
+
+        awaitPartitionMapExchange();
+
+        assertTrue(lsnrRebalanceType.check());
+
+        assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify"));
+
+        assertConflicts(
+            atomicityMode == TRANSACTIONAL, // Atomic cache backup counters are automatically set as primary on crash recovery.
+            historical); // Full rebalance fixes the consistency.
+
+        assertTrue(lsnrRebalanceAmount.check());
+
+        if (atomicityMode == TRANSACTIONAL) { // Same as before the crash.
+            assertTxCounters(primaryLwm, primaryMissed, updateCnt); // Primary
+            assertTxCounters(preloadCnt, backupMissed, backupHwm); // Backups
+        }
+        else if (historical) {
+            assertAtomicCounters(updateCnt); // Primary
+
+            // Backups updated with primary counter.
+            // Entries between primary and backup counters are rebalanced.
+            assertNoneAtomicCounters(backupHwm); // Automaticaly "repaired".
+        }
+        else
+            assertNoneAtomicCounters();
+
+        assertEquals(EXIT_CODE_OK, execute("--consistency", "repair",
+            ConsistencyCommand.CACHE, DEFAULT_CACHE_NAME,
+            ConsistencyCommand.PARTITION, "0",
+            ConsistencyCommand.STRATEGY, strategy.toString()));
+
+        int repairedCnt = repairedEntriesCount();
+
+        assertContains(log, testOut.toString(), historical ? CONSISTENCY_VIOLATIONS_FOUND : NOTHING_FOUND);
+
+        assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify"));
+
+        switch (strategy) {
+            case PRIMARY:
+            case REMOVE:
+            case RELATIVE_MAJORITY:
+                assertConflicts(atomicityMode == TRANSACTIONAL, false); // Inconsistency fixed.
+
+                break;
+
+            case LWW:
+                // LWM can't fix the data when some nodes contain the key, but some not,
+                // because missed entry has no version and can not be compared.
+                assertConflicts(atomicityMode == TRANSACTIONAL, historical && !reuseKeys);
+
+                break;
+
+            case CHECK_ONLY: // Keeps as is.
+                assertConflicts(atomicityMode == TRANSACTIONAL, historical);
+
+                break;
+
+            default:
+                throw new UnsupportedOperationException("Unsupported strategy");
+        }
+
+        int postloadFrom = reuseKeys ? 0 : (committedKey + 1 /*greater than max key used*/);
+
+        for (int i = postloadFrom; i < postloadFrom + preloadCnt; i++) {
+            updateCnt++;
+
+            ignite.cache(DEFAULT_CACHE_NAME).put(i, i);
+        }
+
+        // Repairing one more time, but with guarantee to fix (primary strategy);
+        assertEquals(EXIT_CODE_OK, execute("--consistency", "repair",
+            ConsistencyCommand.CACHE, DEFAULT_CACHE_NAME,
+            ConsistencyCommand.PARTITION, "0",
+            ConsistencyCommand.STRATEGY, PRIMARY.toString()));
+
+        repairedCnt += repairedEntriesCount();
+
+        updateCnt += repairedCnt;
+
+        assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify"));
+
+        backupMissed.add(backupMissedTail); // Now backups got updates after the generated data and aware of all missed updates.
+
+        if (atomicityMode == TRANSACTIONAL) {
+            assertConflicts(true, false); // Still has counters conflicts, while inconsistency already fixed.
+
+            assertTxCounters(primaryLwm, primaryMissed, updateCnt); // Primary (same as before crash).
+            assertTxCounters(preloadCnt, backupMissed, updateCnt); // Backups (all missed updates, increased hwm).
+        }
+        else
+            assertConflicts(false, false); // Fixed both counters and inconsistency.
+
+        int rmvd = strategy == RELATIVE_MAJORITY || strategy == REMOVE ? repairedCnt /*entries removed during the repair*/ : 0;
+
+        for (Ignite node : G.allGrids()) // Checking the cache size after the fixes.
+            assertEquals((reuseKeys ? preloadCnt : primaryKeysCnt + preloadCnt /*postload*/ - rmvd),
+                node.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.ALL));
+    }
+
+    /**
+     * Checks idle_vefify result.
+     *
+     * @param counter Counter conflicts.
+     * @param hash    Hash conflicts.
+     */
+    private void assertConflicts(boolean counter, boolean hash) {
+        if (counter || hash)
+            assertContains(log, testOut.toString(), "conflict partitions has been found: " +
+                "[counterConflicts=" + (counter ? 1 : 0) + ", hashConflicts=" + (hash ? 1 : 0) + "]");
+        else
+            assertContains(log, testOut.toString(), "no conflicts have been found");
+    }
+
+    /**
+     * @param lwm Lwm.
+     * @param missed Missed.
+     * @param hwm Hwm.
+     */
+    private void assertTxCounters(int lwm, List<String> missed, int hwm) {
+        assertContains(log, testOut.toString(),
+            "updateCntr=[lwm=" + lwm + ", missed=" + missed + ", hwm=" + hwm + "]");
+    }
+
+    /**
+     * @param cnt Counter.
+     */
+    private void assertAtomicCounters(int cnt) {
+        assertContains(log, testOut.toString(), "updateCntr=" + cnt);
+    }
+
+    /**
+     * @param cnt Counter.
+     */
+    private void assertNoneAtomicCounters(int... cnt) {
+        assertNotContains(log, testOut.toString(), "updateCntr=" + (F.isEmpty(cnt) ? "" : cnt[0]));
+    }
+
+    /**
+     * Returns amount of entries repaired by Consistency Repair operation.
+     */
+    private int repairedEntriesCount() {
+        Pattern pattern = Pattern.compile("repaired=(\\d*),");
+        Matcher matcher = pattern.matcher(testOut.toString());
+
+        return matcher.find() ?
+            Integer.parseInt(testOut.toString().substring(matcher.start(1), matcher.end(1))) :
+            0;
+    }
+
+    /**
+     * Checks that all async put operations are finished.
+     */
+    private void checkAsyncPutOperationsFinished(GridCompoundFuture<?, ?> asyncPutFuts) {
+        asyncPutFuts.markInitialized();
+
+        try {
+            asyncPutFuts.get();
+
+            if (atomicityMode != ATOMIC) // Atomics already committed at primary before the fail, so no falure on get() is expected.
+                fail(); // But tx cache is still committing, so get() must throw an exception.
+        }
+        catch (IgniteCheckedException | TransactionHeuristicException ex) {
+            assertTrue(atomicityMode != ATOMIC);
+        }
+    }
+}
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
index 296cee8783a..1669e2251e6 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
@@ -44,10 +44,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.function.BooleanSupplier;
-import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.UnaryOperator;
 import java.util.regex.Matcher;
@@ -64,7 +62,6 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.ShutdownPolicy;
-import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cluster.BaselineNode;
 import org.apache.ignite.cluster.ClusterNode;
@@ -89,7 +86,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
@@ -140,11 +136,9 @@ import org.junit.Test;
 
 import static java.io.File.separatorChar;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_CLUSTER_NAME;
-import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
 import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE;
 import static org.apache.ignite.cluster.ClusterState.ACTIVE;
 import static org.apache.ignite.cluster.ClusterState.ACTIVE_READ_ONLY;
@@ -2316,162 +2310,6 @@ public class GridCommandHandlerTest extends GridCommandHandlerClusterPerMethodAb
             fail("Should be found dump with conflicts");
     }
 
-    /**
-     * Tests that idle verify checks gaps.
-     */
-    @Test
-    public void testCacheIdleVerifyChecksGapsAtomic() throws Exception {
-        testCacheIdleVerifyChecksGaps(ATOMIC);
-    }
-
-    /**
-     * Tests that idle verify checks gaps.
-     */
-    @Test
-    public void testCacheIdleVerifyChecksGapsTx() throws Exception {
-        testCacheIdleVerifyChecksGaps(TRANSACTIONAL);
-    }
-
-    /**
-     * Tests that idle verify checks gaps.
-     */
-    private void testCacheIdleVerifyChecksGaps(CacheAtomicityMode atomicityMode) throws Exception {
-        int parts = 1;
-
-        IgniteEx ignite = startGrids(3);
-
-        ignite.cluster().active(true);
-
-        int backups = 2;
-
-        IgniteCache<Object, Object> cache = ignite.createCache(new CacheConfiguration<>()
-            .setAffinity(new RendezvousAffinityFunction(false, parts))
-            .setBackups(backups)
-            .setName(DEFAULT_CACHE_NAME)
-            .setAtomicityMode(atomicityMode)
-            .setWriteSynchronizationMode(PRIMARY_SYNC)
-            .setReadFromBackup(true));
-
-        int cnt = 0;
-
-        for (int i = 0; i < 100; i++) {
-            cache.put(i, i);
-
-            cnt++;
-        }
-
-        int cntFrom = cnt;
-
-        Ignite prim = primaryNode(0L, DEFAULT_CACHE_NAME);
-        Ignite backup = backupNode(0L, DEFAULT_CACHE_NAME);
-
-        TestRecordingCommunicationSpi primSpi = TestRecordingCommunicationSpi.spi(prim);
-
-        AtomicReference<CountDownLatch> latchRef = new AtomicReference<>();
-
-        primSpi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
-            @Override public boolean apply(ClusterNode node, Message msg) {
-                if (msg instanceof GridDhtTxFinishRequest ||
-                    msg instanceof GridDhtAtomicSingleUpdateRequest) {
-                    CountDownLatch blockLatch = latchRef.get();
-
-                    boolean block = blockLatch.getCount() > 0;
-
-                    blockLatch.countDown();
-
-                    return block; // Generating counter gaps.
-                }
-                else
-                    return false;
-            }
-        });
-
-        int blockedKey = cntFrom + 1_000;
-        int committedKey = blockedKey + 1_000;
-
-        int blockedFrom = blockedKey;
-        int committedFrom = committedKey;
-
-        ThreadLocalRandom rnd = ThreadLocalRandom.current();
-
-        List<String> gaps = new ArrayList<>();
-
-        Consumer<Integer> cachePut = (key) -> {
-            if (atomicityMode == TRANSACTIONAL)
-                try (Transaction tx = prim.transactions().txStart()) {
-                    prim.cache(DEFAULT_CACHE_NAME).put(key, key);
-
-                    tx.commit();
-                }
-            else
-                prim.cache(DEFAULT_CACHE_NAME).put(key, key);
-        };
-
-        for (int it = 0; it < 10; it++) {
-            int range = rnd.nextInt(3);
-
-            CountDownLatch blockLatch = new CountDownLatch(backups * range);
-
-            latchRef.set(blockLatch);
-
-            for (int i = 0; i < range; i++) {
-                cachePut.accept(blockedKey++);
-
-                cnt++;
-            }
-
-            if (range == 1)
-                gaps.add(String.valueOf(cnt));
-            else if (range > 1)
-                gaps.add((cnt - range + 1) + " - " + cnt);
-
-            blockLatch.await();
-
-            for (int i = 0; i < range; i++) {
-                cachePut.accept(committedKey++);
-
-                cnt++;
-            }
-        }
-
-        for (int key = blockedFrom; key < blockedKey; key++) {
-            assertNotNull(prim.getOrCreateCache(DEFAULT_CACHE_NAME).get(key));
-            assertNull(backup.getOrCreateCache(DEFAULT_CACHE_NAME).get(key)); // Commit is blocked.
-        }
-
-        for (int key = committedFrom; key < committedKey; key++) {
-            assertNotNull(prim.getOrCreateCache(DEFAULT_CACHE_NAME).get(key));
-            assertNotNull(backup.getOrCreateCache(DEFAULT_CACHE_NAME).get(key));
-        }
-
-        G.restart(true);
-
-        ignite.cluster().active(true);
-
-        awaitPartitionMapExchange();
-
-        injectTestSystemOut();
-
-        assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify"));
-
-        if (atomicityMode == TRANSACTIONAL) {
-            assertContains(log, testOut.toString(), "conflict partitions has been found: [counterConflicts=1, " +
-                "hashConflicts=1]");
-
-            assertContains(log, testOut.toString(),
-                "updateCntr=[lwm=" + cnt + ", missed=[], hwm=" + cnt + "]"); // Primary
-
-            assertContains(log, testOut.toString(),
-                "updateCntr=[lwm=" + cntFrom + ", missed=" + gaps + ", hwm=" + cnt + "]"); // Backups.
-        }
-        else {
-            assertContains(log, testOut.toString(), "conflict partitions has been found: [counterConflicts=0, " +
-                "hashConflicts=1]");
-
-            assertContains(log, testOut.toString(), "updateCntr=" + cnt); // All
-        }
-    }
-
     /**
      * @return Build matcher for dump file name.
      */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterMvccImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterMvccImpl.java
index 2e3066a2a04..5c1b2844a1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterMvccImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterMvccImpl.java
@@ -49,11 +49,11 @@ public class PartitionUpdateCounterMvccImpl extends PartitionUpdateCounterTracki
     @Override public PartitionUpdateCounter copy() {
         PartitionUpdateCounterMvccImpl copy = new PartitionUpdateCounterMvccImpl(grp);
 
-        copy.cntr.set(cntr.get());
+        copy.lwm.set(lwm.get());
         copy.first = first;
         copy.queue = new TreeMap<>(queue);
         copy.initCntr = initCntr;
-        copy.reserveCntr.set(reserveCntr.get());
+        copy.reservedCntr.set(reservedCntr.get());
 
         return copy;
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java
index 80bb1807cde..e75872b2cc5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java
@@ -74,10 +74,10 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
     protected NavigableMap<Long, Item> queue = new TreeMap<>();
 
     /** LWM. */
-    protected final AtomicLong cntr = new AtomicLong();
+    protected final AtomicLong lwm = new AtomicLong();
 
-    /** HWM. */
-    protected final AtomicLong reserveCntr = new AtomicLong();
+    /** Reserved. */
+    protected final AtomicLong reservedCntr = new AtomicLong();
 
     /** */
     protected boolean first = true;
@@ -100,13 +100,13 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
 
     /** {@inheritDoc} */
     @Override public void init(long initUpdCntr, @Nullable byte[] cntrUpdData) {
-        cntr.set(initUpdCntr);
+        lwm.set(initUpdCntr);
 
         initCntr = initUpdCntr;
 
         queue = fromBytes(cntrUpdData);
 
-        reserveCntr.set(highestAppliedCounter());
+        reservedCntr.set(highestAppliedCounter());
     }
 
     /** {@inheritDoc} */
@@ -116,21 +116,21 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
 
     /** {@inheritDoc} */
     @Override public long get() {
-        return cntr.get();
+        return lwm.get();
     }
 
     /** */
     protected synchronized long highestAppliedCounter() {
-        return queue.isEmpty() ? cntr.get() : queue.lastEntry().getValue().absolute();
+        return queue.isEmpty() ? lwm.get() : queue.lastEntry().getValue().absolute();
     }
 
     /**
      * @return Next update counter. For tx mode called by {@link DataStreamerImpl} IsolatedUpdater.
      */
     @Override public long next() {
-        long next = cntr.incrementAndGet();
+        long next = lwm.incrementAndGet();
 
-        reserveCntr.set(next);
+        reservedCntr.set(next);
 
         return next;
     }
@@ -138,25 +138,27 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
     /** {@inheritDoc} */
     @Override public synchronized void update(long val) throws IgniteCheckedException {
         // Reserved update counter is updated only on exchange.
-        long cur = get();
+        long curLwm = lwm.get();
 
         // Always set reserved counter equal to max known counter.
-        long max = Math.max(val, cur);
+        long max = Math.max(val, curLwm);
+        long reserved = reservedCntr.get();
 
-        if (reserveCntr.get() < max)
-            reserveCntr.set(max);
+        if (reserved < max)
+            reservedCntr.set(max);
 
         // Outdated counter (txs are possible before current topology future is finished if primary is not changed).
-        if (val < cur)
+        if (val < curLwm)
             return;
 
         // Absolute counter should be not less than last applied update.
         // Otherwise supplier doesn't contain some updates and rebalancing couldn't restore consistency.
         // Best behavior is to stop node by failure handler in such a case.
         if (val < highestAppliedCounter())
-            throw new IgniteCheckedException("Failed to update the counter [newVal=" + val + ", curState=" + this + ']');
+            throw new IgniteCheckedException("Failed to update the counter " +
+                "[newVal=" + val + ", prevReserved=" + reserved + ", curState=" + this + ']');
 
-        cntr.set(val);
+        lwm.set(val);
 
         /** If some holes are present at this point, thar means some update were missed on recovery and will be restored
          * during rebalance. All gaps are safe to "forget".
@@ -170,7 +172,7 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
 
     /** {@inheritDoc} */
     @Override public synchronized boolean update(long start, long delta) {
-        long cur = cntr.get();
+        long cur = lwm.get();
 
         if (cur > start)
             return false;
@@ -213,7 +215,7 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
             if (nextItem != null)
                 next += nextItem.delta;
 
-            boolean res = cntr.compareAndSet(cur, next);
+            boolean res = lwm.compareAndSet(cur, next);
 
             assert res;
 
@@ -227,8 +229,8 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
 
         initCntr = get();
 
-        if (reserveCntr.get() < initCntr)
-            reserveCntr.set(initCntr);
+        if (reservedCntr.get() < initCntr)
+            reservedCntr.set(initCntr);
     }
 
     /** {@inheritDoc} */
@@ -241,37 +243,37 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
             if (gaps == null)
                 gaps = new GridLongList((queue.size() + 1) * 2);
 
-            long start = cntr.get() + 1;
+            long start = lwm.get() + 1;
             long end = item.getValue().start;
 
             gaps.add(start);
             gaps.add(end);
 
             // Close pending ranges.
-            cntr.set(item.getValue().absolute());
+            lwm.set(item.getValue().absolute());
 
             item = queue.pollFirstEntry();
         }
 
-        reserveCntr.set(get());
+        reservedCntr.set(get());
 
         return gaps;
     }
 
     /** {@inheritDoc} */
     @Override public synchronized long reserve(long delta) {
-        long cntr = get();
+        long lwm = get();
 
-        long reserved = reserveCntr.getAndAdd(delta);
+        long reserved = reservedCntr.getAndAdd(delta);
 
-        assert reserved >= cntr : "LWM after HWM: lwm=" + cntr + ", hwm=" + reserved + ", cntr=" + toString();
+        assert reserved >= lwm : "LWM after reserved: lwm=" + lwm + ", reserved=" + reserved + ", cntr=" + this;
 
         return reserved;
     }
 
     /** {@inheritDoc} */
     @Override public long next(long delta) {
-        return cntr.getAndAdd(delta);
+        return lwm.getAndAdd(delta);
     }
 
     /** {@inheritDoc} */
@@ -344,9 +346,9 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
     @Override public synchronized void reset() {
         initCntr = 0;
 
-        cntr.set(0);
+        lwm.set(0);
 
-        reserveCntr.set(0);
+        reservedCntr.set(0);
 
         queue.clear();
     }
@@ -361,7 +363,7 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
      */
     private static class Item {
         /** */
-        private long start;
+        private final long start;
 
         /** */
         private long delta;
@@ -431,12 +433,12 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
         if (!queue.equals(cntr.queue))
             return false;
 
-        return this.cntr.get() == cntr.cntr.get();
+        return lwm.get() == cntr.lwm.get();
     }
 
     /** {@inheritDoc} */
     @Override public long reserved() {
-        return reserveCntr.get();
+        return reservedCntr.get();
     }
 
     /** {@inheritDoc} */
@@ -452,21 +454,21 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
     /**
      * Human-readable missed unordered updates.
      */
-    private String gaps() {
-        List<String> gaps = new ArrayList<>();
+    private String missed() {
+        List<String> missed = new ArrayList<>();
 
-        long prev = cntr.get();
+        long prev = lwm.get();
 
         for (Item item : queue.values()) {
             if (prev + 1 == item.start)
-                gaps.add(String.valueOf(item.start));
+                missed.add(String.valueOf(item.start));
             else
-                gaps.add((prev + 1) + " - " + item.start);
+                missed.add((prev + 1) + " - " + item.start);
 
             prev = item.start + item.delta;
         }
 
-        return gaps.toString();
+        return missed.toString();
     }
 
     /** {@inheritDoc} */
@@ -476,7 +478,7 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
         long hwm;
 
         synchronized (this) {
-            missed = gaps();
+            missed = missed();
 
             lwm = get();
 
@@ -499,16 +501,16 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
         String missed;
         long lwm;
         long hwm;
-        long maxApplied;
+        long reserved;
 
         synchronized (this) {
-            missed = gaps();
+            missed = missed();
 
             lwm = get();
 
-            hwm = reserveCntr.get();
+            hwm = highestAppliedCounter();
 
-            maxApplied = highestAppliedCounter();
+            reserved = reservedCntr.get();
         }
 
         return new SB()
@@ -516,10 +518,10 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
             .a(lwm)
             .a(", missed=")
             .a(missed)
-            .a(", maxApplied=")
-            .a(maxApplied)
             .a(", hwm=")
             .a(hwm)
+            .a(", reserved=")
+            .a(reserved)
             .a(']')
             .toString();
     }
@@ -533,11 +535,11 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
     @Override public PartitionUpdateCounter copy() {
         PartitionUpdateCounterTrackingImpl copy = createInstance();
 
-        copy.cntr.set(cntr.get());
+        copy.lwm.set(lwm.get());
         copy.first = first;
         copy.queue = new TreeMap<>(queue);
         copy.initCntr = initCntr;
-        copy.reserveCntr.set(reserveCntr.get());
+        copy.reservedCntr.set(reservedCntr.get());
 
         return copy;
     }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSpuriousRebalancingOnNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSpuriousRebalancingOnNodeJoinTest.java
index 960fa7f4ace..c7df9186058 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSpuriousRebalancingOnNodeJoinTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSpuriousRebalancingOnNodeJoinTest.java
@@ -154,7 +154,7 @@ public class IgnitePdsSpuriousRebalancingOnNodeJoinTest extends GridCommonAbstra
 
             PartitionUpdateCounterTrackingImpl delegate = U.field(cntr0, "delegate");
 
-            AtomicLong cntr = U.field(delegate, "cntr");
+            AtomicLong cntr = U.field(delegate, "lwm");
 
             cntr.set(cntr.get() - 1);
 
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index d9f8ade92ae..31c4b9200e6 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -1127,7 +1127,7 @@ public final class GridTestUtils {
      * @param task Runnable.
      * @return Future with task result.
      */
-    public static IgniteInternalFuture runAsync(final Runnable task) {
+    public static <T> IgniteInternalFuture<T> runAsync(final Runnable task) {
         return runAsync(task, "async-runnable-runner");
     }
 
@@ -1137,7 +1137,7 @@ public final class GridTestUtils {
      * @param task Runnable.
      * @return Future with task result.
      */
-    public static IgniteInternalFuture runAsync(final RunnableX task) {
+    public static <T> IgniteInternalFuture<T> runAsync(final RunnableX task) {
         return runAsync(task, "async-runnable-runner");
     }
 
@@ -1147,7 +1147,7 @@ public final class GridTestUtils {
      * @param task Runnable.
      * @return Future with task result.
      */
-    public static IgniteInternalFuture runAsync(final Runnable task, String threadName) {
+    public static <T> IgniteInternalFuture<T> runAsync(final Runnable task, String threadName) {
         return runAsync(() -> {
             task.run();
 
@@ -1161,7 +1161,7 @@ public final class GridTestUtils {
      * @param task Runnable.
      * @return Future with task result.
      */
-    public static IgniteInternalFuture runAsync(final RunnableX task, String threadName) {
+    public static <T> IgniteInternalFuture<T> runAsync(final RunnableX task, String threadName) {
         return runAsync(() -> {
             task.run();