You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2021/10/27 06:55:18 UTC

[ignite] branch master updated: IGNITE-15732 Thin client: Fix transaction failure after timeout - Fixes #9504.

This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d67297  IGNITE-15732 Thin client: Fix transaction failure after timeout - Fixes #9504.
2d67297 is described below

commit 2d67297c9284252dc6ea3f46f5cdc7f06177fb3e
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Wed Oct 27 09:51:48 2021 +0300

    IGNITE-15732 Thin client: Fix transaction failure after timeout - Fixes #9504.
    
    Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
 .../platform/client/tx/ClientTxContext.java        | 18 ++++++---
 .../org/apache/ignite/client/FunctionalTest.java   | 44 ++++++++++++++++++++++
 2 files changed, 57 insertions(+), 5 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/tx/ClientTxContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/tx/ClientTxContext.java
index 2c430aa..626c6bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/tx/ClientTxContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/tx/ClientTxContext.java
@@ -63,13 +63,21 @@ public class ClientTxContext {
      */
     public void release(boolean suspendTx) throws IgniteCheckedException {
         try {
-            if (suspendTx) {
-                TransactionState state = tx.state();
+            try {
+                if (suspendTx) {
+                    TransactionState state = tx.state();
 
-                if (state != TransactionState.COMMITTED && state != TransactionState.ROLLED_BACK)
-                    tx.suspend();
+                    if (state == TransactionState.ACTIVE)
+                        tx.suspend();
+                }
             }
-        } finally {
+            finally {
+                // In some cases thread can still hold the transaction (due to concurrent rollbacks), threadMap should
+                // be forcibly cleared to avoid problems with resuming other transactions in the current worker.
+                tx.context().tm().clearThreadMap(tx);
+            }
+        }
+        finally {
             lock.unlock();
         }
     }
diff --git a/modules/core/src/test/java/org/apache/ignite/client/FunctionalTest.java b/modules/core/src/test/java/org/apache/ignite/client/FunctionalTest.java
index 8a07ebd..673dc69 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/FunctionalTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/client/FunctionalTest.java
@@ -59,6 +59,7 @@ import org.apache.ignite.cache.QueryEntity;
 import org.apache.ignite.cache.QueryIndex;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.configuration.ClientConnectorConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -75,6 +76,7 @@ import org.apache.ignite.mxbean.ClientProcessorMXBean;
 import org.apache.ignite.spi.systemview.view.SystemView;
 import org.apache.ignite.spi.systemview.view.TransactionView;
 import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.junit.Rule;
 import org.junit.Test;
@@ -82,6 +84,7 @@ import org.junit.rules.Timeout;
 
 import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager.TXS_MON_LIST;
 import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
 import static org.apache.ignite.testframework.junits.GridAbstractTest.getMxBean;
 import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
@@ -797,6 +800,47 @@ public class FunctionalTest {
     }
 
     /**
+     * Test that client-connector worker can process further transactional requests (resume transactions) after
+     * external termination of previous transaction.
+     */
+    @Test
+    public void testTxResumeAfterTxTimeout() throws Exception {
+        IgniteConfiguration cfg = Config.getServerConfiguration().setClientConnectorConfiguration(
+            new ClientConnectorConfiguration().setThreadPoolSize(1));
+
+        try (Ignite ignite = Ignition.start(cfg); IgniteClient client = Ignition.startClient(getClientConfiguration())) {
+            String cacheName = "cache";
+
+            IgniteCache<Object, Object> igniteCache = ignite.createCache(new CacheConfiguration<>(cacheName)
+                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
+
+            try (ClientTransaction clientTx = client.transactions().txStart()) {
+                runAsync(() -> {
+                    try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
+                        igniteCache.put(0, 0); // Lock key by ignite node.
+
+                        try {
+                            // Start, but don't close the transaction (to keep it in the threadMap after timeout).
+                            client.transactions().txStart(PESSIMISTIC, READ_COMMITTED, 200L);
+
+                            // Wait until transaction interrupted externally by timeout.
+                            client.cache(cacheName).put(0, 0);
+
+                            fail();
+                        }
+                        catch (ClientException ignored) {
+                            // Expected.
+                        }
+                    }
+                }).get();
+
+                // Resume tx in the worker with interrupted transaction.
+                assertFalse(client.cache(cacheName).containsKey(0));
+            }
+        }
+    }
+
+    /**
      * Test transactions.
      */
     @Test