You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/09/06 16:15:30 UTC
[pulsar] branch master updated: Forget to update memory usage on
producer close (#11906)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ad9efae Forget to update memory usage on producer close (#11906)
ad9efae is described below
commit ad9efae1abf9675f830052ab1d4697330b23a750
Author: Shoothzj <sh...@gmail.com>
AuthorDate: Tue Sep 7 00:14:38 2021 +0800
Forget to update memory usage on producer close (#11906)
---
.../pulsar/client/impl/ProducerMemoryLimitTest.java | 20 +++++++++++++++++++-
.../org/apache/pulsar/client/impl/ProducerImpl.java | 2 ++
2 files changed, 21 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
index b6ec6a5..264ec30 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
@@ -59,7 +59,7 @@ public class ProducerMemoryLimitTest extends ProducerConsumerBase {
.create();
this.stopBroker();
try {
- producer.send("memroy-test".getBytes(StandardCharsets.UTF_8));
+ producer.send("memory-test".getBytes(StandardCharsets.UTF_8));
throw new IllegalStateException("can not reach here");
} catch (PulsarClientException.TimeoutException ex) {
PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient;
@@ -69,6 +69,24 @@ public class ProducerMemoryLimitTest extends ProducerConsumerBase {
}
+ @Test(timeOut = 10_000)
+ public void testProducerCloseMemoryRelease() throws Exception {
+ initClientWithMemoryLimit();
+ @Cleanup
+ ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
+ .topic("testProducerMemoryLimit")
+ .sendTimeout(5, TimeUnit.SECONDS)
+ .maxPendingMessages(0)
+ .enableBatching(false)
+ .create();
+ this.stopBroker();
+ producer.sendAsync("memory-test".getBytes(StandardCharsets.UTF_8));
+ producer.close();
+ PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient;
+ final MemoryLimitController memoryLimitController = clientImpl.getMemoryLimitController();
+ Assert.assertEquals(memoryLimitController.currentUsage(), 0);
+ }
+
private void initClientWithMemoryLimit() throws PulsarClientException {
pulsarClient = PulsarClient.builder().
serviceUrl(lookupUrl.toString())
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 439ceff..50f2d5d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -884,6 +884,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
format("The producer %s of the topic %s was already closed when closing the producers",
producerName, topic));
pendingMessages.forEach(msg -> {
+ client.getMemoryLimitController().releaseMemory(msg.uncompressedSize);
msg.sendComplete(ex);
msg.cmd.release();
msg.recycle();
@@ -907,6 +908,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
log.info("[{}] [{}] Closed Producer", topic, producerName);
setState(State.Closed);
pendingMessages.forEach(msg -> {
+ client.getMemoryLimitController().releaseMemory(msg.uncompressedSize);
msg.cmd.release();
msg.recycle();
});