You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/08/08 09:04:27 UTC
[pulsar] 23/33: [fix][client]Fix client memory limit currentUsage leak and semaphore release duplicated in ProducerImpl (#16837)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 7c73269cda7855045428a3d53df09632cfbbde94
Author: lixinyang <84...@users.noreply.github.com>
AuthorDate: Sat Jul 30 19:31:07 2022 +0800
[fix][client]Fix client memory limit currentUsage leak and semaphore release duplicated in ProducerImpl (#16837)
### Motivation
Fix client memory limit `currentUsage` leak in `ProducerImpl`. When our pulsar cluster occur some error, producer send message fail and we find the `currentUsage` always keep high value like the leaked, and cause the producer send rate is slow.
And find producer semaphore release duplicated when `createOpSendMsg` occur some excrption.
Follow 1 point only release the message count semaphore, but not release the memory limit.
**memory limit currentUsage leak point**
https://github.com/apache/pulsar/blob/c217b8f559292fd34c6a4fb4b30aab213720d962/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2031-L2033
**producer semaphore release duplicated**
https://github.com/apache/pulsar/blob/4d64e2e66689381ebbb94fbfc03eb4e1dfba0405/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2116-L2120
```
After the exception the memory limit leak occured.
org.apache.pulsar.client.api.PulsarClientException$TimeoutException: The producer XXXX-366-15151 can not send message to the topic persistent://XXXX/XXXX/XXXX within given timeout : createdAt 30.005 seconds ago, firstSentAt 30.005 seconds ago, lastSentAt 30.005 seconds ago, retryCount 1
at org.apache.pulsar.client.impl.ProducerImpl$OpSendMsg.sendComplete(ProducerImpl.java:1287)
at org.apache.pulsar.client.impl.ProducerImpl.lambda$failPendingMessages$18(ProducerImpl.java:1826)
at java.base/java.util.ArrayDeque.forEach(ArrayDeque.java:889)
at org.apache.pulsar.client.impl.ProducerImpl$OpSendMsgQueue.forEach(ProducerImpl.java:1369)
at org.apache.pulsar.client.impl.ProducerImpl.failPendingMessages(ProducerImpl.java:1816)
at org.apache.pulsar.client.impl.ProducerImpl.lambda$failPendingMessages$19(ProducerImpl.java:1848)
at org.apache.pulsar.shade.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
at org.apache.pulsar.shade.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
```
### Modifications
1. add the `MemoryLimitController` release.
### Documentation
- [X] `doc-not-needed`
(cherry picked from commit 955dcd10ce28b996811e194c9ad852b06ab30aee)
---
.../client/impl/ProducerMemoryLimitTest.java | 29 +++++++++++++++++
.../pulsar/client/impl/ProducerSemaphoreTest.java | 36 +++++++++++++++++++++-
.../apache/pulsar/client/impl/ProducerImpl.java | 5 ++-
3 files changed, 66 insertions(+), 4 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
index 0856dfc88b2..8bbfa19c509 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
@@ -96,6 +96,35 @@ public class ProducerMemoryLimitTest extends ProducerConsumerBase {
}
+ @Test(timeOut = 10_000)
+ public void testProducerBatchSendTimeoutMemoryRelease() throws Exception {
+ initClientWithMemoryLimit();
+ @Cleanup
+ ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
+ .topic("testProducerMemoryLimit")
+ .sendTimeout(5, TimeUnit.SECONDS)
+ .maxPendingMessages(0)
+ .enableBatching(true)
+ .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
+ .batchingMaxBytes(12)
+ .create();
+ this.stopBroker();
+ try {
+ producer.newMessage().value("memory-test".getBytes(StandardCharsets.UTF_8)).sendAsync();
+ try {
+ producer.newMessage().value("memory-test".getBytes(StandardCharsets.UTF_8)).sendAsync().get();
+ } catch (Exception e) {
+ throw PulsarClientException.unwrap(e);
+ }
+
+ throw new IllegalStateException("can not reach here");
+ } catch (PulsarClientException.TimeoutException ex) {
+ PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient;
+ final MemoryLimitController memoryLimitController = clientImpl.getMemoryLimitController();
+ Assert.assertEquals(memoryLimitController.currentUsage(), 0);
+ }
+ }
+
@Test(timeOut = 10_000)
public void testProducerCloseMemoryRelease() throws Exception {
initClientWithMemoryLimit();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
index cc7b601e42a..9507e9a5e2c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
@@ -18,6 +18,10 @@
*/
package org.apache.pulsar.client.impl;
+import static org.mockito.ArgumentMatchers.any;
+import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.ProducerConsumerBase;
@@ -33,7 +37,6 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -229,4 +232,35 @@ public class ProducerSemaphoreTest extends ProducerConsumerBase {
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize);
Assert.assertFalse(producer.isErrorStat());
}
+
+ @Test(timeOut = 10_000)
+ public void testBatchMessageSendTimeoutProducerSemaphoreRelease() throws Exception {
+ final int pendingQueueSize = 10;
+ @Cleanup
+ ProducerImpl<byte[]> producer =
+ (ProducerImpl<byte[]>) pulsarClient.newProducer()
+ .topic("testProducerSemaphoreRelease")
+ .sendTimeout(5, TimeUnit.SECONDS)
+ .maxPendingMessages(pendingQueueSize)
+ .enableBatching(true)
+ .batchingMaxPublishDelay(500, TimeUnit.MILLISECONDS)
+ .batchingMaxBytes(12)
+ .create();
+ this.stopBroker();
+ try {
+ ProducerImpl<byte[]> spyProducer = Mockito.spy(producer);
+ Mockito.doThrow(new PulsarClientException.CryptoException("crypto error")).when(spyProducer)
+ .encryptMessage(any(),any());
+
+ Field batchMessageContainerField = ProducerImpl.class.getDeclaredField("batchMessageContainer");
+ batchMessageContainerField.setAccessible(true);
+ BatchMessageContainerImpl batchMessageContainer = (BatchMessageContainerImpl) batchMessageContainerField.get(spyProducer);
+ batchMessageContainer.setProducer(spyProducer);
+ spyProducer.send("semaphore-test".getBytes(StandardCharsets.UTF_8));
+
+ throw new IllegalStateException("can not reach here");
+ } catch (PulsarClientException.TimeoutException ex) {
+ Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 10);
+ }
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 99197262585..9c25f269cee 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1959,8 +1959,10 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
return;
}
final int numMessagesInBatch = batchMessageContainer.getNumMessagesInBatch();
+ final long currentBatchSize = batchMessageContainer.getCurrentBatchSize();
batchMessageContainer.discard(ex);
semaphoreRelease(numMessagesInBatch);
+ client.getMemoryLimitController().releaseMemory(currentBatchSize);
}
@Override
@@ -2005,10 +2007,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
for (OpSendMsg opSendMsg : opSendMsgs) {
processOpSendMsg(opSendMsg);
}
- } catch (PulsarClientException e) {
- semaphoreRelease(batchMessageContainer.getNumMessagesInBatch());
} catch (Throwable t) {
- semaphoreRelease(batchMessageContainer.getNumMessagesInBatch());
log.warn("[{}] [{}] error while create opSendMsg by batch message container", topic, producerName, t);
}
}