You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/03 04:09:19 UTC

[pulsar] branch master updated: [fix][client]Fix MaxQueueSize semaphore release leak in createOpSendMsg (#16915)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d95f6cf366f [fix][client]Fix MaxQueueSize semaphore release leak in createOpSendMsg (#16915)
d95f6cf366f is described below

commit d95f6cf366f66bc1e38711bc59cd456c8f53f888
Author: lixinyang <84...@users.noreply.github.com>
AuthorDate: Wed Aug 3 12:09:13 2022 +0800

    [fix][client]Fix MaxQueueSize semaphore release leak in createOpSendMsg (#16915)
---
 .../apache/pulsar/client/impl/ProducerSemaphoreTest.java    | 13 ++++++++++++-
 .../pulsar/client/impl/BatchMessageContainerImpl.java       |  1 +
 2 files changed, 13 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
index 9507e9a5e2c..2bc81c48f9b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
@@ -66,7 +66,7 @@ public class ProducerSemaphoreTest extends ProducerConsumerBase {
         ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
                 .topic("testProducerSemaphoreAcquire")
                 .maxPendingMessages(pendingQueueSize)
-                .enableBatching(false)
+                .enableBatching(true)
                 .create();
 
         this.stopBroker();
@@ -79,6 +79,17 @@ public class ProducerSemaphoreTest extends ProducerConsumerBase {
         } catch (PulsarClientException.InvalidMessageException ex) {
             Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize);
         }
+
+        producer.conf.setBatchingEnabled(false);
+        try {
+            try (MockedStatic<ClientCnx> mockedStatic = Mockito.mockStatic(ClientCnx.class)) {
+                mockedStatic.when(ClientCnx::getMaxMessageSize).thenReturn(2);
+                producer.send("semaphore-test".getBytes(StandardCharsets.UTF_8));
+            }
+            throw new IllegalStateException("can not reach here");
+        } catch (PulsarClientException.InvalidMessageException ex) {
+            Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize);
+        }
     }
 
     @Test(timeOut = 30000)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index 0d107aa7ba9..7d95f0963ba 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -199,6 +199,7 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
     public OpSendMsg createOpSendMsg() throws IOException {
         ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload());
         if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
+            producer.semaphoreRelease(messages.size());
             messages.forEach(msg -> producer.client.getMemoryLimitController()
                     .releaseMemory(msg.getUncompressedSize()));
             discard(new PulsarClientException.InvalidMessageException(