You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/05/09 06:40:40 UTC
[pulsar] branch branch-2.11 updated: [fix][txn] Fix transaction is not aborted when send or ACK failed (#20055)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new e42cca5268e [fix][txn] Fix transaction is not aborted when send or ACK failed (#20055)
e42cca5268e is described below
commit e42cca5268e5e926136f3f138f8aa392c0f42c0b
Author: Yunze Xu <xy...@163.com>
AuthorDate: Thu Apr 20 18:11:52 2023 +0800
[fix][txn] Fix transaction is not aborted when send or ACK failed (#20055)
---
.../broker/transaction/TransactionProduceTest.java | 35 +++++++++--
.../client/impl/transaction/TransactionImpl.java | 71 ++++++++++++----------
2 files changed, 70 insertions(+), 36 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
index 0d1bbda4568..37812d2b12b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
@@ -26,6 +26,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.Cleanup;
@@ -43,6 +45,7 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
@@ -51,10 +54,11 @@ import org.apache.pulsar.common.api.proto.MarkerType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
import org.awaitility.Awaitility;
import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
/**
@@ -70,7 +74,7 @@ public class TransactionProduceTest extends TransactionTestBase {
private static final String ACK_COMMIT_TOPIC = NAMESPACE1 + "/ack-commit";
private static final String ACK_ABORT_TOPIC = NAMESPACE1 + "/ack-abort";
private static final int NUM_PARTITIONS = 16;
- @BeforeMethod
+ @BeforeClass
protected void setup() throws Exception {
setUpBase(1, NUM_PARTITIONS, PRODUCE_COMMIT_TOPIC, TOPIC_PARTITION);
admin.topics().createPartitionedTopic(PRODUCE_ABORT_TOPIC, TOPIC_PARTITION);
@@ -78,7 +82,7 @@ public class TransactionProduceTest extends TransactionTestBase {
admin.topics().createPartitionedTopic(ACK_ABORT_TOPIC, TOPIC_PARTITION);
}
- @AfterMethod(alwaysRun = true)
+ @AfterClass(alwaysRun = true)
protected void cleanup() throws Exception {
super.internalCleanup();
}
@@ -369,5 +373,26 @@ public class TransactionProduceTest extends TransactionTestBase {
return pendingAckCount;
}
-
+ @Test
+ public void testCommitFailure() throws Exception {
+ Transaction txn = pulsarClient.newTransaction().build().get();
+ final String topic = NAMESPACE1 + "/test-commit-failure";
+ @Cleanup
+ final Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
+ producer.newMessage(txn).value(new byte[1024 * 1024 * 10]).sendAsync();
+ try {
+ txn.commit().get();
+ Assert.fail();
+ } catch (ExecutionException e) {
+ Assert.assertTrue(e.getCause() instanceof PulsarClientException.TransactionHasOperationFailedException);
+ Assert.assertEquals(txn.getState(), Transaction.State.ABORTED);
+ }
+ try {
+ getPulsarServiceList().get(0).getTransactionMetadataStoreService().getTxnMeta(txn.getTxnID())
+ .getNow(null);
+ Assert.fail();
+ } catch (CompletionException e) {
+ Assert.assertTrue(e.getCause() instanceof CoordinatorException.TransactionNotFoundException);
+ }
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
index 833b0957d1c..68a10fed2d8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
@@ -23,6 +23,7 @@ import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import java.util.ArrayList;
import java.util.List;
+import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -144,12 +145,20 @@ public class TransactionImpl implements Transaction , TimerTask {
@Override
public CompletableFuture<Void> commit() {
timeout.cancel();
- return checkIfOpenOrCommitting().thenCompose((value) -> {
+ return checkState(State.OPEN, State.COMMITTING).thenCompose((value) -> {
CompletableFuture<Void> commitFuture = new CompletableFuture<>();
this.state = State.COMMITTING;
+<<<<<<< HEAD
allOpComplete().whenComplete((v, e) -> {
if (e != null) {
abort().whenComplete((vx, ex) -> commitFuture.completeExceptionally(e));
+=======
+ opFuture.whenComplete((v, e) -> {
+ if (hasOpsFailed) {
+ checkState(State.COMMITTING).thenCompose(__ -> internalAbort()).whenComplete((vx, ex) ->
+ commitFuture.completeExceptionally(
+ new PulsarClientException.TransactionHasOperationFailedException()));
+>>>>>>> 00d09cbbd2b ([fix][txn] Fix transaction is not aborted when send or ACK failed (#20055))
} else {
tcClient.commitAsync(new TxnID(txnIdMostBits, txnIdLeastBits))
.whenComplete((vx, ex) -> {
@@ -173,6 +182,7 @@ public class TransactionImpl implements Transaction , TimerTask {
@Override
public CompletableFuture<Void> abort() {
timeout.cancel();
+<<<<<<< HEAD
return checkIfOpenOrAborting().thenCompose(value -> {
CompletableFuture<Void> abortFuture = new CompletableFuture<>();
this.state = State.ABORTING;
@@ -181,23 +191,32 @@ public class TransactionImpl implements Transaction , TimerTask {
log.error(e.getMessage());
}
tcClient.abortAsync(new TxnID(txnIdMostBits, txnIdLeastBits)).whenComplete((vx, ex) -> {
+=======
+ return checkState(State.OPEN, State.ABORTING).thenCompose(__ -> internalAbort());
+ }
+>>>>>>> 00d09cbbd2b ([fix][txn] Fix transaction is not aborted when send or ACK failed (#20055))
- if (ex != null) {
- if (ex instanceof TransactionNotFoundException
- || ex instanceof InvalidTxnStatusException) {
- this.state = State.ERROR;
- }
- abortFuture.completeExceptionally(ex);
- } else {
- this.state = State.ABORTED;
- abortFuture.complete(null);
+ private CompletableFuture<Void> internalAbort() {
+ CompletableFuture<Void> abortFuture = new CompletableFuture<>();
+ this.state = State.ABORTING;
+ opFuture.whenComplete((v, e) -> {
+ tcClient.abortAsync(txnId).whenComplete((vx, ex) -> {
+
+ if (ex != null) {
+ if (ex instanceof TransactionNotFoundException
+ || ex instanceof InvalidTxnStatusException) {
+ this.state = State.ERROR;
}
+ abortFuture.completeExceptionally(ex);
+ } else {
+ this.state = State.ABORTED;
+ abortFuture.complete(null);
+ }
- });
});
-
- return abortFuture;
});
+
+ return abortFuture;
}
@Override
@@ -221,26 +240,16 @@ public class TransactionImpl implements Transaction , TimerTask {
}
}
- private CompletableFuture<Void> checkIfOpenOrCommitting() {
- if (state == State.OPEN || state == State.COMMITTING) {
- return CompletableFuture.completedFuture(null);
- } else {
- return invalidTxnStatusFuture();
- }
- }
-
- private CompletableFuture<Void> checkIfOpenOrAborting() {
- if (state == State.OPEN || state == State.ABORTING) {
- return CompletableFuture.completedFuture(null);
- } else {
- return invalidTxnStatusFuture();
+ private CompletableFuture<Void> checkState(State... expectedStates) {
+ final State actualState = STATE_UPDATE.get(this);
+ for (State expectedState : expectedStates) {
+ if (actualState == expectedState) {
+ return CompletableFuture.completedFuture(null);
+ }
}
- }
-
- private CompletableFuture<Void> invalidTxnStatusFuture() {
return FutureUtil.failedFuture(new InvalidTxnStatusException("[" + txnIdMostBits + ":"
- + txnIdLeastBits + "] with unexpected state : "
- + state.name() + ", expect " + State.OPEN + " state!"));
+ + txnIdLeastBits + "] with unexpected state: " + actualState.name() + ", expect: "
+ + Arrays.toString(expectedStates)));
}