You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/04/22 21:30:08 UTC

[ignite] branch master updated: IGNITE-14423 Fixed node failure caused by AssertionError: Transaction does not own lock for update. Fixes #8935

This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new bec58b0  IGNITE-14423 Fixed node failure caused by AssertionError: Transaction does not own lock for update. Fixes #8935
bec58b0 is described below

commit bec58b00e0c611448ac15841b4b6831a3dcfd296
Author: Denis Chudov <mo...@gmail.com>
AuthorDate: Fri Apr 23 00:29:18 2021 +0300

    IGNITE-14423 Fixed node failure caused by AssertionError: Transaction does not own lock for update. Fixes #8935
    
    Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
 .../distributed/dht/GridDhtTxPrepareFuture.java    |   8 +-
 .../TxRecoveryWithConcurrentRollbackTest.java      | 140 +++++++++++++++++++--
 2 files changed, 137 insertions(+), 11 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index c362c31..8607458 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1033,7 +1033,13 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
      * @return {@code True} if {@code done} flag was changed as a result of this call.
      */
     private boolean onComplete(@Nullable GridNearTxPrepareResponse res) {
-        if (!tx.onePhaseCommit() && ((last || tx.isSystemInvalidate()) && !(tx.near() && tx.local())))
+        if (res.error() != null) {
+            if (log.isDebugEnabled())
+                log.debug("Transaction marked for rollback because of error on dht prepare [tx=" + tx + ", error=" + res.error() + "]");
+
+            tx.setRollbackOnly();
+        }
+        else if (!tx.onePhaseCommit() && ((last || tx.isSystemInvalidate()) && !(tx.near() && tx.local())))
             tx.state(PREPARED);
 
         if (super.onDone(res, res == null ? err : null)) {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRecoveryWithConcurrentRollbackTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRecoveryWithConcurrentRollbackTest.java
index 0367945..53c41b9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRecoveryWithConcurrentRollbackTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRecoveryWithConcurrentRollbackTest.java
@@ -19,7 +19,11 @@ package org.apache.ignite.internal.processors.cache.transactions;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
@@ -27,12 +31,15 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeFailureHandler;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocal;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareFuture;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.GridTestUtils;
@@ -46,7 +53,11 @@ import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
 import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
 
@@ -79,9 +90,11 @@ public class TxRecoveryWithConcurrentRollbackTest extends GridCommonAbstractTest
         }
 
         cfg.setActiveOnStart(false);
-        cfg.setClientMode("client".equals(name));
+        cfg.setClientMode(name.startsWith("client"));
         cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
 
+        cfg.setFailureHandler(new StopNodeFailureHandler());
+
         cfg.setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME).
             setCacheMode(PARTITIONED).
             setBackups(backups).
@@ -119,7 +132,7 @@ public class TxRecoveryWithConcurrentRollbackTest extends GridCommonAbstractTest
         persistence = false;
 
         final IgniteEx node0 = startGrids(3);
-        node0.cluster().active(true);
+        node0.cluster().state(ACTIVE);
 
         final Ignite client = startGrid("client");
 
@@ -169,9 +182,9 @@ public class TxRecoveryWithConcurrentRollbackTest extends GridCommonAbstractTest
             assertTrue(txs2.size() == 2);
 
             // Prevent recovery request for grid1 tx branch to go to grid0.
-            TestRecordingCommunicationSpi.spi(grid(1)).blockMessages(GridCacheTxRecoveryRequest.class, grid(0).name());
+            spi(grid(1)).blockMessages(GridCacheTxRecoveryRequest.class, grid(0).name());
             // Prevent finish(false) request processing on node0.
-            TestRecordingCommunicationSpi.spi(client).blockMessages(GridNearTxFinishRequest.class, grid(0).name());
+            spi(client).blockMessages(GridNearTxFinishRequest.class, grid(0).name());
 
             int stripe = U.safeAbs(p.tx().xidVersion().hashCode());
 
@@ -183,7 +196,7 @@ public class TxRecoveryWithConcurrentRollbackTest extends GridCommonAbstractTest
             grid(1).context().getStripedExecutorService().execute(stripe, () -> {});
 
             runAsync(() -> {
-                TestRecordingCommunicationSpi.spi(client).waitForBlocked();
+                spi(client).waitForBlocked();
 
                 client.close();
 
@@ -213,7 +226,7 @@ public class TxRecoveryWithConcurrentRollbackTest extends GridCommonAbstractTest
                 grid(1).context().getStripedExecutorService().queueStripeSize(stripeHolder[0]) == 0, 5_000));
 
             // Unblock recovery message from g1 to g0 because tx is in RECOVERY_FINISH state and waits for recovery end.
-            TestRecordingCommunicationSpi.spi(grid(1)).stopBlock();
+            spi(grid(1)).stopBlock();
 
             txs0.get(0).finishFuture().get();
             txs1.get(0).finishFuture().get();
@@ -234,7 +247,7 @@ public class TxRecoveryWithConcurrentRollbackTest extends GridCommonAbstractTest
             grid(1).context().getStripedExecutorService().queueStripeSize(stripeHolder[0]) == 0, 5_000));
 
         // Proceed with recovery on grid1 -> grid0. Tx0 is committed so tx1 also should be committed.
-        TestRecordingCommunicationSpi.spi(grid(1)).stopBlock();
+        spi(grid(1)).stopBlock();
 
         assertNotNull(txs1);
         txs1.get(0).finishFuture().get();
@@ -275,7 +288,7 @@ public class TxRecoveryWithConcurrentRollbackTest extends GridCommonAbstractTest
         this.syncMode = syncMode;
 
         final IgniteEx node0 = startGrids(3);
-        node0.cluster().active(true);
+        node0.cluster().state(ACTIVE);
 
         final Ignite client = startGrid("client");
 
@@ -297,10 +310,10 @@ public class TxRecoveryWithConcurrentRollbackTest extends GridCommonAbstractTest
             tx0 = txs(grid(0));
             tx2 = txs(grid(2));
 
-            TestRecordingCommunicationSpi.spi(grid(1)).blockMessages((node, msg) -> msg instanceof GridDhtTxFinishRequest);
+            spi(grid(1)).blockMessages((node, msg) -> msg instanceof GridDhtTxFinishRequest);
 
             fut = runAsync(() -> {
-                TestRecordingCommunicationSpi.spi(grid(1)).waitForBlocked(2);
+                spi(grid(1)).waitForBlocked(2);
 
                 client.close();
                 grid(1).close();
@@ -337,4 +350,111 @@ public class TxRecoveryWithConcurrentRollbackTest extends GridCommonAbstractTest
     private List<IgniteInternalTx> txs(IgniteEx g) {
         return new ArrayList<>(g.context().cache().context().tm().activeTransactions());
     }
+
+    /**
+     * Start 3 servers,
+     * start 2 clients,
+     * start two OPTIMISTIC transactions with the same key from different client nodes,
+     * trying to transfer both to PREPARED state,
+     * stop one client node.
+     */
+    @Test
+    public void testTxDoesntBecomePreparedAfterError() throws Exception {
+        backups = 2;
+        persistence = true;
+        syncMode = FULL_ASYNC;
+
+        final IgniteEx node0 = startGrids(3);
+
+        node0.cluster().state(ACTIVE);
+
+        final IgniteEx client1 = startGrid("client1");
+        final IgniteEx client2 = startGrid("client2");
+
+        awaitPartitionMapExchange();
+
+        final IgniteCache<Object, Object> cache = client1.cache(DEFAULT_CACHE_NAME);
+        final IgniteCache<Object, Object> cache2 = client2.cache(DEFAULT_CACHE_NAME);
+
+        final Integer pk = primaryKey(node0.cache(DEFAULT_CACHE_NAME));
+
+        CountDownLatch txPrepareLatch = new CountDownLatch(1);
+
+        GridTestUtils.runMultiThreadedAsync(() -> {
+            try (final Transaction tx = client1.transactions().withLabel("tx1").txStart(OPTIMISTIC, READ_COMMITTED, 5000, 1)) {
+                cache.put(pk, Boolean.TRUE);
+
+                TransactionProxyImpl p = (TransactionProxyImpl)tx;
+
+                // To prevent tx rollback on exit from try-with-resource block, this should cause another tx timeout fail.
+                spi(client1).blockMessages((node, msg) -> msg instanceof GridNearTxFinishRequest);
+
+                log.info("Test, preparing tx: xid=" + tx.xid() + ", tx=" + tx);
+
+                // Doing only prepare to try to lock the key, commit is not needed here.
+                p.tx().prepareNearTxLocal();
+
+                p.tx().currentPrepareFuture().listen(fut -> txPrepareLatch.countDown());
+            } catch (Exception e) {
+                // No-op.
+            }
+        }, 1, "tx1-thread");
+
+        try (final Transaction tx = client2.transactions().withLabel("tx2").txStart(OPTIMISTIC, READ_COMMITTED, 5000, 1)) {
+            cache2.put(pk, Boolean.TRUE);
+
+            TransactionProxyImpl p = (TransactionProxyImpl)tx;
+
+            log.info("Test, preparing tx: xid=" + tx.xid() + ", tx=" + tx);
+
+            p.tx().prepareNearTxLocal();
+
+            p.tx().currentPrepareFuture().listen(fut -> txPrepareLatch.countDown());
+
+            txPrepareLatch.await(6, TimeUnit.SECONDS);
+
+            if (txPrepareLatch.getCount() > 0)
+                fail("Failed to await for tx prepare.");
+
+            AtomicReference<GridDhtTxLocal> dhtTxLocRef = new AtomicReference<>();
+
+            assertTrue(waitForCondition(() -> {
+                dhtTxLocRef.set((GridDhtTxLocal) txs(node0).stream()
+                    .filter(t -> t.state() == TransactionState.PREPARING)
+                    .findFirst()
+                    .orElse(null)
+                );
+
+                return dhtTxLocRef.get() != null;
+            }, 6_000));
+
+            assertNotNull(dhtTxLocRef.get());
+
+            UUID clientNodeToFail = dhtTxLocRef.get().eventNodeId();
+
+            GridDhtTxPrepareFuture prep = GridTestUtils.getFieldValue(dhtTxLocRef.get(), "prepFut");
+
+            prep.get();
+
+            List<IgniteInternalTx> txs = txs(node0);
+
+            String txsStr = txs.stream().map(Object::toString).collect(Collectors.joining(", "));
+
+            log.info("Transactions check point [count=" + txs.size() + ", txs=" + txsStr + "]");
+
+            if (clientNodeToFail.equals(client1.localNode().id()))
+                client1.close();
+            else if (clientNodeToFail.equals(client2.localNode().id()))
+                client2.close();
+        }
+        catch (Exception e) {
+            log.error(e.getMessage(), e);
+        }
+
+        U.sleep(500);
+
+        assertEquals(3, grid(1).context().discovery().aliveServerNodes().size());
+
+        assertEquals(txs(client1).toString() + ", " + txs(client2).toString(), 1, txs(client1).size() + txs(client2).size());
+    }
 }