You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/07/06 16:40:48 UTC
[pulsar] branch master updated: Fix batch ackset recycled multiple
times. (#7409)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 25a6907 Fix batch ackset recycled multiple times. (#7409)
25a6907 is described below
commit 25a690734f278299bfdaae118a2c02ecc25c125e
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue Jul 7 00:40:33 2020 +0800
Fix batch ackset recycled multiple times. (#7409)
* Fix batch ackset recycled multiple times.
* Apply comments.
* Update pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/BitSetRecyclable.java
---
.../client/impl/BatchMessageIndexAckTest.java | 31 ++++++++++++++++++++++
.../PersistentAcknowledgmentsGroupingTracker.java | 1 +
.../apache/pulsar/common/protocol/Commands.java | 3 ---
3 files changed, 32 insertions(+), 3 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
index 114b9ae..3150f10 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
@@ -181,4 +181,35 @@ public class BatchMessageIndexAckTest extends ProducerConsumerBase {
// broker also need to handle the available permits.
Assert.assertEquals(received.size(), 100);
}
+
+ @Test
+ public void testDoNotRecycleAckSetMultipleTimes() throws Exception {
+ final String topic = "persistent://my-property/my-ns/testSafeAckSetRecycle";
+
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .batchingMaxMessages(10)
+ .blockIfQueueFull(true).topic(topic)
+ .create();
+
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .acknowledgmentGroupTime(1, TimeUnit.MILLISECONDS)
+ .topic(topic)
+ .subscriptionName("test")
+ .subscribe();
+
+ final int messages = 100;
+ for (int i = 0; i < messages; i++) {
+ producer.sendAsync("Hello Pulsar".getBytes());
+ }
+
+ // Should not throw an exception.
+ for (int i = 0; i < messages; i++) {
+ consumer.acknowledgeCumulative(consumer.receive());
+ // make sure the group ack flushed.
+ Thread.sleep(2);
+ }
+
+ producer.close();
+ consumer.close();
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index 937f005..6908979 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -205,6 +205,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
}
final ByteBuf cmd = Commands.newAck(consumer.consumerId, msgId.ledgerId, msgId.entryId, bitSet, ackType, null, properties);
+ bitSet.recycle();
cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
return true;
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 0222f3c..7ae21e7 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -961,9 +961,6 @@ public class Commands {
ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.ACK).setAck(ack));
ack.recycle();
- if (ackSet != null) {
- ackSet.recycle();
- }
ackBuilder.recycle();
messageIdDataBuilder.recycle();
messageIdData.recycle();