You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/09/22 15:00:32 UTC

[pulsar] branch master updated: [improve][txn] Add getState in transaction for client API (#17423)

This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a9531dbfafe [improve][txn] Add getState in transaction for client API (#17423)
a9531dbfafe is described below

commit a9531dbfafe73e5f351ae3f526892ffdb55129b2
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Thu Sep 22 23:00:24 2022 +0800

    [improve][txn] Add getState in transaction for client API (#17423)
    
    ### Motivation
    now `org.apache.pulsar.client.api.transaction.Transaction` dont have a interface for user to get the transaction state.
    
    user can get the transaction state to do user's own op.
    ### Modifications
    1. add the interface in `org.apache.pulsar.client.api.transaction.Transaction`     `getState`
    2. TransactionImpl implement the interface
    ```
         * Get transaction state.
         *
         * @return {@link State} the state of the transaction.
         */
        State getState();
    ```
    ### Verifying this change
    add the test
---
 .../pulsar/broker/transaction/TransactionTest.java | 48 ++++++++++++++++++
 .../client/impl/TransactionEndToEndTest.java       |  2 +-
 .../pulsar/client/api/transaction/Transaction.java | 57 ++++++++++++++++++++++
 .../client/impl/transaction/TransactionImpl.java   | 17 +++----
 4 files changed, 112 insertions(+), 12 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index ffc351e9413..c309f69fd56 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -1469,4 +1469,52 @@ public class TransactionTest extends TransactionTestBase {
             Assert.assertTrue(t instanceof BrokerServiceException.ServiceUnitNotReadyException);
         }
     }
+
+    @Test
+    public void testGetTxnState() throws Exception {
+        Transaction transaction = pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.SECONDS)
+                .build().get();
+
+        // test OPEN and TIMEOUT
+        assertEquals(transaction.getState(), Transaction.State.OPEN);
+        Transaction timeoutTxn = transaction;
+        Awaitility.await().until(() -> timeoutTxn.getState() == Transaction.State.TIME_OUT);
+
+        // test abort
+        transaction = pulsarClient.newTransaction().withTransactionTimeout(3, TimeUnit.SECONDS)
+                .build().get();
+        transaction.abort().get();
+        assertEquals(transaction.getState(), Transaction.State.ABORTED);
+
+        // test commit
+        transaction = pulsarClient.newTransaction().withTransactionTimeout(3, TimeUnit.SECONDS)
+                .build().get();
+        transaction.commit().get();
+        assertEquals(transaction.getState(), Transaction.State.COMMITTED);
+
+        // test error
+        transaction = pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.SECONDS)
+                .build().get();
+        pulsarServiceList.get(0).getTransactionMetadataStoreService()
+                .endTransaction(transaction.getTxnID(), 0, false);
+        transaction.commit();
+        Transaction errorTxn = transaction;
+        Awaitility.await().until(() -> errorTxn.getState() == Transaction.State.ERROR);
+
+        // test committing
+        transaction = pulsarClient.newTransaction().withTransactionTimeout(3, TimeUnit.SECONDS)
+                .build().get();
+        ((TransactionImpl) transaction).registerSendOp(new CompletableFuture<>());
+        transaction.commit();
+        Transaction committingTxn = transaction;
+        Awaitility.await().until(() -> committingTxn.getState() == Transaction.State.COMMITTING);
+
+        // test aborting
+        transaction = pulsarClient.newTransaction().withTransactionTimeout(3, TimeUnit.SECONDS)
+                .build().get();
+        ((TransactionImpl) transaction).registerSendOp(new CompletableFuture<>());
+        transaction.abort();
+        Transaction abortingTxn = transaction;
+        Awaitility.await().until(() -> abortingTxn.getState() == Transaction.State.ABORTING);
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index ddb9062454b..aea77bec136 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -1052,7 +1052,7 @@ public class TransactionEndToEndTest extends TransactionTestBase {
                 .build().get();
         producer.newMessage().send();
         Awaitility.await().untilAsserted(() -> {
-            Assert.assertEquals(((TransactionImpl)transaction).getState(), TransactionImpl.State.TIMEOUT);
+            Assert.assertEquals(((TransactionImpl)transaction).getState(), TransactionImpl.State.TIME_OUT);
         });
 
         try {
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java
index fd4cf0bc166..33e96d5c276 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java
@@ -29,6 +29,55 @@ import org.apache.pulsar.common.classification.InterfaceStability;
 @InterfaceStability.Evolving
 public interface Transaction {
 
+    enum State {
+
+        /**
+         * When a transaction is in the `OPEN` state, messages can be produced and acked with this transaction.
+         *
+         * When a transaction is in the `OPEN` state, it can commit or abort.
+         */
+        OPEN,
+
+        /**
+         * When a client invokes a commit, the transaction state is changed from `OPEN` to `COMMITTING`.
+         */
+        COMMITTING,
+
+        /**
+         * When a client invokes an abort, the transaction state is changed from `OPEN` to `ABORTING`.
+         */
+        ABORTING,
+
+        /**
+         * When a client receives a response to a commit, the transaction state is changed from
+         * `COMMITTING` to `COMMITTED`.
+         */
+        COMMITTED,
+
+        /**
+         * When a client receives a response to an abort, the transaction state is changed from `ABORTING` to `ABORTED`.
+         */
+        ABORTED,
+
+        /**
+         * When a client invokes a commit or an abort, but a transaction does not exist in a coordinator,
+         * then the state is changed to `ERROR`.
+         *
+         * When a client invokes a commit, but the transaction state in a coordinator is `ABORTED` or `ABORTING`,
+         * then the state is changed to `ERROR`.
+         *
+         * When a client invokes an abort, but the transaction state in a coordinator is `COMMITTED` or `COMMITTING`,
+         * then the state is changed to `ERROR`.
+         */
+        ERROR,
+
+        /**
+         * When a transaction is timed out and the transaction state is `OPEN`,
+         * then the transaction state is changed from `OPEN` to `TIME_OUT`.
+         */
+        TIME_OUT
+    }
+
     /**
      * Commit the transaction.
      *
@@ -48,4 +97,12 @@ public interface Transaction {
      *  @return {@link TxnID} the txnID.
      */
     TxnID getTxnID();
+
+    /**
+     * Get transaction state.
+     *
+     * @return {@link State} the state of the transaction.
+     */
+    State getState();
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
index 55b20438693..833b0957d1c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
@@ -70,17 +70,7 @@ public class TransactionImpl implements Transaction , TimerTask {
 
     @Override
     public void run(Timeout timeout) throws Exception {
-        STATE_UPDATE.compareAndSet(this, State.OPEN, State.TIMEOUT);
-    }
-
-    public enum State {
-        OPEN,
-        COMMITTING,
-        ABORTING,
-        COMMITTED,
-        ABORTED,
-        ERROR,
-        TIMEOUT
+        STATE_UPDATE.compareAndSet(this, State.OPEN, State.TIME_OUT);
     }
 
     TransactionImpl(PulsarClientImpl client,
@@ -215,6 +205,11 @@ public class TransactionImpl implements Transaction , TimerTask {
         return new TxnID(txnIdMostBits, txnIdLeastBits);
     }
 
+    @Override
+    public State getState() {
+        return state;
+    }
+
     public <T> boolean checkIfOpen(CompletableFuture<T> completableFuture) {
         if (state == State.OPEN) {
             return true;