You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/09/02 14:32:55 UTC

[pulsar] branch master updated: [Transaction] Support transaction abort on partition (#7953)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ac98d8  [Transaction] Support transaction abort on partition (#7953)
3ac98d8 is described below

commit 3ac98d8d9e70ed4305d8739638dbab6833eeabef
Author: ran <ga...@126.com>
AuthorDate: Wed Sep 2 22:32:31 2020 +0800

    [Transaction] Support transaction abort on partition (#7953)
    
    Fixes https://github.com/streamnative/pulsar/issues/1312
    
    # Motivation
    
    Currently, the transaction abort on partitions operation is not getting through.
    
    ### Modifications
    
    Make the transaction abort on partitions operation get through.
---
 .../broker/TransactionMetadataStoreService.java    |  19 ++-
 .../apache/pulsar/broker/service/ServerCnx.java    |   2 +-
 .../buffer/impl/PersistentTransactionBuffer.java   |  17 +--
 .../broker/transaction/TransactionProduceTest.java | 144 ++++++++++++++++-----
 .../pulsar/client/transaction/EndToEndTest.java    |  45 ++++++-
 .../apache/pulsar/client/impl/ProducerImpl.java    |   6 +-
 .../client/impl/transaction/TransactionImpl.java   |   8 +-
 7 files changed, 183 insertions(+), 58 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 066e588..b1ef17b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -194,7 +194,7 @@ public class TransactionMetadataStoreService {
         }
 
         completableFuture = updateTxnStatus(txnID, newStatus, TxnStatus.OPEN)
-                .thenCompose(ignored -> endToTB(txnID, newStatus));
+                .thenCompose(ignored -> endToTB(txnID, txnAction));
         if (TxnStatus.COMMITTING.equals(newStatus)) {
             completableFuture = completableFuture
                     .thenCompose(ignored -> updateTxnStatus(txnID, TxnStatus.COMMITTED, TxnStatus.COMMITTING));
@@ -205,7 +205,7 @@ public class TransactionMetadataStoreService {
         return completableFuture;
     }
 
-    private CompletableFuture<Void> endToTB(TxnID txnID, TxnStatus newStatus) {
+    private CompletableFuture<Void> endToTB(TxnID txnID, int txnAction) {
         CompletableFuture<Void> resultFuture = new CompletableFuture<>();
         List<CompletableFuture<TxnID>> commitFutureList = new ArrayList<>();
         this.getTxnMeta(txnID).whenComplete((txnMeta, throwable) -> {
@@ -214,16 +214,15 @@ public class TransactionMetadataStoreService {
                 return;
             }
             txnMeta.producedPartitions().forEach(partition -> {
-                CompletableFuture<TxnID> commitFuture = new CompletableFuture<>();
-                if (TxnStatus.COMMITTING.equals(newStatus)) {
-                    commitFuture = tbClient.commitTxnOnTopic(partition, txnID.getMostSigBits(), txnID.getLeastSigBits());
-                } else if (TxnStatus.ABORTING.equals(newStatus)) {
-                    commitFuture.completeExceptionally(new Throwable("Unsupported operation."));
+                CompletableFuture<TxnID> actionFuture = new CompletableFuture<>();
+                if (PulsarApi.TxnAction.COMMIT_VALUE == txnAction) {
+                    actionFuture = tbClient.commitTxnOnTopic(partition, txnID.getMostSigBits(), txnID.getLeastSigBits());
+                } else if (PulsarApi.TxnAction.ABORT_VALUE == txnAction) {
+                    actionFuture = tbClient.abortTxnOnTopic(partition, txnID.getMostSigBits(), txnID.getLeastSigBits());
                 } else {
-                    // Unsupported txnStatus
-                    commitFuture.completeExceptionally(new Throwable("Unsupported txnStatus."));
+                    actionFuture.completeExceptionally(new Throwable("Unsupported txnAction " + txnAction));
                 }
-                commitFutureList.add(commitFuture);
+                commitFutureList.add(actionFuture);
             });
             try {
                 FutureUtil.waitForAll(commitFutureList).whenComplete((ignored, waitThrowable) -> {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 8d076ce..071ef80 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1717,7 +1717,7 @@ public class ServerCnx extends PulsarHandler {
         final int txnAction = command.getTxnAction().getNumber();
         TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
 
-        service.getTopics().get(command.getTopic()).whenComplete((topic, t) -> {
+        service.getTopics().get(TopicName.get(command.getTopic()).toString()).whenComplete((topic, t) -> {
             if (!topic.isPresent()) {
                 ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
                         command.getRequestId(), ServerError.TopicNotFound,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBuffer.java
index 65f787d..4c654ee 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBuffer.java
@@ -152,20 +152,15 @@ public class PersistentTransactionBuffer extends PersistentTopic implements Tran
 
     @Override
     public CompletableFuture<Void> endTxnOnPartition(TxnID txnID, int txnAction) {
-        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        CompletableFuture<Void> future = new CompletableFuture<>();
         if (PulsarApi.TxnAction.COMMIT_VALUE == txnAction) {
-            committingTxn(txnID).whenComplete((ignored, throwable) -> {
-                if (throwable != null) {
-                    completableFuture.completeExceptionally(throwable);
-                    return;
-                }
-                completableFuture.complete(null);
-            });
+            future = committingTxn(txnID);
         } else if (PulsarApi.TxnAction.ABORT_VALUE == txnAction) {
-            // TODO handle abort operation
-            completableFuture.complete(null);
+            future = abortTxn(txnID);
+        } else {
+            future.completeExceptionally(new Exception("Unsupported txnAction " + txnAction));
         }
-        return completableFuture;
+        return future;
     }
 
     private CompletableFuture<Void> committingTxn(TxnID txnID) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
index 37ccc84..edcfd17 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
@@ -39,6 +39,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.impl.PartitionedProducerImpl;
+import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.common.api.proto.PulsarApi;
@@ -115,7 +116,6 @@ public class TransactionProduceTest extends TransactionTestBase {
                 .topic(TOPIC_OUTPUT)
                 .sendTimeout(0, TimeUnit.SECONDS)
                 .enableBatching(false)
-                .roundRobinRouterBatchingPartitionSwitchFrequency(1)
                 .create();
 
         int messageCntPerPartition = 3;
@@ -140,35 +140,12 @@ public class TransactionProduceTest extends TransactionTestBase {
         }
 
         // the messageId callback can't be called before commit
-        futureList.forEach(messageIdFuture -> {
-            try {
-                messageIdFuture.get(1, TimeUnit.SECONDS);
-                Assert.fail("MessageId shouldn't be get before txn commit.");
-            } catch (Exception e) {
-                if (e instanceof TimeoutException) {
-                    log.info("This is a expected exception.");
-                } else {
-                    log.error("This exception is not expected.", e);
-                    Assert.fail("This exception is not expected.");
-                }
-            }
-        });
+        checkMessageId(futureList, false);
 
         tnx.commit().get();
 
-        Thread.sleep(3000L);
-
         // the messageId callback should be called after commit
-        futureList.forEach(messageIdFuture -> {
-            try {
-                MessageId messageId = messageIdFuture.get(1, TimeUnit.SECONDS);
-                Assert.assertNotNull(messageId);
-                log.info("Tnx commit success! messageId: {}", messageId);
-            } catch (Exception e) {
-                log.error("Tnx commit failed! tnx: " + tnx, e);
-                Assert.fail("Tnx commit failed! tnx: " + tnx);
-            }
-        });
+        checkMessageId(futureList, true);
 
         for (int i = 0; i < TOPIC_PARTITION; i++) {
             // the target topic partition received the commit marker
@@ -220,13 +197,116 @@ public class TransactionProduceTest extends TransactionTestBase {
         System.out.println("finish test");
     }
 
+    @Test
+    public void produceAndAbortTest() throws Exception {
+        String topic = NAMESPACE1 + "/produce-abort-test";
+        PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) pulsarClient;
+        TransactionImpl txn = (TransactionImpl) pulsarClientImpl.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.SECONDS)
+                .build().get();
+
+        @Cleanup
+        ProducerImpl<byte[]> outProducer = (ProducerImpl<byte[]>) pulsarClientImpl
+                .newProducer()
+                .topic(topic)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .enableBatching(false)
+                .create();
+
+        int messageCnt = 10;
+        Set<String> messageSet = new HashSet<>();
+        List<CompletableFuture<MessageId>> futureList = new ArrayList<>();
+        for (int i = 0; i < messageCnt; i++) {
+            String msg = "Hello Txn - " + i;
+            messageSet.add(msg);
+            CompletableFuture<MessageId> produceFuture = outProducer
+                    .newMessage(txn).value(msg.getBytes(UTF_8)).sendAsync();
+            futureList.add(produceFuture);
+        }
+
+        // the target topic hasn't the abort marker before commit
+        ReadOnlyCursor originTopicCursor = getOriginTopicCursor(topic, -1);
+        Assert.assertNotNull(originTopicCursor);
+        Assert.assertFalse(originTopicCursor.hasMoreEntries());
+        originTopicCursor.close();
+
+        // the messageId callback can't be called before commit
+        checkMessageId(futureList, false);
+
+        txn.abort().get();
+
+        // the messageId callback should be called after commit
+        checkMessageId(futureList, true);
+
+        // the target topic partition doesn't have any entries
+        originTopicCursor = getOriginTopicCursor(topic, -1);
+        Assert.assertNotNull(originTopicCursor);
+        Assert.assertFalse(originTopicCursor.hasMoreEntries());
+
+        // the target topic transactionBuffer should receive the transaction messages,
+        // committing marker and commit marker
+        ReadOnlyCursor tbTopicCursor = getTBTopicCursor(topic, -1);
+        Assert.assertNotNull(tbTopicCursor);
+        Assert.assertTrue(tbTopicCursor.hasMoreEntries());
+        long tbEntriesCnt = tbTopicCursor.getNumberOfEntries();
+        log.info("transaction buffer entries count: {}", tbEntriesCnt);
+        Assert.assertEquals(tbEntriesCnt, messageCnt + 1);
+
+        PulsarApi.MessageMetadata messageMetadata;
+        List<Entry> entries = tbTopicCursor.readEntries((int) tbEntriesCnt);
+        // check the messages
+        for (int i = 0; i < messageCnt; i++) {
+            messageMetadata = Commands.parseMessageMetadata(entries.get(i).getDataBuffer());
+            Assert.assertEquals(messageMetadata.getTxnidMostBits(), txn.getTxnIdMostBits());
+            Assert.assertEquals(messageMetadata.getTxnidLeastBits(), txn.getTxnIdLeastBits());
+
+            byte[] bytes = new byte[entries.get(i).getDataBuffer().readableBytes()];
+            entries.get(i).getDataBuffer().readBytes(bytes);
+            Assert.assertTrue(messageSet.remove(new String(bytes)));
+        }
+
+        // check abort marker
+        messageMetadata = Commands.parseMessageMetadata(entries.get(messageCnt).getDataBuffer());
+        Assert.assertEquals(PulsarMarkers.MarkerType.TXN_ABORT_VALUE, messageMetadata.getMarkerType());
+
+        Assert.assertEquals(0, messageSet.size());
+        log.info("finish test produceAndAbortTest.");
+    }
+
+    private void checkMessageId(List<CompletableFuture<MessageId>> futureList, boolean isFinished) {
+        futureList.forEach(messageIdFuture -> {
+            try {
+                MessageId messageId = messageIdFuture.get(1, TimeUnit.SECONDS);
+                if (isFinished) {
+                    Assert.assertNotNull(messageId);
+                    log.info("Tnx commit success! messageId: {}", messageId);
+                } else {
+                    Assert.fail("MessageId shouldn't be get before txn abort.");
+                }
+            } catch (Exception e) {
+                if (!isFinished) {
+                    if (e instanceof TimeoutException) {
+                        log.info("This is a expected exception.");
+                    } else {
+                        log.error("This exception is not expected.", e);
+                        Assert.fail("This exception is not expected.");
+                    }
+                } else {
+                    log.error("Tnx commit failed!", e);
+                    Assert.fail("Tnx commit failed!");
+                }
+            }
+        });
+    }
+
     private ReadOnlyCursor getTBTopicCursor(String topic, int partition) {
         try {
-            String tbTopicName = PersistentTransactionBuffer.getTransactionBufferTopicName(
-                    TopicName.get(topic).toString() + TopicName.PARTITIONED_TOPIC_SUFFIX + partition);
+            String topicSuffix = partition >= 0 ? TopicName.PARTITIONED_TOPIC_SUFFIX + partition : "";
+            topic = PersistentTransactionBuffer.getTransactionBufferTopicName(
+                    TopicName.get(topic).toString() + topicSuffix);
 
             return getPulsarServiceList().get(0).getManagedLedgerFactory().openReadOnlyCursor(
-                    TopicName.get(tbTopicName).getPersistenceNamingEncoding(),
+                    TopicName.get(topic).getPersistenceNamingEncoding(),
                     PositionImpl.earliest, new ManagedLedgerConfig());
         } catch (Exception e) {
             log.error("Failed to get transaction buffer topic readonly cursor.", e);
@@ -237,9 +317,11 @@ public class TransactionProduceTest extends TransactionTestBase {
 
     private ReadOnlyCursor getOriginTopicCursor(String topic, int partition) {
         try {
-            String partitionTopic = TopicName.get(topic).toString() + TopicName.PARTITIONED_TOPIC_SUFFIX + partition;
+            if (partition >= 0) {
+                topic = TopicName.get(topic).toString() + TopicName.PARTITIONED_TOPIC_SUFFIX + partition;
+            }
             return getPulsarServiceList().get(0).getManagedLedgerFactory().openReadOnlyCursor(
-                    TopicName.get(partitionTopic).getPersistenceNamingEncoding(),
+                    TopicName.get(topic).getPersistenceNamingEncoding(),
                     PositionImpl.earliest, new ManagedLedgerConfig());
         } catch (Exception e) {
             log.error("Failed to get origin topic readonly cursor.", e);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/transaction/EndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/transaction/EndToEndTest.java
index f9c4940..a1a5028 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/transaction/EndToEndTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/transaction/EndToEndTest.java
@@ -87,7 +87,7 @@ public class EndToEndTest extends TransactionTestBase {
     }
 
     @Test
-    public void test() throws Exception {
+    public void partitionCommitTest() throws Exception {
         Transaction txn = ((PulsarClientImpl) pulsarClient)
                 .newTransaction()
                 .withTransactionTimeout(2, TimeUnit.SECONDS)
@@ -149,4 +149,47 @@ public class EndToEndTest extends TransactionTestBase {
         log.info("receive transaction messages count: {}", receiveCnt);
     }
 
+    @Test
+    public void partitionAbortTest() throws Exception {
+        Transaction txn = ((PulsarClientImpl) pulsarClient)
+                .newTransaction()
+                .withTransactionTimeout(2, TimeUnit.SECONDS)
+                .build()
+                .get();
+
+        @Cleanup
+        PartitionedProducerImpl<byte[]> producer = (PartitionedProducerImpl<byte[]>) pulsarClient
+                .newProducer()
+                .topic(TOPIC_OUTPUT)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .enableBatching(false)
+                .create();
+
+        int messageCnt = 10;
+        for (int i = 0; i < messageCnt; i++) {
+            producer.newMessage(txn).value(("Hello Txn - " + i).getBytes(UTF_8)).sendAsync();
+        }
+
+        @Cleanup
+        MultiTopicsConsumerImpl<byte[]> consumer = (MultiTopicsConsumerImpl<byte[]>) pulsarClient
+                .newConsumer()
+                .topic(TOPIC_OUTPUT)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionName("test")
+                .enableBatchIndexAcknowledgment(true)
+                .subscribe();
+
+        // Can't receive transaction messages before abort.
+        Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
+        Assert.assertNull(message);
+
+        txn.abort().get();
+
+        // Cant't receive transaction messages after abort.
+        message = consumer.receive(5, TimeUnit.SECONDS);
+        Assert.assertNull(message);
+
+        log.info("finished test partitionAbortTest");
+    }
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 4cb7a22..afc0195 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -269,6 +269,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
 
     @Override
     CompletableFuture<MessageId> internalSendAsync(Message<?> message, Transaction txn) {
+        if (txn instanceof TransactionImpl) {
+            ((TransactionImpl) txn).registerProducedTopic(topic);
+        }
 
         CompletableFuture<MessageId> future = new CompletableFuture<>();
 
@@ -343,9 +346,6 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                 nextCallback = scb;
             }
         });
-        if (txn instanceof TransactionImpl) {
-            ((TransactionImpl) txn).registerProducedTopic(topic);
-        }
         return future;
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
index 368d805..38530b7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
@@ -138,6 +138,12 @@ public class TransactionImpl implements Transaction {
 
     @Override
     public CompletableFuture<Void> abort() {
-        return FutureUtil.failedFuture(new UnsupportedOperationException("Not Implemented Yet"));
+        return tcClient.abortAsync(new TxnID(txnIdMostBits, txnIdLeastBits)).whenComplete((ignored, throwable) -> {
+            sendOps.values().forEach(txnSendOp -> {
+                txnSendOp.sendFuture.whenComplete(((messageId, t) -> {
+                    txnSendOp.transactionalSendFuture.complete(messageId);
+                }));
+            });
+        });
     }
 }