You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/04/04 21:22:14 UTC

[ignite] branch master updated: IGNITE-14425 Hang transactions in FINISH [COMMIT] phase when сommunication spi is blocked. Fixes #8936

This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new a4bec2d  IGNITE-14425 Hang transactions in FINISH [COMMIT] phase when сommunication spi is blocked. Fixes #8936
a4bec2d is described below

commit a4bec2d68cfc22962fee483efcf11919625f574d
Author: denis-chudov <mo...@gmail.com>
AuthorDate: Mon Apr 5 00:18:22 2021 +0300

    IGNITE-14425 Hang transactions in FINISH [COMMIT] phase when сommunication spi is blocked. Fixes #8936
    
    Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
 .../cache/transactions/IgniteTxManager.java        |   5 +-
 .../transactions/TxRollbackOnTimeoutTest.java      | 101 ++++++++++++++++++---
 2 files changed, 91 insertions(+), 15 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index b189a3d..c3c07b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -3304,8 +3304,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                                             tx.rollbackAsync();
                                     });
                                 }
-                                // If we could not mark tx as rollback, it means that transaction is being committed.
-                                else if (tx.setRollbackOnly())
+                                // If we could not mark tx as rollback, it means that transaction
+                                // is either being committed or already marked as rollback.
+                                else if (tx.setRollbackOnly() || tx.state() == MARKED_ROLLBACK)
                                     tx.rollbackAsync();
                             }
                         }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
index b03853e..058b8c8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.TreeMap;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -45,6 +46,8 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
 import org.apache.ignite.internal.util.typedef.F;
@@ -68,6 +71,8 @@ import org.junit.Test;
 import static java.lang.Thread.sleep;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+import static org.apache.ignite.internal.util.IgniteUtils.awaitQuiet;
 import static org.apache.ignite.testframework.GridTestUtils.runAsync;
 import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
@@ -314,7 +319,7 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest {
 
                         l.countDown();
 
-                        U.awaitQuiet(l);
+                        awaitQuiet(l);
 
                         node1.cache(CACHE_NAME).put(2, 20);
 
@@ -337,7 +342,7 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest {
 
                     l.countDown();
 
-                    U.awaitQuiet(l);
+                    awaitQuiet(l);
 
                     node2.cache(CACHE_NAME).put(1, 1);
 
@@ -546,14 +551,14 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest {
 
                         readStartLatch.countDown();
 
-                        U.awaitQuiet(commitLatch);
+                        awaitQuiet(commitLatch);
 
                         tx.commit();
                     }
                 }
                 else {
                     try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 300, 1)) {
-                        U.awaitQuiet(readStartLatch);
+                        awaitQuiet(readStartLatch);
 
                         client.cache(CACHE_NAME).get(0); // Lock acquisition is queued.
                     }
@@ -712,12 +717,12 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest {
         List<Integer> keys = movingKeysAfterJoin(grid(1), CACHE_NAME, 1);
 
         // Delay exchange finish on server nodes if clientWait=true, or on all nodes otherwise (excluding joining node).
-        TestRecordingCommunicationSpi.spi(crd).blockMessages((node,
+        spi(crd).blockMessages((node,
             msg) -> node.order() < 5 && msg instanceof GridDhtPartitionsFullMessage &&
             (!clientWait || node.order() != grid(1).cluster().localNode().order()));
 
         // Delay prepare until exchange is finished.
-        TestRecordingCommunicationSpi.spi(client).blockMessages((node, msg) -> {
+        spi(client).blockMessages((node, msg) -> {
             boolean block = false;
 
             if (concurrency == PESSIMISTIC) {
@@ -760,7 +765,7 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest {
         IgniteInternalFuture fut1 = runAsync(new Runnable() {
             @Override public void run() {
                 try {
-                    TestRecordingCommunicationSpi.spi(client).waitForBlocked(); // TX is trying to prepare on prev top ver.
+                    spi(client).waitForBlocked(); // TX is trying to prepare on prev top ver.
 
                     startGrid(GRID_CNT);
                 }
@@ -774,10 +779,10 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest {
             @Override public void run() {
                 try {
                     // Wait for all full messages to be ready.
-                    TestRecordingCommunicationSpi.spi(crd).waitForBlocked(GRID_CNT + (clientWait ? 0 : 1));
+                    spi(crd).waitForBlocked(GRID_CNT + (clientWait ? 0 : 1));
 
                     // Trigger remap.
-                    TestRecordingCommunicationSpi.spi(client).stopBlock();
+                    spi(client).stopBlock();
                 }
                 catch (Exception e) {
                     fail(e.getMessage());
@@ -789,7 +794,7 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest {
         fut1.get(30_000);
         fut2.get(30_000);
 
-        TestRecordingCommunicationSpi.spi(crd).stopBlock();
+        spi(crd).stopBlock();
 
         // FIXME: If using awaitPartitionMapExchange for waiting it some times fail while waiting for owners.
         IgniteInternalFuture<?> topFut = ((IgniteEx)client).context().cache().context().exchange().
@@ -1052,7 +1057,7 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest {
                     }
 
                     // Will be unblocked after tx timeout occurs.
-                    U.awaitQuiet(unblocked);
+                    awaitQuiet(unblocked);
 
                     try {
                         near.cache(CACHE_NAME).put(0, 0);
@@ -1085,7 +1090,7 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest {
 
         IgniteInternalFuture<?> fut2 = runAsync(new Runnable() {
             @Override public void run() {
-                U.awaitQuiet(blocked);
+                awaitQuiet(blocked);
 
                 try (Transaction tx = other.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 0, 1)) {
                     for (int i = 0; i < recordsCnt; i++)
@@ -1155,7 +1160,7 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest {
 
         IgniteInternalFuture<?> fut2 = multithreadedAsync(new Runnable() {
             @Override public void run() {
-                U.awaitQuiet(blocked);
+                awaitQuiet(blocked);
 
                 try (Transaction tx = other.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 0, recordsCnt)) {
                     for (int i = 0; i < recordsCnt; i++)
@@ -1178,4 +1183,74 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest {
 
         fut2.get();
     }
+
+    /**
+     * Tests that transactions from near node that has left are rolled back on server nodes.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testRollbackOnNearNodeLeft() throws Exception {
+        Ignite client = startClient();
+
+        Integer pk0 = primaryKey(grid(0).cache(CACHE_NAME));
+        Integer pk1 = primaryKey(grid(1).cache(CACHE_NAME));
+
+        CountDownLatch locked = new CountDownLatch(1);
+        CountDownLatch blocked = new CountDownLatch(1);
+
+        IgniteInternalFuture<Void> fut = runAsync(new Callable<Void>() {
+            @Override public Void call() {
+                try (Transaction tx0 = client.transactions().txStart()) {
+                    client.cache(CACHE_NAME).put(pk0, 0);
+
+                    locked.countDown();
+
+                    awaitQuiet(blocked);
+
+                    tx0.commit();
+                }
+                catch (Exception ignored) {
+                    // No-op.
+                }
+                return null;
+            }
+        });
+
+        IgniteInternalFuture fut2 = runAsync(new Runnable() {
+            @Override public void run() {
+                try (Transaction tx1 = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 1000, 0)) {
+                    awaitQuiet(locked);
+
+                    client.cache(CACHE_NAME).put(pk1, 1);
+
+                    spi(client).blockMessages((node, msg) -> msg instanceof GridNearTxFinishRequest);
+                    spi(grid(0)).blockMessages((node, msg) -> msg instanceof GridNearLockResponse);
+
+                    client.cache(CACHE_NAME).put(pk0, 1);
+
+                    fail();
+                }
+                catch (Exception e) {
+                    assertTrue(X.hasCause(e, TransactionTimeoutException.class));
+                }
+            }
+        });
+
+        spi(client).waitForBlocked();
+        spi(grid(0)).waitForBlocked();
+
+        fut2.get();
+
+        client.close();
+
+        spi(grid(0)).stopBlock();
+
+        blocked.countDown();
+
+        fut.get();
+
+        for (int i = 0; i < GRID_CNT; i++)
+            assertTrue(grid(i).context().cache().context().tm().activeTransactions().isEmpty());
+    }
 }