You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2020/06/22 15:22:16 UTC

[ignite] branch master updated: IGNITE-13170 Java thin client: Fix transactions "withLabel" - Fixes #7951.

This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 16a7035  IGNITE-13170 Java thin client: Fix transactions "withLabel" - Fixes #7951.
16a7035 is described below

commit 16a70351348aefb726461f165b3649fe9df1258b
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Mon Jun 22 20:19:55 2020 +0500

    IGNITE-13170 Java thin client: Fix transactions "withLabel" - Fixes #7951.
    
    Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
 .../client/thin/TcpClientTransactions.java         |  64 ++++++++++---
 .../org/apache/ignite/client/FunctionalTest.java   | 102 +++++++++++++++++++++
 2 files changed, 152 insertions(+), 14 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientTransactions.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientTransactions.java
index 0044c82..3552968 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientTransactions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientTransactions.java
@@ -26,6 +26,7 @@ import org.apache.ignite.client.ClientTransactions;
 import org.apache.ignite.configuration.ClientTransactionConfiguration;
 import org.apache.ignite.internal.binary.BinaryRawWriterEx;
 import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
 
@@ -35,9 +36,6 @@ import static org.apache.ignite.internal.client.thin.ProtocolVersionFeature.TRAN
  * Implementation of {@link ClientTransactions} over TCP protocol.
  */
 class TcpClientTransactions implements ClientTransactions {
-    /** Transaction label. */
-    private String lb;
-
     /** Channel. */
     private final ReliableChannel ch;
 
@@ -65,18 +63,18 @@ class TcpClientTransactions implements ClientTransactions {
 
     /** {@inheritDoc} */
     @Override public ClientTransaction txStart() {
-        return txStart0(null, null, null);
+        return txStart0(null, null, null, null);
     }
 
     /** {@inheritDoc} */
     @Override public ClientTransaction txStart(TransactionConcurrency concurrency, TransactionIsolation isolation) {
-        return txStart0(concurrency, isolation, null);
+        return txStart0(concurrency, isolation, null, null);
     }
 
     /** {@inheritDoc} */
     @Override public ClientTransaction txStart(TransactionConcurrency concurrency, TransactionIsolation isolation,
         long timeout) {
-        return txStart0(concurrency, isolation, timeout);
+        return txStart0(concurrency, isolation, timeout, null);
     }
 
     /**
@@ -84,7 +82,8 @@ class TcpClientTransactions implements ClientTransactions {
      * @param isolation Isolation.
      * @param timeout Timeout.
      */
-    private ClientTransaction txStart0(TransactionConcurrency concurrency, TransactionIsolation isolation, Long timeout) {
+    private ClientTransaction txStart0(TransactionConcurrency concurrency, TransactionIsolation isolation, Long timeout,
+        String lb) {
         TcpClientTransaction tx0 = tx();
 
         if (tx0 != null)
@@ -118,14 +117,9 @@ class TcpClientTransactions implements ClientTransactions {
 
     /** {@inheritDoc} */
     @Override public ClientTransactions withLabel(String lb) {
-        if (lb == null)
-            throw new NullPointerException();
-
-        TcpClientTransactions txs = new TcpClientTransactions(ch, marsh, txCfg);
+        A.notNull(lb, "lb");
 
-        txs.lb = lb;
-
-        return txs;
+        return new ClientTransactionsWithLabel(lb);
     }
 
     /**
@@ -144,6 +138,48 @@ class TcpClientTransactions implements ClientTransactions {
     }
 
     /**
+     * Transactions "withLabel" facade.
+     */
+    private class ClientTransactionsWithLabel implements ClientTransactions {
+        /** Transaction label. */
+        private final String lb;
+
+        /**
+         * @param lb Transaction's label.
+         */
+        ClientTransactionsWithLabel(String lb) {
+            this.lb = lb;
+        }
+
+        /** {@inheritDoc} */
+        @Override public ClientTransaction txStart() throws ClientServerError, ClientException {
+            return txStart0(null, null, null, lb);
+        }
+
+        /** {@inheritDoc} */
+        @Override public ClientTransaction txStart(TransactionConcurrency concurrency, TransactionIsolation isolation)
+            throws ClientServerError, ClientException {
+            return txStart0(concurrency, isolation, null, lb);
+        }
+
+        /** {@inheritDoc} */
+        @Override public ClientTransaction txStart(TransactionConcurrency concurrency, TransactionIsolation isolation,
+            long timeout) throws ClientServerError, ClientException {
+            return txStart0(concurrency, isolation, timeout, lb);
+        }
+
+        /** {@inheritDoc} */
+        @Override public ClientTransactions withLabel(String lb) throws ClientException {
+            A.notNull(lb, "lb");
+
+            if (lb.equals(this.lb))
+                return this;
+
+            return new ClientTransactionsWithLabel(lb);
+        }
+    }
+
+    /**
      *
      */
     class TcpClientTransaction implements ClientTransaction {
diff --git a/modules/core/src/test/java/org/apache/ignite/client/FunctionalTest.java b/modules/core/src/test/java/org/apache/ignite/client/FunctionalTest.java
index 4be745c..b23c670 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/FunctionalTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/client/FunctionalTest.java
@@ -55,6 +55,8 @@ import org.apache.ignite.cache.PartitionLossPolicy;
 import org.apache.ignite.cache.QueryEntity;
 import org.apache.ignite.cache.QueryIndex;
 import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.client.thin.ClientServerError;
 import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
 import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy;
@@ -63,11 +65,14 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.mxbean.ClientProcessorMXBean;
+import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.spi.systemview.view.TransactionView;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
 
+import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager.TXS_MON_LIST;
 import static org.apache.ignite.testframework.junits.GridAbstractTest.getMxBean;
 import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
@@ -952,6 +957,103 @@ public class FunctionalTest {
     }
 
     /**
+     * Test transactions with label.
+     */
+    @Test
+    public void testTransactionsWithLabel() throws Exception {
+        try (IgniteEx ignite = (IgniteEx)Ignition.start(Config.getServerConfiguration());
+             IgniteClient client = Ignition.startClient(getClientConfiguration())
+        ) {
+            ClientCache<Integer, String> cache = client.createCache(new ClientCacheConfiguration()
+                .setName("cache")
+                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+            );
+
+            SystemView<TransactionView> txsView = ignite.context().systemView().view(TXS_MON_LIST);
+
+            cache.put(0, "value1");
+
+            try (ClientTransaction tx = client.transactions().withLabel("label").txStart()) {
+                cache.put(0, "value2");
+
+                assertEquals(1, F.size(txsView.iterator()));
+
+                TransactionView txv = txsView.iterator().next();
+
+                assertEquals("label", txv.label());
+
+                assertEquals("value2", cache.get(0));
+            }
+
+            assertEquals("value1", cache.get(0));
+
+            try (ClientTransaction tx = client.transactions().withLabel("label1").withLabel("label2").txStart()) {
+                cache.put(0, "value2");
+
+                assertEquals(1, F.size(txsView.iterator()));
+
+                TransactionView txv = txsView.iterator().next();
+
+                assertEquals("label2", txv.label());
+
+                tx.commit();
+            }
+
+            assertEquals("value2", cache.get(0));
+
+            // Test concurrent with label and without label transactions.
+            try (ClientTransaction tx = client.transactions().withLabel("label").txStart(PESSIMISTIC, READ_COMMITTED)) {
+                CyclicBarrier barrier = new CyclicBarrier(2);
+
+                cache.put(0, "value3");
+
+                IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> {
+                    try (ClientTransaction tx1 = client.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
+                        cache.put(1, "value3");
+
+                        barrier.await();
+
+                        assertEquals("value2", cache.get(0));
+
+                        barrier.await();
+                    }
+                    catch (InterruptedException | BrokenBarrierException ignore) {
+                        // No-op.
+                    }
+                });
+
+                barrier.await();
+
+                assertNull(cache.get(1));
+
+                assertEquals(1, F.size(txsView.iterator(), txv -> txv.label() == null));
+                assertEquals(1, F.size(txsView.iterator(), txv -> "label".equals(txv.label())));
+
+                barrier.await();
+
+                fut.get();
+            }
+
+            // Test nested transactions is not possible.
+            try (ClientTransaction tx = client.transactions().withLabel("label1").txStart()) {
+                try (ClientTransaction tx1 = client.transactions().txStart()) {
+                    fail();
+                }
+                catch (ClientException expected) {
+                    // No-op.
+                }
+
+                try (ClientTransaction tx1 = client.transactions().withLabel("label2").txStart()) {
+                    fail();
+                }
+                catch (ClientException expected) {
+                    // No-op.
+                }
+            }
+        }
+    }
+
+    /**
      * Test cache with expire policy.
      */
     @Test