You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/08/08 09:04:27 UTC

[pulsar] 23/33: [fix][client]Fix client memory limit currentUsage leak and semaphore release duplicated in ProducerImpl (#16837)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7c73269cda7855045428a3d53df09632cfbbde94
Author: lixinyang <84...@users.noreply.github.com>
AuthorDate: Sat Jul 30 19:31:07 2022 +0800

    [fix][client]Fix client memory limit currentUsage leak and semaphore release duplicated in ProducerImpl (#16837)
    
    ### Motivation
    
    Fix client memory limit `currentUsage` leak in `ProducerImpl`. When our pulsar cluster occur some error,  producer send message fail and we find the `currentUsage` always  keep high value like the leaked, and cause the producer send rate is slow.
    And find producer semaphore release duplicated when `createOpSendMsg`  occur some excrption.
    
    Follow 1 point only release the message count semaphore, but not release the memory limit.
    **memory limit currentUsage leak point**
    https://github.com/apache/pulsar/blob/c217b8f559292fd34c6a4fb4b30aab213720d962/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2031-L2033
    
    **producer semaphore release duplicated**
    https://github.com/apache/pulsar/blob/4d64e2e66689381ebbb94fbfc03eb4e1dfba0405/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2116-L2120
    
    ```
    After the exception the  memory limit leak occured.
    org.apache.pulsar.client.api.PulsarClientException$TimeoutException: The producer XXXX-366-15151 can not send message to the topic persistent://XXXX/XXXX/XXXX within given timeout : createdAt 30.005 seconds ago, firstSentAt 30.005 seconds ago, lastSentAt 30.005 seconds ago, retryCount 1
            at org.apache.pulsar.client.impl.ProducerImpl$OpSendMsg.sendComplete(ProducerImpl.java:1287)
            at org.apache.pulsar.client.impl.ProducerImpl.lambda$failPendingMessages$18(ProducerImpl.java:1826)
            at java.base/java.util.ArrayDeque.forEach(ArrayDeque.java:889)
            at org.apache.pulsar.client.impl.ProducerImpl$OpSendMsgQueue.forEach(ProducerImpl.java:1369)
            at org.apache.pulsar.client.impl.ProducerImpl.failPendingMessages(ProducerImpl.java:1816)
            at org.apache.pulsar.client.impl.ProducerImpl.lambda$failPendingMessages$19(ProducerImpl.java:1848)
            at org.apache.pulsar.shade.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
            at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
            at org.apache.pulsar.shade.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
            at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
            at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
            at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
            at java.base/java.lang.Thread.run(Thread.java:834)
    
    ```
    
    ### Modifications
    
    1. add the `MemoryLimitController` release.
    
    ### Documentation
    
    - [X] `doc-not-needed`
    
    (cherry picked from commit 955dcd10ce28b996811e194c9ad852b06ab30aee)
---
 .../client/impl/ProducerMemoryLimitTest.java       | 29 +++++++++++++++++
 .../pulsar/client/impl/ProducerSemaphoreTest.java  | 36 +++++++++++++++++++++-
 .../apache/pulsar/client/impl/ProducerImpl.java    |  5 ++-
 3 files changed, 66 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
index 0856dfc88b2..8bbfa19c509 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
@@ -96,6 +96,35 @@ public class ProducerMemoryLimitTest extends ProducerConsumerBase {
 
     }
 
+    @Test(timeOut = 10_000)
+    public void testProducerBatchSendTimeoutMemoryRelease() throws Exception {
+        initClientWithMemoryLimit();
+        @Cleanup
+        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
+                .topic("testProducerMemoryLimit")
+                .sendTimeout(5, TimeUnit.SECONDS)
+                .maxPendingMessages(0)
+                .enableBatching(true)
+                .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
+                .batchingMaxBytes(12)
+                .create();
+        this.stopBroker();
+        try {
+            producer.newMessage().value("memory-test".getBytes(StandardCharsets.UTF_8)).sendAsync();
+            try {
+                producer.newMessage().value("memory-test".getBytes(StandardCharsets.UTF_8)).sendAsync().get();
+            } catch (Exception e) {
+                throw PulsarClientException.unwrap(e);
+            }
+
+            throw new IllegalStateException("can not reach here");
+        } catch (PulsarClientException.TimeoutException ex) {
+            PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient;
+            final MemoryLimitController memoryLimitController = clientImpl.getMemoryLimitController();
+            Assert.assertEquals(memoryLimitController.currentUsage(), 0);
+        }
+    }
+
     @Test(timeOut = 10_000)
     public void testProducerCloseMemoryRelease() throws Exception {
         initClientWithMemoryLimit();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
index cc7b601e42a..9507e9a5e2c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
@@ -18,6 +18,10 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static org.mockito.ArgumentMatchers.any;
+import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
@@ -33,7 +37,6 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -229,4 +232,35 @@ public class ProducerSemaphoreTest extends ProducerConsumerBase {
         Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize);
         Assert.assertFalse(producer.isErrorStat());
     }
+
+    @Test(timeOut = 10_000)
+    public void testBatchMessageSendTimeoutProducerSemaphoreRelease() throws Exception {
+        final int pendingQueueSize = 10;
+        @Cleanup
+        ProducerImpl<byte[]> producer =
+                (ProducerImpl<byte[]>) pulsarClient.newProducer()
+                        .topic("testProducerSemaphoreRelease")
+                        .sendTimeout(5, TimeUnit.SECONDS)
+                        .maxPendingMessages(pendingQueueSize)
+                        .enableBatching(true)
+                        .batchingMaxPublishDelay(500, TimeUnit.MILLISECONDS)
+                        .batchingMaxBytes(12)
+                        .create();
+        this.stopBroker();
+        try {
+            ProducerImpl<byte[]> spyProducer = Mockito.spy(producer);
+            Mockito.doThrow(new PulsarClientException.CryptoException("crypto error")).when(spyProducer)
+                    .encryptMessage(any(),any());
+
+            Field batchMessageContainerField = ProducerImpl.class.getDeclaredField("batchMessageContainer");
+            batchMessageContainerField.setAccessible(true);
+            BatchMessageContainerImpl batchMessageContainer = (BatchMessageContainerImpl) batchMessageContainerField.get(spyProducer);
+            batchMessageContainer.setProducer(spyProducer);
+            spyProducer.send("semaphore-test".getBytes(StandardCharsets.UTF_8));
+
+            throw new IllegalStateException("can not reach here");
+        } catch (PulsarClientException.TimeoutException ex) {
+            Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 10);
+        }
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 99197262585..9c25f269cee 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1959,8 +1959,10 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             return;
         }
         final int numMessagesInBatch = batchMessageContainer.getNumMessagesInBatch();
+        final long currentBatchSize = batchMessageContainer.getCurrentBatchSize();
         batchMessageContainer.discard(ex);
         semaphoreRelease(numMessagesInBatch);
+        client.getMemoryLimitController().releaseMemory(currentBatchSize);
     }
 
     @Override
@@ -2005,10 +2007,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                 for (OpSendMsg opSendMsg : opSendMsgs) {
                     processOpSendMsg(opSendMsg);
                 }
-            } catch (PulsarClientException e) {
-                semaphoreRelease(batchMessageContainer.getNumMessagesInBatch());
             } catch (Throwable t) {
-                semaphoreRelease(batchMessageContainer.getNumMessagesInBatch());
                 log.warn("[{}] [{}] error while create opSendMsg by batch message container", topic, producerName, t);
             }
         }