You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/02/25 08:29:46 UTC

[pulsar] branch branch-2.10 updated: [Transaction] Fix end transaction at state of timeout (#14370)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 2062d21  [Transaction] Fix end transaction at state of timeout (#14370)
2062d21 is described below

commit 2062d2138342563587604907ae32a5f05f064b09
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Fri Feb 25 16:15:53 2022 +0800

    [Transaction] Fix end transaction at state of timeout (#14370)
    
    ### Motivation
    For concurrency problems, timeout may change the status to timeout before commit/abort changes the status to committing/aborting.
    
    ### Modification
    Cancel timeout when commit or abort and then check the state.
    
    (cherry picked from commit 4b480450f32dc6ce5337d0b3d68a35111ddf474e)
---
 .../org/apache/pulsar/client/impl/transaction/TransactionImpl.java    | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
index 9cdb428..aa7a180 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
@@ -165,10 +165,10 @@ public class TransactionImpl implements Transaction , TimerTask {
 
     @Override
     public CompletableFuture<Void> commit() {
+        timeout.cancel();
         return checkIfOpenOrCommitting().thenCompose((value) -> {
             CompletableFuture<Void> commitFuture = new CompletableFuture<>();
             this.state = State.COMMITTING;
-            timeout.cancel();
             allOpComplete().whenComplete((v, e) -> {
                 if (e != null) {
                     abort().whenComplete((vx, ex) -> commitFuture.completeExceptionally(e));
@@ -194,10 +194,10 @@ public class TransactionImpl implements Transaction , TimerTask {
 
     @Override
     public CompletableFuture<Void> abort() {
+        timeout.cancel();
         return checkIfOpenOrAborting().thenCompose(value -> {
             CompletableFuture<Void> abortFuture = new CompletableFuture<>();
             this.state = State.ABORTING;
-            timeout.cancel();
             allOpComplete().whenComplete((v, e) -> {
                 if (e != null) {
                     log.error(e.getMessage());