You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/09/13 07:15:24 UTC

[pulsar] 02/02: [fix][broker]Fix memoryLimitController currentUsage and MaxQueueSize semaphore leak when batchMessageContainer add message exception (#17276)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 58946da1f4e09bfec4e67ca39416123beefcc700
Author: lixinyang <84...@users.noreply.github.com>
AuthorDate: Mon Sep 5 10:35:30 2022 +0800

    [fix][broker]Fix memoryLimitController currentUsage and MaxQueueSize semaphore leak when batchMessageContainer add message exception (#17276)
    
    * fix memoryLimitController currentUsage and MaxQueueSize semaphore leak when batchMessageContainer add message
    
    * fix unit test
    
    Co-authored-by: nicklixinyang <ni...@didiglobal.com>
---
 .../client/impl/ProducerMemoryLimitTest.java       | 40 ++++++++++++++++++++++
 .../pulsar/client/impl/ProducerSemaphoreTest.java  | 38 ++++++++++++++++++++
 .../client/impl/BatchMessageContainerImpl.java     |  2 ++
 .../client/impl/BatchMessageContainerImplTest.java | 13 +++++++
 4 files changed, 93 insertions(+)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
index 8bbfa19c509..2a981fcb6df 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
@@ -18,6 +18,11 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import io.netty.buffer.ByteBufAllocator;
+import java.lang.reflect.Field;
 import lombok.Cleanup;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -125,6 +130,41 @@ public class ProducerMemoryLimitTest extends ProducerConsumerBase {
         }
     }
 
+    @Test(timeOut = 10_000)
+    public void testBatchMessageOOMMemoryRelease() throws Exception {
+        initClientWithMemoryLimit();
+        @Cleanup
+        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
+                .topic("testProducerMemoryLimit")
+                .sendTimeout(5, TimeUnit.SECONDS)
+                .maxPendingMessages(0)
+                .enableBatching(true)
+                .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
+                .batchingMaxBytes(12)
+                .create();
+        this.stopBroker();
+
+        try {
+            ProducerImpl<byte[]> spyProducer = Mockito.spy(producer);
+            final ByteBufAllocator mockAllocator = mock(ByteBufAllocator.class);
+            doAnswer((ignore) -> {
+                throw new OutOfMemoryError("memory-test");
+            }).when(mockAllocator).buffer(anyInt());
+
+            final BatchMessageContainerImpl batchMessageContainer = new BatchMessageContainerImpl(mockAllocator);
+            Field batchMessageContainerField = ProducerImpl.class.getDeclaredField("batchMessageContainer");
+            batchMessageContainerField.setAccessible(true);
+            batchMessageContainerField.set(spyProducer, batchMessageContainer);
+
+            spyProducer.send("memory-test".getBytes(StandardCharsets.UTF_8));
+            Assert.fail("can not reach here");
+    } catch (PulsarClientException ex) {
+        PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient;
+        final MemoryLimitController memoryLimitController = clientImpl.getMemoryLimitController();
+        Assert.assertEquals(memoryLimitController.currentUsage(), 0);
+    }
+    }
+
     @Test(timeOut = 10_000)
     public void testProducerCloseMemoryRelease() throws Exception {
         initClientWithMemoryLimit();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
index 2bc81c48f9b..f04f94d11b1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
@@ -19,6 +19,10 @@
 package org.apache.pulsar.client.impl;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import io.netty.buffer.ByteBufAllocator;
 import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
 import java.util.concurrent.TimeUnit;
@@ -274,4 +278,38 @@ public class ProducerSemaphoreTest extends ProducerConsumerBase {
             Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 10);
         }
     }
+
+    @Test(timeOut = 10_000)
+    public void testBatchMessageOOMProducerSemaphoreRelease() throws Exception {
+        final int pendingQueueSize = 10;
+        @Cleanup
+        ProducerImpl<byte[]> producer =
+                (ProducerImpl<byte[]>) pulsarClient.newProducer()
+                        .topic("testProducerSemaphoreRelease")
+                        .sendTimeout(5, TimeUnit.SECONDS)
+                        .maxPendingMessages(pendingQueueSize)
+                        .enableBatching(true)
+                        .batchingMaxPublishDelay(500, TimeUnit.MILLISECONDS)
+                        .batchingMaxBytes(12)
+                        .create();
+        this.stopBroker();
+
+        try {
+            ProducerImpl<byte[]> spyProducer = Mockito.spy(producer);
+            final ByteBufAllocator mockAllocator = mock(ByteBufAllocator.class);
+            doAnswer((ignore) -> {
+                throw new OutOfMemoryError("semaphore-test");
+            }).when(mockAllocator).buffer(anyInt());
+
+            final BatchMessageContainerImpl batchMessageContainer = new BatchMessageContainerImpl(mockAllocator);
+            Field batchMessageContainerField = ProducerImpl.class.getDeclaredField("batchMessageContainer");
+            batchMessageContainerField.setAccessible(true);
+            batchMessageContainerField.set(spyProducer, batchMessageContainer);
+
+            spyProducer.send("semaphore-test".getBytes(StandardCharsets.UTF_8));
+            Assert.fail("can not reach here");
+        } catch (PulsarClientException ex) {
+            Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize);
+        }
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index 44fad489dac..43c229d6e81 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -108,6 +108,8 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
                 }
             } catch (Throwable e) {
                 log.error("construct first message failed, exception is ", e);
+                producer.semaphoreRelease(getNumMessagesInBatch());
+                producer.client.getMemoryLimitController().releaseMemory(msg.getUncompressedSize());
                 discard(new PulsarClientException(e));
                 return false;
             }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
index a4f0205c2cf..1e640301f89 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertTrue;
 import io.netty.buffer.ByteBufAllocator;
+import java.lang.reflect.Field;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -31,6 +32,7 @@ import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.junit.Assert;
 import org.testng.annotations.Test;
 
 public class BatchMessageContainerImplTest {
@@ -41,6 +43,17 @@ public class BatchMessageContainerImplTest {
         final ProducerImpl<?> producer = mock(ProducerImpl.class);
         final ProducerConfigurationData producerConfigurationData = new ProducerConfigurationData();
         producerConfigurationData.setCompressionType(CompressionType.NONE);
+        PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+        MemoryLimitController memoryLimitController = mock(MemoryLimitController.class);
+        when(pulsarClient.getMemoryLimitController()).thenReturn(memoryLimitController);
+        try {
+            Field clientFiled = HandlerState.class.getDeclaredField("client");
+            clientFiled.setAccessible(true);
+            clientFiled.set(producer, pulsarClient);
+        } catch (Exception e){
+            Assert.fail(e.getMessage());
+        }
+
         when(producer.getConfiguration()).thenReturn(producerConfigurationData);
         final ByteBufAllocator mockAllocator = mock(ByteBufAllocator.class);
         doAnswer((ignore) -> {