You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/03/28 06:37:30 UTC

[GitHub] merlimat closed pull request #1424: Improve batch message acking by removing batch message tracker

merlimat closed pull request #1424: Improve batch message acking by removing batch message tracker
URL: https://github.com/apache/incubator-pulsar/pull/1424
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
index 2d7c7aea4..533ce8507 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
@@ -42,7 +43,6 @@
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -434,7 +434,7 @@ public void testOutOfOrderAcksForBatchMessage() throws Exception {
         Message<byte[]> lastunackedMsg = null;
         for (int i = 0; i < numMsgs; i++) {
             Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
-            LOG.info("received message {}", String.valueOf(msg.getData()));
+            LOG.info("received message {}", new String(msg.getData(), UTF_8));
             assertNotNull(msg);
             if (i == 8) {
                 consumer.acknowledgeCumulative(msg);
@@ -514,7 +514,6 @@ public void testNonBatchCumulativeAckAfterBatchPublish() throws Exception {
         Thread.sleep(100);
         rolloverPerIntervalStats();
         assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), 0);
-        assertTrue(((ConsumerImpl<byte[]>) consumer).isBatchingAckTrackerEmpty());
         consumer.close();
         producer.close();
         noBatchProducer.close();
@@ -574,7 +573,6 @@ public void testBatchAndNonBatchCumulativeAcks() throws Exception {
         }
         Thread.sleep(100);
         assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), 0);
-        assertTrue(((ConsumerImpl<byte[]>) consumer).isBatchingAckTrackerEmpty());
         consumer.close();
         producer.close();
         noBatchProducer.close();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
new file mode 100644
index 000000000..75e50aa2f
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.BitSet;
+
+class BatchMessageAcker {
+
+    static BatchMessageAcker newAcker(int batchSize) {
+        BitSet bitSet = new BitSet(batchSize);
+        bitSet.set(0, batchSize);
+        return new BatchMessageAcker(bitSet, batchSize);
+    }
+
+    // bitset shared across messages in the same batch.
+    private final int batchSize;
+    private final BitSet bitSet;
+    private boolean prevBatchCumulativelyAcked = false;
+
+    BatchMessageAcker(BitSet bitSet, int batchSize) {
+        this.bitSet = bitSet;
+        this.batchSize = batchSize;
+    }
+
+    @VisibleForTesting
+    BitSet getBitSet() {
+        return bitSet;
+    }
+
+    public synchronized int getBatchSize() {
+        return batchSize;
+    }
+
+    public synchronized boolean ackIndividual(int batchIndex) {
+        bitSet.clear(batchIndex);
+        return bitSet.isEmpty();
+    }
+
+    public synchronized boolean ackCumulative(int batchIndex) {
+        // +1 since to argument is exclusive
+        bitSet.clear(0, batchIndex + 1);
+        return bitSet.isEmpty();
+    }
+
+    // debug purpose
+    public synchronized int getOutstandingAcks() {
+        return bitSet.cardinality();
+    }
+
+    public void setPrevBatchCumulativelyAcked(boolean acked) {
+        this.prevBatchCumulativelyAcked = acked;
+    }
+
+    public boolean isPrevBatchCumulativelyAcked() {
+        return prevBatchCumulativelyAcked;
+    }
+
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabled.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabled.java
new file mode 100644
index 000000000..a521d6344
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabled.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+class BatchMessageAckerDisabled extends BatchMessageAcker {
+
+    static final BatchMessageAckerDisabled INSTANCE = new BatchMessageAckerDisabled();
+
+    private BatchMessageAckerDisabled() {
+        super(null, 0);
+    }
+
+    @Override
+    public synchronized int getBatchSize() {
+        return 0;
+    }
+
+    @Override
+    public boolean ackIndividual(int batchIndex) {
+        return true;
+    }
+
+    @Override
+    public boolean ackCumulative(int batchIndex) {
+        return true;
+    }
+
+    @Override
+    public int getOutstandingAcks() {
+        return 0;
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
index f7c5bddb5..d87a6ab91 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
@@ -27,17 +27,27 @@
     private final static int NO_BATCH = -1;
     private final int batchIndex;
 
+    private final BatchMessageAcker acker;
+
     public BatchMessageIdImpl(long ledgerId, long entryId, int partitionIndex, int batchIndex) {
+        this(ledgerId, entryId, partitionIndex, batchIndex, BatchMessageAckerDisabled.INSTANCE);
+    }
+
+    public BatchMessageIdImpl(long ledgerId, long entryId, int partitionIndex, int batchIndex, BatchMessageAcker acker) {
         super(ledgerId, entryId, partitionIndex);
         this.batchIndex = batchIndex;
+        this.acker = acker;
     }
 
     public BatchMessageIdImpl(MessageIdImpl other) {
         super(other.ledgerId, other.entryId, other.partitionIndex);
         if (other instanceof BatchMessageIdImpl) {
-            this.batchIndex = ((BatchMessageIdImpl) other).batchIndex;
+            BatchMessageIdImpl otherId = (BatchMessageIdImpl) other;
+            this.batchIndex = otherId.batchIndex;
+            this.acker = otherId.acker;
         } else {
             this.batchIndex = NO_BATCH;
+            this.acker = BatchMessageAckerDisabled.INSTANCE;
         }
     }
 
@@ -95,4 +105,30 @@ public String toString() {
     public byte[] toByteArray() {
         return toByteArray(batchIndex);
     }
+
+    public boolean ackIndividual() {
+        return acker.ackIndividual(batchIndex);
+    }
+
+    public boolean ackCumulative() {
+        return acker.ackCumulative(batchIndex);
+    }
+
+    public int getOutstandingAcksInSameBatch() {
+        return acker.getOutstandingAcks();
+    }
+
+    public int getBatchSize() {
+        return acker.getBatchSize();
+    }
+
+    public MessageIdImpl prevBatchMessageId() {
+        return new MessageIdImpl(
+            ledgerId, entryId - 1, partitionIndex);
+    }
+
+    public BatchMessageAcker getAcker() {
+        return acker;
+    }
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 46709d20a..c2154315c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -27,17 +27,13 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.BitSet;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.NavigableMap;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
@@ -110,7 +106,6 @@
     private final ReadWriteLock zeroQueueLock;
 
     private final UnAckedMessageTracker unAckedMessageTracker;
-    private final ConcurrentNavigableMap<MessageIdImpl, BitSet> batchMessageAckTracker;
 
     protected final ConsumerStatsRecorder stats;
     private final int priorityLevel;
@@ -155,7 +150,6 @@
         this.receiverQueueRefillThreshold = conf.getReceiverQueueSize() / 2;
         this.codecProvider = new CompressionCodecProvider();
         this.priorityLevel = conf.getPriorityLevel();
-        this.batchMessageAckTracker = new ConcurrentSkipListMap<>();
         this.readCompacted = conf.isReadCompacted();
         this.subscriptionInitialPosition = conf.getSubscriptionInitialPosition();
 
@@ -219,7 +213,6 @@ public UnAckedMessageTracker getUnAckedMessageTracker() {
             cnx.sendRequestWithId(unsubscribe, requestId).thenRun(() -> {
                 cnx.removeConsumer(consumerId);
                 log.info("[{}][{}] Successfully unsubscribed from topic", topic, subscription);
-                batchMessageAckTracker.clear();
                 unAckedMessageTracker.close();
                 unsubscribeFuture.complete(null);
                 setState(State.Closed);
@@ -355,75 +348,26 @@ public UnAckedMessageTracker getUnAckedMessageTracker() {
         }
     }
 
-    // we may not be able to ack message being acked by client. However messages in prior
-    // batch may be ackable
-    private void ackMessagesInEarlierBatch(BatchMessageIdImpl batchMessageId, MessageIdImpl message,
-                                           Map<String,Long> properties) {
-        // get entry before this message and ack that message on broker
-        MessageIdImpl lowerKey = batchMessageAckTracker.lowerKey(message);
-        if (lowerKey != null) {
-            NavigableMap<MessageIdImpl, BitSet> entriesUpto = batchMessageAckTracker.headMap(lowerKey, true);
-            for (Object key : entriesUpto.keySet()) {
-                entriesUpto.remove(key);
-            }
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] [{}] ack prior message {} to broker on cumulative ack for message {}", subscription,
-                        consumerId, lowerKey, batchMessageId);
-            }
-            sendAcknowledge(lowerKey, AckType.Cumulative, properties);
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] [{}] no messages prior to message {}", subscription, consumerId, batchMessageId);
-            }
-        }
-    }
-
     boolean markAckForBatchMessage(BatchMessageIdImpl batchMessageId, AckType ackType,
                                    Map<String,Long> properties) {
-        // we keep track of entire batch and so need MessageIdImpl and cannot use BatchMessageIdImpl
-        MessageIdImpl message = new MessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId(),
-                batchMessageId.getPartitionIndex());
-        BitSet bitSet = batchMessageAckTracker.get(message);
-        if (bitSet == null) {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] [{}] message not found {} for ack {}", subscription, consumerId, batchMessageId,
-                        ackType);
-            }
-            return true;
+        boolean isAllMsgsAcked;
+        if (ackType == AckType.Individual) {
+            isAllMsgsAcked = batchMessageId.ackIndividual();
+        } else {
+            isAllMsgsAcked = batchMessageId.ackCumulative();
         }
-        int batchIndex = batchMessageId.getBatchIndex();
-        // bitset is not thread-safe and requires external synchronization
-        int batchSize = 0;
-        // only used for debug-logging
         int outstandingAcks = 0;
-        boolean isAllMsgsAcked = false;
-        lock.writeLock().lock();
-        try {
-            batchSize = bitSet.length();
-            if (ackType == AckType.Individual) {
-                bitSet.clear(batchIndex);
-            } else {
-                // +1 since to argument is exclusive
-                bitSet.clear(0, batchIndex + 1);
-            }
-            isAllMsgsAcked = bitSet.isEmpty();
-            if (log.isDebugEnabled()) {
-                outstandingAcks = bitSet.cardinality();
-            }
-        } finally {
-            lock.writeLock().unlock();
+        if (log.isDebugEnabled()) {
+            outstandingAcks = batchMessageId.getOutstandingAcksInSameBatch();
         }
 
+        int batchSize = batchMessageId.getBatchSize();
         // all messages in this batch have been acked
         if (isAllMsgsAcked) {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] [{}] can ack message to broker {}, acktype {}, cardinality {}, length {}", subscription,
                         consumerName, batchMessageId, ackType, outstandingAcks, batchSize);
             }
-            if (ackType == AckType.Cumulative) {
-                batchMessageAckTracker.keySet().removeIf(m -> (m.compareTo(message) <= 0));
-            }
-            batchMessageAckTracker.remove(message);
             // increment Acknowledge-msg counter with number of messages in batch only if AckType is Individual.
             // CumulativeAckType is handled while sending ack to broker
             if (ackType == AckType.Individual) {
@@ -431,9 +375,10 @@ boolean markAckForBatchMessage(BatchMessageIdImpl batchMessageId, AckType ackTyp
             }
             return true;
         } else {
-            // we cannot ack this message to broker. but prior message may be ackable
-            if (ackType == AckType.Cumulative) {
-                ackMessagesInEarlierBatch(batchMessageId, message, properties);
+            if (AckType.Cumulative == ackType
+                && !batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) {
+                sendAcknowledge(batchMessageId.prevBatchMessageId(), AckType.Cumulative, properties);
+                batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true);
             }
             if (log.isDebugEnabled()) {
                 log.debug("[{}] [{}] cannot ack message to broker {}, acktype {}, pending acks - {}", subscription,
@@ -443,38 +388,6 @@ boolean markAckForBatchMessage(BatchMessageIdImpl batchMessageId, AckType ackTyp
         return false;
     }
 
-    // if we are consuming a mix of batch and non-batch messages then cumulative ack on non-batch messages
-    // should clean up the ack tracker as well
-    private void updateBatchAckTracker(MessageIdImpl message, AckType ackType) {
-        if (batchMessageAckTracker.isEmpty()) {
-            return;
-        }
-        MessageIdImpl lowerKey = batchMessageAckTracker.lowerKey(message);
-        if (lowerKey != null) {
-            NavigableMap<MessageIdImpl, BitSet> entriesUpto = batchMessageAckTracker.headMap(lowerKey, true);
-            for (Object key : entriesUpto.keySet()) {
-                entriesUpto.remove(key);
-            }
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] [{}] updated batch ack tracker up to message {} on cumulative ack for message {}",
-                        subscription, consumerId, lowerKey, message);
-            }
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] [{}] no messages to clean up prior to message {}", subscription, consumerId, message);
-            }
-        }
-    }
-
-    /**
-     * helper method that returns current state of data structure used to track acks for batch messages
-     *
-     * @return true if all batch messages have been acknowledged
-     */
-    public boolean isBatchingAckTrackerEmpty() {
-        return batchMessageAckTracker.isEmpty();
-    }
-
     @Override
     protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ackType,
                                                     Map<String,Long> properties) {
@@ -496,11 +409,6 @@ public boolean isBatchingAckTrackerEmpty() {
                 return CompletableFuture.completedFuture(null);
             }
         }
-        // if we got a cumulative ack on non batch message, check if any earlier batch messages need to be removed
-        // from batch message tracker
-        if (ackType == AckType.Cumulative && !(messageId instanceof BatchMessageIdImpl)) {
-            updateBatchAckTracker((MessageIdImpl) messageId, ackType);
-        }
         return sendAcknowledge(messageId, ackType, properties);
     }
 
@@ -560,7 +468,6 @@ public void connectionOpened(final ClientCnx cnx) {
             currentSize = incomingMessages.size();
             startMessageId = clearReceiverQueue();
             unAckedMessageTracker.clear();
-            batchMessageAckTracker.clear();
         }
 
         boolean isDurable = subscriptionMode == SubscriptionMode.Durable;
@@ -702,7 +609,6 @@ public void connectionFailed(PulsarClientException exception) {
     @Override
     public CompletableFuture<Void> closeAsync() {
         if (getState() == State.Closing || getState() == State.Closed) {
-            batchMessageAckTracker.clear();
             unAckedMessageTracker.close();
             return CompletableFuture.completedFuture(null);
         }
@@ -710,7 +616,6 @@ public void connectionFailed(PulsarClientException exception) {
         if (!isConnected()) {
             log.info("[{}] [{}] Closed Consumer (not connected)", topic, subscription);
             setState(State.Closed);
-            batchMessageAckTracker.clear();
             unAckedMessageTracker.close();
             client.cleanupConsumer(this);
             return CompletableFuture.completedFuture(null);
@@ -730,7 +635,6 @@ public void connectionFailed(PulsarClientException exception) {
             if (exception == null || !cnx.ctx().channel().isActive()) {
                 log.info("[{}] [{}] Closed consumer", topic, subscription);
                 setState(State.Closed);
-                batchMessageAckTracker.clear();
                 unAckedMessageTracker.close();
                 closeFuture.complete(null);
                 client.cleanupConsumer(this);
@@ -925,15 +829,9 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, ByteBuf unc
         int batchSize = msgMetadata.getNumMessagesInBatch();
 
         // create ack tracker for entry aka batch
-        BitSet bitSet = new BitSet(batchSize);
         MessageIdImpl batchMessage = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(),
                 getPartitionIndex());
-        bitSet.set(0, batchSize);
-        if (log.isDebugEnabled()) {
-            log.debug("[{}] [{}] added bit set for message {}, cardinality {}, length {}", subscription, consumerName,
-                    batchMessage, bitSet.cardinality(), bitSet.length());
-        }
-        batchMessageAckTracker.put(batchMessage, bitSet);
+        BatchMessageAcker acker = BatchMessageAcker.newAcker(batchSize);
         unAckedMessageTracker.add(batchMessage);
 
         int skippedMessages = 0;
@@ -971,7 +869,7 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, ByteBuf unc
                 }
 
                 BatchMessageIdImpl batchMessageIdImpl = new BatchMessageIdImpl(messageId.getLedgerId(),
-                        messageId.getEntryId(), getPartitionIndex(), i);
+                        messageId.getEntryId(), getPartitionIndex(), i, acker);
                 final MessageImpl<T> message = new MessageImpl<>(batchMessageIdImpl, msgMetadata,
                         singleMessageMetadataBuilder.build(), singleMessagePayload, cnx, schema);
                 lock.readLock().lock();
@@ -988,9 +886,7 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, ByteBuf unc
                 singleMessageMetadataBuilder.recycle();
             }
         } catch (IOException e) {
-            //
             log.warn("[{}] [{}] unable to obtain message in batch", subscription, consumerName);
-            batchMessageAckTracker.remove(batchMessage);
             discardCorruptedMessage(messageId, cnx, ValidationError.BatchDeSerializeError);
         }
         if (log.isDebugEnabled()) {
@@ -1192,7 +1088,6 @@ public void redeliverUnacknowledgedMessages() {
                 currentSize = incomingMessages.size();
                 incomingMessages.clear();
                 unAckedMessageTracker.clear();
-                batchMessageAckTracker.clear();
             }
             cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(consumerId), cnx.ctx().voidPromise());
             if (currentSize > 0) {
@@ -1232,7 +1127,6 @@ public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {
             batches.forEach(ids -> {
                 List<MessageIdData> messageIdDatas = ids.stream().map(messageId -> {
                     // attempt to remove message from batchMessageAckTracker
-                    batchMessageAckTracker.remove(messageId);
                     builder.setPartition(messageId.getPartitionIndex());
                     builder.setLedgerId(messageId.getLedgerId());
                     builder.setEntryId(messageId.getEntryId());
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
index 2d8ad4ed8..4bd2fda28 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
@@ -489,15 +489,6 @@ public void seek(MessageId messageId) throws PulsarClientException {
         return FutureUtil.failedFuture(new PulsarClientException("Seek operation not supported on partitioned topics"));
     }
 
-    /**
-     * helper method that returns current state of data structure used to track acks for batch messages
-     *
-     * @return true if all batch messages have been acknowledged
-     */
-    public boolean isBatchingAckTrackerEmpty() {
-        return consumers.stream().allMatch(ConsumerImpl::isBatchingAckTrackerEmpty);
-    }
-
     List<ConsumerImpl<T>> getConsumers() {
         return consumers;
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
index e39de96c1..1089cef53 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
@@ -536,16 +536,6 @@ public void seek(MessageId messageId) throws PulsarClientException {
         return FutureUtil.failedFuture(new PulsarClientException("Seek operation not supported on topics consumer"));
     }
 
-    /**
-     * helper method that returns current state of data structure used to track acks for batch messages
-     *
-     * @return true if all batch messages have been acknowledged
-     */
-    public boolean isBatchingAckTrackerEmpty() {
-        return consumers.values().stream().allMatch(consumer -> consumer.isBatchingAckTrackerEmpty());
-    }
-
-
     @Override
     public int getAvailablePermits() {
         return consumers.values().stream().mapToInt(ConsumerImpl::getAvailablePermits).sum();
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabledTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabledTest.java
new file mode 100644
index 000000000..e32a2ef45
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabledTest.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import org.testng.annotations.Test;
+
+public class BatchMessageAckerDisabledTest {
+
+    @Test
+    public void testAckIndividual() {
+        for (int i = 0; i < 10; i++) {
+            assertTrue(BatchMessageAckerDisabled.INSTANCE.ackIndividual(i));
+        }
+    }
+
+    @Test
+    public void testAckCumulative() {
+        for (int i = 0; i < 10; i++) {
+            assertTrue(BatchMessageAckerDisabled.INSTANCE.ackCumulative(i));
+        }
+    }
+
+    @Test
+    public void testGetOutstandingAcks() {
+        assertEquals(0, BatchMessageAckerDisabled.INSTANCE.getOutstandingAcks());
+    }
+
+}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerTest.java
new file mode 100644
index 000000000..2bfa620d4
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerTest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class BatchMessageAckerTest {
+
+    private static final int BATCH_SIZE = 10;
+
+    private BatchMessageAcker acker;
+
+    @BeforeMethod
+    public void setup() {
+        acker = BatchMessageAcker.newAcker(10);
+    }
+
+    @Test
+    public void testAckers() {
+        assertEquals(BATCH_SIZE, acker.getOutstandingAcks());
+        assertEquals(BATCH_SIZE, acker.getBatchSize());
+
+        assertFalse(acker.ackIndividual(4));
+        for (int i = 0; i < BATCH_SIZE; i++) {
+            if (4 == i) {
+                assertFalse(acker.getBitSet().get(i));
+            } else {
+                assertTrue(acker.getBitSet().get(i));
+            }
+        }
+
+        assertFalse(acker.ackCumulative(6));
+        for (int i = 0; i < BATCH_SIZE; i++) {
+            if (i <= 6) {
+                assertFalse(acker.getBitSet().get(i));
+            } else {
+                assertTrue(acker.getBitSet().get(i));
+            }
+        }
+
+        for (int i = BATCH_SIZE - 1; i >= 8; i--) {
+            assertFalse(acker.ackIndividual(i));
+            assertFalse(acker.getBitSet().get(i));
+        }
+
+        assertTrue(acker.ackIndividual(7));
+        assertEquals(0, acker.getOutstandingAcks());
+    }
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services