You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/25 12:43:11 UTC

[GitHub] [pulsar] BewareMyPower opened a new pull request, #17833: [refactor][java] Unify the acknowledge process for batch and non-batch message IDs

BewareMyPower opened a new pull request, #17833:
URL: https://github.com/apache/pulsar/pull/17833

   ### Motivation
   
   When I read the acknowledge related code, I found much repeat code and the inconsistent code style. Generally we should handle the following cases for a given message ID:
   1. The message ID doesn't contain a valid batch index, i.e. `messageId instanceof BatchMessageIdImpl` returns false.
   2. Otherwise, it's a `BatchMessageIdImpl`. The behavior is determined by whether the batch index ACK is enabled.
   
   For individual ACKs, the stats and unacked message tracker must be updated according to the type of message ID. Since `MessageIdImpl` is used as the key, for a `BatchMessageIdImpl`, we must create a `MessageIdImpl` from it as the key of these hash maps.
   
   The existing methods `modifyBatchMessageIdAndStatesInConsumer` and `modifyMessageIdStatesInConsumer` are confused. So I decide to make the code simple and clear.
   
   ### Modifications
   
   - Add a `getMessageIdImpl` method to `BatchMessageIdImpl` to create a `MessageIdImpl` object from it.
   - Add an `addAcknowledgement` overload to accept an extra `BatchMessageIdImpl` argument, which is used to compute the number of ACK commands sent and perform batch related ACKs. The `MessageIdImpl` argument is used to update the DLQ and unacked tracker.
   - Add a `addIndividualAcknowledgement` method for individual ACKs. This method accept two functions to handle ACK for batch and non-batch message IDs because when a list of message IDs are acknowledeged, `doIndividualAckAsync` and `doIndividualBatchAck` will be called instead of `doIndividualAck` and `doIndividualBatchAck`.
   
   ### Documentation
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
   
   - [x] `doc-not-needed` 
   (Please explain why)
   
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)
   
   ### Matching PR in forked repository
   
   PR in forked repository: https://github.com/BewareMyPower/pulsar/pull/4


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #17833: [refactor][java] Unify the acknowledge process for batch and non-batch message IDs

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #17833:
URL: https://github.com/apache/pulsar/pull/17833#discussion_r980259633


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -183,67 +187,57 @@ public CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, AckType ac
                                                      Map<String, Long> properties) {
         if (msgId instanceof BatchMessageIdImpl) {
             BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId;
-            if (ackType == AckType.Individual) {
-                consumer.onAcknowledge(msgId, null);
-                // ack this ack carry bitSet index and judge bit set are all ack
-                if (batchMessageId.ackIndividual()) {
-                    MessageIdImpl messageId = modifyBatchMessageIdAndStatesInConsumer(batchMessageId);
-                    return doIndividualAck(messageId, properties);
-                } else if (batchIndexAckEnabled){
-                    return doIndividualBatchAck(batchMessageId, properties);
-                } else {
-                    // if we prevent batchIndexAck, we can't send the ack command to broker when the batch message are
-                    // all ack complete
-                    return CompletableFuture.completedFuture(null);
-                }
-            } else {
-                consumer.onAcknowledgeCumulative(msgId, null);
-                if (batchMessageId.ackCumulative()) {
-                    return doCumulativeAck(msgId, properties, null);
-                } else {
-                    if (batchIndexAckEnabled) {
-                        return doCumulativeBatchIndexAck(batchMessageId, properties);
-                    } else {
-                        // ack the pre messageId, because we prevent the batchIndexAck, we can ensure pre messageId can
-                        // ack
-                        if (AckType.Cumulative == ackType
-                                && !batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) {
-                            doCumulativeAck(batchMessageId.prevBatchMessageId(), properties, null);
-                            batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true);
-                        }
-                        return CompletableFuture.completedFuture(null);
-                    }
-                }
-            }
+            return addAcknowledgment(batchMessageId.toMessageIdImpl(), ackType, properties, batchMessageId);
         } else {
-            if (ackType == AckType.Individual) {
-                consumer.onAcknowledge(msgId, null);
-                modifyMessageIdStatesInConsumer(msgId);
-                return doIndividualAck(msgId, properties);
-            } else {
-                consumer.onAcknowledgeCumulative(msgId, null);
-                return doCumulativeAck(msgId, properties, null);
-            }
+            return addAcknowledgment(msgId, ackType, properties, null);
         }
     }
 
-    private MessageIdImpl modifyBatchMessageIdAndStatesInConsumer(BatchMessageIdImpl batchMessageId) {
-        MessageIdImpl messageId = new MessageIdImpl(batchMessageId.getLedgerId(),
-                batchMessageId.getEntryId(), batchMessageId.getPartitionIndex());
-        consumer.getStats().incrementNumAcksSent(batchMessageId.getBatchSize());
-        clearMessageIdFromUnAckTrackerAndDeadLetter(messageId);
-        return messageId;
-    }
-
-    private void modifyMessageIdStatesInConsumer(MessageIdImpl messageId) {
-        consumer.getStats().incrementNumAcksSent(1);
-        clearMessageIdFromUnAckTrackerAndDeadLetter(messageId);
+    private CompletableFuture<Void> addIndividualAcknowledgment(
+            MessageIdImpl msgId,
+            @Nullable BatchMessageIdImpl batchMessageId,
+            Function<MessageIdImpl, CompletableFuture<Void>> individualAckFunction,
+            Function<BatchMessageIdImpl, CompletableFuture<Void>> batchAckFunction) {
+        consumer.onAcknowledge(msgId, null);

Review Comment:
   I have fixed it and add some tests to avoid regression, PTAL again. @lhotari 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #17833: [refactor][java] Unify the acknowledge process for batch and non-batch message IDs

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #17833:
URL: https://github.com/apache/pulsar/pull/17833#discussion_r979691065


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -183,67 +187,57 @@ public CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, AckType ac
                                                      Map<String, Long> properties) {
         if (msgId instanceof BatchMessageIdImpl) {
             BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId;
-            if (ackType == AckType.Individual) {
-                consumer.onAcknowledge(msgId, null);
-                // ack this ack carry bitSet index and judge bit set are all ack
-                if (batchMessageId.ackIndividual()) {
-                    MessageIdImpl messageId = modifyBatchMessageIdAndStatesInConsumer(batchMessageId);
-                    return doIndividualAck(messageId, properties);
-                } else if (batchIndexAckEnabled){
-                    return doIndividualBatchAck(batchMessageId, properties);
-                } else {
-                    // if we prevent batchIndexAck, we can't send the ack command to broker when the batch message are
-                    // all ack complete
-                    return CompletableFuture.completedFuture(null);
-                }
-            } else {
-                consumer.onAcknowledgeCumulative(msgId, null);
-                if (batchMessageId.ackCumulative()) {
-                    return doCumulativeAck(msgId, properties, null);
-                } else {
-                    if (batchIndexAckEnabled) {
-                        return doCumulativeBatchIndexAck(batchMessageId, properties);
-                    } else {
-                        // ack the pre messageId, because we prevent the batchIndexAck, we can ensure pre messageId can
-                        // ack
-                        if (AckType.Cumulative == ackType
-                                && !batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) {
-                            doCumulativeAck(batchMessageId.prevBatchMessageId(), properties, null);
-                            batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true);
-                        }
-                        return CompletableFuture.completedFuture(null);
-                    }
-                }
-            }
+            return addAcknowledgment(batchMessageId.toMessageIdImpl(), ackType, properties, batchMessageId);
         } else {
-            if (ackType == AckType.Individual) {
-                consumer.onAcknowledge(msgId, null);
-                modifyMessageIdStatesInConsumer(msgId);
-                return doIndividualAck(msgId, properties);
-            } else {
-                consumer.onAcknowledgeCumulative(msgId, null);
-                return doCumulativeAck(msgId, properties, null);
-            }
+            return addAcknowledgment(msgId, ackType, properties, null);
         }
     }
 
-    private MessageIdImpl modifyBatchMessageIdAndStatesInConsumer(BatchMessageIdImpl batchMessageId) {
-        MessageIdImpl messageId = new MessageIdImpl(batchMessageId.getLedgerId(),
-                batchMessageId.getEntryId(), batchMessageId.getPartitionIndex());
-        consumer.getStats().incrementNumAcksSent(batchMessageId.getBatchSize());
-        clearMessageIdFromUnAckTrackerAndDeadLetter(messageId);
-        return messageId;
-    }
-
-    private void modifyMessageIdStatesInConsumer(MessageIdImpl messageId) {
-        consumer.getStats().incrementNumAcksSent(1);
-        clearMessageIdFromUnAckTrackerAndDeadLetter(messageId);
+    private CompletableFuture<Void> addIndividualAcknowledgment(
+            MessageIdImpl msgId,
+            @Nullable BatchMessageIdImpl batchMessageId,
+            Function<MessageIdImpl, CompletableFuture<Void>> individualAckFunction,
+            Function<BatchMessageIdImpl, CompletableFuture<Void>> batchAckFunction) {
+        consumer.onAcknowledge(msgId, null);

Review Comment:
   Good catch! I will fix it soon.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on a diff in pull request #17833: [refactor][java] Unify the acknowledge process for batch and non-batch message IDs

Posted by GitBox <gi...@apache.org>.
lhotari commented on code in PR #17833:
URL: https://github.com/apache/pulsar/pull/17833#discussion_r979686788


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -183,67 +187,57 @@ public CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, AckType ac
                                                      Map<String, Long> properties) {
         if (msgId instanceof BatchMessageIdImpl) {
             BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId;
-            if (ackType == AckType.Individual) {
-                consumer.onAcknowledge(msgId, null);
-                // ack this ack carry bitSet index and judge bit set are all ack
-                if (batchMessageId.ackIndividual()) {
-                    MessageIdImpl messageId = modifyBatchMessageIdAndStatesInConsumer(batchMessageId);
-                    return doIndividualAck(messageId, properties);
-                } else if (batchIndexAckEnabled){
-                    return doIndividualBatchAck(batchMessageId, properties);
-                } else {
-                    // if we prevent batchIndexAck, we can't send the ack command to broker when the batch message are
-                    // all ack complete
-                    return CompletableFuture.completedFuture(null);
-                }
-            } else {
-                consumer.onAcknowledgeCumulative(msgId, null);
-                if (batchMessageId.ackCumulative()) {
-                    return doCumulativeAck(msgId, properties, null);
-                } else {
-                    if (batchIndexAckEnabled) {
-                        return doCumulativeBatchIndexAck(batchMessageId, properties);
-                    } else {
-                        // ack the pre messageId, because we prevent the batchIndexAck, we can ensure pre messageId can
-                        // ack
-                        if (AckType.Cumulative == ackType
-                                && !batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) {
-                            doCumulativeAck(batchMessageId.prevBatchMessageId(), properties, null);
-                            batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true);
-                        }
-                        return CompletableFuture.completedFuture(null);
-                    }
-                }
-            }
+            return addAcknowledgment(batchMessageId.toMessageIdImpl(), ackType, properties, batchMessageId);
         } else {
-            if (ackType == AckType.Individual) {
-                consumer.onAcknowledge(msgId, null);
-                modifyMessageIdStatesInConsumer(msgId);
-                return doIndividualAck(msgId, properties);
-            } else {
-                consumer.onAcknowledgeCumulative(msgId, null);
-                return doCumulativeAck(msgId, properties, null);
-            }
+            return addAcknowledgment(msgId, ackType, properties, null);
         }
     }
 
-    private MessageIdImpl modifyBatchMessageIdAndStatesInConsumer(BatchMessageIdImpl batchMessageId) {
-        MessageIdImpl messageId = new MessageIdImpl(batchMessageId.getLedgerId(),
-                batchMessageId.getEntryId(), batchMessageId.getPartitionIndex());
-        consumer.getStats().incrementNumAcksSent(batchMessageId.getBatchSize());
-        clearMessageIdFromUnAckTrackerAndDeadLetter(messageId);
-        return messageId;
-    }
-
-    private void modifyMessageIdStatesInConsumer(MessageIdImpl messageId) {
-        consumer.getStats().incrementNumAcksSent(1);
-        clearMessageIdFromUnAckTrackerAndDeadLetter(messageId);
+    private CompletableFuture<Void> addIndividualAcknowledgment(
+            MessageIdImpl msgId,
+            @Nullable BatchMessageIdImpl batchMessageId,
+            Function<MessageIdImpl, CompletableFuture<Void>> individualAckFunction,
+            Function<BatchMessageIdImpl, CompletableFuture<Void>> batchAckFunction) {
+        consumer.onAcknowledge(msgId, null);

Review Comment:
   there's a change in behavior in calling `onAcknowledge`. Previous, the original message id instance got passed. Now some information is potentially lost in the case of a batch message id.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on a diff in pull request #17833: [refactor][java] Unify the acknowledge process for batch and non-batch message IDs

Posted by GitBox <gi...@apache.org>.
lhotari commented on code in PR #17833:
URL: https://github.com/apache/pulsar/pull/17833#discussion_r982418000


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -183,67 +187,57 @@ public CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, AckType ac
                                                      Map<String, Long> properties) {
         if (msgId instanceof BatchMessageIdImpl) {
             BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId;
-            if (ackType == AckType.Individual) {
-                consumer.onAcknowledge(msgId, null);
-                // ack this ack carry bitSet index and judge bit set are all ack
-                if (batchMessageId.ackIndividual()) {
-                    MessageIdImpl messageId = modifyBatchMessageIdAndStatesInConsumer(batchMessageId);
-                    return doIndividualAck(messageId, properties);
-                } else if (batchIndexAckEnabled){
-                    return doIndividualBatchAck(batchMessageId, properties);
-                } else {
-                    // if we prevent batchIndexAck, we can't send the ack command to broker when the batch message are
-                    // all ack complete
-                    return CompletableFuture.completedFuture(null);
-                }
-            } else {
-                consumer.onAcknowledgeCumulative(msgId, null);
-                if (batchMessageId.ackCumulative()) {
-                    return doCumulativeAck(msgId, properties, null);
-                } else {
-                    if (batchIndexAckEnabled) {
-                        return doCumulativeBatchIndexAck(batchMessageId, properties);
-                    } else {
-                        // ack the pre messageId, because we prevent the batchIndexAck, we can ensure pre messageId can
-                        // ack
-                        if (AckType.Cumulative == ackType
-                                && !batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) {
-                            doCumulativeAck(batchMessageId.prevBatchMessageId(), properties, null);
-                            batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true);
-                        }
-                        return CompletableFuture.completedFuture(null);
-                    }
-                }
-            }
+            return addAcknowledgment(batchMessageId.toMessageIdImpl(), ackType, properties, batchMessageId);
         } else {
-            if (ackType == AckType.Individual) {
-                consumer.onAcknowledge(msgId, null);
-                modifyMessageIdStatesInConsumer(msgId);
-                return doIndividualAck(msgId, properties);
-            } else {
-                consumer.onAcknowledgeCumulative(msgId, null);
-                return doCumulativeAck(msgId, properties, null);
-            }
+            return addAcknowledgment(msgId, ackType, properties, null);
         }
     }
 
-    private MessageIdImpl modifyBatchMessageIdAndStatesInConsumer(BatchMessageIdImpl batchMessageId) {
-        MessageIdImpl messageId = new MessageIdImpl(batchMessageId.getLedgerId(),
-                batchMessageId.getEntryId(), batchMessageId.getPartitionIndex());
-        consumer.getStats().incrementNumAcksSent(batchMessageId.getBatchSize());
-        clearMessageIdFromUnAckTrackerAndDeadLetter(messageId);
-        return messageId;
-    }
-
-    private void modifyMessageIdStatesInConsumer(MessageIdImpl messageId) {
-        consumer.getStats().incrementNumAcksSent(1);
-        clearMessageIdFromUnAckTrackerAndDeadLetter(messageId);
+    private CompletableFuture<Void> addIndividualAcknowledgment(
+            MessageIdImpl msgId,
+            @Nullable BatchMessageIdImpl batchMessageId,
+            Function<MessageIdImpl, CompletableFuture<Void>> individualAckFunction,
+            Function<BatchMessageIdImpl, CompletableFuture<Void>> batchAckFunction) {
+        consumer.onAcknowledge(msgId, null);

Review Comment:
   LGTM



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on pull request #17833: [refactor][java] Unify the acknowledge process for batch and non-batch message IDs

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on PR #17833:
URL: https://github.com/apache/pulsar/pull/17833#issuecomment-1263188224

   @eolivelli @Shoothzj Could you take a second look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #17833: [refactor][java] Unify the acknowledge process for batch and non-batch message IDs

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #17833:
URL: https://github.com/apache/pulsar/pull/17833#discussion_r979536545


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java:
##########
@@ -147,6 +147,12 @@ public MessageIdImpl prevBatchMessageId() {
             ledgerId, entryId - 1, partitionIndex);
     }
 
+    // MessageIdImpl is widely used as the key of a hash map, in this case, we should convert the batch message id to
+    // have the correct hash code.
+    public MessageIdImpl getMessageIdImpl() {

Review Comment:
   Good suggestion. `get` usually means returning an internal field (`A` -> `A.x`), while `to` usually means the conversion (`A -> B`). Addressed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #17833: [refactor][java] Unify the acknowledge process for batch and non-batch message IDs

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #17833:
URL: https://github.com/apache/pulsar/pull/17833#discussion_r979458378


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java:
##########
@@ -147,6 +147,12 @@ public MessageIdImpl prevBatchMessageId() {
             ledgerId, entryId - 1, partitionIndex);
     }
 
+    // MessageIdImpl is widely used as the key of a hash map, in this case, we should convert the batch message id to
+    // have the correct hash code.
+    public MessageIdImpl getMessageIdImpl() {

Review Comment:
   I would name this 'toMessageIdImpl()'



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower merged pull request #17833: [refactor][java] Unify the acknowledge process for batch and non-batch message IDs

Posted by GitBox <gi...@apache.org>.
BewareMyPower merged PR #17833:
URL: https://github.com/apache/pulsar/pull/17833


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #17833: [refactor][java] Unify the acknowledge process for batch and non-batch message IDs

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #17833:
URL: https://github.com/apache/pulsar/pull/17833#discussion_r979703073


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java:
##########
@@ -183,67 +187,57 @@ public CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, AckType ac
                                                      Map<String, Long> properties) {
         if (msgId instanceof BatchMessageIdImpl) {
             BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId;
-            if (ackType == AckType.Individual) {
-                consumer.onAcknowledge(msgId, null);
-                // ack this ack carry bitSet index and judge bit set are all ack
-                if (batchMessageId.ackIndividual()) {
-                    MessageIdImpl messageId = modifyBatchMessageIdAndStatesInConsumer(batchMessageId);
-                    return doIndividualAck(messageId, properties);
-                } else if (batchIndexAckEnabled){
-                    return doIndividualBatchAck(batchMessageId, properties);
-                } else {
-                    // if we prevent batchIndexAck, we can't send the ack command to broker when the batch message are
-                    // all ack complete
-                    return CompletableFuture.completedFuture(null);
-                }
-            } else {
-                consumer.onAcknowledgeCumulative(msgId, null);
-                if (batchMessageId.ackCumulative()) {
-                    return doCumulativeAck(msgId, properties, null);
-                } else {
-                    if (batchIndexAckEnabled) {
-                        return doCumulativeBatchIndexAck(batchMessageId, properties);
-                    } else {
-                        // ack the pre messageId, because we prevent the batchIndexAck, we can ensure pre messageId can
-                        // ack
-                        if (AckType.Cumulative == ackType
-                                && !batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) {
-                            doCumulativeAck(batchMessageId.prevBatchMessageId(), properties, null);
-                            batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true);
-                        }
-                        return CompletableFuture.completedFuture(null);
-                    }
-                }
-            }
+            return addAcknowledgment(batchMessageId.toMessageIdImpl(), ackType, properties, batchMessageId);
         } else {
-            if (ackType == AckType.Individual) {
-                consumer.onAcknowledge(msgId, null);
-                modifyMessageIdStatesInConsumer(msgId);
-                return doIndividualAck(msgId, properties);
-            } else {
-                consumer.onAcknowledgeCumulative(msgId, null);
-                return doCumulativeAck(msgId, properties, null);
-            }
+            return addAcknowledgment(msgId, ackType, properties, null);
         }
     }
 
-    private MessageIdImpl modifyBatchMessageIdAndStatesInConsumer(BatchMessageIdImpl batchMessageId) {
-        MessageIdImpl messageId = new MessageIdImpl(batchMessageId.getLedgerId(),
-                batchMessageId.getEntryId(), batchMessageId.getPartitionIndex());
-        consumer.getStats().incrementNumAcksSent(batchMessageId.getBatchSize());
-        clearMessageIdFromUnAckTrackerAndDeadLetter(messageId);
-        return messageId;
-    }
-
-    private void modifyMessageIdStatesInConsumer(MessageIdImpl messageId) {
-        consumer.getStats().incrementNumAcksSent(1);
-        clearMessageIdFromUnAckTrackerAndDeadLetter(messageId);
+    private CompletableFuture<Void> addIndividualAcknowledgment(
+            MessageIdImpl msgId,
+            @Nullable BatchMessageIdImpl batchMessageId,
+            Function<MessageIdImpl, CompletableFuture<Void>> individualAckFunction,
+            Function<BatchMessageIdImpl, CompletableFuture<Void>> batchAckFunction) {
+        consumer.onAcknowledge(msgId, null);

Review Comment:
   BTW, I think the current test for the interceptor are not enough, I will add the related tests as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on pull request #17833: [refactor][java] Unify the acknowledge process for batch and non-batch message IDs

Posted by GitBox <gi...@apache.org>.
lhotari commented on PR #17833:
URL: https://github.com/apache/pulsar/pull/17833#issuecomment-1260929648

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org