You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/07/06 16:40:48 UTC

[pulsar] branch master updated: Fix batch ackset recycled multiple times. (#7409)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 25a6907  Fix batch ackset recycled multiple times. (#7409)
25a6907 is described below

commit 25a690734f278299bfdaae118a2c02ecc25c125e
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue Jul 7 00:40:33 2020 +0800

    Fix batch ackset recycled multiple times. (#7409)
    
    * Fix batch ackset recycled multiple times.
    
    * Apply comments.
    
    * Update pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/BitSetRecyclable.java
---
 .../client/impl/BatchMessageIndexAckTest.java      | 31 ++++++++++++++++++++++
 .../PersistentAcknowledgmentsGroupingTracker.java  |  1 +
 .../apache/pulsar/common/protocol/Commands.java    |  3 ---
 3 files changed, 32 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
index 114b9ae..3150f10 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
@@ -181,4 +181,35 @@ public class BatchMessageIndexAckTest extends ProducerConsumerBase {
         // broker also need to handle the available permits.
         Assert.assertEquals(received.size(), 100);
     }
+
+    @Test
+    public void testDoNotRecycleAckSetMultipleTimes() throws Exception  {
+        final String topic = "persistent://my-property/my-ns/testSafeAckSetRecycle";
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .batchingMaxMessages(10)
+                .blockIfQueueFull(true).topic(topic)
+                .create();
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .acknowledgmentGroupTime(1, TimeUnit.MILLISECONDS)
+                .topic(topic)
+                .subscriptionName("test")
+                .subscribe();
+
+        final int messages = 100;
+        for (int i = 0; i < messages; i++) {
+            producer.sendAsync("Hello Pulsar".getBytes());
+        }
+
+        // Should not throw an exception.
+        for (int i = 0; i < messages; i++) {
+            consumer.acknowledgeCumulative(consumer.receive());
+            // make sure the group ack flushed.
+            Thread.sleep(2);
+        }
+
+        producer.close();
+        consumer.close();
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index 937f005..6908979 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -205,6 +205,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
         }
 
         final ByteBuf cmd = Commands.newAck(consumer.consumerId, msgId.ledgerId, msgId.entryId, bitSet, ackType, null, properties);
+        bitSet.recycle();
         cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
         return true;
     }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 0222f3c..7ae21e7 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -961,9 +961,6 @@ public class Commands {
 
         ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.ACK).setAck(ack));
         ack.recycle();
-        if (ackSet != null) {
-            ackSet.recycle();
-        }
         ackBuilder.recycle();
         messageIdDataBuilder.recycle();
         messageIdData.recycle();