You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/08/04 09:37:44 UTC

[GitHub] [ignite] anton-vinogradov commented on a diff in pull request #10178: IGNITE-17457 Fix cluster lock after tx recovery

anton-vinogradov commented on code in PR #10178:
URL: https://github.com/apache/ignite/pull/10178#discussion_r937556299


##########
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRecoveryWithConcurrentRollbackTest.java:
##########
@@ -258,6 +259,110 @@ else if (g1Keys.contains(key))
         assertEquals(s1, s2);
     }
 
+
+    /**
+     * The test enforces the concurrent processing of the same prepared transaction both in the
+     * tx recovery procedure started due to primary node left and in the tx recovery request handler
+     * invoked by message from another backup node.
+     * <ul>
+     *  <li>Start 3 nodes (g0, g1, g2) and cache with 2 backups.</li>
+     *  <li>Prepare a transaction with g2 as a primary node.</li>
+     *  <li>Kill g2.</li>
+     *  <li>Enforce the concurrent processing of transaction in tx recovery on g1.</li>
+     * </ul>
+     * Use several attempts to reproduce the race condition.
+     * <p>
+     * Expected result: transaction is finished both on g0 and g1
+     */
+    @Test
+    public void testRecoveryNotDeadLockOnPrimaryFail() throws Exception {
+        backups = 2;
+        persistence = false;
+
+        for (int iter = 0; iter < 75; iter++) {
+            stopAllGrids();
+
+            log.info("iteration=" + iter);
+            final IgniteEx grid0 = startGrid("g0");
+            final IgniteEx grid1 = startGrid("g1",
+                    cfg -> cfg.setSystemThreadPoolSize(1).setStripedPoolSize(1));
+            final IgniteEx grid2 = startGrid("g2");
+
+            grid0.cluster().state(ACTIVE);
+
+            final IgniteCache<Object, Object> cache = grid2.cache(DEFAULT_CACHE_NAME);
+
+            final Integer g2Key = primaryKeys(grid2.cache(DEFAULT_CACHE_NAME), 1, 0).get(0);
+
+            List<IgniteInternalTx> txs0 = null;
+            List<IgniteInternalTx> txs1 = null;
+
+            CountDownLatch grid1BlockLatch = new CountDownLatch(1);
+
+            try (final Transaction tx = grid2.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                cache.put(g2Key, Boolean.TRUE);
+                TransactionProxyImpl<?, ?> p = (TransactionProxyImpl<?, ?>)tx;
+                p.tx().prepare(true);
+
+                txs0 = txs(grid0);
+                txs1 = txs(grid1);
+                List<IgniteInternalTx> txs2 = txs(grid2);
+
+                assertTrue(txs0.size() == 1);
+                assertTrue(txs1.size() == 1);
+                assertTrue(txs2.size() == 1);
+
+                // Prevent tx recovery request to be sent from grid0 to grid1.
+                spi(grid0).blockMessages(GridCacheTxRecoveryRequest.class, grid1.name());
+
+                // Block recovery procedure processing on grid1
+                grid1.context().pools().getSystemExecutorService().execute(() -> U.awaitQuiet(grid1BlockLatch));
+                // Block stripe tx recovery request processing on grid1
+                int stripe = U.safeAbs(p.tx().xidVersion().hashCode());
+                grid1.context().pools().getStripedExecutorService().execute(stripe, () -> U.awaitQuiet(grid1BlockLatch));
+
+                // Prevent finish request processing on grid0 and grid1.
+                spi(grid2).blockMessages(GridDhtTxFinishRequest.class, grid0.name());
+                spi(grid2).blockMessages(GridDhtTxFinishRequest.class, grid1.name());
+
+                runAsync(() -> {
+                    grid2.close();
+                    return null;
+                });
+            }
+            catch (Exception ignored) {
+                // Expected.
+            }
+
+            // Wait grid0 is ready to send the tx recovery request to grid1
+            spi(grid0).waitForBlocked();
+            // Let grid0 send the tx recovery request to grid1
+            log.info("unblock grid0");
+            spi(grid0).stopBlock();
+            // Give grid1 some time to receive the tx recovery request (processing is still blocked in grid1).
+            log.info("sleep in grid0");
+            doSleep(100);
+
+            // Unblock processing in grid1. Simultaneously in striped and system pools to start
+            // recovery procedure and the tx recovery request processing at the "same" moment
+            // (for the same transaction). This should increase chances for race condition occur in
+            // the IgniteTxAdapter::markFinalizing.
+            log.info("unblock grid1");

Review Comment:
   Plrase get rid of helpless logging.
   Use asserts "to guarantee" instead of logging "to find".



##########
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRecoveryWithConcurrentRollbackTest.java:
##########
@@ -258,6 +259,110 @@ else if (g1Keys.contains(key))
         assertEquals(s1, s2);
     }
 
+
+    /**
+     * The test enforces the concurrent processing of the same prepared transaction both in the
+     * tx recovery procedure started due to primary node left and in the tx recovery request handler
+     * invoked by message from another backup node.
+     * <ul>
+     *  <li>Start 3 nodes (g0, g1, g2) and cache with 2 backups.</li>
+     *  <li>Prepare a transaction with g2 as a primary node.</li>
+     *  <li>Kill g2.</li>
+     *  <li>Enforce the concurrent processing of transaction in tx recovery on g1.</li>
+     * </ul>
+     * Use several attempts to reproduce the race condition.
+     * <p>
+     * Expected result: transaction is finished both on g0 and g1
+     */
+    @Test
+    public void testRecoveryNotDeadLockOnPrimaryFail() throws Exception {
+        backups = 2;
+        persistence = false;
+
+        for (int iter = 0; iter < 75; iter++) {
+            stopAllGrids();
+
+            log.info("iteration=" + iter);
+            final IgniteEx grid0 = startGrid("g0");
+            final IgniteEx grid1 = startGrid("g1",
+                    cfg -> cfg.setSystemThreadPoolSize(1).setStripedPoolSize(1));
+            final IgniteEx grid2 = startGrid("g2");
+
+            grid0.cluster().state(ACTIVE);
+
+            final IgniteCache<Object, Object> cache = grid2.cache(DEFAULT_CACHE_NAME);
+
+            final Integer g2Key = primaryKeys(grid2.cache(DEFAULT_CACHE_NAME), 1, 0).get(0);
+
+            List<IgniteInternalTx> txs0 = null;
+            List<IgniteInternalTx> txs1 = null;
+
+            CountDownLatch grid1BlockLatch = new CountDownLatch(1);
+
+            try (final Transaction tx = grid2.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                cache.put(g2Key, Boolean.TRUE);
+                TransactionProxyImpl<?, ?> p = (TransactionProxyImpl<?, ?>)tx;
+                p.tx().prepare(true);
+
+                txs0 = txs(grid0);
+                txs1 = txs(grid1);
+                List<IgniteInternalTx> txs2 = txs(grid2);
+
+                assertTrue(txs0.size() == 1);
+                assertTrue(txs1.size() == 1);
+                assertTrue(txs2.size() == 1);
+
+                // Prevent tx recovery request to be sent from grid0 to grid1.
+                spi(grid0).blockMessages(GridCacheTxRecoveryRequest.class, grid1.name());
+
+                // Block recovery procedure processing on grid1
+                grid1.context().pools().getSystemExecutorService().execute(() -> U.awaitQuiet(grid1BlockLatch));
+                // Block stripe tx recovery request processing on grid1
+                int stripe = U.safeAbs(p.tx().xidVersion().hashCode());
+                grid1.context().pools().getStripedExecutorService().execute(stripe, () -> U.awaitQuiet(grid1BlockLatch));
+
+                // Prevent finish request processing on grid0 and grid1.
+                spi(grid2).blockMessages(GridDhtTxFinishRequest.class, grid0.name());
+                spi(grid2).blockMessages(GridDhtTxFinishRequest.class, grid1.name());
+
+                runAsync(() -> {
+                    grid2.close();
+                    return null;
+                });
+            }
+            catch (Exception ignored) {
+                // Expected.
+            }
+
+            // Wait grid0 is ready to send the tx recovery request to grid1
+            spi(grid0).waitForBlocked();
+            // Let grid0 send the tx recovery request to grid1
+            log.info("unblock grid0");
+            spi(grid0).stopBlock();
+            // Give grid1 some time to receive the tx recovery request (processing is still blocked in grid1).
+            log.info("sleep in grid0");

Review Comment:
   Please separate semantic units according to 
   https://cwiki.apache.org/confluence/display/IGNITE/Coding+Guidelines#CodingGuidelines-SemanticUnits



##########
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRecoveryWithConcurrentRollbackTest.java:
##########
@@ -258,6 +259,110 @@ else if (g1Keys.contains(key))
         assertEquals(s1, s2);
     }
 
+
+    /**
+     * The test enforces the concurrent processing of the same prepared transaction both in the
+     * tx recovery procedure started due to primary node left and in the tx recovery request handler
+     * invoked by message from another backup node.
+     * <ul>
+     *  <li>Start 3 nodes (g0, g1, g2) and cache with 2 backups.</li>
+     *  <li>Prepare a transaction with g2 as a primary node.</li>
+     *  <li>Kill g2.</li>
+     *  <li>Enforce the concurrent processing of transaction in tx recovery on g1.</li>

Review Comment:
   Javadoc must explain ides, not to retell test content.
   Such approach is not refactoring friendly, please get rid of retelling.



##########
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRecoveryWithConcurrentRollbackTest.java:
##########
@@ -258,6 +259,110 @@ else if (g1Keys.contains(key))
         assertEquals(s1, s2);
     }
 
+
+    /**
+     * The test enforces the concurrent processing of the same prepared transaction both in the
+     * tx recovery procedure started due to primary node left and in the tx recovery request handler
+     * invoked by message from another backup node.
+     * <ul>
+     *  <li>Start 3 nodes (g0, g1, g2) and cache with 2 backups.</li>
+     *  <li>Prepare a transaction with g2 as a primary node.</li>
+     *  <li>Kill g2.</li>
+     *  <li>Enforce the concurrent processing of transaction in tx recovery on g1.</li>
+     * </ul>
+     * Use several attempts to reproduce the race condition.
+     * <p>
+     * Expected result: transaction is finished both on g0 and g1
+     */
+    @Test
+    public void testRecoveryNotDeadLockOnPrimaryFail() throws Exception {
+        backups = 2;
+        persistence = false;
+
+        for (int iter = 0; iter < 75; iter++) {

Review Comment:
   Why 75?
   Looks like a magic number.
   Use regular numbers like 10, 100, ... when you just need some iterations, or explain your choice.



##########
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRecoveryWithConcurrentRollbackTest.java:
##########
@@ -258,6 +259,110 @@ else if (g1Keys.contains(key))
         assertEquals(s1, s2);
     }
 
+
+    /**
+     * The test enforces the concurrent processing of the same prepared transaction both in the
+     * tx recovery procedure started due to primary node left and in the tx recovery request handler
+     * invoked by message from another backup node.
+     * <ul>
+     *  <li>Start 3 nodes (g0, g1, g2) and cache with 2 backups.</li>
+     *  <li>Prepare a transaction with g2 as a primary node.</li>
+     *  <li>Kill g2.</li>
+     *  <li>Enforce the concurrent processing of transaction in tx recovery on g1.</li>
+     * </ul>
+     * Use several attempts to reproduce the race condition.
+     * <p>
+     * Expected result: transaction is finished both on g0 and g1
+     */
+    @Test
+    public void testRecoveryNotDeadLockOnPrimaryFail() throws Exception {
+        backups = 2;
+        persistence = false;
+
+        for (int iter = 0; iter < 75; iter++) {
+            stopAllGrids();
+
+            log.info("iteration=" + iter);
+            final IgniteEx grid0 = startGrid("g0");
+            final IgniteEx grid1 = startGrid("g1",
+                    cfg -> cfg.setSystemThreadPoolSize(1).setStripedPoolSize(1));
+            final IgniteEx grid2 = startGrid("g2");
+
+            grid0.cluster().state(ACTIVE);
+
+            final IgniteCache<Object, Object> cache = grid2.cache(DEFAULT_CACHE_NAME);
+
+            final Integer g2Key = primaryKeys(grid2.cache(DEFAULT_CACHE_NAME), 1, 0).get(0);
+
+            List<IgniteInternalTx> txs0 = null;
+            List<IgniteInternalTx> txs1 = null;
+
+            CountDownLatch grid1BlockLatch = new CountDownLatch(1);
+
+            try (final Transaction tx = grid2.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                cache.put(g2Key, Boolean.TRUE);
+                TransactionProxyImpl<?, ?> p = (TransactionProxyImpl<?, ?>)tx;
+                p.tx().prepare(true);
+
+                txs0 = txs(grid0);
+                txs1 = txs(grid1);
+                List<IgniteInternalTx> txs2 = txs(grid2);
+
+                assertTrue(txs0.size() == 1);
+                assertTrue(txs1.size() == 1);
+                assertTrue(txs2.size() == 1);
+
+                // Prevent tx recovery request to be sent from grid0 to grid1.
+                spi(grid0).blockMessages(GridCacheTxRecoveryRequest.class, grid1.name());
+
+                // Block recovery procedure processing on grid1
+                grid1.context().pools().getSystemExecutorService().execute(() -> U.awaitQuiet(grid1BlockLatch));
+                // Block stripe tx recovery request processing on grid1
+                int stripe = U.safeAbs(p.tx().xidVersion().hashCode());
+                grid1.context().pools().getStripedExecutorService().execute(stripe, () -> U.awaitQuiet(grid1BlockLatch));
+
+                // Prevent finish request processing on grid0 and grid1.
+                spi(grid2).blockMessages(GridDhtTxFinishRequest.class, grid0.name());
+                spi(grid2).blockMessages(GridDhtTxFinishRequest.class, grid1.name());
+
+                runAsync(() -> {
+                    grid2.close();
+                    return null;
+                });
+            }
+            catch (Exception ignored) {
+                // Expected.
+            }
+
+            // Wait grid0 is ready to send the tx recovery request to grid1
+            spi(grid0).waitForBlocked();
+            // Let grid0 send the tx recovery request to grid1
+            log.info("unblock grid0");
+            spi(grid0).stopBlock();
+            // Give grid1 some time to receive the tx recovery request (processing is still blocked in grid1).
+            log.info("sleep in grid0");
+            doSleep(100);
+
+            // Unblock processing in grid1. Simultaneously in striped and system pools to start
+            // recovery procedure and the tx recovery request processing at the "same" moment
+            // (for the same transaction). This should increase chances for race condition occur in
+            // the IgniteTxAdapter::markFinalizing.
+            log.info("unblock grid1");
+            grid1BlockLatch.countDown();
+
+            // Wait transaction finish in grid0.
+            txs0.get(0).finishFuture().get(5_000);

Review Comment:
   Please never **get_with_timeout** without checking the result of the **get**.
   Also, try to avoid sleeps, timeouts and so on, use concurrent latches, futures, etc instead.



##########
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRecoveryWithConcurrentRollbackTest.java:
##########
@@ -258,6 +259,110 @@ else if (g1Keys.contains(key))
         assertEquals(s1, s2);
     }
 
+
+    /**
+     * The test enforces the concurrent processing of the same prepared transaction both in the
+     * tx recovery procedure started due to primary node left and in the tx recovery request handler
+     * invoked by message from another backup node.
+     * <ul>
+     *  <li>Start 3 nodes (g0, g1, g2) and cache with 2 backups.</li>
+     *  <li>Prepare a transaction with g2 as a primary node.</li>
+     *  <li>Kill g2.</li>
+     *  <li>Enforce the concurrent processing of transaction in tx recovery on g1.</li>
+     * </ul>
+     * Use several attempts to reproduce the race condition.
+     * <p>
+     * Expected result: transaction is finished both on g0 and g1
+     */
+    @Test
+    public void testRecoveryNotDeadLockOnPrimaryFail() throws Exception {
+        backups = 2;
+        persistence = false;
+
+        for (int iter = 0; iter < 75; iter++) {
+            stopAllGrids();
+
+            log.info("iteration=" + iter);
+            final IgniteEx grid0 = startGrid("g0");
+            final IgniteEx grid1 = startGrid("g1",
+                    cfg -> cfg.setSystemThreadPoolSize(1).setStripedPoolSize(1));
+            final IgniteEx grid2 = startGrid("g2");
+
+            grid0.cluster().state(ACTIVE);
+
+            final IgniteCache<Object, Object> cache = grid2.cache(DEFAULT_CACHE_NAME);
+
+            final Integer g2Key = primaryKeys(grid2.cache(DEFAULT_CACHE_NAME), 1, 0).get(0);
+
+            List<IgniteInternalTx> txs0 = null;
+            List<IgniteInternalTx> txs1 = null;
+
+            CountDownLatch grid1BlockLatch = new CountDownLatch(1);
+
+            try (final Transaction tx = grid2.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                cache.put(g2Key, Boolean.TRUE);
+                TransactionProxyImpl<?, ?> p = (TransactionProxyImpl<?, ?>)tx;
+                p.tx().prepare(true);
+
+                txs0 = txs(grid0);
+                txs1 = txs(grid1);
+                List<IgniteInternalTx> txs2 = txs(grid2);
+
+                assertTrue(txs0.size() == 1);
+                assertTrue(txs1.size() == 1);
+                assertTrue(txs2.size() == 1);
+
+                // Prevent tx recovery request to be sent from grid0 to grid1.
+                spi(grid0).blockMessages(GridCacheTxRecoveryRequest.class, grid1.name());
+
+                // Block recovery procedure processing on grid1
+                grid1.context().pools().getSystemExecutorService().execute(() -> U.awaitQuiet(grid1BlockLatch));
+                // Block stripe tx recovery request processing on grid1
+                int stripe = U.safeAbs(p.tx().xidVersion().hashCode());
+                grid1.context().pools().getStripedExecutorService().execute(stripe, () -> U.awaitQuiet(grid1BlockLatch));
+
+                // Prevent finish request processing on grid0 and grid1.
+                spi(grid2).blockMessages(GridDhtTxFinishRequest.class, grid0.name());
+                spi(grid2).blockMessages(GridDhtTxFinishRequest.class, grid1.name());
+
+                runAsync(() -> {
+                    grid2.close();
+                    return null;
+                });
+            }
+            catch (Exception ignored) {
+                // Expected.
+            }
+
+            // Wait grid0 is ready to send the tx recovery request to grid1
+            spi(grid0).waitForBlocked();
+            // Let grid0 send the tx recovery request to grid1
+            log.info("unblock grid0");
+            spi(grid0).stopBlock();
+            // Give grid1 some time to receive the tx recovery request (processing is still blocked in grid1).
+            log.info("sleep in grid0");
+            doSleep(100);
+
+            // Unblock processing in grid1. Simultaneously in striped and system pools to start
+            // recovery procedure and the tx recovery request processing at the "same" moment
+            // (for the same transaction). This should increase chances for race condition occur in
+            // the IgniteTxAdapter::markFinalizing.
+            log.info("unblock grid1");
+            grid1BlockLatch.countDown();
+
+            // Wait transaction finish in grid0.
+            txs0.get(0).finishFuture().get(5_000);
+
+            try {
+                // Check if transaction is finished in grid1. It wouldn't if race condition occur.
+                txs1.get(0).finishFuture().get(20_000);

Review Comment:
   Looks like after the fix you need no timeout at all, please get rid if so.



##########
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRecoveryWithConcurrentRollbackTest.java:
##########
@@ -258,6 +259,110 @@ else if (g1Keys.contains(key))
         assertEquals(s1, s2);
     }
 
+
+    /**
+     * The test enforces the concurrent processing of the same prepared transaction both in the
+     * tx recovery procedure started due to primary node left and in the tx recovery request handler
+     * invoked by message from another backup node.
+     * <ul>
+     *  <li>Start 3 nodes (g0, g1, g2) and cache with 2 backups.</li>
+     *  <li>Prepare a transaction with g2 as a primary node.</li>
+     *  <li>Kill g2.</li>
+     *  <li>Enforce the concurrent processing of transaction in tx recovery on g1.</li>
+     * </ul>
+     * Use several attempts to reproduce the race condition.
+     * <p>
+     * Expected result: transaction is finished both on g0 and g1
+     */
+    @Test
+    public void testRecoveryNotDeadLockOnPrimaryFail() throws Exception {
+        backups = 2;
+        persistence = false;
+
+        for (int iter = 0; iter < 75; iter++) {
+            stopAllGrids();
+
+            log.info("iteration=" + iter);
+            final IgniteEx grid0 = startGrid("g0");
+            final IgniteEx grid1 = startGrid("g1",
+                    cfg -> cfg.setSystemThreadPoolSize(1).setStripedPoolSize(1));
+            final IgniteEx grid2 = startGrid("g2");

Review Comment:
   Please call something only when you're going to summon it by name.
   Please get rig of useless naming.



##########
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRecoveryWithConcurrentRollbackTest.java:
##########
@@ -258,6 +259,110 @@ else if (g1Keys.contains(key))
         assertEquals(s1, s2);
     }
 
+
+    /**
+     * The test enforces the concurrent processing of the same prepared transaction both in the
+     * tx recovery procedure started due to primary node left and in the tx recovery request handler
+     * invoked by message from another backup node.
+     * <ul>
+     *  <li>Start 3 nodes (g0, g1, g2) and cache with 2 backups.</li>
+     *  <li>Prepare a transaction with g2 as a primary node.</li>
+     *  <li>Kill g2.</li>
+     *  <li>Enforce the concurrent processing of transaction in tx recovery on g1.</li>
+     * </ul>
+     * Use several attempts to reproduce the race condition.
+     * <p>
+     * Expected result: transaction is finished both on g0 and g1
+     */
+    @Test
+    public void testRecoveryNotDeadLockOnPrimaryFail() throws Exception {
+        backups = 2;
+        persistence = false;
+
+        for (int iter = 0; iter < 75; iter++) {
+            stopAllGrids();
+
+            log.info("iteration=" + iter);
+            final IgniteEx grid0 = startGrid("g0");
+            final IgniteEx grid1 = startGrid("g1",
+                    cfg -> cfg.setSystemThreadPoolSize(1).setStripedPoolSize(1));
+            final IgniteEx grid2 = startGrid("g2");
+
+            grid0.cluster().state(ACTIVE);
+
+            final IgniteCache<Object, Object> cache = grid2.cache(DEFAULT_CACHE_NAME);
+
+            final Integer g2Key = primaryKeys(grid2.cache(DEFAULT_CACHE_NAME), 1, 0).get(0);
+
+            List<IgniteInternalTx> txs0 = null;
+            List<IgniteInternalTx> txs1 = null;
+
+            CountDownLatch grid1BlockLatch = new CountDownLatch(1);
+
+            try (final Transaction tx = grid2.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                cache.put(g2Key, Boolean.TRUE);
+                TransactionProxyImpl<?, ?> p = (TransactionProxyImpl<?, ?>)tx;
+                p.tx().prepare(true);
+
+                txs0 = txs(grid0);
+                txs1 = txs(grid1);
+                List<IgniteInternalTx> txs2 = txs(grid2);
+
+                assertTrue(txs0.size() == 1);
+                assertTrue(txs1.size() == 1);
+                assertTrue(txs2.size() == 1);
+
+                // Prevent tx recovery request to be sent from grid0 to grid1.
+                spi(grid0).blockMessages(GridCacheTxRecoveryRequest.class, grid1.name());
+
+                // Block recovery procedure processing on grid1
+                grid1.context().pools().getSystemExecutorService().execute(() -> U.awaitQuiet(grid1BlockLatch));
+                // Block stripe tx recovery request processing on grid1
+                int stripe = U.safeAbs(p.tx().xidVersion().hashCode());
+                grid1.context().pools().getStripedExecutorService().execute(stripe, () -> U.awaitQuiet(grid1BlockLatch));
+
+                // Prevent finish request processing on grid0 and grid1.
+                spi(grid2).blockMessages(GridDhtTxFinishRequest.class, grid0.name());
+                spi(grid2).blockMessages(GridDhtTxFinishRequest.class, grid1.name());
+
+                runAsync(() -> {
+                    grid2.close();
+                    return null;
+                });
+            }
+            catch (Exception ignored) {
+                // Expected.
+            }
+
+            // Wait grid0 is ready to send the tx recovery request to grid1
+            spi(grid0).waitForBlocked();
+            // Let grid0 send the tx recovery request to grid1
+            log.info("unblock grid0");
+            spi(grid0).stopBlock();
+            // Give grid1 some time to receive the tx recovery request (processing is still blocked in grid1).
+            log.info("sleep in grid0");
+            doSleep(100);

Review Comment:
   Please never use sleeps at the tests.
   Each sleep means you can't guarantee something, but trying to hide this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org