You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2020/06/22 15:22:16 UTC
[ignite] branch master updated: IGNITE-13170 Java thin client: Fix
transactions "withLabel" - Fixes #7951.
This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 16a7035 IGNITE-13170 Java thin client: Fix transactions "withLabel" - Fixes #7951.
16a7035 is described below
commit 16a70351348aefb726461f165b3649fe9df1258b
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Mon Jun 22 20:19:55 2020 +0500
IGNITE-13170 Java thin client: Fix transactions "withLabel" - Fixes #7951.
Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
.../client/thin/TcpClientTransactions.java | 64 ++++++++++---
.../org/apache/ignite/client/FunctionalTest.java | 102 +++++++++++++++++++++
2 files changed, 152 insertions(+), 14 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientTransactions.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientTransactions.java
index 0044c82..3552968 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientTransactions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientTransactions.java
@@ -26,6 +26,7 @@ import org.apache.ignite.client.ClientTransactions;
import org.apache.ignite.configuration.ClientTransactionConfiguration;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
@@ -35,9 +36,6 @@ import static org.apache.ignite.internal.client.thin.ProtocolVersionFeature.TRAN
* Implementation of {@link ClientTransactions} over TCP protocol.
*/
class TcpClientTransactions implements ClientTransactions {
- /** Transaction label. */
- private String lb;
-
/** Channel. */
private final ReliableChannel ch;
@@ -65,18 +63,18 @@ class TcpClientTransactions implements ClientTransactions {
/** {@inheritDoc} */
@Override public ClientTransaction txStart() {
- return txStart0(null, null, null);
+ return txStart0(null, null, null, null);
}
/** {@inheritDoc} */
@Override public ClientTransaction txStart(TransactionConcurrency concurrency, TransactionIsolation isolation) {
- return txStart0(concurrency, isolation, null);
+ return txStart0(concurrency, isolation, null, null);
}
/** {@inheritDoc} */
@Override public ClientTransaction txStart(TransactionConcurrency concurrency, TransactionIsolation isolation,
long timeout) {
- return txStart0(concurrency, isolation, timeout);
+ return txStart0(concurrency, isolation, timeout, null);
}
/**
@@ -84,7 +82,8 @@ class TcpClientTransactions implements ClientTransactions {
* @param isolation Isolation.
* @param timeout Timeout.
*/
- private ClientTransaction txStart0(TransactionConcurrency concurrency, TransactionIsolation isolation, Long timeout) {
+ private ClientTransaction txStart0(TransactionConcurrency concurrency, TransactionIsolation isolation, Long timeout,
+ String lb) {
TcpClientTransaction tx0 = tx();
if (tx0 != null)
@@ -118,14 +117,9 @@ class TcpClientTransactions implements ClientTransactions {
/** {@inheritDoc} */
@Override public ClientTransactions withLabel(String lb) {
- if (lb == null)
- throw new NullPointerException();
-
- TcpClientTransactions txs = new TcpClientTransactions(ch, marsh, txCfg);
+ A.notNull(lb, "lb");
- txs.lb = lb;
-
- return txs;
+ return new ClientTransactionsWithLabel(lb);
}
/**
@@ -144,6 +138,48 @@ class TcpClientTransactions implements ClientTransactions {
}
/**
+ * Transactions "withLabel" facade.
+ */
+ private class ClientTransactionsWithLabel implements ClientTransactions {
+ /** Transaction label. */
+ private final String lb;
+
+ /**
+ * @param lb Transaction's label.
+ */
+ ClientTransactionsWithLabel(String lb) {
+ this.lb = lb;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClientTransaction txStart() throws ClientServerError, ClientException {
+ return txStart0(null, null, null, lb);
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClientTransaction txStart(TransactionConcurrency concurrency, TransactionIsolation isolation)
+ throws ClientServerError, ClientException {
+ return txStart0(concurrency, isolation, null, lb);
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClientTransaction txStart(TransactionConcurrency concurrency, TransactionIsolation isolation,
+ long timeout) throws ClientServerError, ClientException {
+ return txStart0(concurrency, isolation, timeout, lb);
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClientTransactions withLabel(String lb) throws ClientException {
+ A.notNull(lb, "lb");
+
+ if (lb.equals(this.lb))
+ return this;
+
+ return new ClientTransactionsWithLabel(lb);
+ }
+ }
+
+ /**
*
*/
class TcpClientTransaction implements ClientTransaction {
diff --git a/modules/core/src/test/java/org/apache/ignite/client/FunctionalTest.java b/modules/core/src/test/java/org/apache/ignite/client/FunctionalTest.java
index 4be745c..b23c670 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/FunctionalTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/client/FunctionalTest.java
@@ -55,6 +55,8 @@ import org.apache.ignite.cache.PartitionLossPolicy;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.client.thin.ClientServerError;
import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy;
@@ -63,11 +65,14 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.mxbean.ClientProcessorMXBean;
+import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.spi.systemview.view.TransactionView;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
+import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager.TXS_MON_LIST;
import static org.apache.ignite.testframework.junits.GridAbstractTest.getMxBean;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
@@ -952,6 +957,103 @@ public class FunctionalTest {
}
/**
+ * Test transactions with label.
+ */
+ @Test
+ public void testTransactionsWithLabel() throws Exception {
+ try (IgniteEx ignite = (IgniteEx)Ignition.start(Config.getServerConfiguration());
+ IgniteClient client = Ignition.startClient(getClientConfiguration())
+ ) {
+ ClientCache<Integer, String> cache = client.createCache(new ClientCacheConfiguration()
+ .setName("cache")
+ .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+ );
+
+ SystemView<TransactionView> txsView = ignite.context().systemView().view(TXS_MON_LIST);
+
+ cache.put(0, "value1");
+
+ try (ClientTransaction tx = client.transactions().withLabel("label").txStart()) {
+ cache.put(0, "value2");
+
+ assertEquals(1, F.size(txsView.iterator()));
+
+ TransactionView txv = txsView.iterator().next();
+
+ assertEquals("label", txv.label());
+
+ assertEquals("value2", cache.get(0));
+ }
+
+ assertEquals("value1", cache.get(0));
+
+ try (ClientTransaction tx = client.transactions().withLabel("label1").withLabel("label2").txStart()) {
+ cache.put(0, "value2");
+
+ assertEquals(1, F.size(txsView.iterator()));
+
+ TransactionView txv = txsView.iterator().next();
+
+ assertEquals("label2", txv.label());
+
+ tx.commit();
+ }
+
+ assertEquals("value2", cache.get(0));
+
+ // Test concurrent with label and without label transactions.
+ try (ClientTransaction tx = client.transactions().withLabel("label").txStart(PESSIMISTIC, READ_COMMITTED)) {
+ CyclicBarrier barrier = new CyclicBarrier(2);
+
+ cache.put(0, "value3");
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> {
+ try (ClientTransaction tx1 = client.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
+ cache.put(1, "value3");
+
+ barrier.await();
+
+ assertEquals("value2", cache.get(0));
+
+ barrier.await();
+ }
+ catch (InterruptedException | BrokenBarrierException ignore) {
+ // No-op.
+ }
+ });
+
+ barrier.await();
+
+ assertNull(cache.get(1));
+
+ assertEquals(1, F.size(txsView.iterator(), txv -> txv.label() == null));
+ assertEquals(1, F.size(txsView.iterator(), txv -> "label".equals(txv.label())));
+
+ barrier.await();
+
+ fut.get();
+ }
+
+ // Test nested transactions is not possible.
+ try (ClientTransaction tx = client.transactions().withLabel("label1").txStart()) {
+ try (ClientTransaction tx1 = client.transactions().txStart()) {
+ fail();
+ }
+ catch (ClientException expected) {
+ // No-op.
+ }
+
+ try (ClientTransaction tx1 = client.transactions().withLabel("label2").txStart()) {
+ fail();
+ }
+ catch (ClientException expected) {
+ // No-op.
+ }
+ }
+ }
+ }
+
+ /**
* Test cache with expire policy.
*/
@Test