You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/02/10 04:04:33 UTC

[pulsar] 10/13: [Transaction] Optimize transaction timeout (#14172)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit de5dec4239fbd1c3a595e013a81f262059444489
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Wed Feb 9 15:34:25 2022 +0800

    [Transaction] Optimize transaction timeout (#14172)
    
    (cherry picked from commit 5ee210a5a12573bf8d047962bbac82528091216c)
---
 .../pulsar/broker/transaction/TransactionTest.java | 27 +++++++++++++++++++++-
 .../impl/transaction/TransactionBuilderImpl.java   |  4 +---
 .../client/impl/transaction/TransactionImpl.java   |  6 +++++
 3 files changed, 33 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 4aaadb2..b627438 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -34,6 +34,7 @@ import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 import io.netty.buffer.Unpooled;
+import io.netty.util.Timeout;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.util.HashMap;
@@ -774,4 +775,28 @@ public class TransactionTest extends TransactionTestBase {
                 }));
         completableFuture.get(5, TimeUnit.SECONDS);
     }
-}
+
+    @Test
+    public void testCancelTxnTimeout() throws Exception{
+        Transaction transaction = pulsarClient.newTransaction()
+                .withTransactionTimeout(10, TimeUnit.SECONDS)
+                .build()
+                .get();
+
+        transaction.commit().get();
+
+        Field field = TransactionImpl.class.getDeclaredField("timeout");
+        field.setAccessible(true);
+        Timeout timeout = (Timeout) field.get(transaction);
+        Assert.assertTrue(timeout.isCancelled());
+
+        transaction = pulsarClient.newTransaction()
+                .withTransactionTimeout(10, TimeUnit.SECONDS)
+                .build()
+                .get();
+
+        transaction.abort().get();
+        timeout = (Timeout) field.get(transaction);
+        Assert.assertTrue(timeout.isCancelled());
+    }
+}
\ No newline at end of file
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java
index 3ac8676..9878264 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java
@@ -69,10 +69,8 @@ public class TransactionBuilderImpl implements TransactionBuilder {
                         future.completeExceptionally(throwable);
                         return;
                     }
-                    TransactionImpl transaction = new TransactionImpl(client, txnTimeout,
+                    TransactionImpl transaction = new TransactionImpl(client, timeUnit.toMillis(txnTimeout),
                             txnID.getLeastSigBits(), txnID.getMostSigBits());
-                    client.getTimer().newTimeout(transaction,
-                            txnTimeout, timeUnit);
                     future.complete(transaction);
                 });
         return future;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
index ebcb20e..8adc162 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import com.google.common.collect.Lists;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -68,6 +69,7 @@ public class TransactionImpl implements Transaction , TimerTask {
     private volatile State state;
     private static final AtomicReferenceFieldUpdater<TransactionImpl, State> STATE_UPDATE =
         AtomicReferenceFieldUpdater.newUpdater(TransactionImpl.class, State.class, "state");
+    private final Timeout timeout;
 
     @Override
     public void run(Timeout timeout) throws Exception {
@@ -100,6 +102,8 @@ public class TransactionImpl implements Transaction , TimerTask {
 
         this.sendFutureList = new ArrayList<>();
         this.ackFutureList = new ArrayList<>();
+        this.timeout = client.getTimer().newTimeout(this, transactionTimeoutMs, TimeUnit.MILLISECONDS);
+
     }
 
     // register the topics that will be modified by this transaction
@@ -164,6 +168,7 @@ public class TransactionImpl implements Transaction , TimerTask {
         return checkIfOpenOrCommitting().thenCompose((value) -> {
             CompletableFuture<Void> commitFuture = new CompletableFuture<>();
             this.state = State.COMMITTING;
+            timeout.cancel();
             allOpComplete().whenComplete((v, e) -> {
                 if (e != null) {
                     abort().whenComplete((vx, ex) -> commitFuture.completeExceptionally(e));
@@ -192,6 +197,7 @@ public class TransactionImpl implements Transaction , TimerTask {
         return checkIfOpenOrAborting().thenCompose(value -> {
             CompletableFuture<Void> abortFuture = new CompletableFuture<>();
             this.state = State.ABORTING;
+            timeout.cancel();
             allOpComplete().whenComplete((v, e) -> {
                 if (e != null) {
                     log.error(e.getMessage());