You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/05/24 02:03:07 UTC

[GitHub] [pulsar] liangyepianzhou opened a new pull request, #15729: [improve][txn] Support ack message list for transaction

liangyepianzhou opened a new pull request, #15729:
URL: https://github.com/apache/pulsar/pull/15729

   ### Motivation
   Now, there is
   ```java
   doAcknowledgeWithTxn(List<MessageId> messageIdList, AckType ackType,
                                                              Map<String, Long> properties, TransactionImpl txn)
   ```
   But not interface
   ```java
   acknowledgeAsync(List<MessageId> messageIdList, Transaction txn)
   ```
   ### Modification
   Add interface
   ```java
   acknowledgeAsync(List<MessageId> messageIdList, Transaction txn)
   ```
   
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API: (yes / no)
     - The schema: (yes / no / don't know)
     - The default values of configurations: (yes / no)
     - The wire protocol: (yes / no)
     - The rest endpoints: (yes / no)
     - The admin cli options: (yes / no)
     - Anything that affects deployment: (yes / no / don't know)
   
   ### Documentation
   
   Check the box below or label this PR directly.
   
   Need to update docs? 
   
   - [x] `doc-required` 
   (Your PR needs to update docs and you will update later)
     
   - [ ] `no-need-doc` 
   (Please explain why)
     
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-added`
   (Docs have been already added)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] gaoran10 commented on a diff in pull request #15729: [improve][txn] Support ack message list for transaction

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on code in PR #15729:
URL: https://github.com/apache/pulsar/pull/15729#discussion_r913169454


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -2704,6 +2722,58 @@ private CompletableFuture<Void> doTransactionAcknowledgeForResponse(MessageId me
         }
     }
 
+    private CompletableFuture<Void> doTransactionAcknowledgeForResponse(List<MessageId> messageIds, AckType ackType,
+                                                                        ValidationError validationError,
+                                                                        Map<String, Long> properties, TxnID txnID) {
+        BitSetRecyclable bitSetRecyclable = null;
+        long ledgerId;
+        long entryId;
+        ByteBuf cmd;
+        long requestId = client.newRequestId();
+        List<MessageIdData> messageIdDataList = new LinkedList<>();
+        for (MessageId messageId : messageIds) {

Review Comment:
   Do we need to check if existing ack position conflication? It seems that the broker also will check conflication, maybe we could add conflication check at client.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #15729: [improve][txn] Support ack message list for transaction

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #15729:
URL: https://github.com/apache/pulsar/pull/15729#discussion_r881413533


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java:
##########
@@ -492,11 +492,30 @@ public CompletableFuture<Void> acknowledgeAsync(Messages<?> messages) {
         return acknowledgeAsync(messageIds);
     }
 
+    @Override
+    public CompletableFuture<Void> acknowledgeAsync(Messages<?> messages, Transaction txn) {
+        List<MessageId> messageIds = new ArrayList<>(messages.size());
+        for (Message<?> message: messages) {
+            try {
+                validateMessageId(message);
+            } catch (PulsarClientException e) {
+                return FutureUtil.failedFuture(e);
+            }
+            messageIds.add(message.getMessageId());
+        }
+        return acknowledgeAsync(messageIds, txn);
+    }
+
     @Override
     public CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList) {
         return doAcknowledgeWithTxn(messageIdList, AckType.Individual, Collections.emptyMap(), null);
     }
 
+    @Override
+    public CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList, Transaction txn) {
+        return doAcknowledgeWithTxn(messageIdList, AckType.Individual, Collections.emptyMap(), (TransactionImpl) txn);

Review Comment:
   should register the partition topic and doAckForResponse



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] gaoran10 commented on a diff in pull request #15729: [improve][txn] Support ack message list for transaction

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on code in PR #15729:
URL: https://github.com/apache/pulsar/pull/15729#discussion_r880019671


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java:
##########
@@ -997,4 +998,47 @@ public void testConsistencyOfTransactionStatsAtEndTxn() throws Exception {
 
         transaction.commit().get();
     }
+
+    @Test
+    public void testTransactionAckMessageList() throws Exception {
+        String topic = "persistent://" + NAMESPACE1 +"/test";
+        String subName = "testSub";
+
+        Producer<byte[]> producer = pulsarClient.newProducer()

Review Comment:
   Please add the annotation `@Cleanup` for the producer.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java:
##########
@@ -997,4 +998,47 @@ public void testConsistencyOfTransactionStatsAtEndTxn() throws Exception {
 
         transaction.commit().get();
     }
+
+    @Test
+    public void testTransactionAckMessageList() throws Exception {
+        String topic = "persistent://" + NAMESPACE1 +"/test";
+        String subName = "testSub";
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .sendTimeout(5, TimeUnit.SECONDS)
+                .create();
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName(subName)
+                .subscribe();
+
+        for (int i = 0; i < 5; i++) {
+            producer.newMessage().send();
+        }
+        List<MessageId> messages = new ArrayList<>();
+        for (int i = 0; i < 4; i++) {
+            Message<byte[]> message = consumer.receive();
+            messages.add(message.getMessageId());
+        }
+        Transaction transaction = pulsarClient
+                .newTransaction()
+                .withTransactionTimeout(5, TimeUnit.MINUTES)
+                .build()
+                .get();
+
+        consumer.acknowledgeAsync(messages, transaction);
+        transaction.commit().get();
+
+        consumer.close();
+        consumer = pulsarClient.newConsumer()

Review Comment:
   Please close the consumer finally.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java:
##########
@@ -997,4 +998,47 @@ public void testConsistencyOfTransactionStatsAtEndTxn() throws Exception {
 
         transaction.commit().get();
     }
+
+    @Test

Review Comment:
   Please add a timeout param for the test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #15729: [improve][txn] Support ack message list for transaction

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #15729:
URL: https://github.com/apache/pulsar/pull/15729#discussion_r887634742


##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java:
##########
@@ -414,13 +414,6 @@ void reconsumeLater(Message<?> message,
      * @param messageId
      *            The {@code MessageId} to be cumulatively acknowledged
      * @param txn {@link Transaction} the transaction to cumulative ack
-     * @throws PulsarClientException.AlreadyClosedException
-     *             if the consumer was already closed
-     * @throws org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException
-     *             if the ack with messageId is less than the messageId in pending ack state or ack with transaction is
-     *             different from the transaction in pending ack.
-     * @throws org.apache.pulsar.client.api.PulsarClientException.NotAllowedException
-     *             broker don't support transaction

Review Comment:
   don't delete this



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -578,11 +579,32 @@ protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ack
     }
 
     @Override
-    protected CompletableFuture<Void> doAcknowledge(List<MessageId> messageIdList,
-                                                    AckType ackType,
-                                                    Map<String, Long> properties,
-                                                    TransactionImpl txn) {
-        return this.acknowledgmentsGroupingTracker.addListAcknowledgment(messageIdList, ackType, properties);
+    protected CompletableFuture<Void> doAcknowledge(List<MessageId> messageIdList, AckType ackType,
+                                                    Map<String, Long> properties, TransactionImpl txn) {
+
+        for (MessageId messageId : messageIdList) {
+            checkArgument(messageId instanceof MessageIdImpl);
+        }
+        if (getState() != State.Ready && getState() != State.Connecting) {
+            stats.incrementNumAcksFailed();
+            PulsarClientException exception = new PulsarClientException("Consumer not ready. State: " + getState());
+            if (AckType.Individual.equals(ackType)) {
+                onAcknowledge(messageIdList, exception);
+            } else if (AckType.Cumulative.equals(ackType)) {
+                onAcknowledgeCumulative(messageIdList, exception);
+            }
+            return FutureUtil.failedFuture(exception);
+        }
+        if (txn != null) {
+            List<CompletableFuture<Void>> completableFutures = new LinkedList<>();
+            for (MessageId messageId : messageIdList) {
+                completableFutures.add(doTransactionAcknowledgeForResponse(messageId, ackType, null,

Review Comment:
   the proto support send messgeIdList, so you should rewrite a command is better.
   https://github.com/apache/pulsar/blob/9b01d1c472aa5cde5205ad86fa3b8f998da95f35/pulsar-common/src/main/proto/PulsarApi.proto#L553



##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java:
##########
@@ -469,13 +462,6 @@ CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId,
      *
      * @param messageId {@link MessageId} to be individual acknowledged
      * @param txn {@link Transaction} the transaction to cumulative ack
-     * @throws PulsarClientException.AlreadyClosedException
-     *             if the consumer was already closed
-     * @throws org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException
-     *             if the ack with messageId has been acked by another transaction
-     * @throws org.apache.pulsar.client.api.PulsarClientException.NotAllowedException
-     *             broker don't support transaction
-     *             don't find batch size in consumer pending ack

Review Comment:
   if you want to delete this, please add the sync method in the consumer API



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java:
##########
@@ -606,12 +625,21 @@ protected CompletableFuture<Void> doAcknowledgeWithTxn(List<MessageId> messageId
                                                            Map<String, Long> properties,
                                                            TransactionImpl txn) {
         CompletableFuture<Void> ackFuture;
-        if (txn != null) {
+        if (txn != null && this instanceof ConsumerImpl) {
+
+            // it is okay that we register acked topic after sending the acknowledgements. because
+            // the transactional ack will not be visiable for consumers until the transaction is
+            // committed
+            if (ackType == AckType.Cumulative) {
+                txn.registerCumulativeAckConsumer((ConsumerImpl<?>) this);
+            }
+
             ackFuture = txn.registerAckedTopic(getTopic(), subscription)
                     .thenCompose(ignored -> doAcknowledge(messageIdList, ackType, properties, txn));

Review Comment:
   we don't need regist ack in this code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] gaoran10 commented on a diff in pull request #15729: [improve][txn] Support ack message list for transaction

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on code in PR #15729:
URL: https://github.com/apache/pulsar/pull/15729#discussion_r913169454


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -2704,6 +2722,58 @@ private CompletableFuture<Void> doTransactionAcknowledgeForResponse(MessageId me
         }
     }
 
+    private CompletableFuture<Void> doTransactionAcknowledgeForResponse(List<MessageId> messageIds, AckType ackType,
+                                                                        ValidationError validationError,
+                                                                        Map<String, Long> properties, TxnID txnID) {
+        BitSetRecyclable bitSetRecyclable = null;
+        long ledgerId;
+        long entryId;
+        ByteBuf cmd;
+        long requestId = client.newRequestId();
+        List<MessageIdData> messageIdDataList = new LinkedList<>();
+        for (MessageId messageId : messageIds) {

Review Comment:
   Do we need to check if existing ack position conflication? It seems that the broker will check conflication, maybe we could add conflication check at client first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15729: [improve][txn] Support ack message list for transaction

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #15729:
URL: https://github.com/apache/pulsar/pull/15729#discussion_r881107297


##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java:
##########
@@ -491,13 +491,51 @@ CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId,
      */
     CompletableFuture<Void> acknowledgeAsync(Messages<?> messages);
 
+
+    /**
+     * Asynchronously acknowledge the consumption of {@link Messages}, it will store in pending ack.
+     * After the transaction commit, the message will actually ack.
+     * After the transaction abort, the message will be redelivered.
+     * @param messages
+     *            The {@link Messages} to be acknowledged
+     * @param txn {@link Transaction} The transaction to ack messages.
+     * @throws PulsarClientException.AlreadyClosedException
+     *             if the consumer was already closed
+     * @throws org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException
+     *             if the ack with messageId is less than the messageId in pending ack state or ack with transaction is
+     *             different from the transaction in pending ack.
+     * @throws org.apache.pulsar.client.api.PulsarClientException.NotAllowedException
+     *             broker don't support transaction
+     * @return {@link CompletableFuture} the future of the ack result
+     * */
+    CompletableFuture<Void> acknowledgeAsync(Messages<?> messages, Transaction txn) throws PulsarClientException;
+
+
     /**
      * Asynchronously acknowledge the consumption of a list of message.
      * @param messageIdList
      * @return
      */
     CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList);
 
+
+    /**
+     * Acknowledge the consumption of a list of message, it will store in pending ack.
+     * After the transaction commit, the message will actually ack.
+     * After the transaction abort, the message will be redelivered.
+     * @param messageIdList A list of message Id.
+     * @param txn {@link Transaction} The transaction to ack messages.
+     * @throws PulsarClientException.AlreadyClosedException
+     *             if the consumer was already closed
+     * @throws org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException
+     *             if the ack with messageId is less than the messageId in pending ack state or ack with transaction is
+     *             different from the transaction in pending ack.
+     * @throws org.apache.pulsar.client.api.PulsarClientException.NotAllowedException
+     *             broker don't support transaction
+     * @return {@link CompletableFuture} the future of the ack result     */
+    CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList, Transaction txn)
+            throws PulsarClientException;

Review Comment:
   And here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] liangyepianzhou merged pull request #15729: [improve][txn] Support ack message list for transaction

Posted by GitBox <gi...@apache.org>.
liangyepianzhou merged PR #15729:
URL: https://github.com/apache/pulsar/pull/15729


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] gaoran10 commented on a diff in pull request #15729: [improve][txn] Support ack message list for transaction

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on code in PR #15729:
URL: https://github.com/apache/pulsar/pull/15729#discussion_r913169454


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -2704,6 +2722,58 @@ private CompletableFuture<Void> doTransactionAcknowledgeForResponse(MessageId me
         }
     }
 
+    private CompletableFuture<Void> doTransactionAcknowledgeForResponse(List<MessageId> messageIds, AckType ackType,
+                                                                        ValidationError validationError,
+                                                                        Map<String, Long> properties, TxnID txnID) {
+        BitSetRecyclable bitSetRecyclable = null;
+        long ledgerId;
+        long entryId;
+        ByteBuf cmd;
+        long requestId = client.newRequestId();
+        List<MessageIdData> messageIdDataList = new LinkedList<>();
+        for (MessageId messageId : messageIds) {

Review Comment:
   Do we need to check if existing ack position conflication? It seems that the broker will also check conflication.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] liangyepianzhou commented on a diff in pull request #15729: [improve][txn] Support ack message list for transaction

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on code in PR #15729:
URL: https://github.com/apache/pulsar/pull/15729#discussion_r902305801


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -2682,6 +2700,58 @@ private CompletableFuture<Void> doTransactionAcknowledgeForResponse(MessageId me
         return cnx().newAckForReceipt(cmd, requestId);
     }
 
+    private CompletableFuture<Void> doTransactionAcknowledgeForResponse(List<MessageId> messageIds, AckType ackType,
+                                                                        ValidationError validationError,
+                                                                        Map<String, Long> properties, TxnID txnID) {
+        BitSetRecyclable bitSetRecyclable = null;
+        long ledgerId;
+        long entryId;
+        ByteBuf cmd;
+        long requestId = client.newRequestId();
+        List<MessageIdData> messageIdDataList = new LinkedList<>();
+        for (MessageId messageId : messageIds) {
+            if (messageId instanceof BatchMessageIdImpl) {
+                BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
+                bitSetRecyclable = BitSetRecyclable.create();
+                if (ackType == AckType.Cumulative) {
+                    batchMessageId.ackCumulative();
+                    bitSetRecyclable.set(0, batchMessageId.getBatchSize());
+                    bitSetRecyclable.clear(0, batchMessageId.getBatchIndex() + 1);
+                } else {
+                    bitSetRecyclable.set(0, batchMessageId.getBatchSize());
+                    bitSetRecyclable.clear(batchMessageId.getBatchIndex());
+                }
+                MessageIdData messageIdData = new MessageIdData();
+                messageIdData.setLedgerId(batchMessageId.getLedgerId());
+                messageIdData.setEntryId(batchMessageId.getEntryId());
+                messageIdData.setBatchSize(batchMessageId.getBatchSize());
+                long[] as = bitSetRecyclable.toLongArray();
+                for (int i = 0; i < as.length; i++) {
+                    messageIdData.addAckSet(as[i]);

Review Comment:
   They are different because ack messages need messageIdData.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] gaoran10 commented on a diff in pull request #15729: [improve][txn] Support ack message list for transaction

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on code in PR #15729:
URL: https://github.com/apache/pulsar/pull/15729#discussion_r913169454


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -2704,6 +2722,58 @@ private CompletableFuture<Void> doTransactionAcknowledgeForResponse(MessageId me
         }
     }
 
+    private CompletableFuture<Void> doTransactionAcknowledgeForResponse(List<MessageId> messageIds, AckType ackType,
+                                                                        ValidationError validationError,
+                                                                        Map<String, Long> properties, TxnID txnID) {
+        BitSetRecyclable bitSetRecyclable = null;
+        long ledgerId;
+        long entryId;
+        ByteBuf cmd;
+        long requestId = client.newRequestId();
+        List<MessageIdData> messageIdDataList = new LinkedList<>();
+        for (MessageId messageId : messageIds) {

Review Comment:
   Do we need to check if existing ack position conflication? It seems that the broker will check conflication, maybe we could add conflication check at client.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15729: [improve][txn] Support ack message list for transaction

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #15729:
URL: https://github.com/apache/pulsar/pull/15729#discussion_r881106921


##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java:
##########
@@ -491,13 +491,51 @@ CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId,
      */
     CompletableFuture<Void> acknowledgeAsync(Messages<?> messages);
 
+
+    /**
+     * Asynchronously acknowledge the consumption of {@link Messages}, it will store in pending ack.
+     * After the transaction commit, the message will actually ack.
+     * After the transaction abort, the message will be redelivered.
+     * @param messages
+     *            The {@link Messages} to be acknowledged
+     * @param txn {@link Transaction} The transaction to ack messages.
+     * @throws PulsarClientException.AlreadyClosedException
+     *             if the consumer was already closed
+     * @throws org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException
+     *             if the ack with messageId is less than the messageId in pending ack state or ack with transaction is
+     *             different from the transaction in pending ack.
+     * @throws org.apache.pulsar.client.api.PulsarClientException.NotAllowedException
+     *             broker don't support transaction
+     * @return {@link CompletableFuture} the future of the ack result
+     * */
+    CompletableFuture<Void> acknowledgeAsync(Messages<?> messages, Transaction txn) throws PulsarClientException;
+

Review Comment:
   It's better not throw exception when using CompletableFuture as return type.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #15729: [improve][txn] Support ack message list for transaction

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #15729:
URL: https://github.com/apache/pulsar/pull/15729#discussion_r901420562


##########
pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java:
##########
@@ -1011,6 +1011,37 @@ public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, BitSe
         return serializeWithSize(cmd);
     }
 
+    public static ByteBuf newAck(long consumerId, List<MessageIdData> messageIds, AckType ackType,
+                                 ValidationError validationError, Map<String, Long> properties, long txnIdLeastBits,
+                                 long txnIdMostBits, long requestId) {
+        BaseCommand cmd = localCmd(Type.ACK);
+        CommandAck ack = cmd.setAck()
+                .setConsumerId(consumerId)
+                .setAckType(ackType);
+        ack.addAllMessageIds(messageIds);
+
+        if (validationError != null) {
+            ack.setValidationError(validationError);
+        }
+        if (txnIdMostBits >= 0) {
+            ack.setTxnidMostBits(txnIdMostBits);
+        }
+        if (txnIdLeastBits >= 0) {
+            ack.setTxnidLeastBits(txnIdLeastBits);
+        }
+
+        if (requestId >= 0) {
+            ack.setRequestId(requestId);
+        }
+        if (!properties.isEmpty()) {
+            properties.forEach((k, v) -> {
+                ack.addProperty().setKey(k).setValue(v);
+            });
+        }

Review Comment:
   same as 
   ```
       public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, BitSetRecyclable ackSet, AckType ackType,
                                    ValidationError validationError, Map<String, Long> properties, long txnIdLeastBits,
                                    long txnIdMostBits, long requestId, int batchSize) {
   ```



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -2682,6 +2700,58 @@ private CompletableFuture<Void> doTransactionAcknowledgeForResponse(MessageId me
         return cnx().newAckForReceipt(cmd, requestId);
     }
 
+    private CompletableFuture<Void> doTransactionAcknowledgeForResponse(List<MessageId> messageIds, AckType ackType,
+                                                                        ValidationError validationError,
+                                                                        Map<String, Long> properties, TxnID txnID) {
+        BitSetRecyclable bitSetRecyclable = null;
+        long ledgerId;
+        long entryId;
+        ByteBuf cmd;
+        long requestId = client.newRequestId();
+        List<MessageIdData> messageIdDataList = new LinkedList<>();
+        for (MessageId messageId : messageIds) {
+            if (messageId instanceof BatchMessageIdImpl) {
+                BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
+                bitSetRecyclable = BitSetRecyclable.create();
+                if (ackType == AckType.Cumulative) {
+                    batchMessageId.ackCumulative();
+                    bitSetRecyclable.set(0, batchMessageId.getBatchSize());
+                    bitSetRecyclable.clear(0, batchMessageId.getBatchIndex() + 1);
+                } else {
+                    bitSetRecyclable.set(0, batchMessageId.getBatchSize());
+                    bitSetRecyclable.clear(batchMessageId.getBatchIndex());
+                }
+                MessageIdData messageIdData = new MessageIdData();
+                messageIdData.setLedgerId(batchMessageId.getLedgerId());
+                messageIdData.setEntryId(batchMessageId.getEntryId());
+                messageIdData.setBatchSize(batchMessageId.getBatchSize());
+                long[] as = bitSetRecyclable.toLongArray();
+                for (int i = 0; i < as.length; i++) {
+                    messageIdData.addAckSet(as[i]);

Review Comment:
   same as
   
   ```
   private CompletableFuture<Void> doTransactionAcknowledgeForResponse(MessageId messageId, AckType ackType,
                                                                           ValidationError validationError,
                                                                           Map<String, Long> properties, TxnID txnID) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] liangyepianzhou commented on pull request #15729: [improve][txn] Support ack message list for transaction

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on PR #15729:
URL: https://github.com/apache/pulsar/pull/15729#issuecomment-1147338453

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #15729: [improve][txn] Support ack message list for transaction

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #15729:
URL: https://github.com/apache/pulsar/pull/15729#discussion_r911909322


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java:
##########
@@ -497,11 +497,30 @@ public CompletableFuture<Void> acknowledgeAsync(Messages<?> messages) {
         return acknowledgeAsync(messageIds);
     }
 
+    @Override
+    public CompletableFuture<Void> acknowledgeAsync(Messages<?> messages, Transaction txn) {
+        List<MessageId> messageIds = new ArrayList<>(messages.size());
+        for (Message<?> message: messages) {
+            try {
+                validateMessageId(message);
+            } catch (PulsarClientException e) {
+                return FutureUtil.failedFuture(e);
+            }
+            messageIds.add(message.getMessageId());
+        }

Review Comment:
   the same as `public CompletableFuture<Void> acknowledgeAsync(Messages<?> messages)`



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -2699,6 +2717,58 @@ private CompletableFuture<Void> doTransactionAcknowledgeForResponse(MessageId me
         }
     }
 
+    private CompletableFuture<Void> doTransactionAcknowledgeForResponse(List<MessageId> messageIds, AckType ackType,
+                                                                        ValidationError validationError,
+                                                                        Map<String, Long> properties, TxnID txnID) {
+        BitSetRecyclable bitSetRecyclable = null;
+        long ledgerId;
+        long entryId;
+        ByteBuf cmd;
+        long requestId = client.newRequestId();
+        List<MessageIdData> messageIdDataList = new LinkedList<>();
+        for (MessageId messageId : messageIds) {
+            if (messageId instanceof BatchMessageIdImpl) {
+                BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
+                bitSetRecyclable = BitSetRecyclable.create();
+                if (ackType == AckType.Cumulative) {
+                    batchMessageId.ackCumulative();
+                    bitSetRecyclable.set(0, batchMessageId.getBatchSize());
+                    bitSetRecyclable.clear(0, batchMessageId.getBatchIndex() + 1);
+                } else {
+                    bitSetRecyclable.set(0, batchMessageId.getBatchSize());

Review Comment:
   same as
   ```
   private CompletableFuture<Void> doTransactionAcknowledgeForResponse(MessageId messageId, AckType ackType,
                                                                           ValidationError validationError,
                                                                           Map<String, Long> properties, TxnID txnID) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] liangyepianzhou commented on a diff in pull request #15729: [improve][txn] Support ack message list for transaction

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on code in PR #15729:
URL: https://github.com/apache/pulsar/pull/15729#discussion_r913338803


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -2704,6 +2722,58 @@ private CompletableFuture<Void> doTransactionAcknowledgeForResponse(MessageId me
         }
     }
 
+    private CompletableFuture<Void> doTransactionAcknowledgeForResponse(List<MessageId> messageIds, AckType ackType,
+                                                                        ValidationError validationError,
+                                                                        Map<String, Long> properties, TxnID txnID) {
+        BitSetRecyclable bitSetRecyclable = null;
+        long ledgerId;
+        long entryId;
+        ByteBuf cmd;
+        long requestId = client.newRequestId();
+        List<MessageIdData> messageIdDataList = new LinkedList<>();
+        for (MessageId messageId : messageIds) {

Review Comment:
   Yeah, the broker will check confliction.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -2704,6 +2722,58 @@ private CompletableFuture<Void> doTransactionAcknowledgeForResponse(MessageId me
         }
     }
 
+    private CompletableFuture<Void> doTransactionAcknowledgeForResponse(List<MessageId> messageIds, AckType ackType,
+                                                                        ValidationError validationError,
+                                                                        Map<String, Long> properties, TxnID txnID) {
+        BitSetRecyclable bitSetRecyclable = null;
+        long ledgerId;
+        long entryId;
+        ByteBuf cmd;
+        long requestId = client.newRequestId();
+        List<MessageIdData> messageIdDataList = new LinkedList<>();
+        for (MessageId messageId : messageIds) {

Review Comment:
   Yeah, the broker will check conflication.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org