You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/04/04 21:22:14 UTC
[ignite] branch master updated: IGNITE-14425 Hang transactions in FINISH [COMMIT] phase when сommunication spi is blocked. Fixes #8936
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new a4bec2d IGNITE-14425 Hang transactions in FINISH [COMMIT] phase when сommunication spi is blocked. Fixes #8936
a4bec2d is described below
commit a4bec2d68cfc22962fee483efcf11919625f574d
Author: denis-chudov <mo...@gmail.com>
AuthorDate: Mon Apr 5 00:18:22 2021 +0300
IGNITE-14425 Hang transactions in FINISH [COMMIT] phase when сommunication spi is blocked. Fixes #8936
Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
.../cache/transactions/IgniteTxManager.java | 5 +-
.../transactions/TxRollbackOnTimeoutTest.java | 101 ++++++++++++++++++---
2 files changed, 91 insertions(+), 15 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index b189a3d..c3c07b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -3304,8 +3304,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
tx.rollbackAsync();
});
}
- // If we could not mark tx as rollback, it means that transaction is being committed.
- else if (tx.setRollbackOnly())
+ // If we could not mark tx as rollback, it means that transaction
+ // is either being committed or already marked as rollback.
+ else if (tx.setRollbackOnly() || tx.state() == MARKED_ROLLBACK)
tx.rollbackAsync();
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
index b03853e..058b8c8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -45,6 +46,8 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
import org.apache.ignite.internal.util.typedef.F;
@@ -68,6 +71,8 @@ import org.junit.Test;
import static java.lang.Thread.sleep;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+import static org.apache.ignite.internal.util.IgniteUtils.awaitQuiet;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
@@ -314,7 +319,7 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest {
l.countDown();
- U.awaitQuiet(l);
+ awaitQuiet(l);
node1.cache(CACHE_NAME).put(2, 20);
@@ -337,7 +342,7 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest {
l.countDown();
- U.awaitQuiet(l);
+ awaitQuiet(l);
node2.cache(CACHE_NAME).put(1, 1);
@@ -546,14 +551,14 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest {
readStartLatch.countDown();
- U.awaitQuiet(commitLatch);
+ awaitQuiet(commitLatch);
tx.commit();
}
}
else {
try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 300, 1)) {
- U.awaitQuiet(readStartLatch);
+ awaitQuiet(readStartLatch);
client.cache(CACHE_NAME).get(0); // Lock acquisition is queued.
}
@@ -712,12 +717,12 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest {
List<Integer> keys = movingKeysAfterJoin(grid(1), CACHE_NAME, 1);
// Delay exchange finish on server nodes if clientWait=true, or on all nodes otherwise (excluding joining node).
- TestRecordingCommunicationSpi.spi(crd).blockMessages((node,
+ spi(crd).blockMessages((node,
msg) -> node.order() < 5 && msg instanceof GridDhtPartitionsFullMessage &&
(!clientWait || node.order() != grid(1).cluster().localNode().order()));
// Delay prepare until exchange is finished.
- TestRecordingCommunicationSpi.spi(client).blockMessages((node, msg) -> {
+ spi(client).blockMessages((node, msg) -> {
boolean block = false;
if (concurrency == PESSIMISTIC) {
@@ -760,7 +765,7 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest {
IgniteInternalFuture fut1 = runAsync(new Runnable() {
@Override public void run() {
try {
- TestRecordingCommunicationSpi.spi(client).waitForBlocked(); // TX is trying to prepare on prev top ver.
+ spi(client).waitForBlocked(); // TX is trying to prepare on prev top ver.
startGrid(GRID_CNT);
}
@@ -774,10 +779,10 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest {
@Override public void run() {
try {
// Wait for all full messages to be ready.
- TestRecordingCommunicationSpi.spi(crd).waitForBlocked(GRID_CNT + (clientWait ? 0 : 1));
+ spi(crd).waitForBlocked(GRID_CNT + (clientWait ? 0 : 1));
// Trigger remap.
- TestRecordingCommunicationSpi.spi(client).stopBlock();
+ spi(client).stopBlock();
}
catch (Exception e) {
fail(e.getMessage());
@@ -789,7 +794,7 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest {
fut1.get(30_000);
fut2.get(30_000);
- TestRecordingCommunicationSpi.spi(crd).stopBlock();
+ spi(crd).stopBlock();
// FIXME: If using awaitPartitionMapExchange for waiting it some times fail while waiting for owners.
IgniteInternalFuture<?> topFut = ((IgniteEx)client).context().cache().context().exchange().
@@ -1052,7 +1057,7 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest {
}
// Will be unblocked after tx timeout occurs.
- U.awaitQuiet(unblocked);
+ awaitQuiet(unblocked);
try {
near.cache(CACHE_NAME).put(0, 0);
@@ -1085,7 +1090,7 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest {
IgniteInternalFuture<?> fut2 = runAsync(new Runnable() {
@Override public void run() {
- U.awaitQuiet(blocked);
+ awaitQuiet(blocked);
try (Transaction tx = other.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 0, 1)) {
for (int i = 0; i < recordsCnt; i++)
@@ -1155,7 +1160,7 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest {
IgniteInternalFuture<?> fut2 = multithreadedAsync(new Runnable() {
@Override public void run() {
- U.awaitQuiet(blocked);
+ awaitQuiet(blocked);
try (Transaction tx = other.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 0, recordsCnt)) {
for (int i = 0; i < recordsCnt; i++)
@@ -1178,4 +1183,74 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest {
fut2.get();
}
+
+ /**
+ * Tests that transactions from near node that has left are rolled back on server nodes.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testRollbackOnNearNodeLeft() throws Exception {
+ Ignite client = startClient();
+
+ Integer pk0 = primaryKey(grid(0).cache(CACHE_NAME));
+ Integer pk1 = primaryKey(grid(1).cache(CACHE_NAME));
+
+ CountDownLatch locked = new CountDownLatch(1);
+ CountDownLatch blocked = new CountDownLatch(1);
+
+ IgniteInternalFuture<Void> fut = runAsync(new Callable<Void>() {
+ @Override public Void call() {
+ try (Transaction tx0 = client.transactions().txStart()) {
+ client.cache(CACHE_NAME).put(pk0, 0);
+
+ locked.countDown();
+
+ awaitQuiet(blocked);
+
+ tx0.commit();
+ }
+ catch (Exception ignored) {
+ // No-op.
+ }
+ return null;
+ }
+ });
+
+ IgniteInternalFuture fut2 = runAsync(new Runnable() {
+ @Override public void run() {
+ try (Transaction tx1 = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 1000, 0)) {
+ awaitQuiet(locked);
+
+ client.cache(CACHE_NAME).put(pk1, 1);
+
+ spi(client).blockMessages((node, msg) -> msg instanceof GridNearTxFinishRequest);
+ spi(grid(0)).blockMessages((node, msg) -> msg instanceof GridNearLockResponse);
+
+ client.cache(CACHE_NAME).put(pk0, 1);
+
+ fail();
+ }
+ catch (Exception e) {
+ assertTrue(X.hasCause(e, TransactionTimeoutException.class));
+ }
+ }
+ });
+
+ spi(client).waitForBlocked();
+ spi(grid(0)).waitForBlocked();
+
+ fut2.get();
+
+ client.close();
+
+ spi(grid(0)).stopBlock();
+
+ blocked.countDown();
+
+ fut.get();
+
+ for (int i = 0; i < GRID_CNT; i++)
+ assertTrue(grid(i).context().cache().context().tm().activeTransactions().isEmpty());
+ }
}