You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2023/03/16 08:27:41 UTC

[pulsar] branch getTxnState created (now 8d8701576cc)

This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a change to branch getTxnState
in repository https://gitbox.apache.org/repos/asf/pulsar.git


      at 8d8701576cc [improve][txn] Add getState in transaction for client API (#17423)

This branch includes the following new commits:

     new 8d8701576cc [improve][txn] Add getState in transaction for client API (#17423)

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[pulsar] 01/01: [improve][txn] Add getState in transaction for client API (#17423)

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch getTxnState
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8d8701576ccca727ce8ebaf33ad6cefd988b27e4
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Thu Sep 22 23:00:24 2022 +0800

    [improve][txn] Add getState in transaction for client API (#17423)
    
    now `org.apache.pulsar.client.api.transaction.Transaction` dont have a interface for user to get the transaction state.
    
    user can get the transaction state to do user's own op.
    1. add the interface in `org.apache.pulsar.client.api.transaction.Transaction`     `getState`
    2. TransactionImpl implement the interface
    ```
         * Get transaction state.
         *
         * @return {@link State} the state of the transaction.
         */
        State getState();
    ```
    add the test
    
    (cherry picked from commit a9531dbfafe73e5f351ae3f526892ffdb55129b2)
---
 .../pulsar/broker/transaction/TransactionTest.java | 48 ++++++++++++++++++
 .../client/impl/TransactionEndToEndTest.java       |  2 +-
 .../pulsar/client/api/transaction/Transaction.java | 57 ++++++++++++++++++++++
 .../client/impl/transaction/TransactionImpl.java   | 17 +++----
 4 files changed, 112 insertions(+), 12 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index ccdca13c996..5aafbb25060 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -1495,4 +1495,52 @@ public class TransactionTest extends TransactionTestBase {
                     .send();
         txn.commit();
     }
+
+    @Test
+    public void testGetTxnState() throws Exception {
+        Transaction transaction = pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.SECONDS)
+                .build().get();
+
+        // test OPEN and TIMEOUT
+        assertEquals(transaction.getState(), Transaction.State.OPEN);
+        Transaction timeoutTxn = transaction;
+        Awaitility.await().until(() -> timeoutTxn.getState() == Transaction.State.TIME_OUT);
+
+        // test abort
+        transaction = pulsarClient.newTransaction().withTransactionTimeout(3, TimeUnit.SECONDS)
+                .build().get();
+        transaction.abort().get();
+        assertEquals(transaction.getState(), Transaction.State.ABORTED);
+
+        // test commit
+        transaction = pulsarClient.newTransaction().withTransactionTimeout(3, TimeUnit.SECONDS)
+                .build().get();
+        transaction.commit().get();
+        assertEquals(transaction.getState(), Transaction.State.COMMITTED);
+
+        // test error
+        transaction = pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.SECONDS)
+                .build().get();
+        pulsarServiceList.get(0).getTransactionMetadataStoreService()
+                .endTransaction(transaction.getTxnID(), 0, false);
+        transaction.commit();
+        Transaction errorTxn = transaction;
+        Awaitility.await().until(() -> errorTxn.getState() == Transaction.State.ERROR);
+
+        // test committing
+        transaction = pulsarClient.newTransaction().withTransactionTimeout(3, TimeUnit.SECONDS)
+                .build().get();
+        ((TransactionImpl) transaction).registerSendOp(new CompletableFuture<>());
+        transaction.commit();
+        Transaction committingTxn = transaction;
+        Awaitility.await().until(() -> committingTxn.getState() == Transaction.State.COMMITTING);
+
+        // test aborting
+        transaction = pulsarClient.newTransaction().withTransactionTimeout(3, TimeUnit.SECONDS)
+                .build().get();
+        ((TransactionImpl) transaction).registerSendOp(new CompletableFuture<>());
+        transaction.abort();
+        Transaction abortingTxn = transaction;
+        Awaitility.await().until(() -> abortingTxn.getState() == Transaction.State.ABORTING);
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index bc56eab6bc1..454edebe6f0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -1186,7 +1186,7 @@ public class TransactionEndToEndTest extends TransactionTestBase {
                 .build().get();
         producer.newMessage().send();
         Awaitility.await().untilAsserted(() -> {
-            Assert.assertEquals(((TransactionImpl)transaction).getState(), TransactionImpl.State.TIMEOUT);
+            Assert.assertEquals(((TransactionImpl)transaction).getState(), TransactionImpl.State.TIME_OUT);
         });
 
         try {
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java
index fd4cf0bc166..33e96d5c276 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java
@@ -29,6 +29,55 @@ import org.apache.pulsar.common.classification.InterfaceStability;
 @InterfaceStability.Evolving
 public interface Transaction {
 
+    enum State {
+
+        /**
+         * When a transaction is in the `OPEN` state, messages can be produced and acked with this transaction.
+         *
+         * When a transaction is in the `OPEN` state, it can commit or abort.
+         */
+        OPEN,
+
+        /**
+         * When a client invokes a commit, the transaction state is changed from `OPEN` to `COMMITTING`.
+         */
+        COMMITTING,
+
+        /**
+         * When a client invokes an abort, the transaction state is changed from `OPEN` to `ABORTING`.
+         */
+        ABORTING,
+
+        /**
+         * When a client receives a response to a commit, the transaction state is changed from
+         * `COMMITTING` to `COMMITTED`.
+         */
+        COMMITTED,
+
+        /**
+         * When a client receives a response to an abort, the transaction state is changed from `ABORTING` to `ABORTED`.
+         */
+        ABORTED,
+
+        /**
+         * When a client invokes a commit or an abort, but a transaction does not exist in a coordinator,
+         * then the state is changed to `ERROR`.
+         *
+         * When a client invokes a commit, but the transaction state in a coordinator is `ABORTED` or `ABORTING`,
+         * then the state is changed to `ERROR`.
+         *
+         * When a client invokes an abort, but the transaction state in a coordinator is `COMMITTED` or `COMMITTING`,
+         * then the state is changed to `ERROR`.
+         */
+        ERROR,
+
+        /**
+         * When a transaction is timed out and the transaction state is `OPEN`,
+         * then the transaction state is changed from `OPEN` to `TIME_OUT`.
+         */
+        TIME_OUT
+    }
+
     /**
      * Commit the transaction.
      *
@@ -48,4 +97,12 @@ public interface Transaction {
      *  @return {@link TxnID} the txnID.
      */
     TxnID getTxnID();
+
+    /**
+     * Get transaction state.
+     *
+     * @return {@link State} the state of the transaction.
+     */
+    State getState();
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
index 55b20438693..833b0957d1c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
@@ -70,17 +70,7 @@ public class TransactionImpl implements Transaction , TimerTask {
 
     @Override
     public void run(Timeout timeout) throws Exception {
-        STATE_UPDATE.compareAndSet(this, State.OPEN, State.TIMEOUT);
-    }
-
-    public enum State {
-        OPEN,
-        COMMITTING,
-        ABORTING,
-        COMMITTED,
-        ABORTED,
-        ERROR,
-        TIMEOUT
+        STATE_UPDATE.compareAndSet(this, State.OPEN, State.TIME_OUT);
     }
 
     TransactionImpl(PulsarClientImpl client,
@@ -215,6 +205,11 @@ public class TransactionImpl implements Transaction , TimerTask {
         return new TxnID(txnIdMostBits, txnIdLeastBits);
     }
 
+    @Override
+    public State getState() {
+        return state;
+    }
+
     public <T> boolean checkIfOpen(CompletableFuture<T> completableFuture) {
         if (state == State.OPEN) {
             return true;