You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/02/25 08:29:46 UTC
[pulsar] branch branch-2.10 updated: [Transaction] Fix end transaction at state of timeout (#14370)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 2062d21 [Transaction] Fix end transaction at state of timeout (#14370)
2062d21 is described below
commit 2062d2138342563587604907ae32a5f05f064b09
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Fri Feb 25 16:15:53 2022 +0800
[Transaction] Fix end transaction at state of timeout (#14370)
### Motivation
For concurrency problems, timeout may change the status to timeout before commit/abort changes the status to committing/aborting.
### Modification
Cancel timeout when commit or abort and then check the state.
(cherry picked from commit 4b480450f32dc6ce5337d0b3d68a35111ddf474e)
---
.../org/apache/pulsar/client/impl/transaction/TransactionImpl.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
index 9cdb428..aa7a180 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
@@ -165,10 +165,10 @@ public class TransactionImpl implements Transaction , TimerTask {
@Override
public CompletableFuture<Void> commit() {
+ timeout.cancel();
return checkIfOpenOrCommitting().thenCompose((value) -> {
CompletableFuture<Void> commitFuture = new CompletableFuture<>();
this.state = State.COMMITTING;
- timeout.cancel();
allOpComplete().whenComplete((v, e) -> {
if (e != null) {
abort().whenComplete((vx, ex) -> commitFuture.completeExceptionally(e));
@@ -194,10 +194,10 @@ public class TransactionImpl implements Transaction , TimerTask {
@Override
public CompletableFuture<Void> abort() {
+ timeout.cancel();
return checkIfOpenOrAborting().thenCompose(value -> {
CompletableFuture<Void> abortFuture = new CompletableFuture<>();
this.state = State.ABORTING;
- timeout.cancel();
allOpComplete().whenComplete((v, e) -> {
if (e != null) {
log.error(e.getMessage());