You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/10/06 13:05:19 UTC

[pulsar] branch master updated: [refactor][java] Unify the acknowledge process for batch and non-batch message IDs (#17833)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 83309edab53 [refactor][java] Unify the acknowledge process for batch and non-batch message IDs (#17833)
83309edab53 is described below

commit 83309edab53e090c4126dca8a46b6a5499a3b257
Author: Yunze Xu <xy...@163.com>
AuthorDate: Thu Oct 6 21:05:10 2022 +0800

    [refactor][java] Unify the acknowledge process for batch and non-batch message IDs (#17833)
---
 .../client/impl/ConsumerAckResponseTest.java       | 100 --------
 .../apache/pulsar/client/impl/ConsumerAckTest.java | 256 +++++++++++++++++++++
 .../pulsar/client/impl/BatchMessageIdImpl.java     |   6 +
 .../PersistentAcknowledgmentsGroupingTracker.java  | 139 +++++------
 4 files changed, 333 insertions(+), 168 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckResponseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckResponseTest.java
deleted file mode 100644
index f86bbabdd88..00000000000
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckResponseTest.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.impl;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.testng.Assert.fail;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import lombok.Cleanup;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.transaction.TransactionImpl;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-@Test(groups = "broker-impl")
-public class ConsumerAckResponseTest extends ProducerConsumerBase {
-
-    private TransactionImpl transaction;
-
-    @BeforeClass(alwaysRun = true)
-    public void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
-        transaction = mock(TransactionImpl.class);
-        doReturn(1L).when(transaction).getTxnIdLeastBits();
-        doReturn(1L).when(transaction).getTxnIdMostBits();
-        doReturn(TransactionImpl.State.OPEN).when(transaction).getState();
-        CompletableFuture<Void> completableFuture = CompletableFuture.completedFuture(null);
-        doNothing().when(transaction).registerAckOp(any());
-        doReturn(true).when(transaction).checkIfOpen(any());
-        doReturn(completableFuture).when(transaction).registerAckedTopic(any(), any());
-
-        Thread.sleep(1000 * 3);
-    }
-
-    @AfterClass(alwaysRun = true)
-    public void cleanup() throws Exception {
-        super.internalCleanup();
-    }
-
-    @Test
-    public void testAckResponse() throws PulsarClientException, InterruptedException {
-        String topic = "testAckResponse";
-        @Cleanup
-        Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
-                .topic(topic)
-                .enableBatching(false)
-                .create();
-        @Cleanup
-        ConsumerImpl<Integer> consumer = (ConsumerImpl<Integer>) pulsarClient.newConsumer(Schema.INT32)
-                .topic(topic)
-                .subscriptionName("sub")
-                .subscriptionType(SubscriptionType.Shared)
-                .ackTimeout(1, TimeUnit.SECONDS)
-                .subscribe();
-        producer.send(1);
-        producer.send(2);
-        try {
-            consumer.acknowledgeAsync(new MessageIdImpl(1, 1, 1), transaction).get();
-            fail();
-        } catch (ExecutionException e) {
-            Assert.assertTrue(e.getCause() instanceof PulsarClientException.NotAllowedException);
-        }
-        Message<Integer> message = consumer.receive();
-
-        try {
-            consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
-            fail();
-        } catch (ExecutionException e) {
-            Assert.assertTrue(e.getCause() instanceof PulsarClientException.NotAllowedException);
-        }
-    }
-}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java
new file mode 100644
index 00000000000..ea2815641f1
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import lombok.AllArgsConstructor;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerInterceptor;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.transaction.TransactionImpl;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import org.testng.collections.Sets;
+
+@Slf4j
+@Test(groups = "broker-impl")
+public class ConsumerAckTest extends ProducerConsumerBase {
+
+    private TransactionImpl transaction;
+    private PulsarClient clientWithStats;
+
+    @BeforeClass(alwaysRun = true)
+    public void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+        this.clientWithStats = newPulsarClient(lookupUrl.toString(), 30);
+        transaction = mock(TransactionImpl.class);
+        doReturn(1L).when(transaction).getTxnIdLeastBits();
+        doReturn(1L).when(transaction).getTxnIdMostBits();
+        doReturn(TransactionImpl.State.OPEN).when(transaction).getState();
+        CompletableFuture<Void> completableFuture = CompletableFuture.completedFuture(null);
+        doNothing().when(transaction).registerAckOp(any());
+        doReturn(true).when(transaction).checkIfOpen(any());
+        doReturn(completableFuture).when(transaction).registerAckedTopic(any(), any());
+    }
+
+    @AfterClass(alwaysRun = true)
+    public void cleanup() throws Exception {
+        this.clientWithStats.close();
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testAckResponse() throws PulsarClientException, InterruptedException {
+        String topic = "testAckResponse";
+        @Cleanup
+        Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+        @Cleanup
+        ConsumerImpl<Integer> consumer = (ConsumerImpl<Integer>) pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName("sub")
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .subscribe();
+        producer.send(1);
+        producer.send(2);
+        try {
+            consumer.acknowledgeAsync(new MessageIdImpl(1, 1, 1), transaction).get();
+            fail();
+        } catch (ExecutionException e) {
+            Assert.assertTrue(e.getCause() instanceof PulsarClientException.NotAllowedException);
+        }
+        Message<Integer> message = consumer.receive();
+
+        try {
+            consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
+            fail();
+        } catch (ExecutionException e) {
+            Assert.assertTrue(e.getCause() instanceof PulsarClientException.NotAllowedException);
+        }
+    }
+
+    @Test
+    public void testIndividualAck() throws Exception {
+        @Cleanup AckTestData data = prepareDataForAck("test-individual-ack");
+        for (MessageId messageId : data.messageIds) {
+            data.consumer.acknowledge(messageId);
+        }
+        assertEquals(data.interceptor.individualAckedMessageIdList, data.messageIds);
+        assertEquals(data.consumer.getStats().getNumAcksSent(), data.size());
+        assertTrue(data.consumer.getUnAckedMessageTracker().isEmpty());
+    }
+
+    @Test
+    public void testIndividualAckList() throws Exception {
+        @Cleanup AckTestData data = prepareDataForAck("test-individual-ack-list");
+        data.consumer.acknowledge(data.messageIds);
+        assertEquals(data.interceptor.individualAckedMessageIdList, data.messageIds);
+        assertEquals(data.consumer.getStats().getNumAcksSent(), data.size());
+        assertTrue(data.consumer.getUnAckedMessageTracker().isEmpty());
+    }
+
+    @Test
+    public void testCumulativeAck() throws Exception {
+        @Cleanup AckTestData data = prepareDataForAck("test-cumulative-ack");
+        System.out.println(data.size());
+        data.consumer.acknowledgeCumulative(data.messageIds.get(data.size() - 1));
+        assertEquals(data.interceptor.cumulativeAckedMessageIdList.get(0),
+                data.messageIds.get(data.messageIds.size() - 1));
+        assertEquals(data.consumer.getStats().getNumAcksSent(), 2);
+        assertTrue(data.consumer.getUnAckedMessageTracker().isEmpty());
+    }
+
+    // Send 1 non-batched message, then send N-1 messages that are in the same batch
+    private AckTestData prepareDataForAck(String topic) throws PulsarClientException {
+        final int numMessages = 10;
+        @Cleanup Producer<String> batchProducer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .enableBatching(true)
+                .batchingMaxMessages(numMessages - 1)
+                .batchingMaxPublishDelay(1, TimeUnit.SECONDS)
+                .create();
+        @Cleanup Producer<String> nonBatchProducer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+        AckStatsInterceptor interceptor = new AckStatsInterceptor();
+        ConsumerImpl<String> consumer = (ConsumerImpl<String>) clientWithStats.newConsumer(Schema.STRING).topic(topic)
+                .subscriptionName("sub").intercept(interceptor).ackTimeout(10, TimeUnit.SECONDS).subscribe();
+
+        nonBatchProducer.send("msg-0");
+        for (int i = 1; i < numMessages; i++) {
+            batchProducer.sendAsync("msg-" + i);
+        }
+        List<MessageId> messageIds = new ArrayList<>();
+        for (int i = 0; i < numMessages; i++) {
+            Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
+            assertNotNull(message);
+            messageIds.add(message.getMessageId());
+        }
+        MessageId firstEntryMessageId = messageIds.get(0);
+        MessageId secondEntryMessageId = ((BatchMessageIdImpl) messageIds.get(1)).toMessageIdImpl();
+        // Verify messages 2 to N must be in the same entry
+        for (int i = 2; i < messageIds.size(); i++) {
+            assertEquals(((BatchMessageIdImpl) messageIds.get(i)).toMessageIdImpl(), secondEntryMessageId);
+        }
+
+        assertTrue(interceptor.individualAckedMessageIdList.isEmpty());
+        assertTrue(interceptor.cumulativeAckedMessageIdList.isEmpty());
+        assertEquals(consumer.getStats().getNumAcksSent(), 0);
+        assertNotNull(consumer.getUnAckedMessageTracker().messageIdPartitionMap);
+        assertEquals(consumer.getUnAckedMessageTracker().messageIdPartitionMap.keySet(),
+                Sets.newHashSet(firstEntryMessageId, secondEntryMessageId));
+        return new AckTestData(consumer, interceptor, messageIds);
+    }
+
+    // Send 10 messages, the 1st message is a non-batched message, the other messages are in the same batch
+    @AllArgsConstructor
+    private static class AckTestData implements Closeable {
+
+        private final ConsumerImpl<String> consumer;
+        private final AckStatsInterceptor interceptor;
+        private final List<MessageId> messageIds;
+
+        public int size() {
+            return messageIds.size();
+        }
+
+        @Override
+        public void close() throws IOException {
+            interceptor.close();
+            consumer.close();
+        }
+    }
+
+    private static class AckStatsInterceptor implements ConsumerInterceptor<String> {
+
+        private final List<MessageId> individualAckedMessageIdList = new CopyOnWriteArrayList<>();
+        private final List<MessageId> cumulativeAckedMessageIdList = new CopyOnWriteArrayList<>();
+
+        @Override
+        public void close() {
+            // No ops
+        }
+
+        @Override
+        public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
+            return message;
+        }
+
+        @Override
+        public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable exception) {
+            if (exception != null) {
+                log.error("[{}] Failed to acknowledge {}", consumer.getConsumerName(), messageId);
+                return;
+            }
+            individualAckedMessageIdList.add(messageId);
+        }
+
+        @Override
+        public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable exception) {
+            if (exception != null) {
+                log.error("[{}] Failed to acknowledge {}", consumer.getConsumerName(), messageId);
+                return;
+            }
+            cumulativeAckedMessageIdList.add(messageId);
+        }
+
+        @Override
+        public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
+            // No ops
+        }
+
+        @Override
+        public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageIds) {
+            // No ops
+        }
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
index 104d36b4b2f..ee25d504cf9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
@@ -147,6 +147,12 @@ public class BatchMessageIdImpl extends MessageIdImpl {
             ledgerId, entryId - 1, partitionIndex);
     }
 
+    // MessageIdImpl is widely used as the key of a hash map, in this case, we should convert the batch message id to
+    // have the correct hash code.
+    public MessageIdImpl toMessageIdImpl() {
+        return new MessageIdImpl(ledgerId, entryId, partitionIndex);
+    }
+
     public BatchMessageAcker getAcker() {
         return acker;
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index ae1ac5f7649..f7f43076466 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -35,6 +35,8 @@ import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import javax.annotation.Nullable;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Triple;
@@ -162,18 +164,20 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
 
     private void addListAcknowledgment(List<MessageId> messageIds) {
         for (MessageId messageId : messageIds) {
-            consumer.onAcknowledge(messageId, null);
             if (messageId instanceof BatchMessageIdImpl) {
                 BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
-                if (!batchMessageId.ackIndividual()) {
-                    doIndividualBatchAckAsync((BatchMessageIdImpl) messageId);
-                } else {
-                    messageId = modifyBatchMessageIdAndStatesInConsumer(batchMessageId);
-                    doIndividualAckAsync((MessageIdImpl) messageId);
-                }
+                addIndividualAcknowledgment(batchMessageId.toMessageIdImpl(),
+                        batchMessageId,
+                        this::doIndividualAckAsync,
+                        this::doIndividualBatchAckAsync);
+            } else if (messageId instanceof MessageIdImpl) {
+                addIndividualAcknowledgment((MessageIdImpl) messageId,
+                        null,
+                        this::doIndividualAckAsync,
+                        this::doIndividualBatchAckAsync);
             } else {
-                modifyMessageIdStatesInConsumer((MessageIdImpl) messageId);
-                doIndividualAckAsync((MessageIdImpl) messageId);
+                throw new IllegalStateException("Unsupported message id type in addListAcknowledgement: "
+                        + messageId.getClass().getCanonicalName());
             }
         }
     }
@@ -183,67 +187,65 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
                                                      Map<String, Long> properties) {
         if (msgId instanceof BatchMessageIdImpl) {
             BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId;
-            if (ackType == AckType.Individual) {
-                consumer.onAcknowledge(msgId, null);
-                // ack this ack carry bitSet index and judge bit set are all ack
-                if (batchMessageId.ackIndividual()) {
-                    MessageIdImpl messageId = modifyBatchMessageIdAndStatesInConsumer(batchMessageId);
-                    return doIndividualAck(messageId, properties);
-                } else if (batchIndexAckEnabled){
-                    return doIndividualBatchAck(batchMessageId, properties);
-                } else {
-                    // if we prevent batchIndexAck, we can't send the ack command to broker when the batch message are
-                    // all ack complete
-                    return CompletableFuture.completedFuture(null);
-                }
-            } else {
-                consumer.onAcknowledgeCumulative(msgId, null);
-                if (batchMessageId.ackCumulative()) {
-                    return doCumulativeAck(msgId, properties, null);
-                } else {
-                    if (batchIndexAckEnabled) {
-                        return doCumulativeBatchIndexAck(batchMessageId, properties);
-                    } else {
-                        // ack the pre messageId, because we prevent the batchIndexAck, we can ensure pre messageId can
-                        // ack
-                        if (AckType.Cumulative == ackType
-                                && !batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) {
-                            doCumulativeAck(batchMessageId.prevBatchMessageId(), properties, null);
-                            batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true);
-                        }
-                        return CompletableFuture.completedFuture(null);
-                    }
-                }
-            }
+            return addAcknowledgment(batchMessageId.toMessageIdImpl(), ackType, properties, batchMessageId);
         } else {
-            if (ackType == AckType.Individual) {
-                consumer.onAcknowledge(msgId, null);
-                modifyMessageIdStatesInConsumer(msgId);
-                return doIndividualAck(msgId, properties);
-            } else {
-                consumer.onAcknowledgeCumulative(msgId, null);
-                return doCumulativeAck(msgId, properties, null);
-            }
+            return addAcknowledgment(msgId, ackType, properties, null);
         }
     }
 
-    private MessageIdImpl modifyBatchMessageIdAndStatesInConsumer(BatchMessageIdImpl batchMessageId) {
-        MessageIdImpl messageId = new MessageIdImpl(batchMessageId.getLedgerId(),
-                batchMessageId.getEntryId(), batchMessageId.getPartitionIndex());
-        consumer.getStats().incrementNumAcksSent(batchMessageId.getBatchSize());
-        clearMessageIdFromUnAckTrackerAndDeadLetter(messageId);
-        return messageId;
-    }
-
-    private void modifyMessageIdStatesInConsumer(MessageIdImpl messageId) {
-        consumer.getStats().incrementNumAcksSent(1);
-        clearMessageIdFromUnAckTrackerAndDeadLetter(messageId);
+    private CompletableFuture<Void> addIndividualAcknowledgment(
+            MessageIdImpl msgId,
+            @Nullable BatchMessageIdImpl batchMessageId,
+            Function<MessageIdImpl, CompletableFuture<Void>> individualAckFunction,
+            Function<BatchMessageIdImpl, CompletableFuture<Void>> batchAckFunction) {
+        if (batchMessageId != null) {
+            consumer.onAcknowledge(batchMessageId, null);
+        } else {
+            consumer.onAcknowledge(msgId, null);
+        }
+        if (batchMessageId == null || batchMessageId.ackIndividual()) {
+            consumer.getStats().incrementNumAcksSent((batchMessageId != null) ? batchMessageId.getBatchSize() : 1);
+            consumer.getUnAckedMessageTracker().remove(msgId);
+            if (consumer.getPossibleSendToDeadLetterTopicMessages() != null) {
+                consumer.getPossibleSendToDeadLetterTopicMessages().remove(msgId);
+            }
+            return individualAckFunction.apply(msgId);
+        } else if (batchIndexAckEnabled) {
+            return batchAckFunction.apply(batchMessageId);
+        } else {
+            return CompletableFuture.completedFuture(null);
+        }
     }
 
-    private void clearMessageIdFromUnAckTrackerAndDeadLetter(MessageIdImpl messageId) {
-        consumer.getUnAckedMessageTracker().remove(messageId);
-        if (consumer.getPossibleSendToDeadLetterTopicMessages() != null) {
-            consumer.getPossibleSendToDeadLetterTopicMessages().remove(messageId);
+    private CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId,
+                                                      AckType ackType,
+                                                      Map<String, Long> properties,
+                                                      @Nullable BatchMessageIdImpl batchMessageId) {
+        switch (ackType) {
+            case Individual:
+                return addIndividualAcknowledgment(msgId,
+                        batchMessageId,
+                        __ -> doIndividualAck(__, properties),
+                        __ -> doIndividualBatchAck(__, properties));
+            case Cumulative:
+                if (batchMessageId != null) {
+                    consumer.onAcknowledgeCumulative(batchMessageId, null);
+                } else {
+                    consumer.onAcknowledgeCumulative(msgId, null);
+                }
+                if (batchMessageId == null || batchMessageId.ackCumulative()) {
+                    return doCumulativeAck(msgId, properties, null);
+                } else if (batchIndexAckEnabled) {
+                    return doCumulativeBatchIndexAck(batchMessageId, properties);
+                } else {
+                    if (!batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) {
+                        doCumulativeAck(batchMessageId.prevBatchMessageId(), properties, null);
+                        batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true);
+                    }
+                    return CompletableFuture.completedFuture(null);
+                }
+            default:
+                throw new IllegalStateException("Unknown AckType: " + ackType);
         }
     }
 
@@ -278,9 +280,10 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
     }
 
 
-    private void doIndividualAckAsync(MessageIdImpl messageId) {
+    private CompletableFuture<Void> doIndividualAckAsync(MessageIdImpl messageId) {
         pendingIndividualAcks.add(messageId);
         pendingIndividualBatchIndexAcks.remove(messageId);
+        return CompletableFuture.completedFuture(null);
     }
 
     private CompletableFuture<Void> doIndividualBatchAck(BatchMessageIdImpl batchMessageId,
@@ -343,10 +346,9 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
         }
     }
 
-    private void doIndividualBatchAckAsync(BatchMessageIdImpl batchMessageId) {
+    private CompletableFuture<Void> doIndividualBatchAckAsync(BatchMessageIdImpl batchMessageId) {
         ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent(
-                new MessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId(),
-                        batchMessageId.getPartitionIndex()), (v) -> {
+                batchMessageId.toMessageIdImpl(), __ -> {
                     ConcurrentBitSetRecyclable value;
                     if (batchMessageId.getAcker() != null
                             && !(batchMessageId.getAcker() instanceof BatchMessageAckerDisabled)) {
@@ -358,6 +360,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
                     return value;
                 });
         bitSet.clear(batchMessageId.getBatchIndex());
+        return CompletableFuture.completedFuture(null);
     }
 
     private void doCumulativeAckAsync(MessageIdImpl msgId, BitSetRecyclable bitSet) {