You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/30 03:56:14 UTC
[pulsar] 06/13: Fix semaphore and memory leak when chunks failed to enqueue (#13454)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ef9e773ef454db3db4ed41de20858c2c0b2dadaf
Author: Yunze Xu <xy...@163.com>
AuthorDate: Fri Dec 24 10:56:17 2021 +0800
Fix semaphore and memory leak when chunks failed to enqueue (#13454)
### Motivation
When a large message is sent by chunks, each chunk needs to reserve a spot of the semaphore. However, when it failed, the already reserved memory from limiter and spots from semaphore are not released.
### Modifications
- Release the semaphore and memory when `canEnqueueRequest` returns false for chunks.
- Add `testChunksEnqueueFailed` to cover this case. It sends a large message whose number of chunks is greater than the `maxPendingMessages` so that the first time `canEnqueueRequest` returns true while the following `canEnqueueRequest` calls will return false.
(cherry picked from commit 2e2cd57f984b601a878dd11e1d27f4c169a84e5b)
---
.../pulsar/client/impl/MessageChunkingTest.java | 43 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ProducerImpl.java | 2 +
2 files changed, 45 insertions(+)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
index d4eab77..40191ef 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
@@ -27,17 +28,20 @@ import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -47,6 +51,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.impl.MessageImpl.SchemaState;
import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
import org.apache.pulsar.common.api.proto.MessageMetadata;
@@ -369,6 +374,44 @@ public class MessageChunkingTest extends ProducerConsumerBase {
producer = null; // clean reference of mocked producer
}
+ @Test
+ public void testChunksEnqueueFailed() throws Exception {
+ final String topicName = "persistent://my-property/my-ns/test-chunks-enqueue-failed";
+ log.info("-- Starting {} test --", methodName);
+ this.conf.setMaxMessageSize(5);
+
+ final MemoryLimitController controller = ((PulsarClientImpl) pulsarClient).getMemoryLimitController();
+ assertEquals(controller.currentUsage(), 0);
+
+ final int maxPendingMessages = 10;
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName)
+ .maxPendingMessages(maxPendingMessages)
+ .enableChunking(true)
+ .enableBatching(false)
+ .create();
+ assertTrue(producer instanceof ProducerImpl);
+ Semaphore semaphore = ((ProducerImpl<byte[]>) producer).getSemaphore().orElse(null);
+ assertNotNull(semaphore);
+ assertEquals(semaphore.availablePermits(), maxPendingMessages);
+ producer.send(createMessagePayload(1).getBytes());
+ try {
+ producer.send(createMessagePayload(100).getBytes(StandardCharsets.UTF_8));
+ fail("It should fail with ProducerQueueIsFullError");
+ } catch (PulsarClientException e) {
+ assertTrue(e instanceof PulsarClientException.ProducerQueueIsFullError);
+ assertEquals(controller.currentUsage(), 0);
+ assertEquals(semaphore.availablePermits(), maxPendingMessages);
+ }
+ }
+
+ @Override
+ protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
+ clientBuilder.memoryLimit(10000L, SizeUnit.BYTES);
+ }
+
private String createMessagePayload(int size) {
StringBuilder str = new StringBuilder();
Random rand = new Random();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 3afd660..a5baba7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -456,6 +456,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
// chunked message also sent individually so, try to acquire send-permits
for (int i = 0; i < (totalChunks - 1); i++) {
if (!canEnqueueRequest(callback, message.getSequenceId(), 0 /* The memory was already reserved */)) {
+ client.getMemoryLimitController().releaseMemory(uncompressedSize);
+ semaphoreRelease(i + 1);
return;
}
}