You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/02/16 01:16:04 UTC

[pulsar] 03/08: Fix PersistentAcknowledgmentsGroupingTracker set bitSet issue. (#14260)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b1bf0022443f110ccbd5a726118e2fa21ad41b93
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Mon Feb 14 11:18:11 2022 +0800

    Fix PersistentAcknowledgmentsGroupingTracker set bitSet issue. (#14260)
    
    When consumers set `enableBatchIndexAcknowledgment=true`, client will execute PersistentAcknowledgmentsGroupingTracker#doIndividualBatchAckAsync :
    https://github.com/apache/pulsar/blob/8928c3496a61c588b50461d6adaab089dd421619/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java#L357-L372
    
    There is an error in line 367, it should be
    `value.set(0, batchMessageId.getBatchSize()); `
    
    But batchMessageId.getBatchSize() always return acker.getBatchSize():
    https://github.com/apache/pulsar/blob/8928c3496a61c588b50461d6adaab089dd421619/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java#L137-L139
    
    If line 362 is false, BatchMessageIdImpl only has acker with BatchMessageAckerDisabled which batch is always 0.
    So I have added `getOriginalBatchSize` to return the user-specified batch size.
    
    Then, when print logs in line 556, `pendingIndividualBatchIndexAcks` is always empty. Should replace with `entriesToAck`
    
    (cherry picked from commit 816eaed900bbff1a8514f349cd60e439c6db97bc)
---
 .../pulsar/client/impl/BatchMessageIdImpl.java     |  4 +++
 .../PersistentAcknowledgmentsGroupingTracker.java  |  2 +-
 .../impl/AcknowledgementsGroupingTrackerTest.java  | 41 ++++++++++++++++++++--
 3 files changed, 43 insertions(+), 4 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
index fd8ea72..75ab3a8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
@@ -138,6 +138,10 @@ public class BatchMessageIdImpl extends MessageIdImpl {
         return acker.getBatchSize();
     }
 
+    public int getOriginalBatchSize() {
+        return this.batchSize;
+    }
+
     public MessageIdImpl prevBatchMessageId() {
         return new MessageIdImpl(
             ledgerId, entryId - 1, partitionIndex);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index aa65c61..e737a2a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -363,7 +363,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
                         value = ConcurrentBitSetRecyclable.create(batchMessageId.getAcker().getBitSet());
                     } else {
                         value = ConcurrentBitSetRecyclable.create();
-                        value.set(0, batchMessageId.getBatchIndex());
+                        value.set(0, batchMessageId.getOriginalBatchSize());
                     }
                     return value;
                 });
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
index 9632a88..c0b952a 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
@@ -22,22 +22,27 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
-
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.BitSet;
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.util.TimedCompletableFuture;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
+import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.api.proto.ProtocolVersion;
 import org.testng.annotations.AfterClass;
@@ -381,6 +386,36 @@ public class AcknowledgementsGroupingTrackerTest {
         tracker.close();
     }
 
+    @Test
+    public void testDoIndividualBatchAckAsync() throws Exception{
+        ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>();
+        AcknowledgmentsGroupingTracker tracker = new PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup);
+        MessageId messageId1 = new BatchMessageIdImpl(5, 1, 0, 3, 10, BatchMessageAckerDisabled.INSTANCE);
+        BitSet bitSet = new BitSet(20);
+        for(int i = 0; i < 20; i ++) {
+            bitSet.set(i, true);
+        }
+        MessageId messageId2 = new BatchMessageIdImpl(3, 2, 0, 5, 20, BatchMessageAcker.newAcker(bitSet));
+        Method doIndividualBatchAckAsync = PersistentAcknowledgmentsGroupingTracker.class
+                .getDeclaredMethod("doIndividualBatchAckAsync", BatchMessageIdImpl.class);
+        doIndividualBatchAckAsync.setAccessible(true);
+        doIndividualBatchAckAsync.invoke(tracker, messageId1);
+        doIndividualBatchAckAsync.invoke(tracker, messageId2);
+        Field pendingIndividualBatchIndexAcks = PersistentAcknowledgmentsGroupingTracker.class.getDeclaredField("pendingIndividualBatchIndexAcks");
+        pendingIndividualBatchIndexAcks.setAccessible(true);
+        ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable> batchIndexAcks =
+                (ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable>) pendingIndividualBatchIndexAcks.get(tracker);
+        MessageIdImpl position1 = new MessageIdImpl(5, 1, 0);
+        MessageIdImpl position2 = new MessageIdImpl(3, 2, 0);
+        assertTrue(batchIndexAcks.containsKey(position1));
+        assertNotNull(batchIndexAcks.get(position1));
+        assertEquals(batchIndexAcks.get(position1).cardinality(), 9);
+        assertTrue(batchIndexAcks.containsKey(position2));
+        assertNotNull(batchIndexAcks.get(position2));
+        assertEquals(batchIndexAcks.get(position2).cardinality(), 19);
+        tracker.close();
+    }
+
     public class ClientCnxTest extends ClientCnx {
 
         public ClientCnxTest(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {