You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/27 11:33:34 UTC

[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17777: [fix][txn] optimize the ack/send future in TransactionImpl

codelipenghui commented on code in PR #17777:
URL: https://github.com/apache/pulsar/pull/17777#discussion_r981112064


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java:
##########
@@ -251,8 +274,8 @@ private CompletableFuture<Void> invalidTxnStatusFuture() {
 
     private CompletableFuture<Void> allOpComplete() {
         List<CompletableFuture<?>> futureList = new ArrayList<>();
-        futureList.addAll(sendFutureList);
-        futureList.addAll(ackFutureList);
+        futureList.add(ackFuture);
+        futureList.add(sendFuture);
         return CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));

Review Comment:
   return CompletableFuture.allOf(ackFuture, sendFuture);



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java:
##########
@@ -61,8 +64,12 @@ public class TransactionImpl implements Transaction , TimerTask {
     private final Map<Pair<String, String>, CompletableFuture<Void>> registerSubscriptionMap;
     private final TransactionCoordinatorClientImpl tcClient;
 
-    private final ArrayList<CompletableFuture<MessageId>> sendFutureList;
-    private final ArrayList<CompletableFuture<Void>> ackFutureList;
+    private CompletableFuture<MessageId> sendFuture;
+    private CompletableFuture<Void> ackFuture;
+
+    private final AtomicLong ackCount = new AtomicLong(0);
+    private final AtomicLong sendCount = new AtomicLong(0);

Review Comment:
   Please use the Static AtomicLongFieldUpdater to avoid create AtomicLong instance for each Transaction



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java:
##########
@@ -123,8 +130,16 @@ public CompletableFuture<Void> registerProducedTopic(String topic) {
         return completableFuture;
     }
 
-    public synchronized void registerSendOp(CompletableFuture<MessageId> sendFuture) {
-        sendFutureList.add(sendFuture);
+    public void registerSendOp(CompletableFuture<MessageId> newSendFuture) {
+        if (sendCount.getAndIncrement() == 0) {
+            sendFuture = new CompletableFuture<>();
+        }
+        newSendFuture.thenRun(() -> {

Review Comment:
   How about the future completed with an exception? We will miss the update of the sendCount



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java:
##########
@@ -318,6 +319,84 @@ public Consumer<byte[]> getConsumer(String topicName, String subName) throws Pul
                 .subscribe();
     }
 
+    @Test
+    public void testAsyncSendOrAckForSingleFuture() throws Exception {
+        String topic = NAMESPACE1 + "/testSingleFuture";
+        int totalMessage = 10;
+        int threadSize = 30;
+        String topicName = "subscription";
+        getPulsarServiceList().get(0).getConfig().setBrokerDeduplicationEnabled(false);
+        ExecutorService executorService = Executors.newFixedThreadPool(threadSize);
+
+        //build producer/consumer
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .producerName("producer")
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .create();
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName(topicName)
+                .subscribe();
+        //store the send/ack result  futures
+        ArrayList<CompletableFuture<MessageId>> sendFutures = new ArrayList<>();
+        ArrayList<CompletableFuture<Void>> ackFutures = new ArrayList<>();
+
+        //send and ack messages with transaction
+        Transaction transaction = pulsarClient.newTransaction()
+                .withTransactionTimeout(10, TimeUnit.SECONDS)
+                .build()
+                .get();
+
+        for (int i = 0; i < totalMessage * threadSize; i++) {
+            producer.newMessage().send();
+        }
+
+        CountDownLatch countDownLatch = new CountDownLatch(threadSize);
+        new Thread(() -> {

Review Comment:
   Why need a new thread here.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java:
##########
@@ -318,6 +319,84 @@ public Consumer<byte[]> getConsumer(String topicName, String subName) throws Pul
                 .subscribe();
     }
 
+    @Test
+    public void testAsyncSendOrAckForSingleFuture() throws Exception {
+        String topic = NAMESPACE1 + "/testSingleFuture";
+        int totalMessage = 10;
+        int threadSize = 30;
+        String topicName = "subscription";
+        getPulsarServiceList().get(0).getConfig().setBrokerDeduplicationEnabled(false);
+        ExecutorService executorService = Executors.newFixedThreadPool(threadSize);
+
+        //build producer/consumer
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .producerName("producer")
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .create();
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName(topicName)
+                .subscribe();
+        //store the send/ack result  futures
+        ArrayList<CompletableFuture<MessageId>> sendFutures = new ArrayList<>();
+        ArrayList<CompletableFuture<Void>> ackFutures = new ArrayList<>();
+
+        //send and ack messages with transaction
+        Transaction transaction = pulsarClient.newTransaction()
+                .withTransactionTimeout(10, TimeUnit.SECONDS)
+                .build()
+                .get();
+
+        for (int i = 0; i < totalMessage * threadSize; i++) {
+            producer.newMessage().send();
+        }
+
+        CountDownLatch countDownLatch = new CountDownLatch(threadSize);
+        new Thread(() -> {
+            for (int i = 0; i < threadSize; i++) {
+                executorService.submit(() -> {
+                    try {
+                        for (int j = 0; j < totalMessage; j++) {
+                            CompletableFuture<MessageId> sendFuture = producer.newMessage(transaction).sendAsync();
+                            sendFutures.add(sendFuture);
+                            Message<byte[]> message = consumer.receive();
+                            CompletableFuture<Void> ackFuture = consumer.acknowledgeAsync(message.getMessageId(),
+                                    transaction);
+                            ackFutures.add(ackFuture);
+                        }
+                        countDownLatch.countDown();
+                    } catch (Exception e) {
+                        log.error("Failed to send/ack messages with transaction.", e);

Review Comment:
   Should also update the `countDownLatch`?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java:
##########
@@ -97,8 +104,8 @@ public enum State {
         this.registerSubscriptionMap = new ConcurrentHashMap<>();
         this.tcClient = client.getTcClient();
 
-        this.sendFutureList = new ArrayList<>();
-        this.ackFutureList = new ArrayList<>();
+        this.sendFuture = new CompletableFuture<>();
+        this.ackFuture = new CompletableFuture<>();

Review Comment:
   It should be completed future or null?
   Otherwise, if you don't have ack operation in this transaction, the allOpComplete method will never be completed.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java:
##########
@@ -123,8 +130,16 @@ public CompletableFuture<Void> registerProducedTopic(String topic) {
         return completableFuture;
     }
 
-    public synchronized void registerSendOp(CompletableFuture<MessageId> sendFuture) {
-        sendFutureList.add(sendFuture);
+    public void registerSendOp(CompletableFuture<MessageId> newSendFuture) {
+        if (sendCount.getAndIncrement() == 0) {
+            sendFuture = new CompletableFuture<>();
+        }

Review Comment:
   Looks like the sendFuture and ackCount can be final.
   So that we don't need to add a check here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org