You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/21 08:22:33 UTC

[GitHub] [pulsar] liangyepianzhou commented on a diff in pull request #15729: [improve][txn] Support ack message list for transaction

liangyepianzhou commented on code in PR #15729:
URL: https://github.com/apache/pulsar/pull/15729#discussion_r902305801


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -2682,6 +2700,58 @@ private CompletableFuture<Void> doTransactionAcknowledgeForResponse(MessageId me
         return cnx().newAckForReceipt(cmd, requestId);
     }
 
+    private CompletableFuture<Void> doTransactionAcknowledgeForResponse(List<MessageId> messageIds, AckType ackType,
+                                                                        ValidationError validationError,
+                                                                        Map<String, Long> properties, TxnID txnID) {
+        BitSetRecyclable bitSetRecyclable = null;
+        long ledgerId;
+        long entryId;
+        ByteBuf cmd;
+        long requestId = client.newRequestId();
+        List<MessageIdData> messageIdDataList = new LinkedList<>();
+        for (MessageId messageId : messageIds) {
+            if (messageId instanceof BatchMessageIdImpl) {
+                BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
+                bitSetRecyclable = BitSetRecyclable.create();
+                if (ackType == AckType.Cumulative) {
+                    batchMessageId.ackCumulative();
+                    bitSetRecyclable.set(0, batchMessageId.getBatchSize());
+                    bitSetRecyclable.clear(0, batchMessageId.getBatchIndex() + 1);
+                } else {
+                    bitSetRecyclable.set(0, batchMessageId.getBatchSize());
+                    bitSetRecyclable.clear(batchMessageId.getBatchIndex());
+                }
+                MessageIdData messageIdData = new MessageIdData();
+                messageIdData.setLedgerId(batchMessageId.getLedgerId());
+                messageIdData.setEntryId(batchMessageId.getEntryId());
+                messageIdData.setBatchSize(batchMessageId.getBatchSize());
+                long[] as = bitSetRecyclable.toLongArray();
+                for (int i = 0; i < as.length; i++) {
+                    messageIdData.addAckSet(as[i]);

Review Comment:
   They are different because ack messages need messageIdData.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org