You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/09/13 07:15:24 UTC
[pulsar] 02/02: [fix][broker]Fix memoryLimitController currentUsage and MaxQueueSize semaphore leak when batchMessageContainer add message exception (#17276)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 58946da1f4e09bfec4e67ca39416123beefcc700
Author: lixinyang <84...@users.noreply.github.com>
AuthorDate: Mon Sep 5 10:35:30 2022 +0800
[fix][broker]Fix memoryLimitController currentUsage and MaxQueueSize semaphore leak when batchMessageContainer add message exception (#17276)
* fix memoryLimitController currentUsage and MaxQueueSize semaphore leak when batchMessageContainer add message
* fix unit test
Co-authored-by: nicklixinyang <ni...@didiglobal.com>
---
.../client/impl/ProducerMemoryLimitTest.java | 40 ++++++++++++++++++++++
.../pulsar/client/impl/ProducerSemaphoreTest.java | 38 ++++++++++++++++++++
.../client/impl/BatchMessageContainerImpl.java | 2 ++
.../client/impl/BatchMessageContainerImplTest.java | 13 +++++++
4 files changed, 93 insertions(+)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
index 8bbfa19c509..2a981fcb6df 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
@@ -18,6 +18,11 @@
*/
package org.apache.pulsar.client.impl;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import io.netty.buffer.ByteBufAllocator;
+import java.lang.reflect.Field;
import lombok.Cleanup;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
@@ -125,6 +130,41 @@ public class ProducerMemoryLimitTest extends ProducerConsumerBase {
}
}
+ @Test(timeOut = 10_000)
+ public void testBatchMessageOOMMemoryRelease() throws Exception {
+ initClientWithMemoryLimit();
+ @Cleanup
+ ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
+ .topic("testProducerMemoryLimit")
+ .sendTimeout(5, TimeUnit.SECONDS)
+ .maxPendingMessages(0)
+ .enableBatching(true)
+ .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
+ .batchingMaxBytes(12)
+ .create();
+ this.stopBroker();
+
+ try {
+ ProducerImpl<byte[]> spyProducer = Mockito.spy(producer);
+ final ByteBufAllocator mockAllocator = mock(ByteBufAllocator.class);
+ doAnswer((ignore) -> {
+ throw new OutOfMemoryError("memory-test");
+ }).when(mockAllocator).buffer(anyInt());
+
+ final BatchMessageContainerImpl batchMessageContainer = new BatchMessageContainerImpl(mockAllocator);
+ Field batchMessageContainerField = ProducerImpl.class.getDeclaredField("batchMessageContainer");
+ batchMessageContainerField.setAccessible(true);
+ batchMessageContainerField.set(spyProducer, batchMessageContainer);
+
+ spyProducer.send("memory-test".getBytes(StandardCharsets.UTF_8));
+ Assert.fail("can not reach here");
+ } catch (PulsarClientException ex) {
+ PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient;
+ final MemoryLimitController memoryLimitController = clientImpl.getMemoryLimitController();
+ Assert.assertEquals(memoryLimitController.currentUsage(), 0);
+ }
+ }
+
@Test(timeOut = 10_000)
public void testProducerCloseMemoryRelease() throws Exception {
initClientWithMemoryLimit();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
index 2bc81c48f9b..f04f94d11b1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
@@ -19,6 +19,10 @@
package org.apache.pulsar.client.impl;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import io.netty.buffer.ByteBufAllocator;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
@@ -274,4 +278,38 @@ public class ProducerSemaphoreTest extends ProducerConsumerBase {
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 10);
}
}
+
+ @Test(timeOut = 10_000)
+ public void testBatchMessageOOMProducerSemaphoreRelease() throws Exception {
+ final int pendingQueueSize = 10;
+ @Cleanup
+ ProducerImpl<byte[]> producer =
+ (ProducerImpl<byte[]>) pulsarClient.newProducer()
+ .topic("testProducerSemaphoreRelease")
+ .sendTimeout(5, TimeUnit.SECONDS)
+ .maxPendingMessages(pendingQueueSize)
+ .enableBatching(true)
+ .batchingMaxPublishDelay(500, TimeUnit.MILLISECONDS)
+ .batchingMaxBytes(12)
+ .create();
+ this.stopBroker();
+
+ try {
+ ProducerImpl<byte[]> spyProducer = Mockito.spy(producer);
+ final ByteBufAllocator mockAllocator = mock(ByteBufAllocator.class);
+ doAnswer((ignore) -> {
+ throw new OutOfMemoryError("semaphore-test");
+ }).when(mockAllocator).buffer(anyInt());
+
+ final BatchMessageContainerImpl batchMessageContainer = new BatchMessageContainerImpl(mockAllocator);
+ Field batchMessageContainerField = ProducerImpl.class.getDeclaredField("batchMessageContainer");
+ batchMessageContainerField.setAccessible(true);
+ batchMessageContainerField.set(spyProducer, batchMessageContainer);
+
+ spyProducer.send("semaphore-test".getBytes(StandardCharsets.UTF_8));
+ Assert.fail("can not reach here");
+ } catch (PulsarClientException ex) {
+ Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize);
+ }
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index 44fad489dac..43c229d6e81 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -108,6 +108,8 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
}
} catch (Throwable e) {
log.error("construct first message failed, exception is ", e);
+ producer.semaphoreRelease(getNumMessagesInBatch());
+ producer.client.getMemoryLimitController().releaseMemory(msg.getUncompressedSize());
discard(new PulsarClientException(e));
return false;
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
index a4f0205c2cf..1e640301f89 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertTrue;
import io.netty.buffer.ByteBufAllocator;
+import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -31,6 +32,7 @@ import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.junit.Assert;
import org.testng.annotations.Test;
public class BatchMessageContainerImplTest {
@@ -41,6 +43,17 @@ public class BatchMessageContainerImplTest {
final ProducerImpl<?> producer = mock(ProducerImpl.class);
final ProducerConfigurationData producerConfigurationData = new ProducerConfigurationData();
producerConfigurationData.setCompressionType(CompressionType.NONE);
+ PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+ MemoryLimitController memoryLimitController = mock(MemoryLimitController.class);
+ when(pulsarClient.getMemoryLimitController()).thenReturn(memoryLimitController);
+ try {
+ Field clientFiled = HandlerState.class.getDeclaredField("client");
+ clientFiled.setAccessible(true);
+ clientFiled.set(producer, pulsarClient);
+ } catch (Exception e){
+ Assert.fail(e.getMessage());
+ }
+
when(producer.getConfiguration()).thenReturn(producerConfigurationData);
final ByteBufAllocator mockAllocator = mock(ByteBufAllocator.class);
doAnswer((ignore) -> {