You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/02/23 17:40:27 UTC
[pulsar] branch branch-2.8 updated: Fix PersistentAcknowledgmentsGroupingTracker set bitSet issue. (#14260)
This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new a482307 Fix PersistentAcknowledgmentsGroupingTracker set bitSet issue. (#14260)
a482307 is described below
commit a4823073d27bb75a46680dcd87013128725ab195
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Mon Feb 14 11:18:11 2022 +0800
Fix PersistentAcknowledgmentsGroupingTracker set bitSet issue. (#14260)
When consumers set `enableBatchIndexAcknowledgment=true`, client will execute PersistentAcknowledgmentsGroupingTracker#doIndividualBatchAckAsync :
https://github.com/apache/pulsar/blob/8928c3496a61c588b50461d6adaab089dd421619/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java#L357-L372
There is an error in line 367, it should be
`value.set(0, batchMessageId.getBatchSize()); `
But batchMessageId.getBatchSize() always return acker.getBatchSize():
https://github.com/apache/pulsar/blob/8928c3496a61c588b50461d6adaab089dd421619/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java#L137-L139
If line 362 is false, BatchMessageIdImpl only has acker with BatchMessageAckerDisabled which batch is always 0.
So I have added `getOriginalBatchSize` to return the user-specified batch size.
Then, when print logs in line 556, `pendingIndividualBatchIndexAcks` is always empty. Should replace with `entriesToAck`
(cherry picked from commit 816eaed900bbff1a8514f349cd60e439c6db97bc)
---
.../pulsar/client/impl/BatchMessageIdImpl.java | 4 +++
.../PersistentAcknowledgmentsGroupingTracker.java | 7 ++--
.../impl/AcknowledgementsGroupingTrackerTest.java | 41 ++++++++++++++++++++--
3 files changed, 46 insertions(+), 6 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
index fd8ea72..75ab3a8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java
@@ -138,6 +138,10 @@ public class BatchMessageIdImpl extends MessageIdImpl {
return acker.getBatchSize();
}
+ public int getOriginalBatchSize() {
+ return this.batchSize;
+ }
+
public MessageIdImpl prevBatchMessageId() {
return new MessageIdImpl(
ledgerId, entryId - 1, partitionIndex);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index aa65c61..e3d4123 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -363,7 +363,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
value = ConcurrentBitSetRecyclable.create(batchMessageId.getAcker().getBitSet());
} else {
value = ConcurrentBitSetRecyclable.create();
- value.set(0, batchMessageId.getBatchIndex());
+ value.set(0, batchMessageId.getOriginalBatchSize());
}
return value;
});
@@ -546,8 +546,9 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
if (shouldFlush) {
if (log.isDebugEnabled()) {
- log.debug("[{}] Flushing pending acks to broker: last-cumulative-ack: {} -- individual-acks: {} -- individual-batch-index-acks: {}",
- consumer, lastCumulativeAck, pendingIndividualAcks, pendingIndividualBatchIndexAcks);
+ log.debug("[{}] Flushing pending acks to broker: last-cumulative-ack: {} -- individual-acks: {}"
+ + " -- individual-batch-index-acks: {}",
+ consumer, lastCumulativeAck, pendingIndividualAcks, entriesToAck);
}
cnx.ctx().flush();
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
index 9632a88..c0b952a 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
@@ -22,22 +22,27 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
-
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
-
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.BitSet;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
-
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.util.TimedCompletableFuture;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
+import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.testng.annotations.AfterClass;
@@ -381,6 +386,36 @@ public class AcknowledgementsGroupingTrackerTest {
tracker.close();
}
+ @Test
+ public void testDoIndividualBatchAckAsync() throws Exception{
+ ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>();
+ AcknowledgmentsGroupingTracker tracker = new PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup);
+ MessageId messageId1 = new BatchMessageIdImpl(5, 1, 0, 3, 10, BatchMessageAckerDisabled.INSTANCE);
+ BitSet bitSet = new BitSet(20);
+ for(int i = 0; i < 20; i ++) {
+ bitSet.set(i, true);
+ }
+ MessageId messageId2 = new BatchMessageIdImpl(3, 2, 0, 5, 20, BatchMessageAcker.newAcker(bitSet));
+ Method doIndividualBatchAckAsync = PersistentAcknowledgmentsGroupingTracker.class
+ .getDeclaredMethod("doIndividualBatchAckAsync", BatchMessageIdImpl.class);
+ doIndividualBatchAckAsync.setAccessible(true);
+ doIndividualBatchAckAsync.invoke(tracker, messageId1);
+ doIndividualBatchAckAsync.invoke(tracker, messageId2);
+ Field pendingIndividualBatchIndexAcks = PersistentAcknowledgmentsGroupingTracker.class.getDeclaredField("pendingIndividualBatchIndexAcks");
+ pendingIndividualBatchIndexAcks.setAccessible(true);
+ ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable> batchIndexAcks =
+ (ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable>) pendingIndividualBatchIndexAcks.get(tracker);
+ MessageIdImpl position1 = new MessageIdImpl(5, 1, 0);
+ MessageIdImpl position2 = new MessageIdImpl(3, 2, 0);
+ assertTrue(batchIndexAcks.containsKey(position1));
+ assertNotNull(batchIndexAcks.get(position1));
+ assertEquals(batchIndexAcks.get(position1).cardinality(), 9);
+ assertTrue(batchIndexAcks.containsKey(position2));
+ assertNotNull(batchIndexAcks.get(position2));
+ assertEquals(batchIndexAcks.get(position2).cardinality(), 19);
+ tracker.close();
+ }
+
public class ClientCnxTest extends ClientCnx {
public ClientCnxTest(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {