You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/05/09 06:40:40 UTC

[pulsar] branch branch-2.11 updated: [fix][txn] Fix transaction is not aborted when send or ACK failed (#20055)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new e42cca5268e [fix][txn] Fix transaction is not aborted when send or ACK failed (#20055)
e42cca5268e is described below

commit e42cca5268e5e926136f3f138f8aa392c0f42c0b
Author: Yunze Xu <xy...@163.com>
AuthorDate: Thu Apr 20 18:11:52 2023 +0800

    [fix][txn] Fix transaction is not aborted when send or ACK failed (#20055)
---
 .../broker/transaction/TransactionProduceTest.java | 35 +++++++++--
 .../client/impl/transaction/TransactionImpl.java   | 71 ++++++++++++----------
 2 files changed, 70 insertions(+), 36 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
index 0d1bbda4568..37812d2b12b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
@@ -26,6 +26,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import lombok.Cleanup;
@@ -43,6 +45,7 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
@@ -51,10 +54,11 @@ import org.apache.pulsar.common.api.proto.MarkerType;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 /**
@@ -70,7 +74,7 @@ public class TransactionProduceTest extends TransactionTestBase {
     private static final String ACK_COMMIT_TOPIC = NAMESPACE1 + "/ack-commit";
     private static final String ACK_ABORT_TOPIC = NAMESPACE1 + "/ack-abort";
     private static final int NUM_PARTITIONS = 16;
-    @BeforeMethod
+    @BeforeClass
     protected void setup() throws Exception {
         setUpBase(1, NUM_PARTITIONS, PRODUCE_COMMIT_TOPIC, TOPIC_PARTITION);
         admin.topics().createPartitionedTopic(PRODUCE_ABORT_TOPIC, TOPIC_PARTITION);
@@ -78,7 +82,7 @@ public class TransactionProduceTest extends TransactionTestBase {
         admin.topics().createPartitionedTopic(ACK_ABORT_TOPIC, TOPIC_PARTITION);
     }
 
-    @AfterMethod(alwaysRun = true)
+    @AfterClass(alwaysRun = true)
     protected void cleanup() throws Exception {
         super.internalCleanup();
     }
@@ -369,5 +373,26 @@ public class TransactionProduceTest extends TransactionTestBase {
         return pendingAckCount;
     }
 
-
+    @Test
+    public void testCommitFailure() throws Exception {
+        Transaction txn = pulsarClient.newTransaction().build().get();
+        final String topic = NAMESPACE1 + "/test-commit-failure";
+        @Cleanup
+        final Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
+        producer.newMessage(txn).value(new byte[1024 * 1024 * 10]).sendAsync();
+        try {
+            txn.commit().get();
+            Assert.fail();
+        } catch (ExecutionException e) {
+            Assert.assertTrue(e.getCause() instanceof PulsarClientException.TransactionHasOperationFailedException);
+            Assert.assertEquals(txn.getState(), Transaction.State.ABORTED);
+        }
+        try {
+            getPulsarServiceList().get(0).getTransactionMetadataStoreService().getTxnMeta(txn.getTxnID())
+                    .getNow(null);
+            Assert.fail();
+        } catch (CompletionException e) {
+            Assert.assertTrue(e.getCause() instanceof CoordinatorException.TransactionNotFoundException);
+        }
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
index 833b0957d1c..68a10fed2d8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
@@ -23,6 +23,7 @@ import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -144,12 +145,20 @@ public class TransactionImpl implements Transaction , TimerTask {
     @Override
     public CompletableFuture<Void> commit() {
         timeout.cancel();
-        return checkIfOpenOrCommitting().thenCompose((value) -> {
+        return checkState(State.OPEN, State.COMMITTING).thenCompose((value) -> {
             CompletableFuture<Void> commitFuture = new CompletableFuture<>();
             this.state = State.COMMITTING;
+<<<<<<< HEAD
             allOpComplete().whenComplete((v, e) -> {
                 if (e != null) {
                     abort().whenComplete((vx, ex) -> commitFuture.completeExceptionally(e));
+=======
+            opFuture.whenComplete((v, e) -> {
+                if (hasOpsFailed) {
+                    checkState(State.COMMITTING).thenCompose(__ -> internalAbort()).whenComplete((vx, ex) ->
+                            commitFuture.completeExceptionally(
+                                    new PulsarClientException.TransactionHasOperationFailedException()));
+>>>>>>> 00d09cbbd2b ([fix][txn] Fix transaction is not aborted when send or ACK failed (#20055))
                 } else {
                     tcClient.commitAsync(new TxnID(txnIdMostBits, txnIdLeastBits))
                             .whenComplete((vx, ex) -> {
@@ -173,6 +182,7 @@ public class TransactionImpl implements Transaction , TimerTask {
     @Override
     public CompletableFuture<Void> abort() {
         timeout.cancel();
+<<<<<<< HEAD
         return checkIfOpenOrAborting().thenCompose(value -> {
             CompletableFuture<Void> abortFuture = new CompletableFuture<>();
             this.state = State.ABORTING;
@@ -181,23 +191,32 @@ public class TransactionImpl implements Transaction , TimerTask {
                     log.error(e.getMessage());
                 }
                 tcClient.abortAsync(new TxnID(txnIdMostBits, txnIdLeastBits)).whenComplete((vx, ex) -> {
+=======
+        return checkState(State.OPEN, State.ABORTING).thenCompose(__ -> internalAbort());
+    }
+>>>>>>> 00d09cbbd2b ([fix][txn] Fix transaction is not aborted when send or ACK failed (#20055))
 
-                    if (ex != null) {
-                        if (ex instanceof TransactionNotFoundException
-                                || ex instanceof InvalidTxnStatusException) {
-                            this.state = State.ERROR;
-                        }
-                        abortFuture.completeExceptionally(ex);
-                    } else {
-                        this.state = State.ABORTED;
-                        abortFuture.complete(null);
+    private CompletableFuture<Void> internalAbort() {
+        CompletableFuture<Void> abortFuture = new CompletableFuture<>();
+        this.state = State.ABORTING;
+        opFuture.whenComplete((v, e) -> {
+            tcClient.abortAsync(txnId).whenComplete((vx, ex) -> {
+
+                if (ex != null) {
+                    if (ex instanceof TransactionNotFoundException
+                            || ex instanceof InvalidTxnStatusException) {
+                        this.state = State.ERROR;
                     }
+                    abortFuture.completeExceptionally(ex);
+                } else {
+                    this.state = State.ABORTED;
+                    abortFuture.complete(null);
+                }
 
-                });
             });
-
-            return abortFuture;
         });
+
+        return abortFuture;
     }
 
     @Override
@@ -221,26 +240,16 @@ public class TransactionImpl implements Transaction , TimerTask {
         }
     }
 
-    private CompletableFuture<Void> checkIfOpenOrCommitting() {
-        if (state == State.OPEN || state == State.COMMITTING) {
-            return CompletableFuture.completedFuture(null);
-        } else {
-            return invalidTxnStatusFuture();
-        }
-    }
-
-    private CompletableFuture<Void> checkIfOpenOrAborting() {
-        if (state == State.OPEN || state == State.ABORTING) {
-            return CompletableFuture.completedFuture(null);
-        } else {
-            return invalidTxnStatusFuture();
+    private CompletableFuture<Void> checkState(State... expectedStates) {
+        final State actualState = STATE_UPDATE.get(this);
+        for (State expectedState : expectedStates) {
+            if (actualState == expectedState) {
+                return CompletableFuture.completedFuture(null);
+            }
         }
-    }
-
-    private CompletableFuture<Void> invalidTxnStatusFuture() {
         return FutureUtil.failedFuture(new InvalidTxnStatusException("[" + txnIdMostBits + ":"
-                + txnIdLeastBits + "] with unexpected state : "
-                + state.name() + ", expect " + State.OPEN + " state!"));
+                + txnIdLeastBits + "] with unexpected state: " + actualState.name() + ", expect: "
+                + Arrays.toString(expectedStates)));
     }