You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/12/22 15:46:36 UTC

[GitHub] [pulsar] BewareMyPower opened a new pull request, #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

BewareMyPower opened a new pull request, #19031:
URL: https://github.com/apache/pulsar/pull/19031

   Fixes https://github.com/apache/pulsar/issues/19030
   
   ### Motivation
   
   When a `BatchMessageIdImpl` is created from a deserialization, the `BatchMessageAcker` object cannot be shared by all instances in the same batch, which leads to an acknowledgment failure when batch index ACK is disabled (by default).
   
   ### Modifications
   
   Maintain a map from the `(ledger id, entry id)` pair to the `BatchMessageAcker` in `ConsumerImpl`. If the `BatchMessageIdImpl` doesn't carry a valid `BatchMessageAcker`, create and cache a `BatchMessageAcker` instance and remove it when all messages in the batch are acknowledged.
   
   It requires a change in `MessageIdImpl#fromByteArray` that a `BatchMessageAckerDisabled` will be created to indicate there is no shared acker.
   
   To avoid making code more complicated, this patch refactors the existing code that many logics about consumer are moved from the ACK tracker to the consumer. It also removes the `AckType` parameter when acknowledging a list of messages.
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc` <!-- Your PR contains doc changes. Please attach the local preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR description, or else your PR might not get merged. -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
   - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->
   
   ### Matching PR in forked repository
   
   PR in forked repository: https://github.com/BewareMyPower/pulsar/pull/15


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#issuecomment-1656514102

   The pr had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1058733630


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -529,6 +534,52 @@ protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
         return result;
     }
 
+    private void processMessageIdBeforeAcknowledge(MessageIdImpl messageId, AckType ackType, int numMessages) {
+        if (ackType == AckType.Individual) {
+            stats.incrementNumAcksSent(numMessages);
+            unAckedMessageTracker.remove(messageId);
+            if (possibleSendToDeadLetterTopicMessages != null) {
+                possibleSendToDeadLetterTopicMessages.remove(messageId);
+            }
+        } else {
+            stats.incrementNumAcksSent(unAckedMessageTracker.removeMessagesTill(messageId));
+        }
+    }
+
+    @Nullable
+    private MessageIdImpl getMessageIdToAcknowledge(BatchMessageIdImpl messageId, AckType ackType) {
+        final BatchMessageAcker acker;
+        if (messageId.getAcker() instanceof BatchMessageAckerDisabled) {
+            acker = batchMessageToAcker.computeIfAbsent(
+                    Pair.of(messageId.getLedgerId(), messageId.getEntryId()),
+                    __ -> BatchMessageAcker.newAcker(messageId.getBatchSize()));
+        } else {
+            acker = messageId.getAcker();
+        }
+        if (ackType == AckType.Individual) {
+            if (acker.ackIndividual(messageId.getBatchIndex())) {
+                batchMessageToAcker.remove(Pair.of(messageId.getLedgerId(), messageId.getEntryId()));
+                return messageId.toMessageIdImpl();
+            } else {
+                return conf.isBatchIndexAckEnabled() ? messageId : null;
+            }
+        } else {
+            if (acker.ackCumulative(messageId.getBatchIndex())) {
+                batchMessageToAcker.remove(Pair.of(messageId.getLedgerId(), messageId.getEntryId()));

Review Comment:
   It makes sense to me. I will handle this and add a new test for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1060257731


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -204,6 +208,10 @@
 
     private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
     private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();
+    // Key is the ledger id and the entry id, entry is the acker that represents which single messages are acknowledged
+    private final ConcurrentNavigableMap<Pair<Long, Long>, BatchMessageAcker> batchMessageToAcker =

Review Comment:
   I just thought again. If we have to use completely new `BatchMessageIdImpl` objects in regular cases, i.e. retrieve `MessageId` from `Message`, in the same case when `BatchMessageIdImpl` objects are retrieved from deserializations, the cache should be cleared.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1058125642


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -267,10 +194,9 @@ private CompletableFuture<Void> doIndividualAck(MessageIdImpl messageId, Map<Str
     }
 
 
-    private CompletableFuture<Void> doIndividualAckAsync(MessageIdImpl messageId) {
+    private void doIndividualAckAsync(MessageIdImpl messageId) {

Review Comment:
   The current names of these two "Async" methods are confusing. I will rename them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1059642489


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -204,6 +208,10 @@
 
     private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
     private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();
+    // Key is the ledger id and the entry id, entry is the acker that represents which single messages are acknowledged
+    private final ConcurrentNavigableMap<Pair<Long, Long>, BatchMessageAcker> batchMessageToAcker =

Review Comment:
   > Users should not continue to process/ack the messages before the seek operation.
   
   This implicit rule is never documented and even it's documented, it could be a burden to users.
   
   Assuming there are a message ID set, given the following steps:
   1. Acnowledge some message IDs
   2. Perform some operations, e.g. `seek`.
   3. Acknowledge other message IDs.
   
   ```java
   consumer.acknowledge(msgId1);
   consumer.doSomething(); // pseudo code
   consumer.acknowledge(msgId2);
   ```
   
   The rule means, the steps above could have the different result with acknowledging the whole message ID set:
   
   ```java
   consumer.acknowledge(msgId1);
   consumer.acknowledge(msgId2);
   ```
   
   A more confusing thing is that with this rule, consumer can acknowledge another message ID that has the same position with `msgId1` to have the same result. i.e.
   
   ```java
   // CODE 1
   consumer.acknowledge(msgId1);
   consumer.doSomething();
   consumer.acknowledge(msgId2);
   consumer.acknowledge(msgId3);
   // msgId3.getLedgerId() == msgId1.getLedgerId()
   // msgId3.getEntryId() == msgId1.getEntryId()
   // msgId3.getBatchIndex() == msgId1.getBatchIndex()
   ``` 
   
   is equivalent with
   
   ```java
   // CODE 2
   consumer.acknowledge(msgId1);
   consumer.acknowledge(msgId2);
   ```
   
   What's more confusing is, `msgId1.equals(msgId3)` is true and if we replace `consumer.acknowledge(msgId3)` with `consumer.acknowledge(msgId1)` in `CODE 1`, `CODE 1` won't be equivalent with `CODE 2`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1058714922


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -129,121 +124,53 @@ public boolean isDuplicate(MessageId messageId) {
     }
 
     @Override
-    public CompletableFuture<Void> addListAcknowledgment(List<MessageId> messageIds,
-                                                         AckType ackType, Map<String, Long> properties) {
-        if (AckType.Cumulative.equals(ackType)) {
-            if (consumer.isAckReceiptEnabled()) {
-                Set<CompletableFuture<Void>> completableFutureSet = new HashSet<>();
-                messageIds.forEach(messageId ->
-                        completableFutureSet.add(addAcknowledgment((MessageIdImpl) messageId, ackType, properties)));
-                return FutureUtil.waitForAll(new ArrayList<>(completableFutureSet));
+    public CompletableFuture<Void> addListAcknowledgment(List<MessageIdImpl> messageIds,

Review Comment:
   `AcknowledgmentsGroupingTracker` is not marked with any `InterfaceStability` annotation like `Stable` and `Evolving`. The `pulsar-client-original` module contains classes as the "internal implementations" so that new classes and interfaces were added very casually. If we care much about the breaking changes of these "internal implementations", the code would be very hard to maintain.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#issuecomment-1414646219

   The pr had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1059223211


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -204,6 +208,10 @@
 
     private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
     private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();
+    // Key is the ledger id and the entry id, entry is the acker that represents which single messages are acknowledged
+    private final ConcurrentNavigableMap<Pair<Long, Long>, BatchMessageAcker> batchMessageToAcker =

Review Comment:
   We should also consider cleaning up the `batchMessageToAcker` 
   
   - After the seek operation
   - After the message has been redelivered(Nack or Ack timeout)
   - After the consumer reconnects to the broker? (I'm not 100% sure about this part, the cursor-reset might happened on the broker side)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tisonkun commented on a diff in pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1058101805


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -314,7 +239,7 @@ private CompletableFuture<Void> doCumulativeAck(MessageIdImpl messageId, Map<Str
         }
     }
 
-    private CompletableFuture<Void> doIndividualBatchAckAsync(BatchMessageIdImpl batchMessageId) {
+    private void doIndividualBatchAckAsync(BatchMessageIdImpl batchMessageId) {

Review Comment:
   ditto here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1058745197


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -129,121 +124,53 @@ public boolean isDuplicate(MessageId messageId) {
     }
 
     @Override
-    public CompletableFuture<Void> addListAcknowledgment(List<MessageId> messageIds,
-                                                         AckType ackType, Map<String, Long> properties) {
-        if (AckType.Cumulative.equals(ackType)) {
-            if (consumer.isAckReceiptEnabled()) {
-                Set<CompletableFuture<Void>> completableFutureSet = new HashSet<>();
-                messageIds.forEach(messageId ->
-                        completableFutureSet.add(addAcknowledgment((MessageIdImpl) messageId, ackType, properties)));
-                return FutureUtil.waitForAll(new ArrayList<>(completableFutureSet));
+    public CompletableFuture<Void> addListAcknowledgment(List<MessageIdImpl> messageIds,

Review Comment:
   Just thought again, this PR should be able to cherry-pick into release branches, so I restored the API compatibility and mark those APIs that are never used as deprecated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1059231502


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -204,6 +208,10 @@
 
     private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
     private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();
+    // Key is the ledger id and the entry id, entry is the acker that represents which single messages are acknowledged
+    private final ConcurrentNavigableMap<Pair<Long, Long>, BatchMessageAcker> batchMessageToAcker =

Review Comment:
   I didn't get it. A new map entry could only be only added when a message was acknowledged.
   
   ```java
       private MessageIdImpl getMessageIdToAcknowledge(BatchMessageIdImpl messageId, AckType ackType) {
           final BatchMessageAcker acker;
           if (messageId.getAcker() instanceof BatchMessageAckerDisabled) {
               acker = batchMessageToAcker.computeIfAbsent(
   ```
   
   Why should we clean up the `batchMessageToAcker` after a seek operation?
   
   Regarding the message redelivering or reconnection, if the batch message id was not created by deserialization, the acker inside the `BatchMessageIdImpl` would not be modified. If we only clean up the `batchMessageToAcker` for deserialized batch message id, the behavior would be different.
   
   For example, assuming there are 2 batch message ids (id0 and id1) of the same entry and for the following steps:
   1. Acknowledge `id0`
   2. Reconnection
   3. Acknowledge `id1`
   
   If they are retrieved from the deserialization, the entry will not be acknowledged because the 2nd step cleared the cache, and the `BatchMessageAcker` will be "XO" (`X` represents not acknowledged).
   
   However, if they are just saved from the `message.getMessageId()`, the entry will be acknowledged because the shared `BatchMessageAcker` is "OO" and not affected by the reconnection.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tisonkun commented on a diff in pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1058101605


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -267,10 +194,9 @@ private CompletableFuture<Void> doIndividualAck(MessageIdImpl messageId, Map<Str
     }
 
 
-    private CompletableFuture<Void> doIndividualAckAsync(MessageIdImpl messageId) {
+    private void doIndividualAckAsync(MessageIdImpl messageId) {

Review Comment:
   How can an `Async` variant be `void`? Especially `doIndividualBatchAck` below returns `CompletableFuture<Void>`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1059303705


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -204,6 +208,10 @@
 
     private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
     private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();
+    // Key is the ledger id and the entry id, entry is the acker that represents which single messages are acknowledged
+    private final ConcurrentNavigableMap<Pair<Long, Long>, BatchMessageAcker> batchMessageToAcker =

Review Comment:
   @codelipenghui The key point is about how to use the deserialized batch message id. IMO, when you use message id from the deserialization, before processing them completely, you should not do something else. 
   1. Get some message IDs
   2. Store them in somewhere else
   3. Load them and process them
   
   Just like the example here: Just like the example here: https://github.com/apache/pulsar/pull/19031#discussion_r1059282261, the only difference is that message IDs are stored in memory.
   
   Your example works well because after the seek operation, you didn't use the previous message IDs. Instead, you used the new message IDs, though they represent the same positions with the previous message IDs. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1059323030


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -204,6 +208,10 @@
 
     private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
     private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();
+    // Key is the ledger id and the entry id, entry is the acker that represents which single messages are acknowledged
+    private final ConcurrentNavigableMap<Pair<Long, Long>, BatchMessageAcker> batchMessageToAcker =

Review Comment:
   I think it's a very hacky to acknowledge the rest messages in a batch after seek. Just like the example I mentioned, if you think that's message lost, users can easily simulate the **message lost** like:
   
   ```java
       private Consumer<String> subscribe(String topic) throws Exception {
           return pulsarClient.newConsumer(Schema.STRING)
                   .topic(topic)
                   .subscriptionName("test")
                   .subscriptionType(SubscriptionType.Shared)
                   .isAckReceiptEnabled(true)
                   .subscribe();
       }
   
       @Test
       public void testAckAfterSeek() throws Exception {
           final String topic = "testAckAfterSeek";
   
           Consumer<String> consumer = subscribe(topic);
   
           @Cleanup
           Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                   .topic(topic)
                   .batchingMaxPublishDelay(Long.MAX_VALUE, TimeUnit.MILLISECONDS)
                   .create();
   
           final int messages = 10;
           for (int i = 0; i < messages; i++) {
               producer.sendAsync("New message - " + i);
           }
           producer.flush();
   
           final List<MessageId> messageIdList = new ArrayList<>();
           for (int i = 0; i < messages; i++) {
               final MessageId messageId = consumer.receive().getMessageId();
               messageIdList.add(messageId);
               if (i != 4) {
                   consumer.acknowledge(messageId);
               }
           }
           consumer.seek(MessageId.earliest);
           Thread.sleep(100);
           consumer.acknowledge(messageIdList.get(4));
           consumer.close();
           consumer = subscribe(topic);
           final Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
           // NOTE: message is null
   ```
   
   How could you explain to users that you should not acknowledge `messageIdList.get(4)`, otherwise message lost will happen? BTW, if users left one message ID (`messageIdList.get(4)`) to acknowledge after `seek` and they didn't acknowledge other message IDs, what will they expect? Did they expect messages could be received again?
   
   In short, I think we should not handle this corner case. It's more like a "bug" on purpose.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1059725292


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -204,6 +208,10 @@
 
     private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
     private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();
+    // Key is the ledger id and the entry id, entry is the acker that represents which single messages are acknowledged
+    private final ConcurrentNavigableMap<Pair<Long, Long>, BatchMessageAcker> batchMessageToAcker =

Review Comment:
   The `id2` can be acked first, but the client crashed while processing message 1 (Shared subscription and users multiple threads to process messages). I mean that users do not want to miss the ack intentionally. But we can't guarantee the messages of a batch will be processed in order of a Shared subscription. Even if it is a Failover or Exclusive subscription, users can also ack individually and process messages out of order.
   
   > Regarding the reconnection or ack failure
   
   I point out this one because a server-side cursor reset can cause the reconnection. But it should not be a problem that this PR should fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1059231502


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -204,6 +208,10 @@
 
     private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
     private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();
+    // Key is the ledger id and the entry id, entry is the acker that represents which single messages are acknowledged
+    private final ConcurrentNavigableMap<Pair<Long, Long>, BatchMessageAcker> batchMessageToAcker =

Review Comment:
   I didn't get it. A new map entry could only be only added when a message was acknowledged.
   
   ```java
       private MessageIdImpl getMessageIdToAcknowledge(BatchMessageIdImpl messageId, AckType ackType) {
           final BatchMessageAcker acker;
           if (messageId.getAcker() instanceof BatchMessageAckerDisabled) {
               acker = batchMessageToAcker.computeIfAbsent(
   ```
   
   Why should we clean up the `batchMessageToAcker` after a seek operation?
   
   Regarding the message redelivering or reconnection, if the batch message id was not created by deserialization, the acker inside the `BatchMessageIdImpl` would not be modified. If we only clean up the `batchMessageToAcker` for deserialized batch message id, the behavior would become different.
   
   For example, assuming there are 2 batch message ids (id0 and id1) of the same entry and for the following steps:
   1. Acknowledge `id0`
   2. Reconnection
   3. Acknowledge `id1`
   
   If they are retrieved from the deserialization, the entry will not be acknowledged because the 2nd step cleared the cache, and the `BatchMessageAcker` will be "XO" (`X` represents not acknowledged).
   
   However, if they are just saved from the `message.getMessageId()`, the entry will be acknowledged because the shared `BatchMessageAcker` is "OO" and not affected by the reconnection.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1059232127


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -204,6 +208,10 @@
 
     private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
     private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();
+    // Key is the ledger id and the entry id, entry is the acker that represents which single messages are acknowledged
+    private final ConcurrentNavigableMap<Pair<Long, Long>, BatchMessageAcker> batchMessageToAcker =

Review Comment:
   The `batchMessageAckTracker` before https://github.com/apache/pulsar/pull/1424 is updated for each message received:
   
   ```java
       void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, ByteBuf uncompressedPayload,
               MessageIdData messageId, ClientCnx cnx) {
           /* ... */
           batchMessageAckTracker.put(batchMessage, bitSet);
   ```
   
   But the `batchMessageToAcker` in this PR will be updated only for acknowlegment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#issuecomment-1365854800

   @congbobo184 You comments are addressed now, PTAL again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tisonkun commented on a diff in pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1058144788


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -267,10 +194,9 @@ private CompletableFuture<Void> doIndividualAck(MessageIdImpl messageId, Map<Str
     }
 
 
-    private CompletableFuture<Void> doIndividualAckAsync(MessageIdImpl messageId) {
+    private void doIndividualAckAsync(MessageIdImpl messageId) {

Review Comment:
   @BewareMyPower Yeah. That sounds good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1058143874


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -267,10 +194,9 @@ private CompletableFuture<Void> doIndividualAck(MessageIdImpl messageId, Map<Str
     }
 
 
-    private CompletableFuture<Void> doIndividualAckAsync(MessageIdImpl messageId) {
+    private void doIndividualAckAsync(MessageIdImpl messageId) {

Review Comment:
   Since this PR already includes some refactorings, I might do that in a following PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1058725134


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -549,13 +600,33 @@ protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ack
             return doTransactionAcknowledgeForResponse(messageId, ackType, null, properties,
                     new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits()));
         }
-        return acknowledgmentsGroupingTracker.addAcknowledgment((MessageIdImpl) messageId, ackType, properties);
+        if (ackType == AckType.Individual) {
+            onAcknowledge(messageId, null);
+        } else {
+            onAcknowledgeCumulative(messageId, null);
+        }
+        if (messageId instanceof BatchMessageIdImpl) {
+            BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
+            MessageIdImpl messageIdImpl = getMessageIdToAcknowledge(batchMessageId, ackType);

Review Comment:
   No. This method is to get the `MessageIdImpl` to acknowledge. `increaseAndGet` modifies the atomic variable itself, but `getMessageIdToAcknowledge` just calculated a `MessageIdImpl` to acknowledge from the `BatchMessageIdImpl` and does not modify the `BatchMessageIdImpl` itself.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codecov-commenter commented on pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#issuecomment-1366467252

   # [Codecov](https://codecov.io/gh/apache/pulsar/pull/19031?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#19031](https://codecov.io/gh/apache/pulsar/pull/19031?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (f409267) into [master](https://codecov.io/gh/apache/pulsar/commit/492a9c3e44bef2334a77164afc8b033cc8f8d82f?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (492a9c3) will **increase** coverage by `23.21%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/pulsar/pull/19031/graphs/tree.svg?width=650&height=150&src=pr&token=acYqCpsK9J&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pulsar/pull/19031?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff              @@
   ##             master   #19031       +/-   ##
   =============================================
   + Coverage     47.46%   70.67%   +23.21%     
   + Complexity    10727      436    -10291     
   =============================================
     Files           711       26      -685     
     Lines         69456     2247    -67209     
     Branches       7452      245     -7207     
   =============================================
   - Hits          32964     1588    -31376     
   + Misses        32810      486    -32324     
   + Partials       3682      173     -3509     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | unittests | `70.67% <ø> (+23.21%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pulsar/pull/19031?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../apache/pulsar/client/impl/BatchMessageIdImpl.java](https://codecov.io/gh/apache/pulsar/pull/19031/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL0JhdGNoTWVzc2FnZUlkSW1wbC5qYXZh) | | |
   | [...va/org/apache/pulsar/client/impl/ConsumerBase.java](https://codecov.io/gh/apache/pulsar/pull/19031/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL0NvbnN1bWVyQmFzZS5qYXZh) | | |
   | [...va/org/apache/pulsar/client/impl/ConsumerImpl.java](https://codecov.io/gh/apache/pulsar/pull/19031/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL0NvbnN1bWVySW1wbC5qYXZh) | | |
   | [...a/org/apache/pulsar/client/impl/MessageIdImpl.java](https://codecov.io/gh/apache/pulsar/pull/19031/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL01lc3NhZ2VJZEltcGwuamF2YQ==) | | |
   | [...he/pulsar/client/impl/MultiTopicsConsumerImpl.java](https://codecov.io/gh/apache/pulsar/pull/19031/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL011bHRpVG9waWNzQ29uc3VtZXJJbXBsLmphdmE=) | | |
   | [...pl/NonPersistentAcknowledgmentGroupingTracker.java](https://codecov.io/gh/apache/pulsar/pull/19031/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL05vblBlcnNpc3RlbnRBY2tub3dsZWRnbWVudEdyb3VwaW5nVHJhY2tlci5qYXZh) | | |
   | [...impl/PersistentAcknowledgmentsGroupingTracker.java](https://codecov.io/gh/apache/pulsar/pull/19031/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL1BlcnNpc3RlbnRBY2tub3dsZWRnbWVudHNHcm91cGluZ1RyYWNrZXIuamF2YQ==) | | |
   | [...lsar/client/impl/conf/ClientConfigurationData.java](https://codecov.io/gh/apache/pulsar/pull/19031/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL2NvbmYvQ2xpZW50Q29uZmlndXJhdGlvbkRhdGEuamF2YQ==) | | |
   | [...apache/pulsar/common/naming/BundleSplitOption.java](https://codecov.io/gh/apache/pulsar/pull/19031/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NvbW1vbi9uYW1pbmcvQnVuZGxlU3BsaXRPcHRpb24uamF2YQ==) | | |
   | [...ker/stats/prometheus/AggregatedNamespaceStats.java](https://codecov.io/gh/apache/pulsar/pull/19031/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zdGF0cy9wcm9tZXRoZXVzL0FnZ3JlZ2F0ZWROYW1lc3BhY2VTdGF0cy5qYXZh) | | |
   | ... and [675 more](https://codecov.io/gh/apache/pulsar/pull/19031/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#issuecomment-1362999460

   More explanations to the refactoring parts of this PR:
   1. I don't want to handle the list cumulative acknowledgment. Pulsar never supports this operation. There is no `acknowledgeCumulative` API that accepts a list of messages.
   2. Process the `BatchMessageIdImpl` in `ConsumerImpl`. The previous code processes it mainly in `PersistentAcknowledgmentTracker`, it appearly does not follow the OOP style and there are a lot of `consumer.getXXX()` calls. I don't want to write more code based on that, e.g. using `consumer.getBatchMessageToAcker()` everywhere.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1059278128


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -204,6 +208,10 @@
 
     private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
     private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();
+    // Key is the ledger id and the entry id, entry is the acker that represents which single messages are acknowledged
+    private final ConcurrentNavigableMap<Pair<Long, Long>, BatchMessageAcker> batchMessageToAcker =

Review Comment:
   > However, it's the same behavior with the current master with those MessageId that have internal acker. This PR keeps the same behavior with the BatchMessageIdImpl that has the BatchMessageAcker.
   
   As I understand. You will get the message again, but with a different Acker after the seek operation. Why should the behavior be the same? Let to try to test locally.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1059643662


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -204,6 +208,10 @@
 
     private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
     private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();
+    // Key is the ledger id and the entry id, entry is the acker that represents which single messages are acknowledged
+    private final ConcurrentNavigableMap<Pair<Long, Long>, BatchMessageAcker> batchMessageToAcker =

Review Comment:
   The point I insisted on is, the following two code snippets should be equivalent.
   
   ```java
           final List<BatchMessageIdImpl> msgIds = new ArrayList<>();
           for (int i = 0; i < numMessages; i++) {
               msgIds.add((BatchMessageIdImpl) consumer.receive().getMessageId());
           }
   ```
   
   ```java
           final List<BatchMessageIdImpl> msgIds = new ArrayList<>();
           for (int i = 0; i < numMessages; i++) {
               final MessageIdImpl messageId = (MessageIdImpl) consumer.receive().getMessageId();
               MessageId deserialized = MessageId.fromByteArray(messageId.toByteArray());
               msgIds.add((BatchMessageIdImpl) deserialized);
           }
   ```
   
   This PR works well for the assumption above.
   
   Your solution to the corner cases you described is very hacky. You created the **same** (i.e. `equals` returns true) message ID and archive a different goal. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#issuecomment-1365271654

   Thanks for your reviews, I'm going to address the comments soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1057527970


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -529,6 +534,51 @@ protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
         return result;
     }
 
+    private void processMessageIdBeforeAcknowledge(MessageIdImpl messageId, AckType ackType, int numMessages) {
+        if (ackType == AckType.Individual) {
+            stats.incrementNumAcksSent(numMessages);
+            unAckedMessageTracker.remove(messageId);
+            if (possibleSendToDeadLetterTopicMessages != null) {
+                possibleSendToDeadLetterTopicMessages.remove(messageId);
+            }
+        } else {
+            stats.incrementNumAcksSent(unAckedMessageTracker.removeMessagesTill(messageId));
+        }
+    }
+
+    private @Nullable MessageIdImpl getMessageIdToAcknowledge(BatchMessageIdImpl messageId, AckType ackType) {
+        final BatchMessageAcker acker;
+        if (messageId.getAcker() instanceof BatchMessageAckerDisabled) {
+            acker = batchMessageToAcker.computeIfAbsent(
+                    Pair.of(messageId.getLedgerId(), messageId.getEntryId()),
+                    __ -> BatchMessageAcker.newAcker(messageId.getBatchSize()));

Review Comment:
   If all clients are not enabled `BatchMessageAcker`, this will not happen. This only happens if `BatchMessageAcker` is used and ack some messages,  then turned off and ack some messages.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#issuecomment-1367093292

   Your comments are addressed now, PTAL @315157973 
   
   BTW, since I restored the API compatibility so I added the `release/x.y.z` labels.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1059303705


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -204,6 +208,10 @@
 
     private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
     private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();
+    // Key is the ledger id and the entry id, entry is the acker that represents which single messages are acknowledged
+    private final ConcurrentNavigableMap<Pair<Long, Long>, BatchMessageAcker> batchMessageToAcker =

Review Comment:
   @codelipenghui The key point is about how to use the deserialized batch message id. IMO, when you use message id from the deserialization, before processing them completely, you should not do something else. 
   1. Get some message IDs
   2. Store them in somewhere else
   3. Load them and process them
   
   Just like the example here: https://github.com/apache/pulsar/pull/19031#discussion_r1059282261, the only difference is that message IDs are stored in memory.
   
   Your example works well because after the seek operation, you didn't use the previous message IDs. Instead, you used the new message IDs, though they represent the same positions with the previous message IDs. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1059254154


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -204,6 +208,10 @@
 
     private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
     private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();
+    // Key is the ledger id and the entry id, entry is the acker that represents which single messages are acknowledged
+    private final ConcurrentNavigableMap<Pair<Long, Long>, BatchMessageAcker> batchMessageToAcker =

Review Comment:
   > Regarding the message redelivering or reconnection, if the batch message id was not created by deserialization, the acker inside the BatchMessageIdImpl would not be modified. If we only clean up the batchMessageToAcker for deserialized batch message id, the behavior would be different.
   
   If the user decides to nack the message, they should not continue to ack it. After the message redelivery, the newly received message with a new Acker. But this PR introduced a shared state. It looks like the newly received message with the different acker can also associate with the old ack state if we don't clean up the shared state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1059276607


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -204,6 +208,10 @@
 
     private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
     private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();
+    // Key is the ledger id and the entry id, entry is the acker that represents which single messages are acknowledged
+    private final ConcurrentNavigableMap<Pair<Long, Long>, BatchMessageAcker> batchMessageToAcker =

Review Comment:
   > But the client will ack the whole message 2.
   
   However, it's the same behavior with the current master with those MessageId that have internal acker. This PR keeps the same behavior with the BatchMessageIdImpl that has the `BatchMessageAcker`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1059392558


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -204,6 +208,10 @@
 
     private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
     private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();
+    // Key is the ledger id and the entry id, entry is the acker that represents which single messages are acknowledged
+    private final ConcurrentNavigableMap<Pair<Long, Long>, BatchMessageAcker> batchMessageToAcker =

Review Comment:
   Users should not continue to process/ack the messages before the seek operation. It's not the same case as I provided which is normal usage. 
   
   And we should ensure after the seek operation, we should not return the cached messages before the seek operation to users. If users try to cache the messages or just inflight messages, users need to guarantee they will not ack the inflight or cached messages before the seek operation.
   
   > BTW, if users left one message ID (messageIdList.get(4)) to acknowledge after seek and they didn't acknowledge other message IDs, what will they expect? Did they expect messages could be received again?
   
   Yes, they should receive all the messages after the new seek position no matter what happened before the seek operation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] 315157973 commented on a diff in pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
315157973 commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1058425020


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java:
##########
@@ -105,23 +105,11 @@ public byte[] toByteArray() {
         return toByteArray(batchIndex, batchSize);
     }
 
-    public boolean ackIndividual() {
-        return acker.ackIndividual(batchIndex);
-    }
-
     public boolean ackCumulative() {
         return acker.ackCumulative(batchIndex);
     }
 
-    public int getOutstandingAcksInSameBatch() {
-        return acker.getOutstandingAcks();
-    }
-
     public int getBatchSize() {
-        return acker.getBatchSize();
-    }
-

Review Comment:
   These are the APIs exposed by the client, can they be deleted directly without any compatibility?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -529,6 +534,52 @@ protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
         return result;
     }
 
+    private void processMessageIdBeforeAcknowledge(MessageIdImpl messageId, AckType ackType, int numMessages) {
+        if (ackType == AckType.Individual) {
+            stats.incrementNumAcksSent(numMessages);
+            unAckedMessageTracker.remove(messageId);
+            if (possibleSendToDeadLetterTopicMessages != null) {
+                possibleSendToDeadLetterTopicMessages.remove(messageId);
+            }
+        } else {
+            stats.incrementNumAcksSent(unAckedMessageTracker.removeMessagesTill(messageId));
+        }
+    }
+
+    @Nullable
+    private MessageIdImpl getMessageIdToAcknowledge(BatchMessageIdImpl messageId, AckType ackType) {
+        final BatchMessageAcker acker;
+        if (messageId.getAcker() instanceof BatchMessageAckerDisabled) {
+            acker = batchMessageToAcker.computeIfAbsent(
+                    Pair.of(messageId.getLedgerId(), messageId.getEntryId()),
+                    __ -> BatchMessageAcker.newAcker(messageId.getBatchSize()));
+        } else {
+            acker = messageId.getAcker();
+        }
+        if (ackType == AckType.Individual) {
+            if (acker.ackIndividual(messageId.getBatchIndex())) {
+                batchMessageToAcker.remove(Pair.of(messageId.getLedgerId(), messageId.getEntryId()));
+                return messageId.toMessageIdImpl();
+            } else {
+                return conf.isBatchIndexAckEnabled() ? messageId : null;
+            }
+        } else {
+            if (acker.ackCumulative(messageId.getBatchIndex())) {
+                batchMessageToAcker.remove(Pair.of(messageId.getLedgerId(), messageId.getEntryId()));

Review Comment:
   Shouldn't everything before this msgId be removed?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -129,121 +124,53 @@ public boolean isDuplicate(MessageId messageId) {
     }
 
     @Override
-    public CompletableFuture<Void> addListAcknowledgment(List<MessageId> messageIds,
-                                                         AckType ackType, Map<String, Long> properties) {
-        if (AckType.Cumulative.equals(ackType)) {
-            if (consumer.isAckReceiptEnabled()) {
-                Set<CompletableFuture<Void>> completableFutureSet = new HashSet<>();
-                messageIds.forEach(messageId ->
-                        completableFutureSet.add(addAcknowledgment((MessageIdImpl) messageId, ackType, properties)));
-                return FutureUtil.waitForAll(new ArrayList<>(completableFutureSet));
+    public CompletableFuture<Void> addListAcknowledgment(List<MessageIdImpl> messageIds,

Review Comment:
   A public API is directly modified, and it is incompatible. If a user encapsulates this API and passes in other MessageId implementation classes, the old interface will be unavailable



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -549,13 +600,33 @@ protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ack
             return doTransactionAcknowledgeForResponse(messageId, ackType, null, properties,
                     new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits()));
         }
-        return acknowledgmentsGroupingTracker.addAcknowledgment((MessageIdImpl) messageId, ackType, properties);
+        if (ackType == AckType.Individual) {
+            onAcknowledge(messageId, null);
+        } else {
+            onAcknowledgeCumulative(messageId, null);
+        }
+        if (messageId instanceof BatchMessageIdImpl) {
+            BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
+            MessageIdImpl messageIdImpl = getMessageIdToAcknowledge(batchMessageId, ackType);

Review Comment:
   Would it be better to change the name of this method to AcknowledgeAndGet? Just like Atomic's IncreaseAndGet



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1059282261


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -204,6 +208,10 @@
 
     private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
     private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();
+    // Key is the ledger id and the entry id, entry is the acker that represents which single messages are acknowledged
+    private final ConcurrentNavigableMap<Pair<Long, Long>, BatchMessageAcker> batchMessageToAcker =

Review Comment:
   > You will get the message again, but with a different Acker after the seek operation.
   
   This "correct" behavior requires you to acknowledge the new messages instead of the old message ids. For example, the following code is the "wrong" behavior:
   
   ```java
   var msgIds = new ArrayList<MessageId>();
   for (int i = 0; i < 3; i++) {
       msgIds.add(consumer.receive().getMessageId());
   }
   consumer.acknowledge(msgIds.get(0));
   consumer.acknowledge(msgIds.get(2));
   consumer.seek(Message.earliest);
   consumer.acknowledge(msgIds.get(1));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1059655770


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -204,6 +208,10 @@
 
     private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
     private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();
+    // Key is the ledger id and the entry id, entry is the acker that represents which single messages are acknowledged
+    private final ConcurrentNavigableMap<Pair<Long, Long>, BatchMessageAcker> batchMessageToAcker =

Review Comment:
   > If users try to cache the messages or just inflight messages, users need to guarantee they will not ack the inflight or cached messages before the seek operation.
   
   Just like this rule you've mentioned here, when message IDs from deserializations are used, there is a rule that all message IDs should be acknowledged (for simplicity, it does not count the cumulative ACK case). IMO, if users load a set of message IDs and he only acknowledges partial of them before seeking, it should be an incorrect use.
   
   > It's not the same case as I provided which is normal usage.
   
   As I've explained above, it's not normal usage when you use message IDs from deserializations.
   
   The wrong code users should not write:
   
   ```java
   var id0 = loadMessageId();
   consumer.acknowledge(id0); // batch index: 0, batch size: 2
   var id1 = loadMessageId(); // batch index: 1, batch size: 2
   // User does not acknowledge id1 before seek
   consumer.seek(MessageId.earliest);
   // Instead, user acknowledges the outdated id1 after seek.
   consumer.acknowledge(id1);
   ```
   
   The correct code users should write:
   
   ```java
   var id0 = loadMessageId();
   consumer.acknowledge(id0); // batch index: 0, batch size: 2
   consumer.seek(MessageId.earliest);
   storeMessageId(consumer.receive().getMessageId());  // batch index: 0, batch size: 2
   storeMessageId(consumer.receive().getMessageId());  // batch index: 1, batch size: 2
   var id1 = loadMessageId(); // batch index: 0, batch size: 2
   var id2 = loadMessageId(); // batch index: 1, batch size: 2
   consumer.acknowledge(id1);
   consumer.acknowledge(id2);
   ```
   
   ----
   
   Regarding the reconnection or ack failure, it's just the same. When `acknowledgeAsync` is called on all message IDs of a batch, the cache will be removed no matter if the acknowledgment succeeded. If only partial message IDs of a batch are acknowledged, when users received messages again, they should persist message IDs again and use the new message IDs, including the repeated positions, instead of the old message IDs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1059286759


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -204,6 +208,10 @@
 
     private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
     private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();
+    // Key is the ledger id and the entry id, entry is the acker that represents which single messages are acknowledged
+    private final ConcurrentNavigableMap<Pair<Long, Long>, BatchMessageAcker> batchMessageToAcker =

Review Comment:
   I have tried them locally.
   
   ### The first case uses the message instance to ack the message and seeks to the earliest position after acked half of the messages.
   
   ```java
    @Test
    public void testAckAfterSeek() throws Exception {
        final String topic = "persistent://prop/use/ns-abc/testAckAfterSeek-" + UUID.randomUUID();
   
        @Cleanup
        org.apache.pulsar.client.api.Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                .topic(topic)
                .subscriptionName("test")
                .subscriptionType(SubscriptionType.Shared)
                .subscribe();
   
        @Cleanup
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .topic(topic)
                .batchingMaxPublishDelay(Long.MAX_VALUE, TimeUnit.MILLISECONDS)
                .create();
   
        final int messages = 10;
        for (int i = 0; i < messages; i++) {
            producer.sendAsync("New message - " + i);
        }
        producer.flush();
   
        // Only ack the first 5 messages
        for (int i = 0; i < 5; i++) {
            Message<String> received = consumer.receive();
            log.info("[x] Received message: {}", received.getValue());
            consumer.acknowledge(received);
        }
   
        consumer.seek(MessageId.earliest);
   
        for (int i = 0; i < messages; i++) {
            Message<String> received = consumer.receive();
            log.info("[x] Received message: {}", received.getValue());
            if (i > 4) {
                consumer.acknowledge(received);
            }
        }
   
        // Waiting for the message ack command send to the broker.
        Thread.sleep(3000);
   
        PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic);
        log.info("The topic internal stats : {}", Json.pretty(stats));
    }
   ```
   
   The output:
   
   ```
   2022-12-30T16:03:49,319 - INFO  - [mock-pulsar-bk-OrderedExecutor-0-0:ServerCnx@1500] - [/127.0.0.1:59957] Created new producer: Producer{topic=PersistentTopic{topic=persistent://prop/use/ns-abc/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68}, client=/127.0.0.1:59957, producerName=test-0-0, producerId=0}
   2022-12-30T16:03:49,325 - INFO  - [pulsar-client-io-37-5:ProducerImpl@1707] - [persistent://prop/use/ns-abc/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68] [test-0-0] Created producer on cnx [id: 0xbe2e8d16, L:/127.0.0.1:59957 - R:localhost/127.0.0.1:59949]
   2022-12-30T16:03:49,401 - INFO  - [main:SubscriptionSeekTest@815] - [x] Received message: New message - 0
   2022-12-30T16:03:49,403 - INFO  - [main:SubscriptionSeekTest@815] - [x] Received message: New message - 1
   2022-12-30T16:03:49,403 - INFO  - [main:SubscriptionSeekTest@815] - [x] Received message: New message - 2
   2022-12-30T16:03:49,403 - INFO  - [main:SubscriptionSeekTest@815] - [x] Received message: New message - 3
   2022-12-30T16:03:49,403 - INFO  - [main:SubscriptionSeekTest@815] - [x] Received message: New message - 4
   2022-12-30T16:03:49,405 - INFO  - [main:ConsumerImpl@2144] - [persistent://prop/use/ns-abc/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68][test] Seeking subscription to the message -1:-1:-1
   2022-12-30T16:03:49,406 - INFO  - [pulsar-io-6-2:Consumer@385] - Disconnecting consumer: Consumer{subscription=PersistentSubscription{topic=persistent://prop/use/ns-abc/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68, name=test}, consumerId=0, consumerName=78011, address=/127.0.0.1:59957}
   2022-12-30T16:03:49,407 - INFO  - [pulsar-io-6-2:PersistentDispatcherMultipleConsumers@193] - Removed consumer Consumer{subscription=PersistentSubscription{topic=persistent://prop/use/ns-abc/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68, name=test}, consumerId=0, consumerName=78011, address=/127.0.0.1:59957} with pending 1 acks
   2022-12-30T16:03:49,408 - INFO  - [pulsar-io-6-2:PersistentDispatcherMultipleConsumers@200] - [persistent://prop/use/ns-abc/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68 / test] All consumers removed. Subscription is disconnected
   2022-12-30T16:03:49,408 - INFO  - [pulsar-io-6-2:PersistentSubscription@783] - [persistent://prop/use/ns-abc/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68][test] Successfully disconnected consumers from subscription, proceeding with cursor reset
   2022-12-30T16:03:49,410 - INFO  - [mock-pulsar-bk-OrderedExecutor-0-0:ManagedCursorImpl@1227] - [prop/use/ns-abc/persistent/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68] Initiate reset readPosition to 3:-1 on cursor test
   2022-12-30T16:03:49,411 - INFO  - [pulsar-client-io-37-5:ClientCnx@796] - [localhost/127.0.0.1:59949] Broker notification of Closed consumer: 0
   2022-12-30T16:03:49,411 - INFO  - [pulsar-client-io-37-5:ConnectionHandler@144] - [persistent://prop/use/ns-abc/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68] [test] Closed connection [id: 0xbe2e8d16, L:/127.0.0.1:59957 - R:localhost/127.0.0.1:59949] -- Will try again in 0.1 s
   2022-12-30T16:03:49,415 - INFO  - [mock-pulsar-bk-OrderedExecutor-0-0:PulsarMockBookKeeper@122] - Creating ledger 7
   2022-12-30T16:03:49,425 - INFO  - [bookkeeper-ml-scheduler-OrderedScheduler-0-0:ManagedCursorImpl$30@3049] - [prop/use/ns-abc/persistent/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68] Updated cursor test with ledger id 7 md-position=3:-1 rd-position=3:1
   2022-12-30T16:03:49,433 - INFO  - [mock-pulsar-bk-OrderedExecutor-0-0:ManagedCursorImpl$9@1274] - [prop/use/ns-abc/persistent/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68] reset readPosition to 3:-1 before current read readPosition 3:1 on cursor test
   2022-12-30T16:03:49,433 - INFO  - [mock-pulsar-bk-OrderedExecutor-0-0:ServerCnx@1770] - [/127.0.0.1:59957] [persistent://prop/use/ns-abc/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68][test] Reset subscription to message id -1:-1
   2022-12-30T16:03:49,433 - INFO  - [pulsar-client-io-37-5:ConsumerImpl@2147] - [persistent://prop/use/ns-abc/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68][test] Successfully reset subscription to the message -1:-1:-1
   2022-12-30T16:03:49,512 - INFO  - [pulsar-timer-76-1:ConnectionHandler@148] - [persistent://prop/use/ns-abc/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68] [test] Reconnecting after timeout
   2022-12-30T16:03:49,517 - INFO  - [pulsar-web-32-14:Slf4jRequestLogWriter@62] - 127.0.0.1 - - [30/Dec/2022:16:03:49 +0800] "GET /lookup/v2/destination/persistent/prop/use/ns-abc/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68 HTTP/1.1" 200 217 "-" "Pulsar-Java-v2.12.0-SNAPSHOT" 4
   2022-12-30T16:03:49,517 - INFO  - [pulsar-client-io-37-3:ConsumerImpl@780] - [persistent://prop/use/ns-abc/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68][test] Subscribing to topic on cnx [id: 0xbe2e8d16, L:/127.0.0.1:59957 - R:localhost/127.0.0.1:59949], consumerId 0
   2022-12-30T16:03:49,518 - INFO  - [pulsar-io-6-2:ServerCnx@1072] - [/127.0.0.1:59957] Subscribing on topic persistent://prop/use/ns-abc/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68 / test
   2022-12-30T16:03:49,530 - INFO  - [mock-pulsar-bk-OrderedExecutor-0-0:ManagedCursorImpl@2456] - [prop/use/ns-abc/persistent/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68-test] Rewind from 3:-1 to 3:0
   2022-12-30T16:03:49,530 - INFO  - [mock-pulsar-bk-OrderedExecutor-0-0:ServerCnx@1156] - [/127.0.0.1:59957] Created subscription on topic persistent://prop/use/ns-abc/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68 / test
   2022-12-30T16:03:49,531 - INFO  - [pulsar-client-io-37-5:ConsumerImpl@914] - [persistent://prop/use/ns-abc/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68][test] Subscribed to topic on localhost/127.0.0.1:59949 -- consumer: 0
   2022-12-30T16:03:49,541 - INFO  - [main:SubscriptionSeekTest@823] - [x] Received message: New message - 0
   2022-12-30T16:03:49,542 - INFO  - [main:SubscriptionSeekTest@823] - [x] Received message: New message - 1
   2022-12-30T16:03:49,542 - INFO  - [main:SubscriptionSeekTest@823] - [x] Received message: New message - 2
   2022-12-30T16:03:49,542 - INFO  - [main:SubscriptionSeekTest@823] - [x] Received message: New message - 3
   2022-12-30T16:03:49,542 - INFO  - [main:SubscriptionSeekTest@823] - [x] Received message: New message - 4
   2022-12-30T16:03:49,542 - INFO  - [main:SubscriptionSeekTest@823] - [x] Received message: New message - 5
   2022-12-30T16:03:49,542 - INFO  - [main:SubscriptionSeekTest@823] - [x] Received message: New message - 6
   2022-12-30T16:03:49,542 - INFO  - [main:SubscriptionSeekTest@823] - [x] Received message: New message - 7
   2022-12-30T16:03:49,542 - INFO  - [main:SubscriptionSeekTest@823] - [x] Received message: New message - 8
   2022-12-30T16:03:49,543 - INFO  - [main:SubscriptionSeekTest@823] - [x] Received message: New message - 9
   2022-12-30T16:03:52,575 - INFO  - [pulsar-web-32-16:Slf4jRequestLogWriter@62] - 127.0.0.1 - - [30/Dec/2022:16:03:52 +0800] "GET /admin/persistent/prop/use/ns-abc/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68/internalStats?metadata=false HTTP/1.1" 200 966 "-" "Pulsar-Java-v2.12.0-SNAPSHOT" 20
   2022-12-30T16:03:52,596 - INFO  - [main:SubscriptionSeekTest@833] - The topic internal stats : {
     "entriesAddedCounter" : 1,
     "numberOfEntries" : 1,
     "totalSize" : 275,
     "currentLedgerEntries" : 1,
     "currentLedgerSize" : 275,
     "lastLedgerCreatedTimestamp" : "2022-12-30T16:03:49.057+08:00",
     "waitingCursorsCount" : 2,
     "pendingAddEntriesCount" : 0,
     "lastConfirmedEntry" : "3:0",
     "state" : "LedgerOpened",
     "ledgers" : [ {
       "ledgerId" : 3,
       "entries" : 0,
       "size" : 0,
       "offloaded" : false,
       "underReplicated" : false
     } ],
     "cursors" : {
       "test" : {
         "markDeletePosition" : "3:-1",
         "readPosition" : "3:1",
         "waitingReadOp" : true,
         "pendingReadOps" : 0,
         "messagesConsumedCounter" : 0,
         "cursorLedger" : 7,
         "cursorLedgerLastEntry" : 1,
         "individuallyDeletedMessages" : "[]",
         "lastLedgerSwitchTimestamp" : "2022-12-30T16:03:49.184+08:00",
         "state" : "Open",
         "active" : true,
         "numberOfEntriesSinceFirstNotAckedMessage" : 2,
         "totalNonContiguousDeletedMessagesRange" : 0,
         "subscriptionHavePendingRead" : true,
         "subscriptionHavePendingReplayRead" : false,
         "properties" : { }
       }
     },
     "schemaLedgers" : [ ],
     "compactedLedger" : {
       "ledgerId" : -1,
       "entries" : -1,
       "size" : -1,
       "offloaded" : false,
       "underReplicated" : false
     }
   }
   ```
   
   ### The second case uses the deserialized message ID to ack the message and seeks to the earliest position after acked half of the messages.
   
   ```java
       @Test
       public void testAckAfterSeek() throws Exception {
           final String topic = "persistent://prop/use/ns-abc/testAckAfterSeek-" + UUID.randomUUID();
   
           @Cleanup
           org.apache.pulsar.client.api.Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                   .topic(topic)
                   .subscriptionName("test")
                   .subscriptionType(SubscriptionType.Shared)
                   .subscribe();
   
           @Cleanup
           Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                   .topic(topic)
                   .batchingMaxPublishDelay(Long.MAX_VALUE, TimeUnit.MILLISECONDS)
                   .create();
   
           final int messages = 10;
           for (int i = 0; i < messages; i++) {
               producer.sendAsync("New message - " + i);
           }
           producer.flush();
   
           // Only ack the first 5 messages
           for (int i = 0; i < 5; i++) {
               Message<String> received = consumer.receive();
               log.info("[x] Received message: {}", received.getValue());
               MessageId deserialized = MessageId.fromByteArray(received.getMessageId().toByteArray());
               consumer.acknowledge(deserialized);
           }
   
           consumer.seek(MessageId.earliest);
   
           for (int i = 0; i < messages; i++) {
               Message<String> received = consumer.receive();
               log.info("[x] Received message: {}", received.getValue());
               if (i > 4) {
                   MessageId deserialized = MessageId.fromByteArray(received.getMessageId().toByteArray());
                   consumer.acknowledge(deserialized);
               }
           }
   
           // Waiting for the message ack command send to the broker.
           Thread.sleep(3000);
   
           PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic);
           log.info("The topic internal stats : {}", Json.pretty(stats));
       }
   ```
   
   The output:
   
   ```
   2022-12-30T16:11:32,926 - INFO  - [pulsar-client-io-37-5:ProducerImpl@1707] - [persistent://prop/use/ns-abc/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633] [test-0-0] Created producer on cnx [id: 0x18c45e74, L:/127.0.0.1:60303 - R:localhost/127.0.0.1:60295]
   2022-12-30T16:11:32,996 - INFO  - [main:SubscriptionSeekTest@815] - [x] Received message: New message - 0
   2022-12-30T16:11:32,999 - INFO  - [main:SubscriptionSeekTest@815] - [x] Received message: New message - 1
   2022-12-30T16:11:32,999 - INFO  - [main:SubscriptionSeekTest@815] - [x] Received message: New message - 2
   2022-12-30T16:11:33,000 - INFO  - [main:SubscriptionSeekTest@815] - [x] Received message: New message - 3
   2022-12-30T16:11:33,000 - INFO  - [main:SubscriptionSeekTest@815] - [x] Received message: New message - 4
   2022-12-30T16:11:33,002 - INFO  - [main:ConsumerImpl@2248] - [persistent://prop/use/ns-abc/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633][test] Seeking subscription to the message -1:-1:-1
   2022-12-30T16:11:33,003 - INFO  - [pulsar-io-6-2:Consumer@385] - Disconnecting consumer: Consumer{subscription=PersistentSubscription{topic=persistent://prop/use/ns-abc/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633, name=test}, consumerId=0, consumerName=aec48, address=/127.0.0.1:60303}
   2022-12-30T16:11:33,005 - INFO  - [pulsar-io-6-2:PersistentDispatcherMultipleConsumers@193] - Removed consumer Consumer{subscription=PersistentSubscription{topic=persistent://prop/use/ns-abc/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633, name=test}, consumerId=0, consumerName=aec48, address=/127.0.0.1:60303} with pending 1 acks
   2022-12-30T16:11:33,006 - INFO  - [pulsar-io-6-2:PersistentDispatcherMultipleConsumers@200] - [persistent://prop/use/ns-abc/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633 / test] All consumers removed. Subscription is disconnected
   2022-12-30T16:11:33,006 - INFO  - [pulsar-io-6-2:PersistentSubscription@783] - [persistent://prop/use/ns-abc/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633][test] Successfully disconnected consumers from subscription, proceeding with cursor reset
   2022-12-30T16:11:33,008 - INFO  - [mock-pulsar-bk-OrderedExecutor-0-0:ManagedCursorImpl@1227] - [prop/use/ns-abc/persistent/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633] Initiate reset readPosition to 3:-1 on cursor test
   2022-12-30T16:11:33,009 - INFO  - [pulsar-client-io-37-5:ClientCnx@796] - [localhost/127.0.0.1:60295] Broker notification of Closed consumer: 0
   2022-12-30T16:11:33,009 - INFO  - [pulsar-client-io-37-5:ConnectionHandler@144] - [persistent://prop/use/ns-abc/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633] [test] Closed connection [id: 0x18c45e74, L:/127.0.0.1:60303 - R:localhost/127.0.0.1:60295] -- Will try again in 0.1 s
   2022-12-30T16:11:33,014 - INFO  - [mock-pulsar-bk-OrderedExecutor-0-0:PulsarMockBookKeeper@122] - Creating ledger 7
   2022-12-30T16:11:33,025 - INFO  - [bookkeeper-ml-scheduler-OrderedScheduler-11-0:ManagedCursorImpl$30@3049] - [prop/use/ns-abc/persistent/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633] Updated cursor test with ledger id 7 md-position=3:-1 rd-position=3:1
   2022-12-30T16:11:33,032 - INFO  - [mock-pulsar-bk-OrderedExecutor-0-0:ManagedCursorImpl$9@1274] - [prop/use/ns-abc/persistent/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633] reset readPosition to 3:-1 before current read readPosition 3:1 on cursor test
   2022-12-30T16:11:33,032 - INFO  - [mock-pulsar-bk-OrderedExecutor-0-0:ServerCnx@1770] - [/127.0.0.1:60303] [persistent://prop/use/ns-abc/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633][test] Reset subscription to message id -1:-1
   2022-12-30T16:11:33,033 - INFO  - [pulsar-client-io-37-5:ConsumerImpl@2251] - [persistent://prop/use/ns-abc/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633][test] Successfully reset subscription to the message -1:-1:-1
   2022-12-30T16:11:33,111 - INFO  - [pulsar-timer-76-1:ConnectionHandler@148] - [persistent://prop/use/ns-abc/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633] [test] Reconnecting after timeout
   2022-12-30T16:11:33,118 - INFO  - [pulsar-web-32-14:Slf4jRequestLogWriter@62] - 127.0.0.1 - - [30/Dec/2022:16:11:33 +0800] "GET /lookup/v2/destination/persistent/prop/use/ns-abc/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633 HTTP/1.1" 200 217 "-" "Pulsar-Java-v2.12.0-SNAPSHOT" 6
   2022-12-30T16:11:33,118 - INFO  - [pulsar-client-io-37-3:ConsumerImpl@884] - [persistent://prop/use/ns-abc/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633][test] Subscribing to topic on cnx [id: 0x18c45e74, L:/127.0.0.1:60303 - R:localhost/127.0.0.1:60295], consumerId 0
   2022-12-30T16:11:33,119 - INFO  - [pulsar-io-6-2:ServerCnx@1072] - [/127.0.0.1:60303] Subscribing on topic persistent://prop/use/ns-abc/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633 / test
   2022-12-30T16:11:33,133 - INFO  - [mock-pulsar-bk-OrderedExecutor-0-0:ManagedCursorImpl@2456] - [prop/use/ns-abc/persistent/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633-test] Rewind from 3:-1 to 3:0
   2022-12-30T16:11:33,134 - INFO  - [mock-pulsar-bk-OrderedExecutor-0-0:ServerCnx@1156] - [/127.0.0.1:60303] Created subscription on topic persistent://prop/use/ns-abc/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633 / test
   2022-12-30T16:11:33,135 - INFO  - [pulsar-client-io-37-5:ConsumerImpl@1018] - [persistent://prop/use/ns-abc/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633][test] Subscribed to topic on localhost/127.0.0.1:60295 -- consumer: 0
   2022-12-30T16:11:33,145 - INFO  - [main:SubscriptionSeekTest@824] - [x] Received message: New message - 0
   2022-12-30T16:11:33,145 - INFO  - [main:SubscriptionSeekTest@824] - [x] Received message: New message - 1
   2022-12-30T16:11:33,145 - INFO  - [main:SubscriptionSeekTest@824] - [x] Received message: New message - 2
   2022-12-30T16:11:33,145 - INFO  - [main:SubscriptionSeekTest@824] - [x] Received message: New message - 3
   2022-12-30T16:11:33,145 - INFO  - [main:SubscriptionSeekTest@824] - [x] Received message: New message - 4
   2022-12-30T16:11:33,146 - INFO  - [main:SubscriptionSeekTest@824] - [x] Received message: New message - 5
   2022-12-30T16:11:33,146 - INFO  - [main:SubscriptionSeekTest@824] - [x] Received message: New message - 6
   2022-12-30T16:11:33,146 - INFO  - [main:SubscriptionSeekTest@824] - [x] Received message: New message - 7
   2022-12-30T16:11:33,147 - INFO  - [main:SubscriptionSeekTest@824] - [x] Received message: New message - 8
   2022-12-30T16:11:33,147 - INFO  - [main:SubscriptionSeekTest@824] - [x] Received message: New message - 9
   2022-12-30T16:11:36,182 - INFO  - [pulsar-web-32-16:Slf4jRequestLogWriter@62] - 127.0.0.1 - - [30/Dec/2022:16:11:36 +0800] "GET /admin/persistent/prop/use/ns-abc/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633/internalStats?metadata=false HTTP/1.1" 200 965 "-" "Pulsar-Java-v2.12.0-SNAPSHOT" 23
   2022-12-30T16:11:36,209 - INFO  - [main:SubscriptionSeekTest@835] - The topic internal stats : {
     "entriesAddedCounter" : 1,
     "numberOfEntries" : 1,
     "totalSize" : 275,
     "currentLedgerEntries" : 1,
     "currentLedgerSize" : 275,
     "lastLedgerCreatedTimestamp" : "2022-12-30T16:11:32.628+08:00",
     "waitingCursorsCount" : 1,
     "pendingAddEntriesCount" : 0,
     "lastConfirmedEntry" : "3:0",
     "state" : "LedgerOpened",
     "ledgers" : [ {
       "ledgerId" : 3,
       "entries" : 0,
       "size" : 0,
       "offloaded" : false,
       "underReplicated" : false
     } ],
     "cursors" : {
       "test" : {
         "markDeletePosition" : "3:0",
         "readPosition" : "3:1",
         "waitingReadOp" : true,
         "pendingReadOps" : 0,
         "messagesConsumedCounter" : 1,
         "cursorLedger" : 7,
         "cursorLedgerLastEntry" : 2,
         "individuallyDeletedMessages" : "[]",
         "lastLedgerSwitchTimestamp" : "2022-12-30T16:11:32.772+08:00",
         "state" : "Open",
         "active" : true,
         "numberOfEntriesSinceFirstNotAckedMessage" : 1,
         "totalNonContiguousDeletedMessagesRange" : 0,
         "subscriptionHavePendingRead" : true,
         "subscriptionHavePendingReplayRead" : false,
         "properties" : { }
       }
     },
     "schemaLedgers" : [ ],
     "compactedLedger" : {
       "ledgerId" : -1,
       "entries" : -1,
       "size" : -1,
       "offloaded" : false,
       "underReplicated" : false
     }
   }
   
   ```
   
   You will see that the batch will not be acknowledged by using the message instance but be acknowledged by using the deserialized message ID (from the topic internal stats output).  Hope I didn't miss any details 😁



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1056136563


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -573,7 +644,26 @@ protected CompletableFuture<Void> doAcknowledge(List<MessageId> messageIdList, A
             return doTransactionAcknowledgeForResponse(messageIdList, ackType, null,
                     properties, new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits()));
         } else {
-            return this.acknowledgmentsGroupingTracker.addListAcknowledgment(messageIdList, ackType, properties);
+            for (MessageId messageId : messageIdList) {
+                checkArgument(messageId instanceof MessageIdImpl);

Review Comment:
   has checked in line-631, may can delete line-631 checked



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -549,13 +599,34 @@ protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ack
             return doTransactionAcknowledgeForResponse(messageId, ackType, null, properties,
                     new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits()));
         }
-        return acknowledgmentsGroupingTracker.addAcknowledgment((MessageIdImpl) messageId, ackType, properties);
+        if (ackType == AckType.Individual) {
+            onAcknowledge(messageId, null);
+        } else {
+            onAcknowledgeCumulative(messageId, null);
+        }
+        if (messageId instanceof BatchMessageIdImpl) {
+            BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
+            MessageIdImpl messageIdImpl = getMessageIdToAcknowledge(batchMessageId, ackType);
+            if (messageIdImpl == null) {
+                return CompletableFuture.completedFuture(null);
+            } else if (messageIdImpl instanceof BatchMessageIdImpl) {
+                return acknowledgmentsGroupingTracker.addBatchIndexAck(
+                        (BatchMessageIdImpl) messageIdImpl, ackType, properties);
+            } else {
+                processMessageIdBeforeAcknowledge(messageIdImpl, ackType, batchMessageId.getOriginalBatchSize());
+                return acknowledgmentsGroupingTracker.addAcknowledgment(messageIdImpl, ackType, properties);
+            }
+        } else {
+            MessageIdImpl messageIdImpl = (MessageIdImpl) messageId;
+            processMessageIdBeforeAcknowledge(messageIdImpl, ackType, 1);
+            return acknowledgmentsGroupingTracker.addAcknowledgment(messageIdImpl, ackType, properties);
+        }
     }
 
     @Override
     protected CompletableFuture<Void> doAcknowledge(List<MessageId> messageIdList, AckType ackType,
                                                     Map<String, Long> properties, TransactionImpl txn) {
-
+        List<MessageIdImpl> messageIdListToAck = new ArrayList<>();

Review Comment:
   move to `checkArgument(messageId instanceof MessageIdImpl);` below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] michaeljmarshall commented on pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by "michaeljmarshall (via GitHub)" <gi...@apache.org>.
michaeljmarshall commented on PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#issuecomment-1610271944

   As discussed on the mailing list https://lists.apache.org/thread/w4jzk27qhtosgsz7l9bmhf1t7o9mxjhp, there is no plan to release 2.9.6, so I am going to remove the release/2.9.6 label


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1059642489


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -204,6 +208,10 @@
 
     private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
     private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();
+    // Key is the ledger id and the entry id, entry is the acker that represents which single messages are acknowledged
+    private final ConcurrentNavigableMap<Pair<Long, Long>, BatchMessageAcker> batchMessageToAcker =

Review Comment:
   > Users should not continue to process/ack the messages before the seek operation.
   
   This implicit rule is never documented and even it's documented, it could be a burden to users.
   
   Assuming there are a message ID set, given the following steps:
   1. Acnowledge some message IDs
   2. Perform some operations, e.g. `seek`.
   3. Acknowledge other message IDs.
   
   ```java
   consumer.acknowledge(msgId1);
   consumer.doSomething(); // pseudo code
   consumer.acknowledge(msgId2);
   ```
   
   The rule means, the steps above could have the different result with acknowledging the whole message ID set:
   
   ```java
   consumer.acknowledge(msgId1);
   consumer.acknowledge(msgId2);
   ```
   
   A more confusing thing is that with this rule, consumer can acknowledge another message ID that has the same position with `msgId1` to have the same result. i.e.
   
   ```java
   // CODE 1
   consumer.acknowledge(msgId1);
   consumer.doSomething();
   consumer.acknowledge(msgId2);
   consumer.acknowledge(msgId3);
   // msgId3.getLedgerId() == msgId1.getLedgerId()
   // msgId3.getEntryId() == msgId1.getEntryId()
   // msgId3.getBatchIndex() == msgId1.getBatchIndex()
   ``` 
   
   is equivalent with
   
   ```java
   // CODE 2
   consumer.acknowledge(msgId1);
   consumer.acknowledge(msgId2);
   ```
   
   What's more confusing is, `msgId1.equals(msgId3)` is true and if we replace `consumer.acknowledge(msgId3)` with `consumer.acknowledge(msgId1)` in `CODE 1`, `CODE 1` won't be equivalent with `CODE 2`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1057114168


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -529,6 +534,51 @@ protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
         return result;
     }
 
+    private void processMessageIdBeforeAcknowledge(MessageIdImpl messageId, AckType ackType, int numMessages) {
+        if (ackType == AckType.Individual) {
+            stats.incrementNumAcksSent(numMessages);
+            unAckedMessageTracker.remove(messageId);
+            if (possibleSendToDeadLetterTopicMessages != null) {
+                possibleSendToDeadLetterTopicMessages.remove(messageId);
+            }
+        } else {
+            stats.incrementNumAcksSent(unAckedMessageTracker.removeMessagesTill(messageId));
+        }
+    }
+
+    private @Nullable MessageIdImpl getMessageIdToAcknowledge(BatchMessageIdImpl messageId, AckType ackType) {
+        final BatchMessageAcker acker;
+        if (messageId.getAcker() instanceof BatchMessageAckerDisabled) {
+            acker = batchMessageToAcker.computeIfAbsent(
+                    Pair.of(messageId.getLedgerId(), messageId.getEntryId()),
+                    __ -> BatchMessageAcker.newAcker(messageId.getBatchSize()));

Review Comment:
   Possible memory leak here. E.g:
   
   ```java
   // deserialized message id {batchSize = 2, batchIndex = 1}
   // and { batchIndex = 0 } already has been acked before.
   MessageId msgId = MessageId.fromByteArray(msgId.toByteArray());
   consumer.acknowledge(msgId);
   // then the acker in the map will no longer be released.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1059252347


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -204,6 +208,10 @@
 
     private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
     private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();
+    // Key is the ledger id and the entry id, entry is the acker that represents which single messages are acknowledged
+    private final ConcurrentNavigableMap<Pair<Long, Long>, BatchMessageAcker> batchMessageToAcker =

Review Comment:
   > Why should we clean up the batchMessageToAcker after a seek operation?
   
   The seek API will lead the consumer to consume from a new start message ID. It can be an earlier position. 
   Suppose you have 3 batch messages.
   
   0:(0,1,2,3), 
   1:(0,1,2), 
   2:(0,1,2,3,4,5)
   
   The message 0, 1, and 2:(0,2,4) are acknowledged. Then the consumer seeks to the earliest position.
   Then the consumer will receive messages 0, 1, and 2 again.
   Due to we have 2:(0,2,4) in the `batchMessageToAcker` and If only 2:(1,3,5) is acked after the seek operation.
   But the client will ack the whole message 2. 
   
   From a user perspective, it can be a data loss. We should guarantee the at-lease-once semantic after the seek operation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1059392558


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -204,6 +208,10 @@
 
     private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
     private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();
+    // Key is the ledger id and the entry id, entry is the acker that represents which single messages are acknowledged
+    private final ConcurrentNavigableMap<Pair<Long, Long>, BatchMessageAcker> batchMessageToAcker =

Review Comment:
   Users should not continue to process/ack the messages before the seek operation. It's not the same case as I provided which is normal usage. 
   
   And we should ensure after the seek operation. We should not return the cached messages before the seek operation to users. If users try to cache the messages or just inflight messages, users need to guarantee they will not ack the inflight or cached messages before the seek operation.
   
   > BTW, if users left one message ID (messageIdList.get(4)) to acknowledge after seek and they didn't acknowledge other message IDs, what will they expect? Did they expect messages could be received again?
   
   Yes, they should receive all the messages after the new seek position no matter what happened before the seek operation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1057511617


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -529,6 +534,51 @@ protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
         return result;
     }
 
+    private void processMessageIdBeforeAcknowledge(MessageIdImpl messageId, AckType ackType, int numMessages) {
+        if (ackType == AckType.Individual) {
+            stats.incrementNumAcksSent(numMessages);
+            unAckedMessageTracker.remove(messageId);
+            if (possibleSendToDeadLetterTopicMessages != null) {
+                possibleSendToDeadLetterTopicMessages.remove(messageId);
+            }
+        } else {
+            stats.incrementNumAcksSent(unAckedMessageTracker.removeMessagesTill(messageId));
+        }
+    }
+
+    private @Nullable MessageIdImpl getMessageIdToAcknowledge(BatchMessageIdImpl messageId, AckType ackType) {
+        final BatchMessageAcker acker;
+        if (messageId.getAcker() instanceof BatchMessageAckerDisabled) {
+            acker = batchMessageToAcker.computeIfAbsent(
+                    Pair.of(messageId.getLedgerId(), messageId.getEntryId()),
+                    __ -> BatchMessageAcker.newAcker(messageId.getBatchSize()));

Review Comment:
   It's hard to handle this case if you meant some `MessageId`s are created from the `MessageId` and other `MessageId`s are created from `fromByteArray`.
   
   This PR assumes that when users acknowledge a `MessageId` that is created from `fromByteArray`, the other `MessageId` instances to acknowledge are also created from `fromByteArray`. I'm wondering in which case would the user adopt different ways to create `MessageId`.
   
   ```java
   var msg = consumer.receive(); // batch index: 0, batch size: 2
   consumer.acknowledge(msg);
   // Why not acknowledge the next msg, 
   var msgId = MessageId.fromByteArray(readMessageIdFromDataBase()); // batch index: 1, batch size: 2
   consumer.acknowledge(msgId);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tisonkun commented on a diff in pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1058110192


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -529,6 +534,51 @@ protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
         return result;
     }
 
+    private void processMessageIdBeforeAcknowledge(MessageIdImpl messageId, AckType ackType, int numMessages) {
+        if (ackType == AckType.Individual) {
+            stats.incrementNumAcksSent(numMessages);
+            unAckedMessageTracker.remove(messageId);
+            if (possibleSendToDeadLetterTopicMessages != null) {
+                possibleSendToDeadLetterTopicMessages.remove(messageId);
+            }
+        } else {
+            stats.incrementNumAcksSent(unAckedMessageTracker.removeMessagesTill(messageId));
+        }
+    }
+
+    private @Nullable MessageIdImpl getMessageIdToAcknowledge(BatchMessageIdImpl messageId, AckType ackType) {

Review Comment:
   ```suggestion
       @Nullable
       private MessageIdImpl getMessageIdToAcknowledge(BatchMessageIdImpl messageId, AckType ackType) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #19031: [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1057527970


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -529,6 +534,51 @@ protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
         return result;
     }
 
+    private void processMessageIdBeforeAcknowledge(MessageIdImpl messageId, AckType ackType, int numMessages) {
+        if (ackType == AckType.Individual) {
+            stats.incrementNumAcksSent(numMessages);
+            unAckedMessageTracker.remove(messageId);
+            if (possibleSendToDeadLetterTopicMessages != null) {
+                possibleSendToDeadLetterTopicMessages.remove(messageId);
+            }
+        } else {
+            stats.incrementNumAcksSent(unAckedMessageTracker.removeMessagesTill(messageId));
+        }
+    }
+
+    private @Nullable MessageIdImpl getMessageIdToAcknowledge(BatchMessageIdImpl messageId, AckType ackType) {
+        final BatchMessageAcker acker;
+        if (messageId.getAcker() instanceof BatchMessageAckerDisabled) {
+            acker = batchMessageToAcker.computeIfAbsent(
+                    Pair.of(messageId.getLedgerId(), messageId.getEntryId()),
+                    __ -> BatchMessageAcker.newAcker(messageId.getBatchSize()));

Review Comment:
   If all clients are not enabled `BatchMessageAcker`, this will not happen



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org