You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/09/02 14:32:55 UTC
[pulsar] branch master updated: [Transaction] Support transaction
abort on partition (#7953)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3ac98d8 [Transaction] Support transaction abort on partition (#7953)
3ac98d8 is described below
commit 3ac98d8d9e70ed4305d8739638dbab6833eeabef
Author: ran <ga...@126.com>
AuthorDate: Wed Sep 2 22:32:31 2020 +0800
[Transaction] Support transaction abort on partition (#7953)
Fixes https://github.com/streamnative/pulsar/issues/1312
# Motivation
Currently, the transaction abort on partitions operation is not getting through.
### Modifications
Make the transaction abort on partitions operation get through.
---
.../broker/TransactionMetadataStoreService.java | 19 ++-
.../apache/pulsar/broker/service/ServerCnx.java | 2 +-
.../buffer/impl/PersistentTransactionBuffer.java | 17 +--
.../broker/transaction/TransactionProduceTest.java | 144 ++++++++++++++++-----
.../pulsar/client/transaction/EndToEndTest.java | 45 ++++++-
.../apache/pulsar/client/impl/ProducerImpl.java | 6 +-
.../client/impl/transaction/TransactionImpl.java | 8 +-
7 files changed, 183 insertions(+), 58 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 066e588..b1ef17b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -194,7 +194,7 @@ public class TransactionMetadataStoreService {
}
completableFuture = updateTxnStatus(txnID, newStatus, TxnStatus.OPEN)
- .thenCompose(ignored -> endToTB(txnID, newStatus));
+ .thenCompose(ignored -> endToTB(txnID, txnAction));
if (TxnStatus.COMMITTING.equals(newStatus)) {
completableFuture = completableFuture
.thenCompose(ignored -> updateTxnStatus(txnID, TxnStatus.COMMITTED, TxnStatus.COMMITTING));
@@ -205,7 +205,7 @@ public class TransactionMetadataStoreService {
return completableFuture;
}
- private CompletableFuture<Void> endToTB(TxnID txnID, TxnStatus newStatus) {
+ private CompletableFuture<Void> endToTB(TxnID txnID, int txnAction) {
CompletableFuture<Void> resultFuture = new CompletableFuture<>();
List<CompletableFuture<TxnID>> commitFutureList = new ArrayList<>();
this.getTxnMeta(txnID).whenComplete((txnMeta, throwable) -> {
@@ -214,16 +214,15 @@ public class TransactionMetadataStoreService {
return;
}
txnMeta.producedPartitions().forEach(partition -> {
- CompletableFuture<TxnID> commitFuture = new CompletableFuture<>();
- if (TxnStatus.COMMITTING.equals(newStatus)) {
- commitFuture = tbClient.commitTxnOnTopic(partition, txnID.getMostSigBits(), txnID.getLeastSigBits());
- } else if (TxnStatus.ABORTING.equals(newStatus)) {
- commitFuture.completeExceptionally(new Throwable("Unsupported operation."));
+ CompletableFuture<TxnID> actionFuture = new CompletableFuture<>();
+ if (PulsarApi.TxnAction.COMMIT_VALUE == txnAction) {
+ actionFuture = tbClient.commitTxnOnTopic(partition, txnID.getMostSigBits(), txnID.getLeastSigBits());
+ } else if (PulsarApi.TxnAction.ABORT_VALUE == txnAction) {
+ actionFuture = tbClient.abortTxnOnTopic(partition, txnID.getMostSigBits(), txnID.getLeastSigBits());
} else {
- // Unsupported txnStatus
- commitFuture.completeExceptionally(new Throwable("Unsupported txnStatus."));
+ actionFuture.completeExceptionally(new Throwable("Unsupported txnAction " + txnAction));
}
- commitFutureList.add(commitFuture);
+ commitFutureList.add(actionFuture);
});
try {
FutureUtil.waitForAll(commitFutureList).whenComplete((ignored, waitThrowable) -> {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 8d076ce..071ef80 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1717,7 +1717,7 @@ public class ServerCnx extends PulsarHandler {
final int txnAction = command.getTxnAction().getNumber();
TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
- service.getTopics().get(command.getTopic()).whenComplete((topic, t) -> {
+ service.getTopics().get(TopicName.get(command.getTopic()).toString()).whenComplete((topic, t) -> {
if (!topic.isPresent()) {
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
command.getRequestId(), ServerError.TopicNotFound,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBuffer.java
index 65f787d..4c654ee 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBuffer.java
@@ -152,20 +152,15 @@ public class PersistentTransactionBuffer extends PersistentTopic implements Tran
@Override
public CompletableFuture<Void> endTxnOnPartition(TxnID txnID, int txnAction) {
- CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+ CompletableFuture<Void> future = new CompletableFuture<>();
if (PulsarApi.TxnAction.COMMIT_VALUE == txnAction) {
- committingTxn(txnID).whenComplete((ignored, throwable) -> {
- if (throwable != null) {
- completableFuture.completeExceptionally(throwable);
- return;
- }
- completableFuture.complete(null);
- });
+ future = committingTxn(txnID);
} else if (PulsarApi.TxnAction.ABORT_VALUE == txnAction) {
- // TODO handle abort operation
- completableFuture.complete(null);
+ future = abortTxn(txnID);
+ } else {
+ future.completeExceptionally(new Exception("Unsupported txnAction " + txnAction));
}
- return completableFuture;
+ return future;
}
private CompletableFuture<Void> committingTxn(TxnID txnID) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
index 37ccc84..edcfd17 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
@@ -39,6 +39,7 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
+import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.api.proto.PulsarApi;
@@ -115,7 +116,6 @@ public class TransactionProduceTest extends TransactionTestBase {
.topic(TOPIC_OUTPUT)
.sendTimeout(0, TimeUnit.SECONDS)
.enableBatching(false)
- .roundRobinRouterBatchingPartitionSwitchFrequency(1)
.create();
int messageCntPerPartition = 3;
@@ -140,35 +140,12 @@ public class TransactionProduceTest extends TransactionTestBase {
}
// the messageId callback can't be called before commit
- futureList.forEach(messageIdFuture -> {
- try {
- messageIdFuture.get(1, TimeUnit.SECONDS);
- Assert.fail("MessageId shouldn't be get before txn commit.");
- } catch (Exception e) {
- if (e instanceof TimeoutException) {
- log.info("This is a expected exception.");
- } else {
- log.error("This exception is not expected.", e);
- Assert.fail("This exception is not expected.");
- }
- }
- });
+ checkMessageId(futureList, false);
tnx.commit().get();
- Thread.sleep(3000L);
-
// the messageId callback should be called after commit
- futureList.forEach(messageIdFuture -> {
- try {
- MessageId messageId = messageIdFuture.get(1, TimeUnit.SECONDS);
- Assert.assertNotNull(messageId);
- log.info("Tnx commit success! messageId: {}", messageId);
- } catch (Exception e) {
- log.error("Tnx commit failed! tnx: " + tnx, e);
- Assert.fail("Tnx commit failed! tnx: " + tnx);
- }
- });
+ checkMessageId(futureList, true);
for (int i = 0; i < TOPIC_PARTITION; i++) {
// the target topic partition received the commit marker
@@ -220,13 +197,116 @@ public class TransactionProduceTest extends TransactionTestBase {
System.out.println("finish test");
}
+ @Test
+ public void produceAndAbortTest() throws Exception {
+ String topic = NAMESPACE1 + "/produce-abort-test";
+ PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) pulsarClient;
+ TransactionImpl txn = (TransactionImpl) pulsarClientImpl.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build().get();
+
+ @Cleanup
+ ProducerImpl<byte[]> outProducer = (ProducerImpl<byte[]>) pulsarClientImpl
+ .newProducer()
+ .topic(topic)
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .enableBatching(false)
+ .create();
+
+ int messageCnt = 10;
+ Set<String> messageSet = new HashSet<>();
+ List<CompletableFuture<MessageId>> futureList = new ArrayList<>();
+ for (int i = 0; i < messageCnt; i++) {
+ String msg = "Hello Txn - " + i;
+ messageSet.add(msg);
+ CompletableFuture<MessageId> produceFuture = outProducer
+ .newMessage(txn).value(msg.getBytes(UTF_8)).sendAsync();
+ futureList.add(produceFuture);
+ }
+
+ // the target topic hasn't the abort marker before commit
+ ReadOnlyCursor originTopicCursor = getOriginTopicCursor(topic, -1);
+ Assert.assertNotNull(originTopicCursor);
+ Assert.assertFalse(originTopicCursor.hasMoreEntries());
+ originTopicCursor.close();
+
+ // the messageId callback can't be called before commit
+ checkMessageId(futureList, false);
+
+ txn.abort().get();
+
+ // the messageId callback should be called after commit
+ checkMessageId(futureList, true);
+
+ // the target topic partition doesn't have any entries
+ originTopicCursor = getOriginTopicCursor(topic, -1);
+ Assert.assertNotNull(originTopicCursor);
+ Assert.assertFalse(originTopicCursor.hasMoreEntries());
+
+ // the target topic transactionBuffer should receive the transaction messages,
+ // committing marker and commit marker
+ ReadOnlyCursor tbTopicCursor = getTBTopicCursor(topic, -1);
+ Assert.assertNotNull(tbTopicCursor);
+ Assert.assertTrue(tbTopicCursor.hasMoreEntries());
+ long tbEntriesCnt = tbTopicCursor.getNumberOfEntries();
+ log.info("transaction buffer entries count: {}", tbEntriesCnt);
+ Assert.assertEquals(tbEntriesCnt, messageCnt + 1);
+
+ PulsarApi.MessageMetadata messageMetadata;
+ List<Entry> entries = tbTopicCursor.readEntries((int) tbEntriesCnt);
+ // check the messages
+ for (int i = 0; i < messageCnt; i++) {
+ messageMetadata = Commands.parseMessageMetadata(entries.get(i).getDataBuffer());
+ Assert.assertEquals(messageMetadata.getTxnidMostBits(), txn.getTxnIdMostBits());
+ Assert.assertEquals(messageMetadata.getTxnidLeastBits(), txn.getTxnIdLeastBits());
+
+ byte[] bytes = new byte[entries.get(i).getDataBuffer().readableBytes()];
+ entries.get(i).getDataBuffer().readBytes(bytes);
+ Assert.assertTrue(messageSet.remove(new String(bytes)));
+ }
+
+ // check abort marker
+ messageMetadata = Commands.parseMessageMetadata(entries.get(messageCnt).getDataBuffer());
+ Assert.assertEquals(PulsarMarkers.MarkerType.TXN_ABORT_VALUE, messageMetadata.getMarkerType());
+
+ Assert.assertEquals(0, messageSet.size());
+ log.info("finish test produceAndAbortTest.");
+ }
+
+ private void checkMessageId(List<CompletableFuture<MessageId>> futureList, boolean isFinished) {
+ futureList.forEach(messageIdFuture -> {
+ try {
+ MessageId messageId = messageIdFuture.get(1, TimeUnit.SECONDS);
+ if (isFinished) {
+ Assert.assertNotNull(messageId);
+ log.info("Tnx commit success! messageId: {}", messageId);
+ } else {
+ Assert.fail("MessageId shouldn't be get before txn abort.");
+ }
+ } catch (Exception e) {
+ if (!isFinished) {
+ if (e instanceof TimeoutException) {
+ log.info("This is a expected exception.");
+ } else {
+ log.error("This exception is not expected.", e);
+ Assert.fail("This exception is not expected.");
+ }
+ } else {
+ log.error("Tnx commit failed!", e);
+ Assert.fail("Tnx commit failed!");
+ }
+ }
+ });
+ }
+
private ReadOnlyCursor getTBTopicCursor(String topic, int partition) {
try {
- String tbTopicName = PersistentTransactionBuffer.getTransactionBufferTopicName(
- TopicName.get(topic).toString() + TopicName.PARTITIONED_TOPIC_SUFFIX + partition);
+ String topicSuffix = partition >= 0 ? TopicName.PARTITIONED_TOPIC_SUFFIX + partition : "";
+ topic = PersistentTransactionBuffer.getTransactionBufferTopicName(
+ TopicName.get(topic).toString() + topicSuffix);
return getPulsarServiceList().get(0).getManagedLedgerFactory().openReadOnlyCursor(
- TopicName.get(tbTopicName).getPersistenceNamingEncoding(),
+ TopicName.get(topic).getPersistenceNamingEncoding(),
PositionImpl.earliest, new ManagedLedgerConfig());
} catch (Exception e) {
log.error("Failed to get transaction buffer topic readonly cursor.", e);
@@ -237,9 +317,11 @@ public class TransactionProduceTest extends TransactionTestBase {
private ReadOnlyCursor getOriginTopicCursor(String topic, int partition) {
try {
- String partitionTopic = TopicName.get(topic).toString() + TopicName.PARTITIONED_TOPIC_SUFFIX + partition;
+ if (partition >= 0) {
+ topic = TopicName.get(topic).toString() + TopicName.PARTITIONED_TOPIC_SUFFIX + partition;
+ }
return getPulsarServiceList().get(0).getManagedLedgerFactory().openReadOnlyCursor(
- TopicName.get(partitionTopic).getPersistenceNamingEncoding(),
+ TopicName.get(topic).getPersistenceNamingEncoding(),
PositionImpl.earliest, new ManagedLedgerConfig());
} catch (Exception e) {
log.error("Failed to get origin topic readonly cursor.", e);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/transaction/EndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/transaction/EndToEndTest.java
index f9c4940..a1a5028 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/transaction/EndToEndTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/transaction/EndToEndTest.java
@@ -87,7 +87,7 @@ public class EndToEndTest extends TransactionTestBase {
}
@Test
- public void test() throws Exception {
+ public void partitionCommitTest() throws Exception {
Transaction txn = ((PulsarClientImpl) pulsarClient)
.newTransaction()
.withTransactionTimeout(2, TimeUnit.SECONDS)
@@ -149,4 +149,47 @@ public class EndToEndTest extends TransactionTestBase {
log.info("receive transaction messages count: {}", receiveCnt);
}
+ @Test
+ public void partitionAbortTest() throws Exception {
+ Transaction txn = ((PulsarClientImpl) pulsarClient)
+ .newTransaction()
+ .withTransactionTimeout(2, TimeUnit.SECONDS)
+ .build()
+ .get();
+
+ @Cleanup
+ PartitionedProducerImpl<byte[]> producer = (PartitionedProducerImpl<byte[]>) pulsarClient
+ .newProducer()
+ .topic(TOPIC_OUTPUT)
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .enableBatching(false)
+ .create();
+
+ int messageCnt = 10;
+ for (int i = 0; i < messageCnt; i++) {
+ producer.newMessage(txn).value(("Hello Txn - " + i).getBytes(UTF_8)).sendAsync();
+ }
+
+ @Cleanup
+ MultiTopicsConsumerImpl<byte[]> consumer = (MultiTopicsConsumerImpl<byte[]>) pulsarClient
+ .newConsumer()
+ .topic(TOPIC_OUTPUT)
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscriptionName("test")
+ .enableBatchIndexAcknowledgment(true)
+ .subscribe();
+
+ // Can't receive transaction messages before abort.
+ Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
+ Assert.assertNull(message);
+
+ txn.abort().get();
+
+ // Cant't receive transaction messages after abort.
+ message = consumer.receive(5, TimeUnit.SECONDS);
+ Assert.assertNull(message);
+
+ log.info("finished test partitionAbortTest");
+ }
+
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 4cb7a22..afc0195 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -269,6 +269,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
@Override
CompletableFuture<MessageId> internalSendAsync(Message<?> message, Transaction txn) {
+ if (txn instanceof TransactionImpl) {
+ ((TransactionImpl) txn).registerProducedTopic(topic);
+ }
CompletableFuture<MessageId> future = new CompletableFuture<>();
@@ -343,9 +346,6 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
nextCallback = scb;
}
});
- if (txn instanceof TransactionImpl) {
- ((TransactionImpl) txn).registerProducedTopic(topic);
- }
return future;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
index 368d805..38530b7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
@@ -138,6 +138,12 @@ public class TransactionImpl implements Transaction {
@Override
public CompletableFuture<Void> abort() {
- return FutureUtil.failedFuture(new UnsupportedOperationException("Not Implemented Yet"));
+ return tcClient.abortAsync(new TxnID(txnIdMostBits, txnIdLeastBits)).whenComplete((ignored, throwable) -> {
+ sendOps.values().forEach(txnSendOp -> {
+ txnSendOp.sendFuture.whenComplete(((messageId, t) -> {
+ txnSendOp.transactionalSendFuture.complete(messageId);
+ }));
+ });
+ });
}
}