You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/06/02 01:01:28 UTC

[pulsar] branch master updated: [PIP-54] Support acknowledgment at batch index level (#6052)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new aa0fbb0  [PIP-54] Support acknowledgment at batch index level (#6052)
aa0fbb0 is described below

commit aa0fbb0d31d36f383054d93e3684de79879c430c
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue Jun 2 09:01:17 2020 +0800

    [PIP-54] Support acknowledgment at batch index level (#6052)
    
    Master issue: #6253
    Fixes #5969
    
    ### Motivation
    
    Add support for ack batch message local index. Can be disabled at broker side by set batchIndexAcknowledgeEnable=false at broker.conf
    
    PIP-54 documentation will be created soon.
    
    ### Modifications
    
    1. Managed cursor support track and persistent local index of batch message.
    2. Client support send batch index ack to broker.
    3. The batch messages with index ack information dispatched to the client.
    4. Client skip the acked index.
    
    ### Verifying this change
    
    New unit tests added
---
 .../src/main/resources/pulsar/suppressions.xml     |    1 +
 conf/broker.conf                                   |    3 +
 .../apache/bookkeeper/mledger/ManagedCursor.java   |    5 +
 .../bookkeeper/mledger/ManagedLedgerConfig.java    |   17 +
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  152 ++-
 .../bookkeeper/mledger/impl/PositionImpl.java      |   11 +
 managed-ledger/src/main/proto/MLDataFormats.proto  |   13 +-
 .../mledger/impl/ManagedCursorContainerTest.java   |    5 +
 .../bookkeeper/mledger/impl/ManagedCursorTest.java |  135 +++
 .../apache/pulsar/broker/ServiceConfiguration.java |    3 +
 .../broker/service/AbstractBaseDispatcher.java     |   12 +-
 .../pulsar/broker/service/BrokerService.java       |    2 +
 .../org/apache/pulsar/broker/service/Consumer.java |   41 +-
 .../broker/service/EntryBatchIndexesAcks.java      |   71 ++
 .../NonPersistentDispatcherMultipleConsumers.java  |    4 +-
 ...onPersistentDispatcherSingleActiveConsumer.java |    4 +-
 ...istentStickyKeyDispatcherMultipleConsumers.java |    4 +-
 .../service/persistent/CompactorSubscription.java  |    1 -
 .../PersistentDispatcherMultipleConsumers.java     |    8 +-
 .../PersistentDispatcherSingleActiveConsumer.java  |    6 +-
 ...istentStickyKeyDispatcherMultipleConsumers.java |    8 +-
 .../service/persistent/PersistentSubscription.java |    1 +
 .../broker/service/persistent/PersistentTopic.java |    3 +-
 .../apache/pulsar/client/impl/RawReaderImpl.java   |    3 +-
 .../pulsar/broker/service/ServerCnxTest.java       |    2 +-
 ...ntStickyKeyDispatcherMultipleConsumersTest.java |    2 +
 .../impl/BatchMessageIndexAckDisableTest.java      |  148 +++
 .../client/impl/BatchMessageIndexAckTest.java      |  184 +++
 .../client/impl/CompactedOutBatchMessageTest.java  |    2 +-
 .../impl/AcknowledgmentsGroupingTracker.java       |    2 +
 .../pulsar/client/impl/BatchMessageIdImpl.java     |    8 +-
 .../org/apache/pulsar/client/impl/ClientCnx.java   |    2 +-
 .../apache/pulsar/client/impl/ConsumerImpl.java    |   23 +-
 ...NonPersistentAcknowledgmentGroupingTracker.java |    5 +
 .../PersistentAcknowledgmentsGroupingTracker.java  |  104 +-
 .../pulsar/client/impl/ZeroQueueConsumerImpl.java  |    3 +
 .../pulsar/client/impl/BatchMessageIdImplTest.java |    2 +-
 .../apache/pulsar/common/api/proto/PulsarApi.java  |  198 ++++
 .../apache/pulsar/common/protocol/Commands.java    |   36 +-
 .../pulsar/common/util/SafeCollectionUtils.java    |   29 +-
 .../common/util/collections/BitSetRecyclable.java  | 1209 ++++++++++++++++++++
 .../common/util/collections/ConcurrentBitSet.java  |    4 +
 .../collections/ConcurrentBitSetRecyclable.java    |   50 +
 pulsar-common/src/main/proto/PulsarApi.proto       |    2 +
 .../BitSetRecyclableRecyclableTest.java            |   48 +
 .../ConcurrentBitSetRecyclableTest.java            |   33 +-
 46 files changed, 2498 insertions(+), 111 deletions(-)

diff --git a/buildtools/src/main/resources/pulsar/suppressions.xml b/buildtools/src/main/resources/pulsar/suppressions.xml
index 7fd94d9..d6d38a2 100644
--- a/buildtools/src/main/resources/pulsar/suppressions.xml
+++ b/buildtools/src/main/resources/pulsar/suppressions.xml
@@ -40,4 +40,5 @@
     <suppress checks=".*" files=".+[\\/]com[\\/]scurrilous[\\/]circe[\\/].+\.java" />
     
     <suppress checks=".*" files="MLDataFormats.java" />
+    <suppress checks=".*" files="BitSetRecyclable.java" />
 </suppressions>
diff --git a/conf/broker.conf b/conf/broker.conf
index e897f23..80fef85 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -339,6 +339,9 @@ delayedDeliveryEnabled=true
 # Default is 1 second.
 delayedDeliveryTickTimeMillis=1000
 
+# Whether to enable acknowledge of batch local index.
+acknowledgmentAtBatchIndexLevelEnabled=false
+
 # Enable tracking of replicated subscriptions state across clusters.
 enableReplicatedSubscriptions=true
 
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index a48ed04..354c791 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -643,4 +643,9 @@ public interface ManagedCursor {
      * Trim delete entries for the given entries
      */
     void trimDeletedEntries(List<Entry> entries);
+
+    /**
+     * Get deleted batch indexes list for a batch message.
+     */
+    long[] getDeletedBatchIndexesAsLongArray(PositionImpl position);
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index bf23fe7..dde67c2 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -42,6 +42,8 @@ public class ManagedLedgerConfig {
 
     private boolean createIfMissing = true;
     private int maxUnackedRangesToPersist = 10000;
+    private int maxBatchDeletedIndexToPersist = 10000;
+    private boolean deletionAtBatchIndexLevelEnabled = true;
     private int maxUnackedRangesToPersistInZk = 1000;
     private int maxEntriesPerLedger = 50000;
     private int maxSizePerLedgerMb = 100;
@@ -431,6 +433,13 @@ public class ManagedLedgerConfig {
     }
 
     /**
+     * @return max batch deleted index that will be persisted and recoverd.
+     */
+    public int getMaxBatchDeletedIndexToPersist() {
+        return maxBatchDeletedIndexToPersist;
+    }
+
+    /**
      * @param maxUnackedRangesToPersist
      *            max unacked message ranges that will be persisted and receverd.
      */
@@ -585,4 +594,12 @@ public class ManagedLedgerConfig {
             Map<String, Object> bookKeeperEnsemblePlacementPolicyProperties) {
         this.bookKeeperEnsemblePlacementPolicyProperties = bookKeeperEnsemblePlacementPolicyProperties;
     }
+
+    public boolean isDeletionAtBatchIndexLevelEnabled() {
+        return deletionAtBatchIndexLevelEnabled;
+    }
+
+    public void setDeletionAtBatchIndexLevelEnabled(boolean deletionAtBatchIndexLevelEnabled) {
+        this.deletionAtBatchIndexLevelEnabled = deletionAtBatchIndexLevelEnabled;
+    }
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 43ae5df..8a3d31f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -43,11 +43,14 @@ import io.netty.util.concurrent.FastThreadLocal;
 
 import java.time.Clock;
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -90,6 +93,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.common.util.collections.BitSetRecyclable;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;
 import org.apache.pulsar.common.util.collections.LongPairRangeSet;
 import org.apache.pulsar.common.util.collections.LongPairRangeSet.LongPairConsumer;
@@ -156,6 +160,10 @@ public class ManagedCursorImpl implements ManagedCursor {
         return position;
     };
     private final LongPairRangeSet<PositionImpl> individualDeletedMessages;
+
+    // Maintain the deletion status for batch messages
+    // (ledgerId, entryId) -> deletion indexes
+    private final ConcurrentSkipListMap<PositionImpl, BitSetRecyclable> batchDeletedIndexes;
     private final ReadWriteLock lock = new ReentrantReadWriteLock();
 
     private RateLimiter markDeleteLimiter;
@@ -232,6 +240,11 @@ public class ManagedCursorImpl implements ManagedCursor {
         this.individualDeletedMessages = config.isUnackedRangesOpenCacheSetEnabled()
                 ? new ConcurrentOpenLongPairRangeSet<>(4096, positionRangeConverter)
                 : new LongPairRangeSet.DefaultRangeSet<>(positionRangeConverter);
+        if (config.isDeletionAtBatchIndexLevelEnabled()) {
+            this.batchDeletedIndexes = new ConcurrentSkipListMap<>();
+        } else {
+            this.batchDeletedIndexes = null;
+        }
         this.digestType = BookKeeper.DigestType.fromApiDigestType(config.getDigestType());
         STATE_UPDATER.set(this, State.Uninitialized);
         PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.set(this, 0);
@@ -379,6 +392,10 @@ public class ManagedCursorImpl implements ManagedCursor {
                 if (positionInfo.getIndividualDeletedMessagesCount() > 0) {
                     recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesList());
                 }
+                if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null
+                    && positionInfo.getBatchedEntryDeletionIndexInfoCount() > 0) {
+                    recoverBatchDeletedIndexes(positionInfo.getBatchedEntryDeletionIndexInfoList());
+                }
                 recoveredCursor(position, recoveredProperties, lh);
                 callback.operationComplete();
             }, null);
@@ -398,6 +415,25 @@ public class ManagedCursorImpl implements ManagedCursor {
         }
     }
 
+    private void recoverBatchDeletedIndexes (List<MLDataFormats.BatchedEntryDeletionIndexInfo> batchDeletedIndexInfoList) {
+        lock.writeLock().lock();
+        try {
+            this.batchDeletedIndexes.clear();
+            batchDeletedIndexInfoList.forEach(batchDeletedIndexInfo -> {
+                if (batchDeletedIndexInfo.getDeleteSetCount() > 0) {
+                    long[] array = new long[batchDeletedIndexInfo.getDeleteSetCount()];
+                    for (int i = 0; i < batchDeletedIndexInfo.getDeleteSetList().size(); i++) {
+                        array[i] = batchDeletedIndexInfo.getDeleteSetList().get(i);
+                    }
+                    this.batchDeletedIndexes.put(PositionImpl.get(batchDeletedIndexInfo.getPosition().getLedgerId(),
+                        batchDeletedIndexInfo.getPosition().getEntryId()), BitSetRecyclable.create().resetWords(array));
+                }
+            });
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
     private void recoveredCursor(PositionImpl position, Map<String, Long> properties,
                                  LedgerHandle recoveredFromCursorLedger) {
         // if the position was at a ledger that didn't exist (since it will be deleted if it was previously empty),
@@ -920,6 +956,10 @@ public class ManagedCursorImpl implements ManagedCursor {
                     lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, Collections.emptyMap(),
                             null, null);
                     individualDeletedMessages.clear();
+                    if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
+                        batchDeletedIndexes.values().forEach(BitSetRecyclable::recycle);
+                        batchDeletedIndexes.clear();
+                    }
 
                     PositionImpl oldReadPosition = readPosition;
                     if (oldReadPosition.compareTo(newPosition) >= 0) {
@@ -1507,8 +1547,22 @@ public class ManagedCursorImpl implements ManagedCursor {
         if (log.isDebugEnabled()) {
             log.debug("[{}] Mark delete cursor {} up to position: {}", ledger.getName(), name, position);
         }
+
         PositionImpl newPosition = (PositionImpl) position;
 
+        if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
+            if (newPosition.ackSet != null) {
+                batchDeletedIndexes.put(newPosition, BitSetRecyclable.create().resetWords(newPosition.ackSet));
+                newPosition = ledger.getPreviousPosition(newPosition);
+            }
+            Map<PositionImpl, BitSetRecyclable> subMap = batchDeletedIndexes.subMap(PositionImpl.earliest, newPosition);
+            subMap.values().forEach(BitSetRecyclable::recycle);
+            subMap.clear();
+        } else if (newPosition.ackSet != null) {
+            newPosition = ledger.getPreviousPosition(newPosition);
+            newPosition.ackSet = null;
+        }
+
         if (((PositionImpl) ledger.getLastConfirmedEntry()).compareTo(newPosition) < 0) {
             if (log.isDebugEnabled()) {
                 log.debug(
@@ -1600,6 +1654,11 @@ public class ManagedCursorImpl implements ManagedCursor {
                 try {
                     individualDeletedMessages.removeAtMost(mdEntry.newPosition.getLedgerId(),
                             mdEntry.newPosition.getEntryId());
+                    if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
+                        Map<PositionImpl, BitSetRecyclable> subMap = batchDeletedIndexes.subMap(PositionImpl.earliest, false, PositionImpl.get(mdEntry.newPosition.getLedgerId(), mdEntry.newPosition.getEntryId()), true);
+                        subMap.values().forEach(BitSetRecyclable::recycle);
+                        subMap.clear();
+                    }
                 } finally {
                     lock.writeLock().unlock();
                 }
@@ -1722,35 +1781,62 @@ public class ManagedCursorImpl implements ManagedCursor {
 
             for (Position pos : positions) {
                 PositionImpl position  = (PositionImpl) checkNotNull(pos);
-
                 if (((PositionImpl) ledger.getLastConfirmedEntry()).compareTo(position) < 0) {
                     if (log.isDebugEnabled()) {
                         log.debug(
-                                "[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {} for cursor [{}]",
-                                ledger.getName(), position, ledger.getLastConfirmedEntry(), name);
+                            "[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {} for cursor [{}]",
+                            ledger.getName(), position, ledger.getLastConfirmedEntry(), name);
                     }
                     callback.deleteFailed(new ManagedLedgerException("Invalid mark deleted position"), ctx);
                     return;
                 }
 
                 if (individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId())
-                        || position.compareTo(markDeletePosition) <= 0) {
+                    || position.compareTo(markDeletePosition) <= 0) {
+                    if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
+                        BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position);
+                        if (bitSetRecyclable != null) {
+                            bitSetRecyclable.recycle();
+                        }
+                    }
                     if (log.isDebugEnabled()) {
                         log.debug("[{}] [{}] Position was already deleted {}", ledger.getName(), name, position);
                     }
                     continue;
                 }
-
-                // Add a range (prev, pos] to the set. Adding the previous entry as an open limit to the range will make
-                // the RangeSet recognize the "continuity" between adjacent Positions
-                PositionImpl previousPosition = ledger.getPreviousPosition(position);
-                individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(), previousPosition.getEntryId(),
+                if (position.ackSet == null) {
+                    if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
+                        BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position);
+                        if (bitSetRecyclable != null) {
+                            bitSetRecyclable.recycle();
+                        }
+                    }
+                    // Add a range (prev, pos] to the set. Adding the previous entry as an open limit to the range will make
+                    // the RangeSet recognize the "continuity" between adjacent Positions
+                    PositionImpl previousPosition = ledger.getPreviousPosition(position);
+                    individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(), previousPosition.getEntryId(),
                         position.getLedgerId(), position.getEntryId());
-                MSG_CONSUMED_COUNTER_UPDATER.incrementAndGet(this);
+                    MSG_CONSUMED_COUNTER_UPDATER.incrementAndGet(this);
 
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] [{}] Individually deleted messages: {}", ledger.getName(), name,
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] [{}] Individually deleted messages: {}", ledger.getName(), name,
                             individualDeletedMessages);
+                    }
+                } else if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
+                    BitSetRecyclable bitSet = batchDeletedIndexes.computeIfAbsent(position, (v) -> BitSetRecyclable.create().resetWords(position.ackSet));
+                    BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(position.ackSet);
+                    bitSet.and(givenBitSet);
+                    givenBitSet.recycle();
+                    if (bitSet.isEmpty()) {
+                        PositionImpl previousPosition = ledger.getPreviousPosition(position);
+                        individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(), previousPosition.getEntryId(),
+                            position.getLedgerId(), position.getEntryId());
+                        ++messagesConsumedCounter;
+                        BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position);
+                        if (bitSetRecyclable != null) {
+                            bitSetRecyclable.recycle();
+                        }
+                    }
                 }
             }
 
@@ -2062,6 +2148,9 @@ public class ManagedCursorImpl implements ManagedCursor {
         info.addAllProperties(buildPropertiesMap(properties));
         if (persistIndividualDeletedMessageRanges) {
             info.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges());
+            if (config.isDeletionAtBatchIndexLevelEnabled()) {
+                info.addAllBatchedEntryDeletionIndexInfo(buildBatchEntryDeletionIndexInfoList());
+            }
         }
 
         if (log.isDebugEnabled()) {
@@ -2306,11 +2395,38 @@ public class ManagedCursorImpl implements ManagedCursor {
         }
     }
 
+    private List<MLDataFormats.BatchedEntryDeletionIndexInfo> buildBatchEntryDeletionIndexInfoList() {
+        if (!config.isDeletionAtBatchIndexLevelEnabled() || batchDeletedIndexes == null || batchDeletedIndexes.isEmpty()) {
+            return Collections.emptyList();
+        }
+        MLDataFormats.NestedPositionInfo.Builder nestedPositionBuilder = MLDataFormats.NestedPositionInfo
+            .newBuilder();
+        MLDataFormats.BatchedEntryDeletionIndexInfo.Builder batchDeletedIndexInfoBuilder = MLDataFormats.BatchedEntryDeletionIndexInfo
+            .newBuilder();
+        List<MLDataFormats.BatchedEntryDeletionIndexInfo> result = Lists.newArrayList();
+        Iterator<Map.Entry<PositionImpl, BitSetRecyclable>> iterator = batchDeletedIndexes.entrySet().iterator();
+        while (iterator.hasNext() && result.size() < config.getMaxBatchDeletedIndexToPersist()) {
+            Map.Entry<PositionImpl, BitSetRecyclable> entry = iterator.next();
+            nestedPositionBuilder.setLedgerId(entry.getKey().getLedgerId());
+            nestedPositionBuilder.setEntryId(entry.getKey().getEntryId());
+            batchDeletedIndexInfoBuilder.setPosition(nestedPositionBuilder.build());
+            long[] array = entry.getValue().toLongArray();
+            List<Long> deleteSet = new ArrayList<>(array.length);
+            for (long l : array) {
+                deleteSet.add(l);
+            }
+            batchDeletedIndexInfoBuilder.addAllDeleteSet(deleteSet);
+            result.add(batchDeletedIndexInfoBuilder.build());
+        }
+        return result;
+    }
+
     void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, final VoidCallback callback) {
         PositionImpl position = mdEntry.newPosition;
         PositionInfo pi = PositionInfo.newBuilder().setLedgerId(position.getLedgerId())
                 .setEntryId(position.getEntryId())
                 .addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges())
+                .addAllBatchedEntryDeletionIndexInfo(buildBatchEntryDeletionIndexInfoList())
                 .addAllProperties(buildPropertiesMap(mdEntry.properties)).build();
 
 
@@ -2693,6 +2809,16 @@ public class ManagedCursorImpl implements ManagedCursor {
         return this;
     }
 
+    @Override
+    public long[] getDeletedBatchIndexesAsLongArray(PositionImpl position) {
+        if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
+            BitSetRecyclable bitSet = batchDeletedIndexes.get(position);
+            return bitSet == null ? null : bitSet.toLongArray();
+        } else {
+            return null;
+        }
+    }
+  
     void updateReadStats(int readEntriesCount, long readEntriesSize) {
         this.entriesReadCount += readEntriesCount;
         this.entriesReadSize += readEntriesSize;
@@ -2723,7 +2849,5 @@ public class ManagedCursorImpl implements ManagedCursor {
 
         return Math.min(maxEntriesBasedOnSize, maxEntries);
     }
-
-
     private static final Logger log = LoggerFactory.getLogger(ManagedCursorImpl.class);
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
index f090b52..1c35ef1 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
@@ -30,6 +30,7 @@ public class PositionImpl implements Position, Comparable<PositionImpl> {
 
     protected long ledgerId;
     protected long entryId;
+    protected long[] ackSet;
 
     public static final PositionImpl earliest = new PositionImpl(-1, -1);
     public static final PositionImpl latest = new PositionImpl(Long.MAX_VALUE, Long.MAX_VALUE);
@@ -49,6 +50,12 @@ public class PositionImpl implements Position, Comparable<PositionImpl> {
         this.entryId = entryId;
     }
 
+    public PositionImpl(long ledgerId, long entryId, long[] ackSet) {
+        this.ledgerId = ledgerId;
+        this.entryId = entryId;
+        this.ackSet = ackSet;
+    }
+
     public PositionImpl(PositionImpl other) {
         this.ledgerId = other.ledgerId;
         this.entryId = other.entryId;
@@ -58,6 +65,10 @@ public class PositionImpl implements Position, Comparable<PositionImpl> {
         return new PositionImpl(ledgerId, entryId);
     }
 
+    public static PositionImpl get(long ledgerId, long entryId, long[] ackSet) {
+        return new PositionImpl(ledgerId, entryId, ackSet);
+    }
+
     public static PositionImpl get(PositionImpl other) {
         return new PositionImpl(other);
     }
diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto b/managed-ledger/src/main/proto/MLDataFormats.proto
index 151cd69..8b1ecbf 100644
--- a/managed-ledger/src/main/proto/MLDataFormats.proto
+++ b/managed-ledger/src/main/proto/MLDataFormats.proto
@@ -68,6 +68,9 @@ message PositionInfo {
     // Additional custom properties associated with
 	// the current cursor position
 	repeated LongProperty properties = 4;
+
+    // Store which index in the batch message has been deleted
+    repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 5;
 }
 
 message NestedPositionInfo {
@@ -80,6 +83,11 @@ message MessageRange {
     required NestedPositionInfo upperEndpoint = 2;
 }
 
+message BatchedEntryDeletionIndexInfo {
+    required NestedPositionInfo position = 1;
+    repeated int64 deleteSet = 2;
+}
+
 // Generic string and long tuple
 message LongProperty {
     required string name = 1;
@@ -100,5 +108,8 @@ message ManagedCursorInfo {
 	// the current cursor position
 	repeated LongProperty properties = 5;
 
-  optional int64 lastActive = 6;
+    optional int64 lastActive = 6;
+
+    // Store which index in the batch message has been deleted
+    repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 7;
 }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index 0191057..ab87e6f 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -44,6 +44,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.common.api.proto.PulsarApi.IntRange;
 import org.testng.annotations.Test;
 
 public class ManagedCursorContainerTest {
@@ -326,6 +327,10 @@ public class ManagedCursorContainerTest {
         }
 
         @Override
+        public long[] getDeletedBatchIndexesAsLongArray(PositionImpl position) {
+            return new long[0];
+        }
+
         public void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback,
                 Object ctx) {
         }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index e3ba438..e53bcae 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -38,6 +38,7 @@ import java.lang.reflect.Field;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.BitSet;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
@@ -81,12 +82,14 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.pulsar.metadata.api.Stat;
+import org.apache.pulsar.common.api.proto.PulsarApi.IntRange;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.MockZooKeeper;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testng.Assert;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
@@ -3056,6 +3059,138 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
     }
 
     @Test
+    public void testBatchIndexDelete() throws ManagedLedgerException, InterruptedException {
+        ManagedLedger ledger = factory.open("test_batch_index_delete");
+        ManagedCursor cursor = ledger.openCursor("c1");
+
+        final int totalEntries = 100;
+        final Position[] positions = new Position[totalEntries];
+        for (int i = 0; i < totalEntries; i++) {
+            // add entry
+            positions[i] = ledger.addEntry(("entry-" + i).getBytes(Encoding));
+        }
+        assertEquals(cursor.getNumberOfEntries(), totalEntries);
+        deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(IntRange.newBuilder().setStart(2).setEnd(4).build()));
+        List<IntRange> deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[0]), 10);
+        Assert.assertEquals(1, deletedIndexes.size());
+        Assert.assertEquals(2, deletedIndexes.get(0).getStart());
+        Assert.assertEquals(4, deletedIndexes.get(0).getEnd());
+
+        deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(IntRange.newBuilder().setStart(3).setEnd(8).build()));
+        deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[0]), 10);
+        Assert.assertEquals(1, deletedIndexes.size());
+        Assert.assertEquals(2, deletedIndexes.get(0).getStart());
+        Assert.assertEquals(8, deletedIndexes.get(0).getEnd());
+
+        deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(0).build()));
+        deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[0]), 10);
+        Assert.assertEquals(2, deletedIndexes.size());
+        Assert.assertEquals(0, deletedIndexes.get(0).getStart());
+        Assert.assertEquals(0, deletedIndexes.get(0).getEnd());
+        Assert.assertEquals(2, deletedIndexes.get(1).getStart());
+        Assert.assertEquals(8, deletedIndexes.get(1).getEnd());
+
+        deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(IntRange.newBuilder().setStart(1).setEnd(1).build()));
+        deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(IntRange.newBuilder().setStart(9).setEnd(9).build()));
+        deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[0]), 10);
+        Assert.assertNull(deletedIndexes);
+        Assert.assertEquals(positions[0], cursor.getMarkDeletedPosition());
+
+        deleteBatchIndex(cursor, positions[1], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(5).build()));
+        cursor.delete(positions[1]);
+        deleteBatchIndex(cursor, positions[1], 10, Lists.newArrayList(IntRange.newBuilder().setStart(6).setEnd(8).build()));
+        deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[1]), 10);
+        Assert.assertNull(deletedIndexes);
+
+        deleteBatchIndex(cursor, positions[2], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(5).build()));
+        cursor.markDelete(positions[3]);
+        deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[2]), 10);
+        Assert.assertNull(deletedIndexes);
+
+        deleteBatchIndex(cursor, positions[3], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(5).build()));
+        cursor.resetCursor(positions[0]);
+        deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[3]), 10);
+        Assert.assertNull(deletedIndexes);
+    }
+
+    @Test
+    public void testBatchIndexesDeletionPersistAndRecover() throws ManagedLedgerException, InterruptedException {
+        ManagedLedger ledger = factory.open("test_batch_indexes_deletion_persistent");
+        ManagedCursor cursor = ledger.openCursor("c1");
+
+        final int totalEntries = 100;
+        final Position[] positions = new Position[totalEntries];
+        for (int i = 0; i < totalEntries; i++) {
+            // add entry
+            positions[i] = ledger.addEntry(("entry-" + i).getBytes(Encoding));
+        }
+        assertEquals(cursor.getNumberOfEntries(), totalEntries);
+        deleteBatchIndex(cursor, positions[5], 10, Lists.newArrayList(IntRange.newBuilder().setStart(3).setEnd(6).build()));
+        deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(9).build()));
+        deleteBatchIndex(cursor, positions[1], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(9).build()));
+        deleteBatchIndex(cursor, positions[2], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(9).build()));
+        deleteBatchIndex(cursor, positions[3], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(9).build()));
+        deleteBatchIndex(cursor, positions[4], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(9).build()));
+
+        ledger = factory.open("test_batch_indexes_deletion_persistent");
+        cursor = ledger.openCursor("c1");
+        List<IntRange> deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[5]), 10);
+        Assert.assertEquals(deletedIndexes.size(), 1);
+        Assert.assertEquals(deletedIndexes.get(0).getStart(), 3);
+        Assert.assertEquals(deletedIndexes.get(0).getEnd(), 6);
+        Assert.assertEquals(cursor.getMarkDeletedPosition(), positions[4]);
+        deleteBatchIndex(cursor, positions[5], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(9).build()));
+        deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[5]), 10);
+        Assert.assertNull(deletedIndexes);
+        Assert.assertEquals(cursor.getMarkDeletedPosition(), positions[5]);
+    }
+
+    private void deleteBatchIndex(ManagedCursor cursor, Position position, int batchSize,
+                                  List<IntRange> deleteIndexes) throws InterruptedException {
+        CountDownLatch latch = new CountDownLatch(1);
+        PositionImpl pos = (PositionImpl) position;
+        BitSet bitSet = new BitSet(batchSize);
+        bitSet.set(0, batchSize);
+        deleteIndexes.forEach(intRange -> {
+            bitSet.clear(intRange.getStart(), intRange.getEnd() + 1);
+        });
+        pos.ackSet = bitSet.toLongArray();
+
+        cursor.asyncDelete(pos,
+            new DeleteCallback() {
+                @Override
+                public void deleteComplete(Object ctx) {
+                    latch.countDown();
+                }
+
+                @Override
+                public void deleteFailed(ManagedLedgerException exception, Object ctx) {
+                    latch.countDown();
+                }
+            }, null);
+        latch.await();
+        pos.ackSet = null;
+    }
+
+    private List<IntRange> getAckedIndexRange(long[] bitSetLongArray, int batchSize) {
+        if (bitSetLongArray == null) {
+            return null;
+        }
+        List<IntRange> result = new ArrayList<>();
+        BitSet bitSet = BitSet.valueOf(bitSetLongArray);
+        int nextClearBit = bitSet.nextClearBit(0);
+        IntRange.Builder builder = IntRange.newBuilder();
+        while (nextClearBit != -1 && nextClearBit <= batchSize) {
+            int nextSetBit = bitSet.nextSetBit(nextClearBit);
+            if (nextSetBit == -1) {
+                break;
+            }
+            result.add(builder.setStart(nextClearBit).setEnd(nextSetBit - 1).build());
+            nextClearBit = bitSet.nextClearBit(nextSetBit);
+        }
+        return result;
+    }
+  
     void testReadEntriesOrWaitWithMaxSize() throws Exception {
         ManagedLedger ledger = factory.open("testReadEntriesOrWaitWithMaxSize");
         ManagedCursor c = ledger.openCursor("c");
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index a960dbc..c9689d1 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -176,6 +176,9 @@ public class ServiceConfiguration implements PulsarConfiguration {
             + " affecting the accuracy of the delivery time compared to the scheduled time. Default is 1 second.")
     private long delayedDeliveryTickTimeMillis = 1000;
 
+    @FieldContext(category = CATEGORY_SERVER, doc = "Whether to enable the acknowledge of batch local index")
+    private boolean acknowledgmentAtBatchIndexLevelEnabled = false;
+
     @FieldContext(
         category = CATEGORY_WEBSOCKET,
         doc = "Enable the WebSocket API service in broker"
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index c8da420..849d626 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -27,8 +27,10 @@ import java.util.List;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Markers;
@@ -66,7 +68,7 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
      *            an object where the total size in messages and bytes will be returned back to the caller
      */
     public void filterEntriesForConsumer(List<Entry> entries, EntryBatchSizes batchSizes,
-            SendMessageInfo sendMessageInfo) {
+             SendMessageInfo sendMessageInfo, EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor) {
         int totalMessages = 0;
         long totalBytes = 0;
 
@@ -102,6 +104,14 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
                 totalMessages += batchSize;
                 totalBytes += metadataAndPayload.readableBytes();
                 batchSizes.setBatchSize(i, batchSize);
+                if (indexesAcks != null && cursor != null) {
+                    long[] ackSet = cursor.getDeletedBatchIndexesAsLongArray(PositionImpl.get(entry.getLedgerId(), entry.getEntryId()));
+                    if (ackSet != null) {
+                        indexesAcks.setIndexesAcks(i, Pair.of(batchSize, ackSet));
+                    } else {
+                        indexesAcks.setIndexesAcks(i,null);
+                    }
+                }
             } finally {
                 msgMetadata.recycle();
             }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 3b4f645..3abefc1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1065,6 +1065,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
             OffloadPolicies offloadPolicies = policies.map(p -> p.offload_policies).orElse(null);
             managedLedgerConfig.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
 
+            managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
+
             future.complete(managedLedgerConfig);
         }, (exception) -> future.completeExceptionally(exception)));
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index eaefa40..d14ea5e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -30,6 +30,7 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
 
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -50,6 +51,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
+import org.apache.pulsar.common.api.proto.PulsarApi.IntRange;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
 import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
 import org.apache.pulsar.common.naming.TopicName;
@@ -57,6 +59,7 @@ import org.apache.pulsar.common.policies.data.ConsumerStats;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.stats.Rate;
 import org.apache.pulsar.common.util.DateFormatter;
+import org.apache.pulsar.common.util.SafeCollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -209,8 +212,9 @@ public class Consumer {
      *
      * @return a SendMessageInfo object that contains the detail of what was sent to consumer
      */
-    public ChannelPromise sendMessages(final List<Entry> entries, EntryBatchSizes batchSizes, int totalMessages,
-            long totalBytes, RedeliveryTracker redeliveryTracker) {
+
+    public ChannelPromise sendMessages(final List<Entry> entries, EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks,
+               int totalMessages, long totalBytes, RedeliveryTracker redeliveryTracker) {
         this.lastConsumedTimestamp = System.currentTimeMillis();
         final ChannelHandlerContext ctx = cnx.ctx();
         final ChannelPromise writePromise = ctx.newPromise();
@@ -222,6 +226,9 @@ public class Consumer {
             }
             writePromise.setSuccess();
             batchSizes.recyle();
+            if (batchIndexesAcks != null) {
+                batchIndexesAcks.recycle();
+            }
             return writePromise;
         }
 
@@ -245,7 +252,8 @@ public class Consumer {
         AVG_MESSAGES_PER_ENTRY.set(this, tmpAvgMessagesPerEntry);
 
         // reduce permit and increment unackedMsg count with total number of messages in batch-msgs
-        MESSAGE_PERMITS_UPDATER.addAndGet(this, -totalMessages);
+        int ackedCount = batchIndexesAcks == null ? 0 : batchIndexesAcks.getTotalAckedIndexCount();
+        MESSAGE_PERMITS_UPDATER.addAndGet(this, ackedCount - totalMessages);
         incrementUnackedMessages(totalMessages);
         msgOut.recordMultipleEvents(totalMessages, totalBytes);
         msgOutCounter.add(totalMessages);
@@ -295,7 +303,8 @@ public class Consumer {
                 if (redeliveryTracker.contains(position)) {
                     redeliveryCount = redeliveryTracker.incrementAndGetRedeliveryCount(position);
                 }
-                ctx.write(Commands.newMessage(consumerId, messageId, redeliveryCount, metadataAndPayload), ctx.voidPromise());
+                ctx.write(Commands.newMessage(consumerId, messageId, redeliveryCount, metadataAndPayload,
+                    batchIndexesAcks == null ? null : batchIndexesAcks.getAckSet(i)), ctx.voidPromise());
                 messageId.recycle();
                 messageIdBuilder.recycle();
                 entry.release();
@@ -304,6 +313,9 @@ public class Consumer {
             // Use an empty write here so that we can just tie the flush with the write promise for last entry
             ctx.writeAndFlush(Unpooled.EMPTY_BUFFER, writePromise);
             batchSizes.recyle();
+            if (batchIndexesAcks != null) {
+                batchIndexesAcks.recycle();
+            }
         });
 
         return writePromise;
@@ -387,19 +399,30 @@ public class Consumer {
                 log.warn("[{}] [{}] Received cumulative ack on shared subscription, ignoring", subscription, consumerId);
                 return;
             }
-
-            MessageIdData msgId = ack.getMessageId(0);
-            PositionImpl position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
+            PositionImpl position = PositionImpl.earliest;
+            if (ack.getMessageIdCount() == 1) {
+                MessageIdData msgId = ack.getMessageId(0);
+                if (msgId.getAckSetCount() > 0) {
+                    position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId(), SafeCollectionUtils.longListToArray(msgId.getAckSetList()));
+                } else {
+                    position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
+                }
+            }
             subscription.acknowledgeMessage(Collections.singletonList(position), AckType.Cumulative, properties);
         } else {
             // Individual ack
             List<Position> positionsAcked = new ArrayList<>();
             for (int i = 0; i < ack.getMessageIdCount(); i++) {
                 MessageIdData msgId = ack.getMessageId(i);
-                PositionImpl position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
+                PositionImpl position;
+                if (msgId.getAckSetCount() > 0) {
+                    position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId(), SafeCollectionUtils.longListToArray(msgId.getAckSetList()));
+                } else {
+                    position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
+                }
                 positionsAcked.add(position);
 
-                if (Subscription.isIndividualAckMode(subType)) {
+                if (Subscription.isIndividualAckMode(subType) && msgId.getAckSetCount() == 0) {
                     removePendingAcks(position);
                 }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryBatchIndexesAcks.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryBatchIndexesAcks.java
new file mode 100644
index 0000000..e41a2909
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryBatchIndexesAcks.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+
+import io.netty.util.Recycler;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+public class EntryBatchIndexesAcks {
+
+    Pair<Integer, long[]>[] indexesAcks = new Pair[100];
+
+    public void setIndexesAcks(int entryIdx, Pair<Integer, long[]> indexesAcks) {
+        this.indexesAcks[entryIdx] = indexesAcks;
+    }
+
+    public long[] getAckSet(int entryIdx) {
+        Pair<Integer, long[]> pair = indexesAcks[entryIdx];
+        return pair == null ? null : pair.getRight();
+    }
+
+    public int getTotalAckedIndexCount() {
+        int count = 0;
+        for (Pair<Integer, long[]> pair : indexesAcks) {
+            if (pair != null) {
+                count += pair.getLeft() - BitSet.valueOf(pair.getRight()).cardinality();
+            }
+        }
+        return count;
+    }
+
+    public void recycle() {
+        handle.recycle(this);
+    }
+
+    public static EntryBatchIndexesAcks get() {
+        return RECYCLER.get();
+    }
+
+    private EntryBatchIndexesAcks(Recycler.Handle<EntryBatchIndexesAcks> handle) {
+        this.handle = handle;
+    }
+
+    private final Recycler.Handle<EntryBatchIndexesAcks> handle;
+    private static final Recycler<EntryBatchIndexesAcks> RECYCLER = new Recycler<EntryBatchIndexesAcks>() {
+        @Override
+        protected EntryBatchIndexesAcks newObject(Handle<EntryBatchIndexesAcks> handle) {
+            return new EntryBatchIndexesAcks(handle);
+        }
+    };
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index 08f5f4c..d97e0db 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -201,8 +201,8 @@ public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcher
         if (consumer != null) {
             SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
             EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
-            filterEntriesForConsumer(entries, batchSizes, sendMessageInfo);
-            consumer.sendMessages(entries, batchSizes, sendMessageInfo.getTotalMessages(),
+            filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, null);
+            consumer.sendMessages(entries, batchSizes, null, sendMessageInfo.getTotalMessages(),
                     sendMessageInfo.getTotalBytes(), getRedeliveryTracker());
 
             TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index e1c2e58..7db50cc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -62,8 +62,8 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
         if (currentConsumer != null && currentConsumer.getAvailablePermits() > 0 && currentConsumer.isWritable()) {
             SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
             EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
-            filterEntriesForConsumer(entries, batchSizes, sendMessageInfo);
-            currentConsumer.sendMessages(entries, batchSizes, sendMessageInfo.getTotalMessages(),
+            filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, null);
+            currentConsumer.sendMessages(entries, batchSizes, null, sendMessageInfo.getTotalMessages(),
                     sendMessageInfo.getTotalBytes(), getRedeliveryTracker());
         } else {
             entries.forEach(entry -> {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
index c5183cd..76ac571 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -79,8 +79,8 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
                 if (consumer != null) {
                     SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
                     EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesWithSameKey.getValue().size());
-                    filterEntriesForConsumer(entriesWithSameKey.getValue(), batchSizes, sendMessageInfo);
-                    consumer.sendMessages(entriesWithSameKey.getValue(), batchSizes, sendMessageInfo.getTotalMessages(),
+                    filterEntriesForConsumer(entriesWithSameKey.getValue(), batchSizes, sendMessageInfo, null, null);
+                    consumer.sendMessages(entriesWithSameKey.getValue(), batchSizes, null, sendMessageInfo.getTotalMessages(),
                             sendMessageInfo.getTotalBytes(), getRedeliveryTracker());
                     TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages());
                 } else {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
index 5d39c20..94af806 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
@@ -28,7 +28,6 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
-import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.compaction.CompactedTopic;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 621c6d2..7514103 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -51,6 +51,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
 import org.apache.pulsar.broker.service.EntryBatchSizes;
 import org.apache.pulsar.broker.service.InMemoryRedeliveryTracker;
 import org.apache.pulsar.broker.service.RedeliveryTracker;
@@ -518,15 +519,16 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                 List<Entry> entriesForThisConsumer = entries.subList(start, start + messagesForC);
 
                 EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
-                filterEntriesForConsumer(entriesForThisConsumer, batchSizes, sendMessageInfo);
+                EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get();
+                filterEntriesForConsumer(entriesForThisConsumer, batchSizes, sendMessageInfo, batchIndexesAcks, cursor);
 
-                c.sendMessages(entriesForThisConsumer, batchSizes, sendMessageInfo.getTotalMessages(),
+                c.sendMessages(entriesForThisConsumer, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
                         sendMessageInfo.getTotalBytes(), redeliveryTracker);
 
                 int msgSent = sendMessageInfo.getTotalMessages();
                 start += messagesForC;
                 entriesToDispatch -= messagesForC;
-                TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -msgSent);
+                TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -(msgSent - batchIndexesAcks.getTotalAckedIndexCount()));
                 totalMessagesSent += sendMessageInfo.getTotalMessages();
                 totalBytesSent += sendMessageInfo.getTotalBytes();
             }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 867c12b..4d856e5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -42,6 +42,7 @@ import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
 import org.apache.pulsar.broker.service.EntryBatchSizes;
 import org.apache.pulsar.broker.service.RedeliveryTracker;
 import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
@@ -237,13 +238,14 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
         } else {
             EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
             SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
-            filterEntriesForConsumer(entries, batchSizes, sendMessageInfo);
+            EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get();
+            filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, batchIndexesAcks, cursor);
 
             int totalMessages = sendMessageInfo.getTotalMessages();
             long totalBytes = sendMessageInfo.getTotalBytes();
 
             currentConsumer
-                    .sendMessages(entries, batchSizes, sendMessageInfo.getTotalMessages(),
+                    .sendMessages(entries, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
                             sendMessageInfo.getTotalBytes(), redeliveryTracker)
                     .addListener(future -> {
                         if (future.isSuccess()) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 987ecfc..22e45d5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -31,6 +31,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
 import org.apache.pulsar.broker.service.EntryBatchSizes;
 import org.apache.pulsar.broker.service.SendMessageInfo;
 import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
@@ -111,9 +112,10 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
 
                 SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
                 EntryBatchSizes batchSizes = EntryBatchSizes.get(subList.size());
-                filterEntriesForConsumer(subList, batchSizes, sendMessageInfo);
+                EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get();
+                filterEntriesForConsumer(subList, batchSizes, sendMessageInfo, batchIndexesAcks, cursor);
 
-                consumer.sendMessages(subList, batchSizes, sendMessageInfo.getTotalMessages(),
+                consumer.sendMessages(subList, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
                         sendMessageInfo.getTotalBytes(), getRedeliveryTracker()).addListener(future -> {
                             if (future.isSuccess() && keyNumbers.decrementAndGet() == 0) {
                                 readMoreEntries();
@@ -124,7 +126,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
                     entriesWithSameKey.getValue().remove(0);
                 }
 
-                TOTAL_AVAILABLE_PERMITS_UPDATER.getAndAdd(this, -sendMessageInfo.getTotalMessages());
+                TOTAL_AVAILABLE_PERMITS_UPDATER.getAndAdd(this, -(sendMessageInfo.getTotalMessages() - batchIndexesAcks.getTotalAckedIndexCount()));
                 totalMessagesSent += sendMessageInfo.getTotalMessages();
                 totalBytesSent += sendMessageInfo.getTotalBytes();
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 24f02d2..041f511 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -349,6 +349,7 @@ public class PersistentSubscription implements Subscription {
                 log.debug("[{}][{}] Cumulative ack on {}", topicName, subName, position);
             }
             cursor.asyncMarkDelete(position, mergeCursorProperties(properties), markDeleteCallback, position);
+
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("[{}][{}] Individual acks on {}", topicName, subName, positions);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index e3bd7b0..490988b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -716,8 +716,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                 } catch (ManagedLedgerException e) {
                     subscriptionFuture.completeExceptionally(e);
                 }
-
-                return new PersistentSubscription(this, subscriptionName, cursor, false);
+            return new PersistentSubscription(this, subscriptionName, cursor, false);
             });
 
             if (!subscriptionFuture.isDone()) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 3ca072d..f7f35fa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -35,6 +35,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.api.proto.PulsarApi.IntRange;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.Commands;
@@ -203,7 +204,7 @@ public class RawReaderImpl implements RawReader {
         }
 
         @Override
-        void messageReceived(MessageIdData messageId, int redeliveryCount, ByteBuf headersAndPayload, ClientCnx cnx) {
+        void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ackSet, ByteBuf headersAndPayload, ClientCnx cnx) {
             if (log.isDebugEnabled()) {
                 log.debug("[{}][{}] Received raw message: {}/{}/{}", topic, subscription,
                           messageId.getEntryId(), messageId.getLedgerId(), messageId.getPartition());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 110d04c..44a2966 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -1247,7 +1247,7 @@ public class ServerCnxTest {
 
         PositionImpl pos = new PositionImpl(0, 0);
 
-        clientCommand = Commands.newAck(1 /* consumer id */, pos.getLedgerId(), pos.getEntryId(), AckType.Individual,
+        clientCommand = Commands.newAck(1 /* consumer id */, pos.getLedgerId(), pos.getEntryId(), null, AckType.Individual,
                                         null, Collections.emptyMap());
         channel.writeInbound(clientCommand);
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index 598f39d..7d1935a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -100,6 +100,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
         doReturn(channelMock).when(consumerMock).sendMessages(
                 anyList(),
                 any(EntryBatchSizes.class),
+                any(EntryBatchIndexesAcks.class),
                 anyInt(),
                 anyLong(),
                 any(RedeliveryTracker.class)
@@ -142,6 +143,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
         verify(consumerMock, times(2)).sendMessages(
                 anyList(),
                 any(EntryBatchSizes.class),
+                any(EntryBatchIndexesAcks.class),
                 totalMessagesCaptor.capture(),
                 anyLong(),
                 any(RedeliveryTracker.class)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckDisableTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckDisableTest.java
new file mode 100644
index 0000000..71305b3
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckDisableTest.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class BatchMessageIndexAckDisableTest extends ProducerConsumerBase {
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        conf.setAcknowledgmentAtBatchIndexLevelEnabled(false);
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testBatchMessageIndexAckForSharedSubscription() throws PulsarClientException, ExecutionException, InterruptedException {
+        final String topic = "testBatchMessageIndexAckForSharedSubscription";
+
+        @Cleanup
+        Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32)
+            .topic(topic)
+            .subscriptionName("sub")
+            .receiverQueueSize(100)
+            .subscriptionType(SubscriptionType.Shared)
+            .ackTimeout(1, TimeUnit.SECONDS)
+            .subscribe();
+
+        @Cleanup
+        Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+            .topic(topic)
+            .batchingMaxPublishDelay(50, TimeUnit.MILLISECONDS)
+            .create();
+
+        final int messages = 100;
+        List<CompletableFuture<MessageId>> futures = new ArrayList<>(messages);
+        for (int i = 0; i < messages; i++) {
+            futures.add(producer.sendAsync(i));
+        }
+        FutureUtil.waitForAll(futures).get();
+
+        for (int i = 0; i < messages; i++) {
+            if (i % 2 == 0) {
+                consumer.acknowledge(consumer.receive());
+            }
+        }
+
+        List<Message<Integer>> received = new ArrayList<>(messages);
+        for (int i = 0; i < messages; i++) {
+            received.add(consumer.receive());
+        }
+
+        Assert.assertEquals(received.size(), 100);
+    }
+
+    @Test
+    public void testBatchMessageIndexAckForExclusiveSubscription() throws PulsarClientException, ExecutionException, InterruptedException {
+        final String topic = "testBatchMessageIndexAckForExclusiveSubscription";
+
+        @Cleanup
+        Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32)
+            .topic(topic)
+            .subscriptionName("sub")
+            .receiverQueueSize(100)
+            .subscribe();
+
+        @Cleanup
+        Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+            .topic(topic)
+            .batchingMaxPublishDelay(50, TimeUnit.MILLISECONDS)
+            .create();
+
+        final int messages = 100;
+        List<CompletableFuture<MessageId>> futures = new ArrayList<>(messages);
+        for (int i = 0; i < messages; i++) {
+            futures.add(producer.sendAsync(i));
+        }
+        FutureUtil.waitForAll(futures).get();
+
+        for (int i = 0; i < messages; i++) {
+            if (i == 49) {
+                consumer.acknowledgeCumulative(consumer.receive());
+            } else {
+                consumer.receive();
+            }
+        }
+
+        //Wait ack send.
+        Thread.sleep(1000);
+        consumer.close();
+        consumer = pulsarClient.newConsumer(Schema.INT32)
+            .topic(topic)
+            .subscriptionName("sub")
+            .receiverQueueSize(100)
+            .subscribe();
+
+        List<Message<Integer>> received = new ArrayList<>(100);
+        for (int i = 0; i < messages; i++) {
+            received.add(consumer.receive());
+        }
+
+        Assert.assertEquals(received.size(), 100);
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
new file mode 100644
index 0000000..114b9ae
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class BatchMessageIndexAckTest extends ProducerConsumerBase {
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testBatchMessageIndexAckForSharedSubscription() throws PulsarClientException, ExecutionException, InterruptedException {
+        final String topic = "testBatchMessageIndexAckForSharedSubscription";
+
+        @Cleanup
+        Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32)
+            .topic(topic)
+            .subscriptionName("sub")
+            .receiverQueueSize(100)
+            .subscriptionType(SubscriptionType.Shared)
+            .negativeAckRedeliveryDelay(2, TimeUnit.SECONDS)
+            .subscribe();
+
+        @Cleanup
+        Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+            .topic(topic)
+            .batchingMaxPublishDelay(50, TimeUnit.MILLISECONDS)
+            .create();
+
+        final int messages = 100;
+        List<CompletableFuture<MessageId>> futures = new ArrayList<>(messages);
+        for (int i = 0; i < messages; i++) {
+            futures.add(producer.sendAsync(i));
+        }
+        FutureUtil.waitForAll(futures).get();
+
+        for (int i = 0; i < messages; i++) {
+            if (i % 2 == 0) {
+                consumer.acknowledge(consumer.receive());
+            } else {
+                consumer.negativeAcknowledge(consumer.receive());
+            }
+        }
+
+        List<Message<Integer>> received = new ArrayList<>(50);
+        for (int i = 0; i < 50; i++) {
+            received.add(consumer.receive());
+        }
+
+        Assert.assertEquals(received.size(), 50);
+
+        Message<Integer> moreMessage = consumer.receive(1, TimeUnit.SECONDS);
+        Assert.assertNull(moreMessage);
+
+        futures.clear();
+        for (int i = 0; i < 50; i++) {
+            futures.add(producer.sendAsync(i));
+        }
+        FutureUtil.waitForAll(futures).get();
+
+        for (int i = 0; i < 50; i++) {
+            received.add(consumer.receive());
+        }
+
+        // Ensure the flow permit is work well since the client skip the acked batch index,
+        // broker also need to handle the available permits.
+        Assert.assertEquals(received.size(), 100);
+    }
+
+    @Test
+    public void testBatchMessageIndexAckForExclusiveSubscription() throws PulsarClientException, ExecutionException, InterruptedException {
+        final String topic = "testBatchMessageIndexAckForExclusiveSubscription";
+
+        @Cleanup
+        Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32)
+            .topic(topic)
+            .subscriptionName("sub")
+            .receiverQueueSize(100)
+            .subscribe();
+
+        @Cleanup
+        Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+            .topic(topic)
+            .batchingMaxPublishDelay(50, TimeUnit.MILLISECONDS)
+            .create();
+
+        final int messages = 100;
+        List<CompletableFuture<MessageId>> futures = new ArrayList<>(messages);
+        for (int i = 0; i < messages; i++) {
+            futures.add(producer.sendAsync(i));
+        }
+        FutureUtil.waitForAll(futures).get();
+
+        for (int i = 0; i < messages; i++) {
+            if (i == 49) {
+                consumer.acknowledgeCumulative(consumer.receive());
+            } else {
+                consumer.receive();
+            }
+        }
+
+        //Wait ack send.
+        Thread.sleep(1000);
+        consumer.close();
+        consumer = pulsarClient.newConsumer(Schema.INT32)
+            .topic(topic)
+            .subscriptionName("sub")
+            .receiverQueueSize(100)
+            .subscribe();
+
+        List<Message<Integer>> received = new ArrayList<>(50);
+        for (int i = 0; i < 50; i++) {
+            received.add(consumer.receive());
+        }
+
+        Assert.assertEquals(received.size(), 50);
+
+        Message<Integer> moreMessage = consumer.receive(1, TimeUnit.SECONDS);
+        Assert.assertNull(moreMessage);
+
+        futures.clear();
+        for (int i = 0; i < 50; i++) {
+            futures.add(producer.sendAsync(i));
+        }
+        FutureUtil.waitForAll(futures).get();
+
+        for (int i = 0; i < 50; i++) {
+            received.add(consumer.receive());
+        }
+
+        // Ensure the flow permit is work well since the client skip the acked batch index,
+        // broker also need to handle the available permits.
+        Assert.assertEquals(received.size(), 100);
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java
index f702037..76b84df 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java
@@ -73,7 +73,7 @@ public class CompactedOutBatchMessageTest extends ProducerConsumerBase {
              = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topic1)
                 .subscriptionName("my-subscriber-name").subscribe()) {
             // shove it in the sideways
-            consumer.receiveIndividualMessagesFromBatch(metadata, 0, batchBuffer,
+            consumer.receiveIndividualMessagesFromBatch(metadata, 0, null, batchBuffer,
                                                         MessageIdData.newBuilder().setLedgerId(1234)
                                                         .setEntryId(567).build(), consumer.cnx());
             Message<?> m = consumer.receive();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
index 93600ec..1517f12 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
@@ -31,6 +31,8 @@ public interface AcknowledgmentsGroupingTracker extends AutoCloseable {
 
     void addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map<String, Long> properties);
 
+    void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int batchIndex, int batchSize, AckType ackType, Map<String, Long> properties);
+
     void flush();
 
     @Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
index f311936..f97467f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.client.api.MessageId;
 public class BatchMessageIdImpl extends MessageIdImpl {
     private final static int NO_BATCH = -1;
     private final int batchIndex;
+    private final int batchSize;
 
     private final transient BatchMessageAcker acker;
 
@@ -36,12 +37,13 @@ public class BatchMessageIdImpl extends MessageIdImpl {
     }
 
     public BatchMessageIdImpl(long ledgerId, long entryId, int partitionIndex, int batchIndex) {
-        this(ledgerId, entryId, partitionIndex, batchIndex, BatchMessageAckerDisabled.INSTANCE);
+        this(ledgerId, entryId, partitionIndex, batchIndex, 0, BatchMessageAckerDisabled.INSTANCE);
     }
 
-    public BatchMessageIdImpl(long ledgerId, long entryId, int partitionIndex, int batchIndex, BatchMessageAcker acker) {
+    public BatchMessageIdImpl(long ledgerId, long entryId, int partitionIndex, int batchIndex, int batchSize, BatchMessageAcker acker) {
         super(ledgerId, entryId, partitionIndex);
         this.batchIndex = batchIndex;
+        this.batchSize = batchSize;
         this.acker = acker;
     }
 
@@ -50,9 +52,11 @@ public class BatchMessageIdImpl extends MessageIdImpl {
         if (other instanceof BatchMessageIdImpl) {
             BatchMessageIdImpl otherId = (BatchMessageIdImpl) other;
             this.batchIndex = otherId.batchIndex;
+            this.batchSize = otherId.batchSize;
             this.acker = otherId.acker;
         } else {
             this.batchIndex = NO_BATCH;
+            this.batchSize = 0;
             this.acker = BatchMessageAckerDisabled.INSTANCE;
         }
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index b08a97c..6dde157 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -384,7 +384,7 @@ public class ClientCnx extends PulsarHandler {
         }
         ConsumerImpl<?> consumer = consumers.get(cmdMessage.getConsumerId());
         if (consumer != null) {
-            consumer.messageReceived(cmdMessage.getMessageId(), cmdMessage.getRedeliveryCount(), headersAndPayload, this);
+            consumer.messageReceived(cmdMessage.getMessageId(), cmdMessage.getRedeliveryCount(), cmdMessage.getAckSetList(), headersAndPayload, this);
         }
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 220bd76..662222f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -32,6 +32,7 @@ import io.netty.util.Timeout;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -76,6 +77,7 @@ import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.client.util.RetryMessageUtil;
+import org.apache.pulsar.common.api.proto.PulsarApi.IntRange;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
@@ -95,6 +97,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.SafeCollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -520,6 +523,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                             ackType);
                 }
             } else {
+                BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
+                acknowledgmentsGroupingTracker.addBatchIndexAcknowledgment(batchMessageId, batchMessageId.getBatchIndex(),
+                    batchMessageId.getBatchSize(), ackType, properties);
                 // other messages in batch are still pending ack.
                 return CompletableFuture.completedFuture(null);
             }
@@ -985,7 +991,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         });
     }
 
-    void messageReceived(MessageIdData messageId, int redeliveryCount, ByteBuf headersAndPayload, ClientCnx cnx) {
+    void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ackSet, ByteBuf headersAndPayload, ClientCnx cnx) {
         if (log.isDebugEnabled()) {
             log.debug("[{}][{}] Received message: {}/{}", topic, subscription, messageId.getLedgerId(),
                     messageId.getEntryId());
@@ -1077,7 +1083,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             }
         } else {
             // handle batch message enqueuing; uncompressed payload has all messages in batch
-            receiveIndividualMessagesFromBatch(msgMetadata, redeliveryCount, uncompressedPayload, messageId, cnx);
+            receiveIndividualMessagesFromBatch(msgMetadata, redeliveryCount, ackSet, uncompressedPayload, messageId, cnx);
 
             uncompressedPayload.release();
             msgMetadata.recycle();
@@ -1168,7 +1174,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         listenerExecutor.execute(() -> receivedFuture.complete(interceptMessage));
     }
 
-    void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliveryCount, ByteBuf uncompressedPayload,
+    void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliveryCount, List<Long> ackSet, ByteBuf uncompressedPayload,
             MessageIdData messageId, ClientCnx cnx) {
         int batchSize = msgMetadata.getNumMessagesInBatch();
 
@@ -1214,8 +1220,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                     continue;
                 }
 
+                if (ackSet != null && BitSet.valueOf(SafeCollectionUtils.longListToArray(ackSet)).get(i)) {
+                    singleMessagePayload.release();
+                    singleMessageMetadataBuilder.recycle();
+                    ++skippedMessages;
+                    continue;
+                }
+
                 BatchMessageIdImpl batchMessageIdImpl = new BatchMessageIdImpl(messageId.getLedgerId(),
-                        messageId.getEntryId(), getPartitionIndex(), i, acker);
+                        messageId.getEntryId(), getPartitionIndex(), i, batchSize, acker);
                 final MessageImpl<T> message = new MessageImpl<>(topicName.toString(), batchMessageIdImpl,
                         msgMetadata, singleMessageMetadataBuilder.build(), singleMessagePayload,
                         createEncryptionContext(msgMetadata), cnx, schema, redeliveryCount);
@@ -1448,7 +1461,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
     }
 
     private void discardMessage(MessageIdData messageId, ClientCnx currentCnx, ValidationError validationError) {
-        ByteBuf cmd = Commands.newAck(consumerId, messageId.getLedgerId(), messageId.getEntryId(), AckType.Individual,
+        ByteBuf cmd = Commands.newAck(consumerId, messageId.getLedgerId(), messageId.getEntryId(), null, AckType.Individual,
                                       validationError, Collections.emptyMap());
         currentCnx.ctx().writeAndFlush(cmd, currentCnx.ctx().voidPromise());
         increaseAvailablePermits(currentCnx);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java
index 3c987d9..9061ac3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java
@@ -46,6 +46,11 @@ public class NonPersistentAcknowledgmentGroupingTracker implements Acknowledgmen
     }
 
     @Override
+    public void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int batchIndex, int BatchSize, AckType ackType, Map<String, Long> properties) {
+        // no-op
+    }
+
+    @Override
     public void flush() {
         // no-op
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index 95ca4cf..927594c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -23,8 +23,10 @@ import io.netty.channel.EventLoopGroup;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -32,11 +34,13 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.tuple.Triple;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.util.collections.BitSetRecyclable;
+import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
 
 /**
  * Group the acknowledgements for a certain time and then sends them out in a single protobuf command.
@@ -57,16 +61,21 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
      * Latest cumulative ack sent to broker
      */
     private volatile MessageIdImpl lastCumulativeAck = (MessageIdImpl) MessageId.earliest;
+    private volatile BitSetRecyclable lastCumulativeAckSet = null;
     private volatile boolean cumulativeAckFlushRequired = false;
 
     private static final AtomicReferenceFieldUpdater<PersistentAcknowledgmentsGroupingTracker, MessageIdImpl> LAST_CUMULATIVE_ACK_UPDATER = AtomicReferenceFieldUpdater
             .newUpdater(PersistentAcknowledgmentsGroupingTracker.class, MessageIdImpl.class, "lastCumulativeAck");
+    private static final AtomicReferenceFieldUpdater<PersistentAcknowledgmentsGroupingTracker, BitSetRecyclable> LAST_CUMULATIVE_ACK_SET_UPDATER = AtomicReferenceFieldUpdater
+        .newUpdater(PersistentAcknowledgmentsGroupingTracker.class, BitSetRecyclable.class, "lastCumulativeAckSet");
+
 
     /**
      * This is a set of all the individual acks that the application has issued and that were not already sent to
      * broker.
      */
     private final ConcurrentSkipListSet<MessageIdImpl> pendingIndividualAcks;
+    private final ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable> pendingIndividualBatchIndexAcks;
 
     private final ScheduledFuture<?> scheduledTask;
 
@@ -74,6 +83,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
                                                     EventLoopGroup eventLoopGroup) {
         this.consumer = consumer;
         this.pendingIndividualAcks = new ConcurrentSkipListSet<>();
+        this.pendingIndividualBatchIndexAcks = new ConcurrentHashMap<>();
         this.acknowledgementGroupTimeMicros = conf.getAcknowledgementsGroupTimeMicros();
 
         if (acknowledgementGroupTimeMicros > 0) {
@@ -103,7 +113,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
             // uncommon condition since it's only used for the compaction subscription.
             doImmediateAck(msgId, ackType, properties);
         } else if (ackType == AckType.Cumulative) {
-            doCumulativeAck(msgId);
+            doCumulativeAck(msgId, null);
         } else {
             // Individual ack
             if (msgId instanceof BatchMessageIdImpl) {
@@ -112,18 +122,51 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
             } else {
                 pendingIndividualAcks.add(msgId);
             }
+            pendingIndividualBatchIndexAcks.remove(msgId);
             if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
                 flush();
             }
         }
     }
 
-    private void doCumulativeAck(MessageIdImpl msgId) {
+    @Override
+    public void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int batchIndex, int batchSize, AckType ackType, Map<String, Long> properties) {
+        if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty()) {
+            doImmediateBatchIndexAck(msgId, batchIndex, batchSize, ackType, properties);
+        } else if (ackType == AckType.Cumulative) {
+            BitSetRecyclable bitSet = BitSetRecyclable.create();
+            bitSet.set(0, batchSize);
+            bitSet.clear(0, batchIndex + 1);
+            doCumulativeAck(msgId, bitSet);
+        } else if (ackType == AckType.Individual) {
+            ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent(
+                new MessageIdImpl(msgId.getLedgerId(), msgId.getEntryId(), msgId.getPartitionIndex()), (v) -> {
+                    ConcurrentBitSetRecyclable value = ConcurrentBitSetRecyclable.create();
+                    value.set(0, batchSize + 1);
+                    value.clear(batchIndex);
+                    return value;
+                });
+            bitSet.clear(batchIndex);
+            if (pendingIndividualBatchIndexAcks.size() >= MAX_ACK_GROUP_SIZE) {
+                flush();
+            }
+        }
+    }
+
+    private void doCumulativeAck(MessageIdImpl msgId, BitSetRecyclable bitSet) {
         // Handle concurrent updates from different threads
         while (true) {
             MessageIdImpl lastCumlativeAck = this.lastCumulativeAck;
+            BitSetRecyclable lastBitSet = this.lastCumulativeAckSet;
             if (msgId.compareTo(lastCumlativeAck) > 0) {
-                if (LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this, lastCumlativeAck, msgId)) {
+                if (LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this, lastCumlativeAck, msgId) && LAST_CUMULATIVE_ACK_SET_UPDATER.compareAndSet(this, lastBitSet, bitSet)) {
+                    if (lastBitSet != null) {
+                        try {
+                            lastBitSet.recycle();
+                        } catch (Exception ignore) {
+                            // no-op
+                        }
+                    }
                     // Successfully updated the last cumulative ack. Next flush iteration will send this to broker.
                     cumulativeAckFlushRequired = true;
                     return;
@@ -142,13 +185,32 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
             return false;
         }
 
-        final ByteBuf cmd = Commands.newAck(consumer.consumerId, msgId.getLedgerId(), msgId.getEntryId(), ackType, null,
+        final ByteBuf cmd = Commands.newAck(consumer.consumerId, msgId.getLedgerId(), msgId.getEntryId(), null, ackType, null,
                 properties);
 
         cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
         return true;
     }
 
+    private boolean doImmediateBatchIndexAck(BatchMessageIdImpl msgId, int batchIndex, int batchSize, AckType ackType, Map<String, Long> properties) {
+        ClientCnx cnx = consumer.getClientCnx();
+
+        if (cnx == null) {
+            return false;
+        }
+        BitSetRecyclable bitSet = BitSetRecyclable.create();
+        bitSet.set(0, batchSize);
+        if (ackType == AckType.Cumulative) {
+            bitSet.clear(0, batchIndex + 1);
+        } else {
+            bitSet.clear(batchIndex);
+        }
+
+        final ByteBuf cmd = Commands.newAck(consumer.consumerId, msgId.ledgerId, msgId.entryId, bitSet, ackType, null, properties);
+        cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+        return true;
+    }
+
     /**
      * Flush all the pending acks and send them to the broker
      */
@@ -164,7 +226,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
 
         boolean shouldFlush = false;
         if (cumulativeAckFlushRequired) {
-            ByteBuf cmd = Commands.newAck(consumer.consumerId, lastCumulativeAck.ledgerId, lastCumulativeAck.entryId,
+            ByteBuf cmd = Commands.newAck(consumer.consumerId, lastCumulativeAck.ledgerId, lastCumulativeAck.entryId, lastCumulativeAckSet,
                     AckType.Cumulative, null, Collections.emptyMap());
             cnx.ctx().write(cmd, cnx.ctx().voidPromise());
             shouldFlush=true;
@@ -172,22 +234,18 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
         }
 
         // Flush all individual acks
+        List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck = new ArrayList<>(pendingIndividualAcks.size() + pendingIndividualBatchIndexAcks.size());
         if (!pendingIndividualAcks.isEmpty()) {
             if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) {
                 // We can send 1 single protobuf command with all individual acks
-                List<Pair<Long, Long>> entriesToAck = new ArrayList<>(pendingIndividualAcks.size());
                 while (true) {
                     MessageIdImpl msgId = pendingIndividualAcks.pollFirst();
                     if (msgId == null) {
                         break;
                     }
 
-                    entriesToAck.add(Pair.of(msgId.getLedgerId(), msgId.getEntryId()));
+                    entriesToAck.add(Triple.of(msgId.getLedgerId(), msgId.getEntryId(), null));
                 }
-
-                cnx.ctx().write(Commands.newMultiMessageAck(consumer.consumerId, entriesToAck),
-                        cnx.ctx().voidPromise());
-                shouldFlush = true;
             } else {
                 // When talking to older brokers, send the acknowledgements individually
                 while (true) {
@@ -196,17 +254,33 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
                         break;
                     }
 
-                    cnx.ctx().write(Commands.newAck(consumer.consumerId, msgId.getLedgerId(), msgId.getEntryId(),
+                    cnx.ctx().write(Commands.newAck(consumer.consumerId, msgId.getLedgerId(), msgId.getEntryId(), null,
                             AckType.Individual, null, Collections.emptyMap()), cnx.ctx().voidPromise());
                     shouldFlush = true;
                 }
             }
         }
 
+        if (!pendingIndividualBatchIndexAcks.isEmpty()) {
+            Iterator<Map.Entry<MessageIdImpl, ConcurrentBitSetRecyclable>> iterator = pendingIndividualBatchIndexAcks.entrySet().iterator();
+
+            while (iterator.hasNext()) {
+                Map.Entry<MessageIdImpl, ConcurrentBitSetRecyclable> entry = iterator.next();
+                entriesToAck.add(Triple.of(entry.getKey().ledgerId, entry.getKey().entryId, entry.getValue()));
+                iterator.remove();
+            }
+        }
+
+        if (entriesToAck.size() > 0) {
+            cnx.ctx().write(Commands.newMultiMessageAck(consumer.consumerId, entriesToAck),
+                cnx.ctx().voidPromise());
+            shouldFlush = true;
+        }
+
         if (shouldFlush) {
             if (log.isDebugEnabled()) {
-                log.debug("[{}] Flushing pending acks to broker: last-cumulative-ack: {} -- individual-acks: {}",
-                        consumer, lastCumulativeAck, pendingIndividualAcks);
+                log.debug("[{}] Flushing pending acks to broker: last-cumulative-ack: {} -- individual-acks: {} -- individual-batch-index-acks: {}",
+                        consumer, lastCumulativeAck, pendingIndividualAcks, pendingIndividualBatchIndexAcks);
             }
             cnx.ctx().flush();
         }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index a0de070..79e368e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -23,6 +23,7 @@ import static java.lang.String.format;
 
 import io.netty.buffer.ByteBuf;
 
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.locks.Lock;
@@ -36,6 +37,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.common.api.proto.PulsarApi.IntRange;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 
@@ -177,6 +179,7 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> {
 
     @Override
     void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliveryCount,
+            List<Long> ackSet,
             ByteBuf uncompressedPayload, MessageIdData messageId, ClientCnx cnx) {
         log.warn(
                 "Closing consumer [{}]-[{}] due to unsupported received batch-message with zero receiver queue size",
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageIdImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageIdImplTest.java
index c32a593..2920737 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageIdImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageIdImplTest.java
@@ -76,7 +76,7 @@ public class BatchMessageIdImplTest {
     public void deserializationTest() {
         // initialize BitSet with null
         BatchMessageAcker ackerDisabled = new BatchMessageAcker(null, 0);
-        BatchMessageIdImpl batchMsgId = new BatchMessageIdImpl(0, 0, 0, 0, ackerDisabled);
+        BatchMessageIdImpl batchMsgId = new BatchMessageIdImpl(0, 0, 0, 0, 0, ackerDisabled);
 
         ObjectWriter writer = ObjectMapperFactory.create().writerWithDefaultPrettyPrinter();
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 9e0bf86..d3b9f3c 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -1133,6 +1133,11 @@ public final class PulsarApi {
     // optional int32 batch_index = 4 [default = -1];
     boolean hasBatchIndex();
     int getBatchIndex();
+    
+    // repeated int64 ack_set = 5;
+    java.util.List<java.lang.Long> getAckSetList();
+    int getAckSetCount();
+    long getAckSet(int index);
   }
   public static final class MessageIdData extends
       org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -1209,11 +1214,26 @@ public final class PulsarApi {
       return batchIndex_;
     }
     
+    // repeated int64 ack_set = 5;
+    public static final int ACK_SET_FIELD_NUMBER = 5;
+    private java.util.List<java.lang.Long> ackSet_;
+    public java.util.List<java.lang.Long>
+        getAckSetList() {
+      return ackSet_;
+    }
+    public int getAckSetCount() {
+      return ackSet_.size();
+    }
+    public long getAckSet(int index) {
+      return ackSet_.get(index);
+    }
+    
     private void initFields() {
       ledgerId_ = 0L;
       entryId_ = 0L;
       partition_ = -1;
       batchIndex_ = -1;
+      ackSet_ = java.util.Collections.emptyList();;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -1252,6 +1272,9 @@ public final class PulsarApi {
       if (((bitField0_ & 0x00000008) == 0x00000008)) {
         output.writeInt32(4, batchIndex_);
       }
+      for (int i = 0; i < ackSet_.size(); i++) {
+        output.writeInt64(5, ackSet_.get(i));
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -1276,6 +1299,15 @@ public final class PulsarApi {
         size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
           .computeInt32Size(4, batchIndex_);
       }
+      {
+        int dataSize = 0;
+        for (int i = 0; i < ackSet_.size(); i++) {
+          dataSize += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+            .computeInt64SizeNoTag(ackSet_.get(i));
+        }
+        size += dataSize;
+        size += 1 * getAckSetList().size();
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -1397,6 +1429,8 @@ public final class PulsarApi {
         bitField0_ = (bitField0_ & ~0x00000004);
         batchIndex_ = -1;
         bitField0_ = (bitField0_ & ~0x00000008);
+        ackSet_ = java.util.Collections.emptyList();;
+        bitField0_ = (bitField0_ & ~0x00000010);
         return this;
       }
       
@@ -1446,6 +1480,11 @@ public final class PulsarApi {
           to_bitField0_ |= 0x00000008;
         }
         result.batchIndex_ = batchIndex_;
+        if (((bitField0_ & 0x00000010) == 0x00000010)) {
+          ackSet_ = java.util.Collections.unmodifiableList(ackSet_);
+          bitField0_ = (bitField0_ & ~0x00000010);
+        }
+        result.ackSet_ = ackSet_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -1464,6 +1503,16 @@ public final class PulsarApi {
         if (other.hasBatchIndex()) {
           setBatchIndex(other.getBatchIndex());
         }
+        if (!other.ackSet_.isEmpty()) {
+          if (ackSet_.isEmpty()) {
+            ackSet_ = other.ackSet_;
+            bitField0_ = (bitField0_ & ~0x00000010);
+          } else {
+            ensureAckSetIsMutable();
+            ackSet_.addAll(other.ackSet_);
+          }
+          
+        }
         return this;
       }
       
@@ -1521,6 +1570,11 @@ public final class PulsarApi {
               batchIndex_ = input.readInt32();
               break;
             }
+            case 40: {
+              ensureAckSetIsMutable();
+              ackSet_.add(input.readInt64());
+              break;
+            }
           }
         }
       }
@@ -1611,6 +1665,51 @@ public final class PulsarApi {
         return this;
       }
       
+      // repeated int64 ack_set = 5;
+      private java.util.List<java.lang.Long> ackSet_ = java.util.Collections.emptyList();;
+      private void ensureAckSetIsMutable() {
+        if (!((bitField0_ & 0x00000010) == 0x00000010)) {
+          ackSet_ = new java.util.ArrayList<java.lang.Long>(ackSet_);
+          bitField0_ |= 0x00000010;
+         }
+      }
+      public java.util.List<java.lang.Long>
+          getAckSetList() {
+        return java.util.Collections.unmodifiableList(ackSet_);
+      }
+      public int getAckSetCount() {
+        return ackSet_.size();
+      }
+      public long getAckSet(int index) {
+        return ackSet_.get(index);
+      }
+      public Builder setAckSet(
+          int index, long value) {
+        ensureAckSetIsMutable();
+        ackSet_.set(index, value);
+        
+        return this;
+      }
+      public Builder addAckSet(long value) {
+        ensureAckSetIsMutable();
+        ackSet_.add(value);
+        
+        return this;
+      }
+      public Builder addAllAckSet(
+          java.lang.Iterable<? extends java.lang.Long> values) {
+        ensureAckSetIsMutable();
+        super.addAll(values, ackSet_);
+        
+        return this;
+      }
+      public Builder clearAckSet() {
+        ackSet_ = java.util.Collections.emptyList();;
+        bitField0_ = (bitField0_ & ~0x00000010);
+        
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.MessageIdData)
     }
     
@@ -17748,6 +17847,11 @@ public final class PulsarApi {
     // optional uint32 redelivery_count = 3 [default = 0];
     boolean hasRedeliveryCount();
     int getRedeliveryCount();
+    
+    // repeated int64 ack_set = 4;
+    java.util.List<java.lang.Long> getAckSetList();
+    int getAckSetCount();
+    long getAckSet(int index);
   }
   public static final class CommandMessage extends
       org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -17814,10 +17918,25 @@ public final class PulsarApi {
       return redeliveryCount_;
     }
     
+    // repeated int64 ack_set = 4;
+    public static final int ACK_SET_FIELD_NUMBER = 4;
+    private java.util.List<java.lang.Long> ackSet_;
+    public java.util.List<java.lang.Long>
+        getAckSetList() {
+      return ackSet_;
+    }
+    public int getAckSetCount() {
+      return ackSet_.size();
+    }
+    public long getAckSet(int index) {
+      return ackSet_.get(index);
+    }
+    
     private void initFields() {
       consumerId_ = 0L;
       messageId_ = org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance();
       redeliveryCount_ = 0;
+      ackSet_ = java.util.Collections.emptyList();;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -17857,6 +17976,9 @@ public final class PulsarApi {
       if (((bitField0_ & 0x00000004) == 0x00000004)) {
         output.writeUInt32(3, redeliveryCount_);
       }
+      for (int i = 0; i < ackSet_.size(); i++) {
+        output.writeInt64(4, ackSet_.get(i));
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -17877,6 +17999,15 @@ public final class PulsarApi {
         size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
           .computeUInt32Size(3, redeliveryCount_);
       }
+      {
+        int dataSize = 0;
+        for (int i = 0; i < ackSet_.size(); i++) {
+          dataSize += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+            .computeInt64SizeNoTag(ackSet_.get(i));
+        }
+        size += dataSize;
+        size += 1 * getAckSetList().size();
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -17996,6 +18127,8 @@ public final class PulsarApi {
         bitField0_ = (bitField0_ & ~0x00000002);
         redeliveryCount_ = 0;
         bitField0_ = (bitField0_ & ~0x00000004);
+        ackSet_ = java.util.Collections.emptyList();;
+        bitField0_ = (bitField0_ & ~0x00000008);
         return this;
       }
       
@@ -18041,6 +18174,11 @@ public final class PulsarApi {
           to_bitField0_ |= 0x00000004;
         }
         result.redeliveryCount_ = redeliveryCount_;
+        if (((bitField0_ & 0x00000008) == 0x00000008)) {
+          ackSet_ = java.util.Collections.unmodifiableList(ackSet_);
+          bitField0_ = (bitField0_ & ~0x00000008);
+        }
+        result.ackSet_ = ackSet_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -18056,6 +18194,16 @@ public final class PulsarApi {
         if (other.hasRedeliveryCount()) {
           setRedeliveryCount(other.getRedeliveryCount());
         }
+        if (!other.ackSet_.isEmpty()) {
+          if (ackSet_.isEmpty()) {
+            ackSet_ = other.ackSet_;
+            bitField0_ = (bitField0_ & ~0x00000008);
+          } else {
+            ensureAckSetIsMutable();
+            ackSet_.addAll(other.ackSet_);
+          }
+          
+        }
         return this;
       }
       
@@ -18117,6 +18265,11 @@ public final class PulsarApi {
               redeliveryCount_ = input.readUInt32();
               break;
             }
+            case 32: {
+              ensureAckSetIsMutable();
+              ackSet_.add(input.readInt64());
+              break;
+            }
           }
         }
       }
@@ -18208,6 +18361,51 @@ public final class PulsarApi {
         return this;
       }
       
+      // repeated int64 ack_set = 4;
+      private java.util.List<java.lang.Long> ackSet_ = java.util.Collections.emptyList();;
+      private void ensureAckSetIsMutable() {
+        if (!((bitField0_ & 0x00000008) == 0x00000008)) {
+          ackSet_ = new java.util.ArrayList<java.lang.Long>(ackSet_);
+          bitField0_ |= 0x00000008;
+         }
+      }
+      public java.util.List<java.lang.Long>
+          getAckSetList() {
+        return java.util.Collections.unmodifiableList(ackSet_);
+      }
+      public int getAckSetCount() {
+        return ackSet_.size();
+      }
+      public long getAckSet(int index) {
+        return ackSet_.get(index);
+      }
+      public Builder setAckSet(
+          int index, long value) {
+        ensureAckSetIsMutable();
+        ackSet_.set(index, value);
+        
+        return this;
+      }
+      public Builder addAckSet(long value) {
+        ensureAckSetIsMutable();
+        ackSet_.add(value);
+        
+        return this;
+      }
+      public Builder addAllAckSet(
+          java.lang.Iterable<? extends java.lang.Long> values) {
+        ensureAckSetIsMutable();
+        super.addAll(values, ackSet_);
+        
+        return this;
+      }
+      public Builder clearAckSet() {
+        ackSet_ = java.util.Collections.emptyList();;
+        bitField0_ = (bitField0_ & ~0x00000008);
+        
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandMessage)
     }
     
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 0e1a1f9..cbc4a39 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -39,7 +39,7 @@ import java.util.stream.Collectors;
 import lombok.experimental.UtilityClass;
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.tuple.Triple;
 import org.apache.pulsar.client.api.KeySharedPolicy;
 import org.apache.pulsar.client.api.Range;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
@@ -117,6 +117,9 @@ import org.apache.pulsar.common.api.proto.PulsarApi.TxnAction;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.common.util.SafeCollectionUtils;
+import org.apache.pulsar.common.util.collections.BitSetRecyclable;
+import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
 import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
 import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
 import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
@@ -437,13 +440,16 @@ public class Commands {
     }
 
     public static ByteBufPair newMessage(long consumerId, MessageIdData messageId, int redeliveryCount,
-        ByteBuf metadataAndPayload) {
+        ByteBuf metadataAndPayload, long[] ackSet) {
         CommandMessage.Builder msgBuilder = CommandMessage.newBuilder();
         msgBuilder.setConsumerId(consumerId);
         msgBuilder.setMessageId(messageId);
         if (redeliveryCount > 0) {
             msgBuilder.setRedeliveryCount(redeliveryCount);
         }
+        if (ackSet != null) {
+            msgBuilder.addAllAckSet(SafeCollectionUtils.longArrayToList(ackSet));
+        }
         CommandMessage msg = msgBuilder.build();
         BaseCommand.Builder cmdBuilder = BaseCommand.newBuilder();
         BaseCommand cmd = cmdBuilder.setType(Type.MESSAGE).setMessage(msg).build();
@@ -859,7 +865,8 @@ public class Commands {
         return res;
     }
 
-    public static ByteBuf newMultiMessageAck(long consumerId, List<Pair<Long, Long>> entries) {
+    public static ByteBuf newMultiMessageAck(long consumerId,
+             List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entries) {
         CommandAck.Builder ackBuilder = CommandAck.newBuilder();
         ackBuilder.setConsumerId(consumerId);
         ackBuilder.setAckType(AckType.Individual);
@@ -867,14 +874,19 @@ public class Commands {
         int entriesCount = entries.size();
         for (int i = 0; i < entriesCount; i++) {
             long ledgerId = entries.get(i).getLeft();
-            long entryId = entries.get(i).getRight();
-
+            long entryId = entries.get(i).getMiddle();
+            ConcurrentBitSetRecyclable bitSet = entries.get(i).getRight();
             MessageIdData.Builder messageIdDataBuilder = MessageIdData.newBuilder();
             messageIdDataBuilder.setLedgerId(ledgerId);
             messageIdDataBuilder.setEntryId(entryId);
+            if (bitSet != null) {
+                messageIdDataBuilder.addAllAckSet(SafeCollectionUtils.longArrayToList(bitSet.toLongArray()));
+            }
             MessageIdData messageIdData = messageIdDataBuilder.build();
             ackBuilder.addMessageId(messageIdData);
-
+            if (bitSet != null) {
+                bitSet.recycle();
+            }
             messageIdDataBuilder.recycle();
         }
 
@@ -890,12 +902,12 @@ public class Commands {
         return res;
     }
 
-    public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, AckType ackType,
+    public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, BitSetRecyclable ackSet, AckType ackType,
                                  ValidationError validationError, Map<String, Long> properties) {
-        return newAck(consumerId, ledgerId, entryId, ackType, validationError, properties, 0, 0);
+        return newAck(consumerId, ledgerId, entryId, ackSet, ackType, validationError, properties, 0, 0);
     }
 
-    public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, AckType ackType,
+    public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, BitSetRecyclable ackSet, AckType ackType,
                                  ValidationError validationError, Map<String, Long> properties, long txnIdLeastBits,
                                  long txnIdMostBits) {
         CommandAck.Builder ackBuilder = CommandAck.newBuilder();
@@ -904,6 +916,9 @@ public class Commands {
         MessageIdData.Builder messageIdDataBuilder = MessageIdData.newBuilder();
         messageIdDataBuilder.setLedgerId(ledgerId);
         messageIdDataBuilder.setEntryId(entryId);
+        if (ackSet != null) {
+            messageIdDataBuilder.addAllAckSet(SafeCollectionUtils.longArrayToList(ackSet.toLongArray()));
+        }
         MessageIdData messageIdData = messageIdDataBuilder.build();
         ackBuilder.addMessageId(messageIdData);
         if (validationError != null) {
@@ -923,6 +938,9 @@ public class Commands {
 
         ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.ACK).setAck(ack));
         ack.recycle();
+        if (ackSet != null) {
+            ackSet.recycle();
+        }
         ackBuilder.recycle();
         messageIdDataBuilder.recycle();
         messageIdData.recycle();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SafeCollectionUtils.java
similarity index 57%
copy from pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/util/SafeCollectionUtils.java
index 93600ec..21488fb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SafeCollectionUtils.java
@@ -16,25 +16,24 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.impl;
+package org.apache.pulsar.common.util;
 
-import java.util.Map;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
 
 /**
- * Acknowledgments grouping tracker.
+ * Safe collection utils.
  */
-public interface AcknowledgmentsGroupingTracker extends AutoCloseable {
+public class SafeCollectionUtils {
 
-    boolean isDuplicate(MessageId messageId);
+    public static List<Long> longArrayToList(long[] array) {
+        return array == null || array.length == 0 ? Collections.emptyList()
+            : Arrays.stream(array).boxed().collect(Collectors.toList());
+    }
 
-    void addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map<String, Long> properties);
-
-    void flush();
-
-    @Override
-    void close();
-
-    void flushAndClean();
+    public static long[] longListToArray(List<Long> list) {
+        return list == null || list.size() == 0 ? new long[0] : list.stream().mapToLong(l->l).toArray();
+    }
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/BitSetRecyclable.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/BitSetRecyclable.java
new file mode 100644
index 0000000..285416a
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/BitSetRecyclable.java
@@ -0,0 +1,1209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.LongBuffer;
+import java.util.Arrays;
+import java.util.BitSet;
+
+/**
+ * This this copy of {@link BitSet}.
+ * Provides {@link BitSetRecyclable#resetWords(long[])} method and leverage with netty recycler.
+ */
+public class BitSetRecyclable implements Cloneable, java.io.Serializable {
+    /*
+     * BitSets are packed into arrays of "words."  Currently a word is
+     * a long, which consists of 64 bits, requiring 6 address bits.
+     * The choice of word size is determined purely by performance concerns.
+     */
+    private final static int ADDRESS_BITS_PER_WORD = 6;
+    private final static int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD;
+    private final static int BIT_INDEX_MASK = BITS_PER_WORD - 1;
+
+    /* Used to shift left or right for a partial word mask */
+    private static final long WORD_MASK = 0xffffffffffffffffL;
+
+    /**
+     * @serialField bits long[]
+     *
+     * The bits in this BitSet.  The ith bit is stored in bits[i/64] at
+     * bit position i % 64 (where bit position 0 refers to the least
+     * significant bit and 63 refers to the most significant bit).
+     */
+    private static final ObjectStreamField[] serialPersistentFields = {
+        new ObjectStreamField("bits", long[].class),
+    };
+
+    /**
+     * The internal field corresponding to the serialField "bits".
+     */
+    private long[] words;
+
+    /**
+     * The number of words in the logical size of this BitSet.
+     */
+    private transient int wordsInUse = 0;
+
+    /**
+     * Whether the size of "words" is user-specified.  If so, we assume
+     * the user knows what he's doing and try harder to preserve it.
+     */
+    private transient boolean sizeIsSticky = false;
+
+    /* use serialVersionUID from JDK 1.0.2 for interoperability */
+    private static final long serialVersionUID = 7997698588986878753L;
+
+    /**
+     * Given a bit index, return word index containing it.
+     */
+    private static int wordIndex(int bitIndex) {
+        return bitIndex >> ADDRESS_BITS_PER_WORD;
+    }
+
+    /**
+     * Every public method must preserve these invariants.
+     */
+    private void checkInvariants() {
+        assert(wordsInUse == 0 || words[wordsInUse - 1] != 0);
+        assert(wordsInUse >= 0 && wordsInUse <= words.length);
+        assert(wordsInUse == words.length || words[wordsInUse] == 0);
+    }
+
+    /**
+     * Sets the field wordsInUse to the logical size in words of the bit set.
+     * WARNING:This method assumes that the number of words actually in use is
+     * less than or equal to the current value of wordsInUse!
+     */
+    private void recalculateWordsInUse() {
+        // Traverse the bitset until a used word is found
+        int i;
+        for (i = wordsInUse-1; i >= 0; i--)
+            if (words[i] != 0)
+                break;
+
+        wordsInUse = i+1; // The new logical size
+    }
+
+    /**
+     * Creates a new bit set. All bits are initially {@code false}.
+     */
+    public BitSetRecyclable() {
+        initWords(BITS_PER_WORD);
+        sizeIsSticky = false;
+    }
+
+    /**
+     * Creates a bit set whose initial size is large enough to explicitly
+     * represent bits with indices in the range {@code 0} through
+     * {@code nbits-1}. All bits are initially {@code false}.
+     *
+     * @param  nbits the initial size of the bit set
+     * @throws NegativeArraySizeException if the specified initial size
+     *         is negative
+     */
+    public BitSetRecyclable(int nbits) {
+        // nbits can't be negative; size 0 is OK
+        if (nbits < 0)
+            throw new NegativeArraySizeException("nbits < 0: " + nbits);
+
+        initWords(nbits);
+        sizeIsSticky = true;
+    }
+
+    private void initWords(int nbits) {
+        words = new long[wordIndex(nbits-1) + 1];
+    }
+
+    /**
+     * Creates a bit set using words as the internal representation.
+     * The last word (if there is one) must be non-zero.
+     */
+    private BitSetRecyclable(long[] words) {
+        this.words = words;
+        this.wordsInUse = words.length;
+        checkInvariants();
+    }
+
+    /**
+     * Returns a new bit set containing all the bits in the given long array.
+     *
+     * <p>More precisely,
+     * <br>{@code BitSet.valueOf(longs).get(n) == ((longs[n/64] & (1L<<(n%64))) != 0)}
+     * <br>for all {@code n < 64 * longs.length}.
+     *
+     * <p>This method is equivalent to
+     * {@code BitSet.valueOf(LongBuffer.wrap(longs))}.
+     *
+     * @param longs a long array containing a little-endian representation
+     *        of a sequence of bits to be used as the initial bits of the
+     *        new bit set
+     * @return a {@code BitSet} containing all the bits in the long array
+     * @since 1.7
+     */
+    public static BitSetRecyclable valueOf(long[] longs) {
+        int n;
+        for (n = longs.length; n > 0 && longs[n - 1] == 0; n--)
+            ;
+        return new BitSetRecyclable(Arrays.copyOf(longs, n));
+    }
+
+    /**
+     * Returns a new bit set containing all the bits in the given long
+     * buffer between its position and limit.
+     *
+     * <p>More precisely,
+     * <br>{@code BitSet.valueOf(lb).get(n) == ((lb.get(lb.position()+n/64) & (1L<<(n%64))) != 0)}
+     * <br>for all {@code n < 64 * lb.remaining()}.
+     *
+     * <p>The long buffer is not modified by this method, and no
+     * reference to the buffer is retained by the bit set.
+     *
+     * @param lb a long buffer containing a little-endian representation
+     *        of a sequence of bits between its position and limit, to be
+     *        used as the initial bits of the new bit set
+     * @return a {@code BitSet} containing all the bits in the buffer in the
+     *         specified range
+     * @since 1.7
+     */
+    public static BitSetRecyclable valueOf(LongBuffer lb) {
+        lb = lb.slice();
+        int n;
+        for (n = lb.remaining(); n > 0 && lb.get(n - 1) == 0; n--)
+            ;
+        long[] words = new long[n];
+        lb.get(words);
+        return new BitSetRecyclable(words);
+    }
+
+    /**
+     * Returns a new bit set containing all the bits in the given byte array.
+     *
+     * <p>More precisely,
+     * <br>{@code BitSet.valueOf(bytes).get(n) == ((bytes[n/8] & (1<<(n%8))) != 0)}
+     * <br>for all {@code n <  8 * bytes.length}.
+     *
+     * <p>This method is equivalent to
+     * {@code BitSet.valueOf(ByteBuffer.wrap(bytes))}.
+     *
+     * @param bytes a byte array containing a little-endian
+     *        representation of a sequence of bits to be used as the
+     *        initial bits of the new bit set
+     * @return a {@code BitSet} containing all the bits in the byte array
+     * @since 1.7
+     */
+    public static BitSetRecyclable valueOf(byte[] bytes) {
+        return BitSetRecyclable.valueOf(ByteBuffer.wrap(bytes));
+    }
+
+    /**
+     * Returns a new bit set containing all the bits in the given byte
+     * buffer between its position and limit.
+     *
+     * <p>More precisely,
+     * <br>{@code BitSet.valueOf(bb).get(n) == ((bb.get(bb.position()+n/8) & (1<<(n%8))) != 0)}
+     * <br>for all {@code n < 8 * bb.remaining()}.
+     *
+     * <p>The byte buffer is not modified by this method, and no
+     * reference to the buffer is retained by the bit set.
+     *
+     * @param bb a byte buffer containing a little-endian representation
+     *        of a sequence of bits between its position and limit, to be
+     *        used as the initial bits of the new bit set
+     * @return a {@code BitSet} containing all the bits in the buffer in the
+     *         specified range
+     * @since 1.7
+     */
+    public static BitSetRecyclable valueOf(ByteBuffer bb) {
+        bb = bb.slice().order(ByteOrder.LITTLE_ENDIAN);
+        int n;
+        for (n = bb.remaining(); n > 0 && bb.get(n - 1) == 0; n--)
+            ;
+        long[] words = new long[(n + 7) / 8];
+        bb.limit(n);
+        int i = 0;
+        while (bb.remaining() >= 8)
+            words[i++] = bb.getLong();
+        for (int remaining = bb.remaining(), j = 0; j < remaining; j++)
+            words[i] |= (bb.get() & 0xffL) << (8 * j);
+        return new BitSetRecyclable(words);
+    }
+
+    /**
+     * Returns a new byte array containing all the bits in this bit set.
+     *
+     * <p>More precisely, if
+     * <br>{@code byte[] bytes = s.toByteArray();}
+     * <br>then {@code bytes.length == (s.length()+7)/8} and
+     * <br>{@code s.get(n) == ((bytes[n/8] & (1<<(n%8))) != 0)}
+     * <br>for all {@code n < 8 * bytes.length}.
+     *
+     * @return a byte array containing a little-endian representation
+     *         of all the bits in this bit set
+     * @since 1.7
+     */
+    public byte[] toByteArray() {
+        int n = wordsInUse;
+        if (n == 0)
+            return new byte[0];
+        int len = 8 * (n-1);
+        for (long x = words[n - 1]; x != 0; x >>>= 8)
+            len++;
+        byte[] bytes = new byte[len];
+        ByteBuffer bb = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN);
+        for (int i = 0; i < n - 1; i++)
+            bb.putLong(words[i]);
+        for (long x = words[n - 1]; x != 0; x >>>= 8)
+            bb.put((byte) (x & 0xff));
+        return bytes;
+    }
+
+    /**
+     * Returns a new long array containing all the bits in this bit set.
+     *
+     * <p>More precisely, if
+     * <br>{@code long[] longs = s.toLongArray();}
+     * <br>then {@code longs.length == (s.length()+63)/64} and
+     * <br>{@code s.get(n) == ((longs[n/64] & (1L<<(n%64))) != 0)}
+     * <br>for all {@code n < 64 * longs.length}.
+     *
+     * @return a long array containing a little-endian representation
+     *         of all the bits in this bit set
+     * @since 1.7
+     */
+    public long[] toLongArray() {
+        return Arrays.copyOf(words, wordsInUse);
+    }
+
+    /**
+     * Ensures that the BitSet can hold enough words.
+     * @param wordsRequired the minimum acceptable number of words.
+     */
+    private void ensureCapacity(int wordsRequired) {
+        if (words.length < wordsRequired) {
+            // Allocate larger of doubled size or required size
+            int request = Math.max(2 * words.length, wordsRequired);
+            words = Arrays.copyOf(words, request);
+            sizeIsSticky = false;
+        }
+    }
+
+    /**
+     * Ensures that the BitSet can accommodate a given wordIndex,
+     * temporarily violating the invariants.  The caller must
+     * restore the invariants before returning to the user,
+     * possibly using recalculateWordsInUse().
+     * @param wordIndex the index to be accommodated.
+     */
+    private void expandTo(int wordIndex) {
+        int wordsRequired = wordIndex+1;
+        if (wordsInUse < wordsRequired) {
+            ensureCapacity(wordsRequired);
+            wordsInUse = wordsRequired;
+        }
+    }
+
+    /**
+     * Checks that fromIndex ... toIndex is a valid range of bit indices.
+     */
+    private static void checkRange(int fromIndex, int toIndex) {
+        if (fromIndex < 0)
+            throw new IndexOutOfBoundsException("fromIndex < 0: " + fromIndex);
+        if (toIndex < 0)
+            throw new IndexOutOfBoundsException("toIndex < 0: " + toIndex);
+        if (fromIndex > toIndex)
+            throw new IndexOutOfBoundsException("fromIndex: " + fromIndex +
+                " > toIndex: " + toIndex);
+    }
+
+    /**
+     * Sets the bit at the specified index to the complement of its
+     * current value.
+     *
+     * @param  bitIndex the index of the bit to flip
+     * @throws IndexOutOfBoundsException if the specified index is negative
+     * @since  1.4
+     */
+    public void flip(int bitIndex) {
+        if (bitIndex < 0)
+            throw new IndexOutOfBoundsException("bitIndex < 0: " + bitIndex);
+
+        int wordIndex = wordIndex(bitIndex);
+        expandTo(wordIndex);
+
+        words[wordIndex] ^= (1L << bitIndex);
+
+        recalculateWordsInUse();
+        checkInvariants();
+    }
+
+    /**
+     * Sets each bit from the specified {@code fromIndex} (inclusive) to the
+     * specified {@code toIndex} (exclusive) to the complement of its current
+     * value.
+     *
+     * @param  fromIndex index of the first bit to flip
+     * @param  toIndex index after the last bit to flip
+     * @throws IndexOutOfBoundsException if {@code fromIndex} is negative,
+     *         or {@code toIndex} is negative, or {@code fromIndex} is
+     *         larger than {@code toIndex}
+     * @since  1.4
+     */
+    public void flip(int fromIndex, int toIndex) {
+        checkRange(fromIndex, toIndex);
+
+        if (fromIndex == toIndex)
+            return;
+
+        int startWordIndex = wordIndex(fromIndex);
+        int endWordIndex   = wordIndex(toIndex - 1);
+        expandTo(endWordIndex);
+
+        long firstWordMask = WORD_MASK << fromIndex;
+        long lastWordMask  = WORD_MASK >>> -toIndex;
+        if (startWordIndex == endWordIndex) {
+            // Case 1: One word
+            words[startWordIndex] ^= (firstWordMask & lastWordMask);
+        } else {
+            // Case 2: Multiple words
+            // Handle first word
+            words[startWordIndex] ^= firstWordMask;
+
+            // Handle intermediate words, if any
+            for (int i = startWordIndex+1; i < endWordIndex; i++)
+                words[i] ^= WORD_MASK;
+
+            // Handle last word
+            words[endWordIndex] ^= lastWordMask;
+        }
+
+        recalculateWordsInUse();
+        checkInvariants();
+    }
+
+    /**
+     * Sets the bit at the specified index to {@code true}.
+     *
+     * @param  bitIndex a bit index
+     * @throws IndexOutOfBoundsException if the specified index is negative
+     * @since  JDK1.0
+     */
+    public void set(int bitIndex) {
+        if (bitIndex < 0)
+            throw new IndexOutOfBoundsException("bitIndex < 0: " + bitIndex);
+
+        int wordIndex = wordIndex(bitIndex);
+        expandTo(wordIndex);
+
+        words[wordIndex] |= (1L << bitIndex); // Restores invariants
+
+        checkInvariants();
+    }
+
+    /**
+     * Sets the bit at the specified index to the specified value.
+     *
+     * @param  bitIndex a bit index
+     * @param  value a boolean value to set
+     * @throws IndexOutOfBoundsException if the specified index is negative
+     * @since  1.4
+     */
+    public void set(int bitIndex, boolean value) {
+        if (value)
+            set(bitIndex);
+        else
+            clear(bitIndex);
+    }
+
+    /**
+     * Sets the bits from the specified {@code fromIndex} (inclusive) to the
+     * specified {@code toIndex} (exclusive) to {@code true}.
+     *
+     * @param  fromIndex index of the first bit to be set
+     * @param  toIndex index after the last bit to be set
+     * @throws IndexOutOfBoundsException if {@code fromIndex} is negative,
+     *         or {@code toIndex} is negative, or {@code fromIndex} is
+     *         larger than {@code toIndex}
+     * @since  1.4
+     */
+    public void set(int fromIndex, int toIndex) {
+        checkRange(fromIndex, toIndex);
+
+        if (fromIndex == toIndex)
+            return;
+
+        // Increase capacity if necessary
+        int startWordIndex = wordIndex(fromIndex);
+        int endWordIndex   = wordIndex(toIndex - 1);
+        expandTo(endWordIndex);
+
+        long firstWordMask = WORD_MASK << fromIndex;
+        long lastWordMask  = WORD_MASK >>> -toIndex;
+        if (startWordIndex == endWordIndex) {
+            // Case 1: One word
+            words[startWordIndex] |= (firstWordMask & lastWordMask);
+        } else {
+            // Case 2: Multiple words
+            // Handle first word
+            words[startWordIndex] |= firstWordMask;
+
+            // Handle intermediate words, if any
+            for (int i = startWordIndex+1; i < endWordIndex; i++)
+                words[i] = WORD_MASK;
+
+            // Handle last word (restores invariants)
+            words[endWordIndex] |= lastWordMask;
+        }
+
+        checkInvariants();
+    }
+
+    /**
+     * Sets the bits from the specified {@code fromIndex} (inclusive) to the
+     * specified {@code toIndex} (exclusive) to the specified value.
+     *
+     * @param  fromIndex index of the first bit to be set
+     * @param  toIndex index after the last bit to be set
+     * @param  value value to set the selected bits to
+     * @throws IndexOutOfBoundsException if {@code fromIndex} is negative,
+     *         or {@code toIndex} is negative, or {@code fromIndex} is
+     *         larger than {@code toIndex}
+     * @since  1.4
+     */
+    public void set(int fromIndex, int toIndex, boolean value) {
+        if (value)
+            set(fromIndex, toIndex);
+        else
+            clear(fromIndex, toIndex);
+    }
+
+    /**
+     * Sets the bit specified by the index to {@code false}.
+     *
+     * @param  bitIndex the index of the bit to be cleared
+     * @throws IndexOutOfBoundsException if the specified index is negative
+     * @since  JDK1.0
+     */
+    public void clear(int bitIndex) {
+        if (bitIndex < 0)
+            throw new IndexOutOfBoundsException("bitIndex < 0: " + bitIndex);
+
+        int wordIndex = wordIndex(bitIndex);
+        if (wordIndex >= wordsInUse)
+            return;
+
+        words[wordIndex] &= ~(1L << bitIndex);
+
+        recalculateWordsInUse();
+        checkInvariants();
+    }
+
+    /**
+     * Sets the bits from the specified {@code fromIndex} (inclusive) to the
+     * specified {@code toIndex} (exclusive) to {@code false}.
+     *
+     * @param  fromIndex index of the first bit to be cleared
+     * @param  toIndex index after the last bit to be cleared
+     * @throws IndexOutOfBoundsException if {@code fromIndex} is negative,
+     *         or {@code toIndex} is negative, or {@code fromIndex} is
+     *         larger than {@code toIndex}
+     * @since  1.4
+     */
+    public void clear(int fromIndex, int toIndex) {
+        checkRange(fromIndex, toIndex);
+
+        if (fromIndex == toIndex)
+            return;
+
+        int startWordIndex = wordIndex(fromIndex);
+        if (startWordIndex >= wordsInUse)
+            return;
+
+        int endWordIndex = wordIndex(toIndex - 1);
+        if (endWordIndex >= wordsInUse) {
+            toIndex = length();
+            endWordIndex = wordsInUse - 1;
+        }
+
+        long firstWordMask = WORD_MASK << fromIndex;
+        long lastWordMask  = WORD_MASK >>> -toIndex;
+        if (startWordIndex == endWordIndex) {
+            // Case 1: One word
+            words[startWordIndex] &= ~(firstWordMask & lastWordMask);
+        } else {
+            // Case 2: Multiple words
+            // Handle first word
+            words[startWordIndex] &= ~firstWordMask;
+
+            // Handle intermediate words, if any
+            for (int i = startWordIndex+1; i < endWordIndex; i++)
+                words[i] = 0;
+
+            // Handle last word
+            words[endWordIndex] &= ~lastWordMask;
+        }
+
+        recalculateWordsInUse();
+        checkInvariants();
+    }
+
+    /**
+     * Sets all of the bits in this BitSet to {@code false}.
+     *
+     * @since 1.4
+     */
+    public void clear() {
+        while (wordsInUse > 0)
+            words[--wordsInUse] = 0;
+    }
+
+    /**
+     * Returns the value of the bit with the specified index. The value
+     * is {@code true} if the bit with the index {@code bitIndex}
+     * is currently set in this {@code BitSet}; otherwise, the result
+     * is {@code false}.
+     *
+     * @param  bitIndex   the bit index
+     * @return the value of the bit with the specified index
+     * @throws IndexOutOfBoundsException if the specified index is negative
+     */
+    public boolean get(int bitIndex) {
+        if (bitIndex < 0)
+            throw new IndexOutOfBoundsException("bitIndex < 0: " + bitIndex);
+
+        checkInvariants();
+
+        int wordIndex = wordIndex(bitIndex);
+        return (wordIndex < wordsInUse)
+            && ((words[wordIndex] & (1L << bitIndex)) != 0);
+    }
+
+    /**
+     * Returns a new {@code BitSet} composed of bits from this {@code BitSet}
+     * from {@code fromIndex} (inclusive) to {@code toIndex} (exclusive).
+     *
+     * @param  fromIndex index of the first bit to include
+     * @param  toIndex index after the last bit to include
+     * @return a new {@code BitSet} from a range of this {@code BitSet}
+     * @throws IndexOutOfBoundsException if {@code fromIndex} is negative,
+     *         or {@code toIndex} is negative, or {@code fromIndex} is
+     *         larger than {@code toIndex}
+     * @since  1.4
+     */
+    public BitSetRecyclable get(int fromIndex, int toIndex) {
+        checkRange(fromIndex, toIndex);
+
+        checkInvariants();
+
+        int len = length();
+
+        // If no set bits in range return empty bitset
+        if (len <= fromIndex || fromIndex == toIndex)
+            return new BitSetRecyclable(0);
+
+        // An optimization
+        if (toIndex > len)
+            toIndex = len;
+
+        BitSetRecyclable result = new BitSetRecyclable(toIndex - fromIndex);
+        int targetWords = wordIndex(toIndex - fromIndex - 1) + 1;
+        int sourceIndex = wordIndex(fromIndex);
+        boolean wordAligned = ((fromIndex & BIT_INDEX_MASK) == 0);
+
+        // Process all words but the last word
+        for (int i = 0; i < targetWords - 1; i++, sourceIndex++)
+            result.words[i] = wordAligned ? words[sourceIndex] :
+                (words[sourceIndex] >>> fromIndex) |
+                    (words[sourceIndex+1] << -fromIndex);
+
+        // Process the last word
+        long lastWordMask = WORD_MASK >>> -toIndex;
+        result.words[targetWords - 1] =
+            ((toIndex-1) & BIT_INDEX_MASK) < (fromIndex & BIT_INDEX_MASK)
+                ? /* straddles source words */
+                ((words[sourceIndex] >>> fromIndex) |
+                    (words[sourceIndex+1] & lastWordMask) << -fromIndex)
+                :
+                ((words[sourceIndex] & lastWordMask) >>> fromIndex);
+
+        // Set wordsInUse correctly
+        result.wordsInUse = targetWords;
+        result.recalculateWordsInUse();
+        result.checkInvariants();
+
+        return result;
+    }
+
+    /**
+     * Returns the index of the first bit that is set to {@code true}
+     * that occurs on or after the specified starting index. If no such
+     * bit exists then {@code -1} is returned.
+     *
+     * <p>To iterate over the {@code true} bits in a {@code BitSet},
+     * use the following loop:
+     *
+     *  <pre> {@code
+     * for (int i = bs.nextSetBit(0); i >= 0; i = bs.nextSetBit(i+1)) {
+     *     // operate on index i here
+     *     if (i == Integer.MAX_VALUE) {
+     *         break; // or (i+1) would overflow
+     *     }
+     * }}</pre>
+     *
+     * @param  fromIndex the index to start checking from (inclusive)
+     * @return the index of the next set bit, or {@code -1} if there
+     *         is no such bit
+     * @throws IndexOutOfBoundsException if the specified index is negative
+     * @since  1.4
+     */
+    public int nextSetBit(int fromIndex) {
+        if (fromIndex < 0)
+            throw new IndexOutOfBoundsException("fromIndex < 0: " + fromIndex);
+
+        checkInvariants();
+
+        int u = wordIndex(fromIndex);
+        if (u >= wordsInUse)
+            return -1;
+
+        long word = words[u] & (WORD_MASK << fromIndex);
+
+        while (true) {
+            if (word != 0)
+                return (u * BITS_PER_WORD) + Long.numberOfTrailingZeros(word);
+            if (++u == wordsInUse)
+                return -1;
+            word = words[u];
+        }
+    }
+
+    /**
+     * Returns the index of the first bit that is set to {@code false}
+     * that occurs on or after the specified starting index.
+     *
+     * @param  fromIndex the index to start checking from (inclusive)
+     * @return the index of the next clear bit
+     * @throws IndexOutOfBoundsException if the specified index is negative
+     * @since  1.4
+     */
+    public int nextClearBit(int fromIndex) {
+        // Neither spec nor implementation handle bitsets of maximal length.
+        // See 4816253.
+        if (fromIndex < 0)
+            throw new IndexOutOfBoundsException("fromIndex < 0: " + fromIndex);
+
+        checkInvariants();
+
+        int u = wordIndex(fromIndex);
+        if (u >= wordsInUse)
+            return fromIndex;
+
+        long word = ~words[u] & (WORD_MASK << fromIndex);
+
+        while (true) {
+            if (word != 0)
+                return (u * BITS_PER_WORD) + Long.numberOfTrailingZeros(word);
+            if (++u == wordsInUse)
+                return wordsInUse * BITS_PER_WORD;
+            word = ~words[u];
+        }
+    }
+
+    /**
+     * Returns the index of the nearest bit that is set to {@code true}
+     * that occurs on or before the specified starting index.
+     * If no such bit exists, or if {@code -1} is given as the
+     * starting index, then {@code -1} is returned.
+     *
+     * <p>To iterate over the {@code true} bits in a {@code BitSet},
+     * use the following loop:
+     *
+     *  <pre> {@code
+     * for (int i = bs.length(); (i = bs.previousSetBit(i-1)) >= 0; ) {
+     *     // operate on index i here
+     * }}</pre>
+     *
+     * @param  fromIndex the index to start checking from (inclusive)
+     * @return the index of the previous set bit, or {@code -1} if there
+     *         is no such bit
+     * @throws IndexOutOfBoundsException if the specified index is less
+     *         than {@code -1}
+     * @since  1.7
+     */
+    public int previousSetBit(int fromIndex) {
+        if (fromIndex < 0) {
+            if (fromIndex == -1)
+                return -1;
+            throw new IndexOutOfBoundsException(
+                "fromIndex < -1: " + fromIndex);
+        }
+
+        checkInvariants();
+
+        int u = wordIndex(fromIndex);
+        if (u >= wordsInUse)
+            return length() - 1;
+
+        long word = words[u] & (WORD_MASK >>> -(fromIndex+1));
+
+        while (true) {
+            if (word != 0)
+                return (u+1) * BITS_PER_WORD - 1 - Long.numberOfLeadingZeros(word);
+            if (u-- == 0)
+                return -1;
+            word = words[u];
+        }
+    }
+
+    /**
+     * Returns the index of the nearest bit that is set to {@code false}
+     * that occurs on or before the specified starting index.
+     * If no such bit exists, or if {@code -1} is given as the
+     * starting index, then {@code -1} is returned.
+     *
+     * @param  fromIndex the index to start checking from (inclusive)
+     * @return the index of the previous clear bit, or {@code -1} if there
+     *         is no such bit
+     * @throws IndexOutOfBoundsException if the specified index is less
+     *         than {@code -1}
+     * @since  1.7
+     */
+    public int previousClearBit(int fromIndex) {
+        if (fromIndex < 0) {
+            if (fromIndex == -1)
+                return -1;
+            throw new IndexOutOfBoundsException(
+                "fromIndex < -1: " + fromIndex);
+        }
+
+        checkInvariants();
+
+        int u = wordIndex(fromIndex);
+        if (u >= wordsInUse)
+            return fromIndex;
+
+        long word = ~words[u] & (WORD_MASK >>> -(fromIndex+1));
+
+        while (true) {
+            if (word != 0)
+                return (u+1) * BITS_PER_WORD -1 - Long.numberOfLeadingZeros(word);
+            if (u-- == 0)
+                return -1;
+            word = ~words[u];
+        }
+    }
+
+    /**
+     * Returns the "logical size" of this {@code BitSet}: the index of
+     * the highest set bit in the {@code BitSet} plus one. Returns zero
+     * if the {@code BitSet} contains no set bits.
+     *
+     * @return the logical size of this {@code BitSet}
+     * @since  1.2
+     */
+    public int length() {
+        if (wordsInUse == 0)
+            return 0;
+
+        return BITS_PER_WORD * (wordsInUse - 1) +
+            (BITS_PER_WORD - Long.numberOfLeadingZeros(words[wordsInUse - 1]));
+    }
+
+    /**
+     * Returns true if this {@code BitSet} contains no bits that are set
+     * to {@code true}.
+     *
+     * @return boolean indicating whether this {@code BitSet} is empty
+     * @since  1.4
+     */
+    public boolean isEmpty() {
+        return wordsInUse == 0;
+    }
+
+    /**
+     * Returns true if the specified {@code BitSet} has any bits set to
+     * {@code true} that are also set to {@code true} in this {@code BitSet}.
+     *
+     * @param  set {@code BitSet} to intersect with
+     * @return boolean indicating whether this {@code BitSet} intersects
+     *         the specified {@code BitSet}
+     * @since  1.4
+     */
+    public boolean intersects(BitSetRecyclable set) {
+        for (int i = Math.min(wordsInUse, set.wordsInUse) - 1; i >= 0; i--)
+            if ((words[i] & set.words[i]) != 0)
+                return true;
+        return false;
+    }
+
+    /**
+     * Returns the number of bits set to {@code true} in this {@code BitSet}.
+     *
+     * @return the number of bits set to {@code true} in this {@code BitSet}
+     * @since  1.4
+     */
+    public int cardinality() {
+        int sum = 0;
+        for (int i = 0; i < wordsInUse; i++)
+            sum += Long.bitCount(words[i]);
+        return sum;
+    }
+
+    /**
+     * Performs a logical <b>AND</b> of this target bit set with the
+     * argument bit set. This bit set is modified so that each bit in it
+     * has the value {@code true} if and only if it both initially
+     * had the value {@code true} and the corresponding bit in the
+     * bit set argument also had the value {@code true}.
+     *
+     * @param set a bit set
+     */
+    public void and(BitSetRecyclable set) {
+        if (this == set)
+            return;
+
+        while (wordsInUse > set.wordsInUse)
+            words[--wordsInUse] = 0;
+
+        // Perform logical AND on words in common
+        for (int i = 0; i < wordsInUse; i++)
+            words[i] &= set.words[i];
+
+        recalculateWordsInUse();
+        checkInvariants();
+    }
+
+    /**
+     * Performs a logical <b>OR</b> of this bit set with the bit set
+     * argument. This bit set is modified so that a bit in it has the
+     * value {@code true} if and only if it either already had the
+     * value {@code true} or the corresponding bit in the bit set
+     * argument has the value {@code true}.
+     *
+     * @param set a bit set
+     */
+    public void or(BitSetRecyclable set) {
+        if (this == set)
+            return;
+
+        int wordsInCommon = Math.min(wordsInUse, set.wordsInUse);
+
+        if (wordsInUse < set.wordsInUse) {
+            ensureCapacity(set.wordsInUse);
+            wordsInUse = set.wordsInUse;
+        }
+
+        // Perform logical OR on words in common
+        for (int i = 0; i < wordsInCommon; i++)
+            words[i] |= set.words[i];
+
+        // Copy any remaining words
+        if (wordsInCommon < set.wordsInUse)
+            System.arraycopy(set.words, wordsInCommon,
+                words, wordsInCommon,
+                wordsInUse - wordsInCommon);
+
+        // recalculateWordsInUse() is unnecessary
+        checkInvariants();
+    }
+
+    /**
+     * Performs a logical <b>XOR</b> of this bit set with the bit set
+     * argument. This bit set is modified so that a bit in it has the
+     * value {@code true} if and only if one of the following
+     * statements holds:
+     * <ul>
+     * <li>The bit initially has the value {@code true}, and the
+     *     corresponding bit in the argument has the value {@code false}.
+     * <li>The bit initially has the value {@code false}, and the
+     *     corresponding bit in the argument has the value {@code true}.
+     * </ul>
+     *
+     * @param  set a bit set
+     */
+    public void xor(BitSetRecyclable set) {
+        int wordsInCommon = Math.min(wordsInUse, set.wordsInUse);
+
+        if (wordsInUse < set.wordsInUse) {
+            ensureCapacity(set.wordsInUse);
+            wordsInUse = set.wordsInUse;
+        }
+
+        // Perform logical XOR on words in common
+        for (int i = 0; i < wordsInCommon; i++)
+            words[i] ^= set.words[i];
+
+        // Copy any remaining words
+        if (wordsInCommon < set.wordsInUse)
+            System.arraycopy(set.words, wordsInCommon,
+                words, wordsInCommon,
+                set.wordsInUse - wordsInCommon);
+
+        recalculateWordsInUse();
+        checkInvariants();
+    }
+
+    /**
+     * Clears all of the bits in this {@code BitSet} whose corresponding
+     * bit is set in the specified {@code BitSet}.
+     *
+     * @param  set the {@code BitSet} with which to mask this
+     *         {@code BitSet}
+     * @since  1.2
+     */
+    public void andNot(BitSetRecyclable set) {
+        // Perform logical (a & !b) on words in common
+        for (int i = Math.min(wordsInUse, set.wordsInUse) - 1; i >= 0; i--)
+            words[i] &= ~set.words[i];
+
+        recalculateWordsInUse();
+        checkInvariants();
+    }
+
+    /**
+     * Returns the hash code value for this bit set. The hash code depends
+     * only on which bits are set within this {@code BitSet}.
+     *
+     * <p>The hash code is defined to be the result of the following
+     * calculation:
+     *  <pre> {@code
+     * public int hashCode() {
+     *     long h = 1234;
+     *     long[] words = toLongArray();
+     *     for (int i = words.length; --i >= 0; )
+     *         h ^= words[i] * (i + 1);
+     *     return (int)((h >> 32) ^ h);
+     * }}</pre>
+     * Note that the hash code changes if the set of bits is altered.
+     *
+     * @return the hash code value for this bit set
+     */
+    public int hashCode() {
+        long h = 1234;
+        for (int i = wordsInUse; --i >= 0; )
+            h ^= words[i] * (i + 1);
+
+        return (int)((h >> 32) ^ h);
+    }
+
+    /**
+     * Returns the number of bits of space actually in use by this
+     * {@code BitSet} to represent bit values.
+     * The maximum element in the set is the size - 1st element.
+     *
+     * @return the number of bits currently in this bit set
+     */
+    public int size() {
+        return words.length * BITS_PER_WORD;
+    }
+
+    /**
+     * Compares this object against the specified object.
+     * The result is {@code true} if and only if the argument is
+     * not {@code null} and is a {@code Bitset} object that has
+     * exactly the same set of bits set to {@code true} as this bit
+     * set. That is, for every nonnegative {@code int} index {@code k},
+     * <pre>((BitSet)obj).get(k) == this.get(k)</pre>
+     * must be true. The current sizes of the two bit sets are not compared.
+     *
+     * @param  obj the object to compare with
+     * @return {@code true} if the objects are the same;
+     *         {@code false} otherwise
+     * @see    #size()
+     */
+    public boolean equals(Object obj) {
+        if (!(obj instanceof BitSetRecyclable))
+            return false;
+        if (this == obj)
+            return true;
+
+        BitSetRecyclable set = (BitSetRecyclable) obj;
+
+        checkInvariants();
+        set.checkInvariants();
+
+        if (wordsInUse != set.wordsInUse)
+            return false;
+
+        // Check words in use by both BitSets
+        for (int i = 0; i < wordsInUse; i++)
+            if (words[i] != set.words[i])
+                return false;
+
+        return true;
+    }
+
+    /**
+     * Cloning this {@code BitSet} produces a new {@code BitSet}
+     * that is equal to it.
+     * The clone of the bit set is another bit set that has exactly the
+     * same bits set to {@code true} as this bit set.
+     *
+     * @return a clone of this bit set
+     * @see    #size()
+     */
+    public Object clone() {
+        if (! sizeIsSticky)
+            trimToSize();
+
+        try {
+            BitSetRecyclable result = (BitSetRecyclable) super.clone();
+            result.words = words.clone();
+            result.checkInvariants();
+            return result;
+        } catch (CloneNotSupportedException e) {
+            throw new InternalError(e);
+        }
+    }
+
+    /**
+     * Attempts to reduce internal storage used for the bits in this bit set.
+     * Calling this method may, but is not required to, affect the value
+     * returned by a subsequent call to the {@link #size()} method.
+     */
+    private void trimToSize() {
+        if (wordsInUse != words.length) {
+            words = Arrays.copyOf(words, wordsInUse);
+            checkInvariants();
+        }
+    }
+
+    /**
+     * Save the state of the {@code BitSet} instance to a stream (i.e.,
+     * serialize it).
+     */
+    private void writeObject(ObjectOutputStream s)
+        throws IOException {
+
+        checkInvariants();
+
+        if (! sizeIsSticky)
+            trimToSize();
+
+        ObjectOutputStream.PutField fields = s.putFields();
+        fields.put("bits", words);
+        s.writeFields();
+    }
+
+    /**
+     * Reconstitute the {@code BitSet} instance from a stream (i.e.,
+     * deserialize it).
+     */
+    private void readObject(ObjectInputStream s)
+        throws IOException, ClassNotFoundException {
+
+        ObjectInputStream.GetField fields = s.readFields();
+        words = (long[]) fields.get("bits", null);
+
+        // Assume maximum length then find real length
+        // because recalculateWordsInUse assumes maintenance
+        // or reduction in logical size
+        wordsInUse = words.length;
+        recalculateWordsInUse();
+        sizeIsSticky = (words.length > 0 && words[words.length-1] == 0L); // heuristic
+        checkInvariants();
+    }
+
+    /**
+     * Returns a string representation of this bit set. For every index
+     * for which this {@code BitSet} contains a bit in the set
+     * state, the decimal representation of that index is included in
+     * the result. Such indices are listed in order from lowest to
+     * highest, separated by ",&nbsp;" (a comma and a space) and
+     * surrounded by braces, resulting in the usual mathematical
+     * notation for a set of integers.
+     *
+     * <p>Example:
+     * <pre>
+     * BitSet drPepper = new BitSet();</pre>
+     * Now {@code drPepper.toString()} returns "{@code {}}".
+     * <pre>
+     * drPepper.set(2);</pre>
+     * Now {@code drPepper.toString()} returns "{@code {2}}".
+     * <pre>
+     * drPepper.set(4);
+     * drPepper.set(10);</pre>
+     * Now {@code drPepper.toString()} returns "{@code {2, 4, 10}}".
+     *
+     * @return a string representation of this bit set
+     */
+    public String toString() {
+        checkInvariants();
+
+        int numBits = (wordsInUse > 128) ?
+            cardinality() : wordsInUse * BITS_PER_WORD;
+        StringBuilder b = new StringBuilder(6*numBits + 2);
+        b.append('{');
+
+        int i = nextSetBit(0);
+        if (i != -1) {
+            b.append(i);
+            while (true) {
+                if (++i < 0) break;
+                if ((i = nextSetBit(i)) < 0) break;
+                int endOfRun = nextClearBit(i);
+                do { b.append(", ").append(i); }
+                while (++i != endOfRun);
+            }
+        }
+
+        b.append('}');
+        return b.toString();
+    }
+
+    public BitSetRecyclable resetWords(long[] words) {
+        int n;
+        for (n = words.length; n > 0 && words[n - 1] == 0; n--)
+            ;
+        long[] longs = Arrays.copyOf(words, n);
+        this.words = longs;
+        this.wordsInUse = longs.length;
+        checkInvariants();
+        return this;
+    }
+
+    private Handle<BitSetRecyclable> recyclerHandle = null;
+
+    private static final Recycler<BitSetRecyclable> RECYCLER = new Recycler<BitSetRecyclable>() {
+        protected BitSetRecyclable newObject(Recycler.Handle<BitSetRecyclable> recyclerHandle) {
+            return new BitSetRecyclable(recyclerHandle);
+        }
+    };
+
+    private BitSetRecyclable(Handle<BitSetRecyclable> recyclerHandle) {
+        this();
+        this.recyclerHandle = recyclerHandle;
+    }
+
+    public static BitSetRecyclable create() {
+        return RECYCLER.get();
+    }
+
+    public void recycle() {
+        if (recyclerHandle != null) {
+            this.clear();
+            recyclerHandle.recycle(this);
+        }
+    }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java
index 8f80d03..24decbc 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java
@@ -29,6 +29,10 @@ public class ConcurrentBitSet extends BitSet {
     private static final long serialVersionUID = 1L;
     private final StampedLock rwLock = new StampedLock();
 
+    public ConcurrentBitSet() {
+        super();
+    }
+
     /**
      * Creates a bit set whose initial size is large enough to explicitly represent bits with indices in the range
      * {@code 0} through {@code nbits-1}. All bits are initially {@code false}.
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java
new file mode 100644
index 0000000..8e787c1
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+
+/**
+ * Safe multithreaded version of {@code BitSet} and leverage netty recycler.
+ */
+public class ConcurrentBitSetRecyclable extends ConcurrentBitSet {
+
+    private final Handle<ConcurrentBitSetRecyclable> recyclerHandle;
+
+    private static final Recycler<ConcurrentBitSetRecyclable> RECYCLER = new Recycler<ConcurrentBitSetRecyclable>() {
+        protected ConcurrentBitSetRecyclable newObject(Handle<ConcurrentBitSetRecyclable> recyclerHandle) {
+            return new ConcurrentBitSetRecyclable(recyclerHandle);
+        }
+    };
+
+    private ConcurrentBitSetRecyclable(Handle<ConcurrentBitSetRecyclable> recyclerHandle) {
+        super();
+        this.recyclerHandle = recyclerHandle;
+    }
+
+    public static ConcurrentBitSetRecyclable create() {
+        return RECYCLER.get();
+    }
+
+    public void recycle() {
+        this.clear();
+        recyclerHandle.recycle(this);
+    }
+}
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index b4b7bd9..a0aa8a7 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -54,6 +54,7 @@ message MessageIdData {
     required uint64 entryId  = 2;
     optional int32 partition = 3 [default = -1];
     optional int32 batch_index = 4 [default = -1];
+    repeated int64 ack_set = 5;
 }
 
 message KeyValue {
@@ -458,6 +459,7 @@ message CommandMessage {
     required uint64 consumer_id       = 1;
     required MessageIdData message_id = 2;
     optional uint32 redelivery_count  = 3 [default = 0];
+    repeated int64 ack_set = 4;
 }
 
 message CommandAck {
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/BitSetRecyclableRecyclableTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/BitSetRecyclableRecyclableTest.java
new file mode 100644
index 0000000..0f42f35
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/BitSetRecyclableRecyclableTest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class BitSetRecyclableRecyclableTest {
+
+    @Test
+    public void testRecycle() {
+        BitSetRecyclable bitset1 = BitSetRecyclable.create();
+        bitset1.set(3);
+        bitset1.recycle();
+        BitSetRecyclable bitset2 = BitSetRecyclable.create();
+        BitSetRecyclable bitset3 = BitSetRecyclable.create();
+        Assert.assertSame(bitset2, bitset1);
+        Assert.assertFalse(bitset2.get(3));
+        Assert.assertNotSame(bitset3, bitset1);
+    }
+
+    @Test
+    public void testResetWords() {
+        BitSetRecyclable bitset1 = BitSetRecyclable.create();
+        BitSetRecyclable bitset2 = BitSetRecyclable.create();
+        bitset1.set(256);
+        bitset2.set(128);
+        bitset1.resetWords(bitset2.toLongArray());
+        Assert.assertTrue(bitset1.get(128));
+        Assert.assertFalse(bitset1.get(256));
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclableTest.java
similarity index 55%
copy from pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
copy to pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclableTest.java
index 93600ec..b037c70 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclableTest.java
@@ -16,25 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.impl;
+package org.apache.pulsar.common.util.collections;
 
-import java.util.Map;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.testng.Assert;
+import org.testng.annotations.Test;
 
-/**
- * Acknowledgments grouping tracker.
- */
-public interface AcknowledgmentsGroupingTracker extends AutoCloseable {
-
-    boolean isDuplicate(MessageId messageId);
-
-    void addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map<String, Long> properties);
-
-    void flush();
-
-    @Override
-    void close();
+public class ConcurrentBitSetRecyclableTest {
 
-    void flushAndClean();
+    @Test
+    public void testRecycle() {
+        ConcurrentBitSetRecyclable bitset1 = ConcurrentBitSetRecyclable.create();
+        bitset1.set(3);
+        bitset1.recycle();
+        ConcurrentBitSetRecyclable bitset2 = ConcurrentBitSetRecyclable.create();
+        ConcurrentBitSetRecyclable bitset3 = ConcurrentBitSetRecyclable.create();
+        Assert.assertSame(bitset2, bitset1);
+        Assert.assertFalse(bitset2.get(3));
+        Assert.assertNotSame(bitset3, bitset1);
+    }
 }