You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rg...@apache.org on 2022/03/01 16:24:25 UTC

[pulsar] 16/21: [Transaction] Fix end transaction at state of timeout (#14370)

This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3089aa4040cc6c42c05898fab00c98332a9cb395
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Fri Feb 25 16:15:53 2022 +0800

    [Transaction] Fix end transaction at state of timeout (#14370)
    
    ### Motivation
    For concurrency problems, timeout may change the status to timeout before commit/abort changes the status to committing/aborting.
    
    ### Modification
    Cancel timeout when commit or abort and then check the state.
    
    (cherry picked from commit 4b480450f32dc6ce5337d0b3d68a35111ddf474e)
---
 .../org/apache/pulsar/client/impl/transaction/TransactionImpl.java    | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
index 8adc162..bba5331 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
@@ -165,10 +165,10 @@ public class TransactionImpl implements Transaction , TimerTask {
 
     @Override
     public CompletableFuture<Void> commit() {
+        timeout.cancel();
         return checkIfOpenOrCommitting().thenCompose((value) -> {
             CompletableFuture<Void> commitFuture = new CompletableFuture<>();
             this.state = State.COMMITTING;
-            timeout.cancel();
             allOpComplete().whenComplete((v, e) -> {
                 if (e != null) {
                     abort().whenComplete((vx, ex) -> commitFuture.completeExceptionally(e));
@@ -194,10 +194,10 @@ public class TransactionImpl implements Transaction , TimerTask {
 
     @Override
     public CompletableFuture<Void> abort() {
+        timeout.cancel();
         return checkIfOpenOrAborting().thenCompose(value -> {
             CompletableFuture<Void> abortFuture = new CompletableFuture<>();
             this.state = State.ABORTING;
-            timeout.cancel();
             allOpComplete().whenComplete((v, e) -> {
                 if (e != null) {
                     log.error(e.getMessage());