You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/10/12 01:36:22 UTC

[GitHub] [pulsar] liangyepianzhou opened a new pull request, #17777: [fix][txn] optimize the ack/send future in TransactionImpl

liangyepianzhou opened a new pull request, #17777:
URL: https://github.com/apache/pulsar/pull/17777

   ### Motivation
   The TransactionImpl stores a lot of future. This uses a lot of memory, and can be optimized to two futures. ### Modification
   Optimize the future list to single future.
   
   ### Documentation
   - [x] `doc-not-needed` 
   
   ### Matching PR in forked repository
   
   PR in forked repository: https://github.com/liangyepianzhou/pulsar/pull/5


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] liangyepianzhou commented on a diff in pull request #17777: [fix][txn] optimize the ack/send future in TransactionImpl

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on code in PR #17777:
URL: https://github.com/apache/pulsar/pull/17777#discussion_r983129636


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java:
##########
@@ -318,6 +319,84 @@ public Consumer<byte[]> getConsumer(String topicName, String subName) throws Pul
                 .subscribe();
     }
 
+    @Test
+    public void testAsyncSendOrAckForSingleFuture() throws Exception {
+        String topic = NAMESPACE1 + "/testSingleFuture";
+        int totalMessage = 10;
+        int threadSize = 30;
+        String topicName = "subscription";
+        getPulsarServiceList().get(0).getConfig().setBrokerDeduplicationEnabled(false);
+        ExecutorService executorService = Executors.newFixedThreadPool(threadSize);
+
+        //build producer/consumer
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .producerName("producer")
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .create();
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName(topicName)
+                .subscribe();
+        //store the send/ack result  futures
+        ArrayList<CompletableFuture<MessageId>> sendFutures = new ArrayList<>();
+        ArrayList<CompletableFuture<Void>> ackFutures = new ArrayList<>();
+
+        //send and ack messages with transaction
+        Transaction transaction = pulsarClient.newTransaction()
+                .withTransactionTimeout(10, TimeUnit.SECONDS)
+                .build()
+                .get();
+
+        for (int i = 0; i < totalMessage * threadSize; i++) {
+            producer.newMessage().send();
+        }
+
+        CountDownLatch countDownLatch = new CountDownLatch(threadSize);
+        new Thread(() -> {

Review Comment:
   Delete it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codecov-commenter commented on pull request #17777: [fix][txn] optimize the ack/send future in TransactionImpl

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #17777:
URL: https://github.com/apache/pulsar/pull/17777#issuecomment-1275499682

   # [Codecov](https://codecov.io/gh/apache/pulsar/pull/17777?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > :exclamation: No coverage uploaded for pull request base (`master@782764f`). [Click here to learn what that means](https://docs.codecov.io/docs/error-reference?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#section-missing-base-commit).
   > The diff coverage is `n/a`.
   
   ```diff
   @@            Coverage Diff            @@
   ##             master   #17777   +/-   ##
   =========================================
     Coverage          ?   24.56%           
     Complexity        ?     3222           
   =========================================
     Files             ?      393           
     Lines             ?    43419           
     Branches          ?     4462           
   =========================================
     Hits              ?    10668           
     Misses            ?    31066           
     Partials          ?     1685           
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | unittests | `24.56% <0.00%> (?)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17777: [fix][txn] optimize the ack/send future in TransactionImpl

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17777:
URL: https://github.com/apache/pulsar/pull/17777#discussion_r981112064


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java:
##########
@@ -251,8 +274,8 @@ private CompletableFuture<Void> invalidTxnStatusFuture() {
 
     private CompletableFuture<Void> allOpComplete() {
         List<CompletableFuture<?>> futureList = new ArrayList<>();
-        futureList.addAll(sendFutureList);
-        futureList.addAll(ackFutureList);
+        futureList.add(ackFuture);
+        futureList.add(sendFuture);
         return CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));

Review Comment:
   return CompletableFuture.allOf(ackFuture, sendFuture);



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java:
##########
@@ -61,8 +64,12 @@ public class TransactionImpl implements Transaction , TimerTask {
     private final Map<Pair<String, String>, CompletableFuture<Void>> registerSubscriptionMap;
     private final TransactionCoordinatorClientImpl tcClient;
 
-    private final ArrayList<CompletableFuture<MessageId>> sendFutureList;
-    private final ArrayList<CompletableFuture<Void>> ackFutureList;
+    private CompletableFuture<MessageId> sendFuture;
+    private CompletableFuture<Void> ackFuture;
+
+    private final AtomicLong ackCount = new AtomicLong(0);
+    private final AtomicLong sendCount = new AtomicLong(0);

Review Comment:
   Please use the Static AtomicLongFieldUpdater to avoid create AtomicLong instance for each Transaction



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java:
##########
@@ -123,8 +130,16 @@ public CompletableFuture<Void> registerProducedTopic(String topic) {
         return completableFuture;
     }
 
-    public synchronized void registerSendOp(CompletableFuture<MessageId> sendFuture) {
-        sendFutureList.add(sendFuture);
+    public void registerSendOp(CompletableFuture<MessageId> newSendFuture) {
+        if (sendCount.getAndIncrement() == 0) {
+            sendFuture = new CompletableFuture<>();
+        }
+        newSendFuture.thenRun(() -> {

Review Comment:
   How about the future completed with an exception? We will miss the update of the sendCount



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java:
##########
@@ -318,6 +319,84 @@ public Consumer<byte[]> getConsumer(String topicName, String subName) throws Pul
                 .subscribe();
     }
 
+    @Test
+    public void testAsyncSendOrAckForSingleFuture() throws Exception {
+        String topic = NAMESPACE1 + "/testSingleFuture";
+        int totalMessage = 10;
+        int threadSize = 30;
+        String topicName = "subscription";
+        getPulsarServiceList().get(0).getConfig().setBrokerDeduplicationEnabled(false);
+        ExecutorService executorService = Executors.newFixedThreadPool(threadSize);
+
+        //build producer/consumer
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .producerName("producer")
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .create();
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName(topicName)
+                .subscribe();
+        //store the send/ack result  futures
+        ArrayList<CompletableFuture<MessageId>> sendFutures = new ArrayList<>();
+        ArrayList<CompletableFuture<Void>> ackFutures = new ArrayList<>();
+
+        //send and ack messages with transaction
+        Transaction transaction = pulsarClient.newTransaction()
+                .withTransactionTimeout(10, TimeUnit.SECONDS)
+                .build()
+                .get();
+
+        for (int i = 0; i < totalMessage * threadSize; i++) {
+            producer.newMessage().send();
+        }
+
+        CountDownLatch countDownLatch = new CountDownLatch(threadSize);
+        new Thread(() -> {

Review Comment:
   Why need a new thread here.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java:
##########
@@ -318,6 +319,84 @@ public Consumer<byte[]> getConsumer(String topicName, String subName) throws Pul
                 .subscribe();
     }
 
+    @Test
+    public void testAsyncSendOrAckForSingleFuture() throws Exception {
+        String topic = NAMESPACE1 + "/testSingleFuture";
+        int totalMessage = 10;
+        int threadSize = 30;
+        String topicName = "subscription";
+        getPulsarServiceList().get(0).getConfig().setBrokerDeduplicationEnabled(false);
+        ExecutorService executorService = Executors.newFixedThreadPool(threadSize);
+
+        //build producer/consumer
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .producerName("producer")
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .create();
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName(topicName)
+                .subscribe();
+        //store the send/ack result  futures
+        ArrayList<CompletableFuture<MessageId>> sendFutures = new ArrayList<>();
+        ArrayList<CompletableFuture<Void>> ackFutures = new ArrayList<>();
+
+        //send and ack messages with transaction
+        Transaction transaction = pulsarClient.newTransaction()
+                .withTransactionTimeout(10, TimeUnit.SECONDS)
+                .build()
+                .get();
+
+        for (int i = 0; i < totalMessage * threadSize; i++) {
+            producer.newMessage().send();
+        }
+
+        CountDownLatch countDownLatch = new CountDownLatch(threadSize);
+        new Thread(() -> {
+            for (int i = 0; i < threadSize; i++) {
+                executorService.submit(() -> {
+                    try {
+                        for (int j = 0; j < totalMessage; j++) {
+                            CompletableFuture<MessageId> sendFuture = producer.newMessage(transaction).sendAsync();
+                            sendFutures.add(sendFuture);
+                            Message<byte[]> message = consumer.receive();
+                            CompletableFuture<Void> ackFuture = consumer.acknowledgeAsync(message.getMessageId(),
+                                    transaction);
+                            ackFutures.add(ackFuture);
+                        }
+                        countDownLatch.countDown();
+                    } catch (Exception e) {
+                        log.error("Failed to send/ack messages with transaction.", e);

Review Comment:
   Should also update the `countDownLatch`?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java:
##########
@@ -97,8 +104,8 @@ public enum State {
         this.registerSubscriptionMap = new ConcurrentHashMap<>();
         this.tcClient = client.getTcClient();
 
-        this.sendFutureList = new ArrayList<>();
-        this.ackFutureList = new ArrayList<>();
+        this.sendFuture = new CompletableFuture<>();
+        this.ackFuture = new CompletableFuture<>();

Review Comment:
   It should be completed future or null?
   Otherwise, if you don't have ack operation in this transaction, the allOpComplete method will never be completed.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java:
##########
@@ -123,8 +130,16 @@ public CompletableFuture<Void> registerProducedTopic(String topic) {
         return completableFuture;
     }
 
-    public synchronized void registerSendOp(CompletableFuture<MessageId> sendFuture) {
-        sendFutureList.add(sendFuture);
+    public void registerSendOp(CompletableFuture<MessageId> newSendFuture) {
+        if (sendCount.getAndIncrement() == 0) {
+            sendFuture = new CompletableFuture<>();
+        }

Review Comment:
   Looks like the sendFuture and ackCount can be final.
   So that we don't need to add a check here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17777: [fix][txn] optimize the ack/send future in TransactionImpl

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17777:
URL: https://github.com/apache/pulsar/pull/17777#discussion_r983069566


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java:
##########
@@ -123,8 +130,16 @@ public CompletableFuture<Void> registerProducedTopic(String topic) {
         return completableFuture;
     }
 
-    public synchronized void registerSendOp(CompletableFuture<MessageId> sendFuture) {
-        sendFutureList.add(sendFuture);
+    public void registerSendOp(CompletableFuture<MessageId> newSendFuture) {
+        if (sendCount.getAndIncrement() == 0) {
+            sendFuture = new CompletableFuture<>();
+        }

Review Comment:
   @liangyepianzhou Ignore this one.
   
   It related here https://github.com/apache/pulsar/pull/17777#discussion_r981114108
   I see the constructor already set sendFuture and ackFuture to `new CompletableFuture<>()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17777: [fix][txn] optimize the ack/send future in TransactionImpl

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17777:
URL: https://github.com/apache/pulsar/pull/17777#discussion_r990895203


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java:
##########
@@ -123,8 +125,23 @@ public CompletableFuture<Void> registerProducedTopic(String topic) {
         return completableFuture;
     }
 
-    public synchronized void registerSendOp(CompletableFuture<MessageId> sendFuture) {
-        sendFutureList.add(sendFuture);
+    public void registerSendOp(CompletableFuture<MessageId> newSendFuture) {
+        if (OP_COUNT_UPDATE.getAndIncrement(this) == 0) {
+            opFuture = new CompletableFuture<>();

Review Comment:
   If the transaction has three ops, and all of the ops are completed.
   
   Then a new op happened on this transaction, we will miss the new op, right?
   Because the user can able to commit or abort the transaction without waiting the last op.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] liangyepianzhou commented on a diff in pull request #17777: [fix][txn] optimize the ack/send future in TransactionImpl

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on code in PR #17777:
URL: https://github.com/apache/pulsar/pull/17777#discussion_r983129366


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java:
##########
@@ -61,8 +64,12 @@ public class TransactionImpl implements Transaction , TimerTask {
     private final Map<Pair<String, String>, CompletableFuture<Void>> registerSubscriptionMap;
     private final TransactionCoordinatorClientImpl tcClient;
 
-    private final ArrayList<CompletableFuture<MessageId>> sendFutureList;
-    private final ArrayList<CompletableFuture<Void>> ackFutureList;
+    private CompletableFuture<MessageId> sendFuture;
+    private CompletableFuture<Void> ackFuture;
+
+    private final AtomicLong ackCount = new AtomicLong(0);
+    private final AtomicLong sendCount = new AtomicLong(0);

Review Comment:
   We do need an AtomicLong for each Transaction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] liangyepianzhou commented on a diff in pull request #17777: [fix][txn] optimize the ack/send future in TransactionImpl

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on code in PR #17777:
URL: https://github.com/apache/pulsar/pull/17777#discussion_r979206264


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java:
##########
@@ -123,8 +132,25 @@ public CompletableFuture<Void> registerProducedTopic(String topic) {
         return completableFuture;
     }
 
-    public synchronized void registerSendOp(CompletableFuture<MessageId> sendFuture) {
-        sendFutureList.add(sendFuture);
+    public void registerSendOp(CompletableFuture<MessageId> newSendFuture) {
+        sendLock.lock();
+        try {
+            if (sendCount.getAndIncrement() == 0) {
+                sendFuture = new CompletableFuture<>();
+            }
+        } finally {
+            sendLock.unlock();
+        }
+        newSendFuture.thenRun(() -> {
+            sendLock.lock();

Review Comment:
   
   1. decrease the opCount 1 -> 0 
   2. check opCount == 0, and then increase the opCount 0 -> 1
   3. future = new future;
   4. future.complete
   5. finally, the future is completely and opCount is 1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] liangyepianzhou merged pull request #17777: [fix][txn] optimize the ack/send future in TransactionImpl

Posted by GitBox <gi...@apache.org>.
liangyepianzhou merged PR #17777:
URL: https://github.com/apache/pulsar/pull/17777


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] liangyepianzhou commented on a diff in pull request #17777: [fix][txn] optimize the ack/send future in TransactionImpl

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on code in PR #17777:
URL: https://github.com/apache/pulsar/pull/17777#discussion_r983129366


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java:
##########
@@ -61,8 +64,12 @@ public class TransactionImpl implements Transaction , TimerTask {
     private final Map<Pair<String, String>, CompletableFuture<Void>> registerSubscriptionMap;
     private final TransactionCoordinatorClientImpl tcClient;
 
-    private final ArrayList<CompletableFuture<MessageId>> sendFutureList;
-    private final ArrayList<CompletableFuture<Void>> ackFutureList;
+    private CompletableFuture<MessageId> sendFuture;
+    private CompletableFuture<Void> ackFuture;
+
+    private final AtomicLong ackCount = new AtomicLong(0);
+    private final AtomicLong sendCount = new AtomicLong(0);

Review Comment:
   We do need an AtomicLong for each Transaction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui closed pull request #17777: [fix][txn] optimize the ack/send future in TransactionImpl

Posted by GitBox <gi...@apache.org>.
codelipenghui closed pull request #17777: [fix][txn] optimize the ack/send future in TransactionImpl
URL: https://github.com/apache/pulsar/pull/17777


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #17777: [fix][txn] optimize the ack/send future in TransactionImpl

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #17777:
URL: https://github.com/apache/pulsar/pull/17777#discussion_r980116725


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java:
##########
@@ -318,6 +319,84 @@ public Consumer<byte[]> getConsumer(String topicName, String subName) throws Pul
                 .subscribe();
     }
 
+    @Test
+    public void testAsyncSendOrAckForSingleFuture() throws Exception {
+        String topic = NAMESPACE1 + "/testSingleFuture";
+        int totalMessage = 10;
+        int threadSize = 30;
+        String topicName = "subscription";
+        getPulsarServiceList().get(0).getConfig().setBrokerDeduplicationEnabled(false);
+        ExecutorService executorService = Executors.newFixedThreadPool(threadSize);
+
+        //build producer/consumer
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .producerName("producer")
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .create();
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName(topicName)
+                .subscribe();
+        //store the send/ack result  futures
+        ArrayList<CompletableFuture<MessageId>> sendFutures = new ArrayList<>();
+        ArrayList<CompletableFuture<Void>> ackFutures = new ArrayList<>();
+
+        //send and ack messages with transaction
+        Transaction transaction = pulsarClient.newTransaction()
+                .withTransactionTimeout(10, TimeUnit.SECONDS)
+                .build()
+                .get();
+
+        for (int i = 0; i < totalMessage * threadSize; i++) {
+            producer.newMessage().send();
+        }
+
+        CountDownLatch countDownLatch = new CountDownLatch(threadSize);
+        new Thread(() -> {
+            for (int i = 0; i < threadSize; i++) {
+                executorService.submit(() -> {
+                    try {
+                        for (int j = 0; j < totalMessage; j++) {
+                            CompletableFuture<MessageId> sendFuture = producer.newMessage(transaction).sendAsync();
+                            sendFutures.add(sendFuture);
+                            Message<byte[]> message = consumer.receive();
+                            CompletableFuture<Void> ackFuture = consumer.acknowledgeAsync(message.getMessageId(),
+                                    transaction);
+                            ackFutures.add(ackFuture);
+                        }
+                        countDownLatch.countDown();
+                    } catch (Exception e) {
+                        log.error("Failed to send/ack messages with transaction.", e);
+                    }
+                });
+            }
+        }).start();
+        //wait the all send/ack op is excuted and store its futures in the arraylist.
+        countDownLatch.await(10, TimeUnit.SECONDS);
+        transaction.commit().get();
+
+        //verify the final status is right.
+        Field ackCountField = TransactionImpl.class.getDeclaredField("ackCount");

Review Comment:
   can use WhiteboxImpl



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java:
##########
@@ -61,8 +64,12 @@ public class TransactionImpl implements Transaction , TimerTask {
     private final Map<Pair<String, String>, CompletableFuture<Void>> registerSubscriptionMap;
     private final TransactionCoordinatorClientImpl tcClient;
 
-    private final ArrayList<CompletableFuture<MessageId>> sendFutureList;
-    private final ArrayList<CompletableFuture<Void>> ackFutureList;
+    private CompletableFuture<MessageId> sendFuture;
+    private CompletableFuture<Void> ackFuture;
+
+    private AtomicLong ackCount = new AtomicLong(0);

Review Comment:
   final



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #17777: [fix][txn] optimize the ack/send future in TransactionImpl

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #17777:
URL: https://github.com/apache/pulsar/pull/17777#discussion_r979200390


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java:
##########
@@ -123,8 +132,25 @@ public CompletableFuture<Void> registerProducedTopic(String topic) {
         return completableFuture;
     }
 
-    public synchronized void registerSendOp(CompletableFuture<MessageId> sendFuture) {
-        sendFutureList.add(sendFuture);
+    public void registerSendOp(CompletableFuture<MessageId> newSendFuture) {
+        sendLock.lock();
+        try {
+            if (sendCount.getAndIncrement() == 0) {
+                sendFuture = new CompletableFuture<>();
+            }
+        } finally {
+            sendLock.unlock();
+        }
+        newSendFuture.thenRun(() -> {
+            sendLock.lock();

Review Comment:
   why do we need this lock?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] liangyepianzhou commented on a diff in pull request #17777: [fix][txn] optimize the ack/send future in TransactionImpl

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on code in PR #17777:
URL: https://github.com/apache/pulsar/pull/17777#discussion_r981219172


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java:
##########
@@ -123,8 +130,16 @@ public CompletableFuture<Void> registerProducedTopic(String topic) {
         return completableFuture;
     }
 
-    public synchronized void registerSendOp(CompletableFuture<MessageId> sendFuture) {
-        sendFutureList.add(sendFuture);
+    public void registerSendOp(CompletableFuture<MessageId> newSendFuture) {
+        if (sendCount.getAndIncrement() == 0) {
+            sendFuture = new CompletableFuture<>();
+        }

Review Comment:
   Hello Penghui, I do not understand why the sendFuture can be final.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org