You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/04/22 21:30:08 UTC
[ignite] branch master updated: IGNITE-14423 Fixed node failure
caused by AssertionError: Transaction does not own lock for update. Fixes
#8935
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new bec58b0 IGNITE-14423 Fixed node failure caused by AssertionError: Transaction does not own lock for update. Fixes #8935
bec58b0 is described below
commit bec58b00e0c611448ac15841b4b6831a3dcfd296
Author: Denis Chudov <mo...@gmail.com>
AuthorDate: Fri Apr 23 00:29:18 2021 +0300
IGNITE-14423 Fixed node failure caused by AssertionError: Transaction does not own lock for update. Fixes #8935
Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
.../distributed/dht/GridDhtTxPrepareFuture.java | 8 +-
.../TxRecoveryWithConcurrentRollbackTest.java | 140 +++++++++++++++++++--
2 files changed, 137 insertions(+), 11 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index c362c31..8607458 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1033,7 +1033,13 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
* @return {@code True} if {@code done} flag was changed as a result of this call.
*/
private boolean onComplete(@Nullable GridNearTxPrepareResponse res) {
- if (!tx.onePhaseCommit() && ((last || tx.isSystemInvalidate()) && !(tx.near() && tx.local())))
+ if (res.error() != null) {
+ if (log.isDebugEnabled())
+ log.debug("Transaction marked for rollback because of error on dht prepare [tx=" + tx + ", error=" + res.error() + "]");
+
+ tx.setRollbackOnly();
+ }
+ else if (!tx.onePhaseCommit() && ((last || tx.isSystemInvalidate()) && !(tx.near() && tx.local())))
tx.state(PREPARED);
if (super.onDone(res, res == null ? err : null)) {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRecoveryWithConcurrentRollbackTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRecoveryWithConcurrentRollbackTest.java
index 0367945..53c41b9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRecoveryWithConcurrentRollbackTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRecoveryWithConcurrentRollbackTest.java
@@ -19,7 +19,11 @@ package org.apache.ignite.internal.processors.cache.transactions;
import java.util.ArrayList;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
@@ -27,12 +31,15 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeFailureHandler;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocal;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareFuture;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
@@ -46,7 +53,11 @@ import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
@@ -79,9 +90,11 @@ public class TxRecoveryWithConcurrentRollbackTest extends GridCommonAbstractTest
}
cfg.setActiveOnStart(false);
- cfg.setClientMode("client".equals(name));
+ cfg.setClientMode(name.startsWith("client"));
cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+ cfg.setFailureHandler(new StopNodeFailureHandler());
+
cfg.setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME).
setCacheMode(PARTITIONED).
setBackups(backups).
@@ -119,7 +132,7 @@ public class TxRecoveryWithConcurrentRollbackTest extends GridCommonAbstractTest
persistence = false;
final IgniteEx node0 = startGrids(3);
- node0.cluster().active(true);
+ node0.cluster().state(ACTIVE);
final Ignite client = startGrid("client");
@@ -169,9 +182,9 @@ public class TxRecoveryWithConcurrentRollbackTest extends GridCommonAbstractTest
assertTrue(txs2.size() == 2);
// Prevent recovery request for grid1 tx branch to go to grid0.
- TestRecordingCommunicationSpi.spi(grid(1)).blockMessages(GridCacheTxRecoveryRequest.class, grid(0).name());
+ spi(grid(1)).blockMessages(GridCacheTxRecoveryRequest.class, grid(0).name());
// Prevent finish(false) request processing on node0.
- TestRecordingCommunicationSpi.spi(client).blockMessages(GridNearTxFinishRequest.class, grid(0).name());
+ spi(client).blockMessages(GridNearTxFinishRequest.class, grid(0).name());
int stripe = U.safeAbs(p.tx().xidVersion().hashCode());
@@ -183,7 +196,7 @@ public class TxRecoveryWithConcurrentRollbackTest extends GridCommonAbstractTest
grid(1).context().getStripedExecutorService().execute(stripe, () -> {});
runAsync(() -> {
- TestRecordingCommunicationSpi.spi(client).waitForBlocked();
+ spi(client).waitForBlocked();
client.close();
@@ -213,7 +226,7 @@ public class TxRecoveryWithConcurrentRollbackTest extends GridCommonAbstractTest
grid(1).context().getStripedExecutorService().queueStripeSize(stripeHolder[0]) == 0, 5_000));
// Unblock recovery message from g1 to g0 because tx is in RECOVERY_FINISH state and waits for recovery end.
- TestRecordingCommunicationSpi.spi(grid(1)).stopBlock();
+ spi(grid(1)).stopBlock();
txs0.get(0).finishFuture().get();
txs1.get(0).finishFuture().get();
@@ -234,7 +247,7 @@ public class TxRecoveryWithConcurrentRollbackTest extends GridCommonAbstractTest
grid(1).context().getStripedExecutorService().queueStripeSize(stripeHolder[0]) == 0, 5_000));
// Proceed with recovery on grid1 -> grid0. Tx0 is committed so tx1 also should be committed.
- TestRecordingCommunicationSpi.spi(grid(1)).stopBlock();
+ spi(grid(1)).stopBlock();
assertNotNull(txs1);
txs1.get(0).finishFuture().get();
@@ -275,7 +288,7 @@ public class TxRecoveryWithConcurrentRollbackTest extends GridCommonAbstractTest
this.syncMode = syncMode;
final IgniteEx node0 = startGrids(3);
- node0.cluster().active(true);
+ node0.cluster().state(ACTIVE);
final Ignite client = startGrid("client");
@@ -297,10 +310,10 @@ public class TxRecoveryWithConcurrentRollbackTest extends GridCommonAbstractTest
tx0 = txs(grid(0));
tx2 = txs(grid(2));
- TestRecordingCommunicationSpi.spi(grid(1)).blockMessages((node, msg) -> msg instanceof GridDhtTxFinishRequest);
+ spi(grid(1)).blockMessages((node, msg) -> msg instanceof GridDhtTxFinishRequest);
fut = runAsync(() -> {
- TestRecordingCommunicationSpi.spi(grid(1)).waitForBlocked(2);
+ spi(grid(1)).waitForBlocked(2);
client.close();
grid(1).close();
@@ -337,4 +350,111 @@ public class TxRecoveryWithConcurrentRollbackTest extends GridCommonAbstractTest
private List<IgniteInternalTx> txs(IgniteEx g) {
return new ArrayList<>(g.context().cache().context().tm().activeTransactions());
}
+
+ /**
+ * Start 3 servers,
+ * start 2 clients,
+ * start two OPTIMISTIC transactions with the same key from different client nodes,
+ * trying to transfer both to PREPARED state,
+ * stop one client node.
+ */
+ @Test
+ public void testTxDoesntBecomePreparedAfterError() throws Exception {
+ backups = 2;
+ persistence = true;
+ syncMode = FULL_ASYNC;
+
+ final IgniteEx node0 = startGrids(3);
+
+ node0.cluster().state(ACTIVE);
+
+ final IgniteEx client1 = startGrid("client1");
+ final IgniteEx client2 = startGrid("client2");
+
+ awaitPartitionMapExchange();
+
+ final IgniteCache<Object, Object> cache = client1.cache(DEFAULT_CACHE_NAME);
+ final IgniteCache<Object, Object> cache2 = client2.cache(DEFAULT_CACHE_NAME);
+
+ final Integer pk = primaryKey(node0.cache(DEFAULT_CACHE_NAME));
+
+ CountDownLatch txPrepareLatch = new CountDownLatch(1);
+
+ GridTestUtils.runMultiThreadedAsync(() -> {
+ try (final Transaction tx = client1.transactions().withLabel("tx1").txStart(OPTIMISTIC, READ_COMMITTED, 5000, 1)) {
+ cache.put(pk, Boolean.TRUE);
+
+ TransactionProxyImpl p = (TransactionProxyImpl)tx;
+
+ // To prevent tx rollback on exit from try-with-resource block, this should cause another tx timeout fail.
+ spi(client1).blockMessages((node, msg) -> msg instanceof GridNearTxFinishRequest);
+
+ log.info("Test, preparing tx: xid=" + tx.xid() + ", tx=" + tx);
+
+ // Doing only prepare to try to lock the key, commit is not needed here.
+ p.tx().prepareNearTxLocal();
+
+ p.tx().currentPrepareFuture().listen(fut -> txPrepareLatch.countDown());
+ } catch (Exception e) {
+ // No-op.
+ }
+ }, 1, "tx1-thread");
+
+ try (final Transaction tx = client2.transactions().withLabel("tx2").txStart(OPTIMISTIC, READ_COMMITTED, 5000, 1)) {
+ cache2.put(pk, Boolean.TRUE);
+
+ TransactionProxyImpl p = (TransactionProxyImpl)tx;
+
+ log.info("Test, preparing tx: xid=" + tx.xid() + ", tx=" + tx);
+
+ p.tx().prepareNearTxLocal();
+
+ p.tx().currentPrepareFuture().listen(fut -> txPrepareLatch.countDown());
+
+ txPrepareLatch.await(6, TimeUnit.SECONDS);
+
+ if (txPrepareLatch.getCount() > 0)
+ fail("Failed to await for tx prepare.");
+
+ AtomicReference<GridDhtTxLocal> dhtTxLocRef = new AtomicReference<>();
+
+ assertTrue(waitForCondition(() -> {
+ dhtTxLocRef.set((GridDhtTxLocal) txs(node0).stream()
+ .filter(t -> t.state() == TransactionState.PREPARING)
+ .findFirst()
+ .orElse(null)
+ );
+
+ return dhtTxLocRef.get() != null;
+ }, 6_000));
+
+ assertNotNull(dhtTxLocRef.get());
+
+ UUID clientNodeToFail = dhtTxLocRef.get().eventNodeId();
+
+ GridDhtTxPrepareFuture prep = GridTestUtils.getFieldValue(dhtTxLocRef.get(), "prepFut");
+
+ prep.get();
+
+ List<IgniteInternalTx> txs = txs(node0);
+
+ String txsStr = txs.stream().map(Object::toString).collect(Collectors.joining(", "));
+
+ log.info("Transactions check point [count=" + txs.size() + ", txs=" + txsStr + "]");
+
+ if (clientNodeToFail.equals(client1.localNode().id()))
+ client1.close();
+ else if (clientNodeToFail.equals(client2.localNode().id()))
+ client2.close();
+ }
+ catch (Exception e) {
+ log.error(e.getMessage(), e);
+ }
+
+ U.sleep(500);
+
+ assertEquals(3, grid(1).context().discovery().aliveServerNodes().size());
+
+ assertEquals(txs(client1).toString() + ", " + txs(client2).toString(), 1, txs(client1).size() + txs(client2).size());
+ }
}