You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/09/22 15:00:32 UTC
[pulsar] branch master updated: [improve][txn] Add getState in transaction for client API (#17423)
This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a9531dbfafe [improve][txn] Add getState in transaction for client API (#17423)
a9531dbfafe is described below
commit a9531dbfafe73e5f351ae3f526892ffdb55129b2
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Thu Sep 22 23:00:24 2022 +0800
[improve][txn] Add getState in transaction for client API (#17423)
### Motivation
now `org.apache.pulsar.client.api.transaction.Transaction` dont have a interface for user to get the transaction state.
user can get the transaction state to do user's own op.
### Modifications
1. add the interface in `org.apache.pulsar.client.api.transaction.Transaction` `getState`
2. TransactionImpl implement the interface
```
* Get transaction state.
*
* @return {@link State} the state of the transaction.
*/
State getState();
```
### Verifying this change
add the test
---
.../pulsar/broker/transaction/TransactionTest.java | 48 ++++++++++++++++++
.../client/impl/TransactionEndToEndTest.java | 2 +-
.../pulsar/client/api/transaction/Transaction.java | 57 ++++++++++++++++++++++
.../client/impl/transaction/TransactionImpl.java | 17 +++----
4 files changed, 112 insertions(+), 12 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index ffc351e9413..c309f69fd56 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -1469,4 +1469,52 @@ public class TransactionTest extends TransactionTestBase {
Assert.assertTrue(t instanceof BrokerServiceException.ServiceUnitNotReadyException);
}
}
+
+ @Test
+ public void testGetTxnState() throws Exception {
+ Transaction transaction = pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.SECONDS)
+ .build().get();
+
+ // test OPEN and TIMEOUT
+ assertEquals(transaction.getState(), Transaction.State.OPEN);
+ Transaction timeoutTxn = transaction;
+ Awaitility.await().until(() -> timeoutTxn.getState() == Transaction.State.TIME_OUT);
+
+ // test abort
+ transaction = pulsarClient.newTransaction().withTransactionTimeout(3, TimeUnit.SECONDS)
+ .build().get();
+ transaction.abort().get();
+ assertEquals(transaction.getState(), Transaction.State.ABORTED);
+
+ // test commit
+ transaction = pulsarClient.newTransaction().withTransactionTimeout(3, TimeUnit.SECONDS)
+ .build().get();
+ transaction.commit().get();
+ assertEquals(transaction.getState(), Transaction.State.COMMITTED);
+
+ // test error
+ transaction = pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.SECONDS)
+ .build().get();
+ pulsarServiceList.get(0).getTransactionMetadataStoreService()
+ .endTransaction(transaction.getTxnID(), 0, false);
+ transaction.commit();
+ Transaction errorTxn = transaction;
+ Awaitility.await().until(() -> errorTxn.getState() == Transaction.State.ERROR);
+
+ // test committing
+ transaction = pulsarClient.newTransaction().withTransactionTimeout(3, TimeUnit.SECONDS)
+ .build().get();
+ ((TransactionImpl) transaction).registerSendOp(new CompletableFuture<>());
+ transaction.commit();
+ Transaction committingTxn = transaction;
+ Awaitility.await().until(() -> committingTxn.getState() == Transaction.State.COMMITTING);
+
+ // test aborting
+ transaction = pulsarClient.newTransaction().withTransactionTimeout(3, TimeUnit.SECONDS)
+ .build().get();
+ ((TransactionImpl) transaction).registerSendOp(new CompletableFuture<>());
+ transaction.abort();
+ Transaction abortingTxn = transaction;
+ Awaitility.await().until(() -> abortingTxn.getState() == Transaction.State.ABORTING);
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index ddb9062454b..aea77bec136 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -1052,7 +1052,7 @@ public class TransactionEndToEndTest extends TransactionTestBase {
.build().get();
producer.newMessage().send();
Awaitility.await().untilAsserted(() -> {
- Assert.assertEquals(((TransactionImpl)transaction).getState(), TransactionImpl.State.TIMEOUT);
+ Assert.assertEquals(((TransactionImpl)transaction).getState(), TransactionImpl.State.TIME_OUT);
});
try {
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java
index fd4cf0bc166..33e96d5c276 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java
@@ -29,6 +29,55 @@ import org.apache.pulsar.common.classification.InterfaceStability;
@InterfaceStability.Evolving
public interface Transaction {
+ enum State {
+
+ /**
+ * When a transaction is in the `OPEN` state, messages can be produced and acked with this transaction.
+ *
+ * When a transaction is in the `OPEN` state, it can commit or abort.
+ */
+ OPEN,
+
+ /**
+ * When a client invokes a commit, the transaction state is changed from `OPEN` to `COMMITTING`.
+ */
+ COMMITTING,
+
+ /**
+ * When a client invokes an abort, the transaction state is changed from `OPEN` to `ABORTING`.
+ */
+ ABORTING,
+
+ /**
+ * When a client receives a response to a commit, the transaction state is changed from
+ * `COMMITTING` to `COMMITTED`.
+ */
+ COMMITTED,
+
+ /**
+ * When a client receives a response to an abort, the transaction state is changed from `ABORTING` to `ABORTED`.
+ */
+ ABORTED,
+
+ /**
+ * When a client invokes a commit or an abort, but a transaction does not exist in a coordinator,
+ * then the state is changed to `ERROR`.
+ *
+ * When a client invokes a commit, but the transaction state in a coordinator is `ABORTED` or `ABORTING`,
+ * then the state is changed to `ERROR`.
+ *
+ * When a client invokes an abort, but the transaction state in a coordinator is `COMMITTED` or `COMMITTING`,
+ * then the state is changed to `ERROR`.
+ */
+ ERROR,
+
+ /**
+ * When a transaction is timed out and the transaction state is `OPEN`,
+ * then the transaction state is changed from `OPEN` to `TIME_OUT`.
+ */
+ TIME_OUT
+ }
+
/**
* Commit the transaction.
*
@@ -48,4 +97,12 @@ public interface Transaction {
* @return {@link TxnID} the txnID.
*/
TxnID getTxnID();
+
+ /**
+ * Get transaction state.
+ *
+ * @return {@link State} the state of the transaction.
+ */
+ State getState();
+
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
index 55b20438693..833b0957d1c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
@@ -70,17 +70,7 @@ public class TransactionImpl implements Transaction , TimerTask {
@Override
public void run(Timeout timeout) throws Exception {
- STATE_UPDATE.compareAndSet(this, State.OPEN, State.TIMEOUT);
- }
-
- public enum State {
- OPEN,
- COMMITTING,
- ABORTING,
- COMMITTED,
- ABORTED,
- ERROR,
- TIMEOUT
+ STATE_UPDATE.compareAndSet(this, State.OPEN, State.TIME_OUT);
}
TransactionImpl(PulsarClientImpl client,
@@ -215,6 +205,11 @@ public class TransactionImpl implements Transaction , TimerTask {
return new TxnID(txnIdMostBits, txnIdLeastBits);
}
+ @Override
+ public State getState() {
+ return state;
+ }
+
public <T> boolean checkIfOpen(CompletableFuture<T> completableFuture) {
if (state == State.OPEN) {
return true;