You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/09/06 16:15:30 UTC

[pulsar] branch master updated: Forget to update memory usage on producer close (#11906)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ad9efae  Forget to update memory usage on producer close (#11906)
ad9efae is described below

commit ad9efae1abf9675f830052ab1d4697330b23a750
Author: Shoothzj <sh...@gmail.com>
AuthorDate: Tue Sep 7 00:14:38 2021 +0800

    Forget to update memory usage on producer close (#11906)
---
 .../pulsar/client/impl/ProducerMemoryLimitTest.java  | 20 +++++++++++++++++++-
 .../org/apache/pulsar/client/impl/ProducerImpl.java  |  2 ++
 2 files changed, 21 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
index b6ec6a5..264ec30 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
@@ -59,7 +59,7 @@ public class ProducerMemoryLimitTest extends ProducerConsumerBase {
                 .create();
         this.stopBroker();
         try {
-            producer.send("memroy-test".getBytes(StandardCharsets.UTF_8));
+            producer.send("memory-test".getBytes(StandardCharsets.UTF_8));
             throw new IllegalStateException("can not reach here");
         } catch (PulsarClientException.TimeoutException ex) {
             PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient;
@@ -69,6 +69,24 @@ public class ProducerMemoryLimitTest extends ProducerConsumerBase {
 
     }
 
+    @Test(timeOut = 10_000)
+    public void testProducerCloseMemoryRelease() throws Exception {
+        initClientWithMemoryLimit();
+        @Cleanup
+        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
+                .topic("testProducerMemoryLimit")
+                .sendTimeout(5, TimeUnit.SECONDS)
+                .maxPendingMessages(0)
+                .enableBatching(false)
+                .create();
+        this.stopBroker();
+        producer.sendAsync("memory-test".getBytes(StandardCharsets.UTF_8));
+        producer.close();
+        PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient;
+        final MemoryLimitController memoryLimitController = clientImpl.getMemoryLimitController();
+        Assert.assertEquals(memoryLimitController.currentUsage(), 0);
+    }
+
     private void initClientWithMemoryLimit() throws PulsarClientException {
         pulsarClient = PulsarClient.builder().
                 serviceUrl(lookupUrl.toString())
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 439ceff..50f2d5d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -884,6 +884,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                     format("The producer %s of the topic %s was already closed when closing the producers",
                         producerName, topic));
                 pendingMessages.forEach(msg -> {
+                    client.getMemoryLimitController().releaseMemory(msg.uncompressedSize);
                     msg.sendComplete(ex);
                     msg.cmd.release();
                     msg.recycle();
@@ -907,6 +908,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                     log.info("[{}] [{}] Closed Producer", topic, producerName);
                     setState(State.Closed);
                     pendingMessages.forEach(msg -> {
+                        client.getMemoryLimitController().releaseMemory(msg.uncompressedSize);
                         msg.cmd.release();
                         msg.recycle();
                     });