You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/23 03:22:22 UTC

[pulsar] branch branch-2.7 updated: [Transaction] Fix transaction messages order error and deduplication error (#9024)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new f6240a0  [Transaction] Fix transaction messages order error and deduplication error (#9024)
f6240a0 is described below

commit f6240a09d2bc2c893345d87eaaf706ae5ab42267
Author: ran <ga...@126.com>
AuthorDate: Wed Dec 23 11:20:39 2020 +0800

    [Transaction] Fix transaction messages order error and deduplication error (#9024)
    
    ### Motivation
    
    Currently, the transaction messages would be produced in the wrong order, and the deduplication check is not work well.
    
    ### Modifications
    
    *Describe the modifications you've done.*
    
    ### Verifying this change
    
    This change added tests and can be verified as follows:
    
    - *org.apache.pulsar.client.impl.TransactionEndToEndTest#produceTxnMessageOrderTest*
    
    (cherry picked from commit a5c9c159adfdb5102b490e42fdb189e9c3c4ecaf)
---
 .../buffer/impl/TopicTransactionBuffer.java        | 58 ++++++++++++++--------
 .../broker/transaction/TransactionTestBase.java    |  1 +
 .../client/impl/TransactionEndToEndTest.java       | 39 +++++++++++++++
 .../client/impl/TypedMessageBuilderImpl.java       |  4 +-
 .../client/impl/transaction/TransactionImpl.java   | 51 +++++++++----------
 5 files changed, 103 insertions(+), 50 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index c63d5d1..c8f4fbc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -23,8 +23,9 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
-import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
@@ -54,14 +55,18 @@ public class TopicTransactionBuffer implements TransactionBuffer {
     @Override
     public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) {
         CompletableFuture<Position> completableFuture = new CompletableFuture<>();
-        topic.publishMessage(buffer, (e, ledgerId, entryId) -> {
-            if (e != null) {
-                log.error("Failed to append buffer to txn {}", txnId, e);
-                completableFuture.completeExceptionally(e);
-                return;
+        topic.getManagedLedger().asyncAddEntry(buffer, new AsyncCallbacks.AddEntryCallback() {
+            @Override
+            public void addComplete(Position position, Object ctx) {
+                completableFuture.complete(position);
             }
-            completableFuture.complete(PositionImpl.get(ledgerId, entryId));
-        });
+
+            @Override
+            public void addFailed(ManagedLedgerException exception, Object ctx) {
+                log.error("Failed to append buffer to txn {}", txnId, exception);
+                completableFuture.completeExceptionally(exception);
+            }
+        }, null);
         return completableFuture;
     }
 
@@ -78,14 +83,19 @@ public class TopicTransactionBuffer implements TransactionBuffer {
 
         ByteBuf commitMarker = Markers.newTxnCommitMarker(-1L, txnID.getMostSigBits(),
                 txnID.getLeastSigBits(), getMessageIdDataList(sendMessageIdList));
-        topic.publishMessage(commitMarker, (e, ledgerId, entryId) -> {
-            if (e != null) {
-                log.error("Failed to commit for txn {}", txnID, e);
-                completableFuture.completeExceptionally(e);
-                return;
+
+        topic.getManagedLedger().asyncAddEntry(commitMarker, new AsyncCallbacks.AddEntryCallback() {
+            @Override
+            public void addComplete(Position position, Object ctx) {
+                completableFuture.complete(null);
             }
-            completableFuture.complete(null);
-        });
+
+            @Override
+            public void addFailed(ManagedLedgerException exception, Object ctx) {
+                log.error("Failed to commit for txn {}", txnID, exception);
+                completableFuture.completeExceptionally(exception);
+            }
+        }, null);
         return completableFuture;
     }
 
@@ -98,14 +108,18 @@ public class TopicTransactionBuffer implements TransactionBuffer {
 
         ByteBuf abortMarker = Markers.newTxnAbortMarker(
                 -1L, txnID.getMostSigBits(), txnID.getLeastSigBits(), getMessageIdDataList(sendMessageIdList));
-        topic.publishMessage(abortMarker, (e, ledgerId, entryId) -> {
-            if (e != null) {
-                log.error("Failed to abort for txn {}", txnID, e);
-                completableFuture.completeExceptionally(e);
-                return;
+        topic.getManagedLedger().asyncAddEntry(abortMarker, new AsyncCallbacks.AddEntryCallback() {
+            @Override
+            public void addComplete(Position position, Object ctx) {
+                completableFuture.complete(null);
+            }
+
+            @Override
+            public void addFailed(ManagedLedgerException exception, Object ctx) {
+                log.error("Failed to abort for txn {}", txnID, exception);
+                completableFuture.completeExceptionally(exception);
             }
-            completableFuture.complete(null);
-        });
+        }, null);
         return completableFuture;
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index 8e1a605..d061c82 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -116,6 +116,7 @@ public class TransactionTestBase {
             conf.setWebServicePort(Optional.of(0));
             conf.setWebServicePortTls(Optional.of(0));
             conf.setTransactionCoordinatorEnabled(true);
+            conf.setBrokerDeduplicationEnabled(true);
             serviceConfigurationList.add(conf);
 
             PulsarService pulsar = spy(new PulsarService(conf));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index db3d9fe..104eb97 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -570,6 +570,7 @@ public class TransactionEndToEndTest extends TransactionTestBase {
     @Test
     public void txnMetadataHandlerRecoverTest() throws Exception {
         String topic = NAMESPACE1 + "/tc-metadata-handler-recover";
+        @Cleanup
         Producer<byte[]> producer = pulsarClient.newProducer()
                 .topic(topic)
                 .sendTimeout(0, TimeUnit.SECONDS)
@@ -592,6 +593,7 @@ public class TransactionEndToEndTest extends TransactionTestBase {
         }
 
         pulsarClient.close();
+        @Cleanup
         PulsarClientImpl recoverPulsarClient = (PulsarClientImpl) PulsarClient.builder()
                 .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
                 .statsInterval(0, TimeUnit.SECONDS)
@@ -603,6 +605,7 @@ public class TransactionEndToEndTest extends TransactionTestBase {
             tcClient.commit(entry.getKey(), entry.getValue());
         }
 
+        @Cleanup
         Consumer<byte[]> consumer = recoverPulsarClient.newConsumer()
                 .topic(topic)
                 .subscriptionName("test")
@@ -615,4 +618,40 @@ public class TransactionEndToEndTest extends TransactionTestBase {
         }
     }
 
+    @Test
+    public void produceTxnMessageOrderTest() throws Exception {
+        String topic = NAMESPACE1 + "/txn-produce-order";
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("test")
+                .subscribe();
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .producerName("txn-publish-order")
+                .create();
+
+        for (int ti = 0; ti < 10; ti++) {
+            Transaction txn = pulsarClient
+                    .newTransaction()
+                    .withTransactionTimeout(2, TimeUnit.SECONDS)
+                    .build().get();
+
+            for (int i = 0; i < 1000; i++) {
+                producer.newMessage(txn).value(("" + i).getBytes()).sendAsync();
+            }
+            txn.commit().get();
+
+            for (int i = 0; i < 1000; i++) {
+                Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
+                Assert.assertNotNull(message);
+                Assert.assertEquals(Integer.valueOf(new String(message.getData())), new Integer(i));
+            }
+        }
+    }
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
index c922b74..b911266 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
@@ -74,9 +74,7 @@ public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> {
         }
         msgMetadataBuilder.setTxnidLeastBits(txn.getTxnIdLeastBits());
         msgMetadataBuilder.setTxnidMostBits(txn.getTxnIdMostBits());
-        long sequenceId = txn.nextSequenceId();
-        msgMetadataBuilder.setSequenceId(sequenceId);
-        return sequenceId;
+        return -1L;
     }
 
     @Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
index 3036c07..5efb58c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
@@ -26,6 +26,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.collect.Lists;
@@ -55,10 +57,9 @@ public class TransactionImpl implements Transaction {
     private final long transactionTimeoutMs;
     private final long txnIdLeastBits;
     private final long txnIdMostBits;
-    private final AtomicLong sequenceId = new AtomicLong(0L);
 
-    private final Set<String> producedTopics;
-    private final Set<String> ackedTopics;
+    private final Map<String, CompletableFuture<Void>> registerPartitionMap;
+    private final Map<String, CompletableFuture<Void>> registerSubscriptionMap;
     private final TransactionCoordinatorClientImpl tcClient;
     private Map<ConsumerImpl<?>, Integer> cumulativeAckConsumers;
 
@@ -74,29 +75,27 @@ public class TransactionImpl implements Transaction {
         this.txnIdLeastBits = txnIdLeastBits;
         this.txnIdMostBits = txnIdMostBits;
 
-        this.producedTopics = new HashSet<>();
-        this.ackedTopics = new HashSet<>();
+        this.registerPartitionMap = new ConcurrentHashMap<>();
+        this.registerSubscriptionMap = new ConcurrentHashMap<>();
         this.tcClient = client.getTcClient();
 
         this.sendFutureList = new ArrayList<>();
         this.ackFutureList = new ArrayList<>();
     }
 
-    public long nextSequenceId() {
-        return sequenceId.getAndIncrement();
-    }
-
     // register the topics that will be modified by this transaction
     public synchronized CompletableFuture<Void> registerProducedTopic(String topic) {
         CompletableFuture<Void> completableFuture = new CompletableFuture<>();
-        if (producedTopics.add(topic)) {
-            // we need to issue the request to TC to register the produced topic
-            completableFuture = tcClient.addPublishPartitionToTxnAsync(
-                    new TxnID(txnIdMostBits, txnIdLeastBits), Lists.newArrayList(topic));
-        } else {
-            completableFuture.complete(null);
-        }
-        return completableFuture;
+        // we need to issue the request to TC to register the produced topic
+        return registerPartitionMap.compute(topic, (key, future) -> {
+            if (future != null) {
+                return future.thenCompose(ignored -> CompletableFuture.completedFuture(null));
+            } else {
+                return tcClient.addPublishPartitionToTxnAsync(
+                            new TxnID(txnIdMostBits, txnIdLeastBits), Lists.newArrayList(topic))
+                        .thenCompose(ignored -> CompletableFuture.completedFuture(null));
+            }
+        });
     }
 
     public synchronized void registerSendOp(CompletableFuture<MessageId> sendFuture) {
@@ -106,14 +105,16 @@ public class TransactionImpl implements Transaction {
     // register the topics that will be modified by this transaction
     public synchronized CompletableFuture<Void> registerAckedTopic(String topic, String subscription) {
         CompletableFuture<Void> completableFuture = new CompletableFuture<>();
-        if (ackedTopics.add(topic)) {
-            // we need to issue the request to TC to register the acked topic
-            completableFuture = tcClient.addSubscriptionToTxnAsync(
-                    new TxnID(txnIdMostBits, txnIdLeastBits), topic, subscription);
-        } else {
-            completableFuture.complete(null);
-        }
-        return completableFuture;
+        // we need to issue the request to TC to register the acked topic
+        return registerSubscriptionMap.compute(topic, (key, future) -> {
+            if (future != null) {
+                return future.thenCompose(ignored -> CompletableFuture.completedFuture(null));
+            } else {
+                return tcClient.addSubscriptionToTxnAsync(
+                        new TxnID(txnIdMostBits, txnIdLeastBits), topic, subscription)
+                        .thenCompose(ignored -> CompletableFuture.completedFuture(null));
+            }
+        });
     }
 
     public synchronized void registerAckOp(CompletableFuture<Void> ackFuture) {