You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/30 03:56:14 UTC

[pulsar] 06/13: Fix semaphore and memory leak when chunks failed to enqueue (#13454)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ef9e773ef454db3db4ed41de20858c2c0b2dadaf
Author: Yunze Xu <xy...@163.com>
AuthorDate: Fri Dec 24 10:56:17 2021 +0800

    Fix semaphore and memory leak when chunks failed to enqueue (#13454)
    
    ### Motivation
    
    When a large message is sent by chunks, each chunk needs to reserve a spot of the semaphore. However, when it failed, the already reserved memory from limiter and spots from semaphore are not released.
    
    ### Modifications
    
    - Release the semaphore and memory when `canEnqueueRequest` returns false for chunks.
    - Add `testChunksEnqueueFailed` to cover this case. It sends a large message whose number of chunks is greater than the `maxPendingMessages` so that the first time `canEnqueueRequest` returns true while the following `canEnqueueRequest` calls will return false.
    
    (cherry picked from commit 2e2cd57f984b601a878dd11e1d27f4c169a84e5b)
---
 .../pulsar/client/impl/MessageChunkingTest.java    | 43 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ProducerImpl.java    |  2 +
 2 files changed, 45 insertions(+)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
index d4eab77..40191ef 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.google.common.collect.Lists;
@@ -27,17 +28,20 @@ import com.google.common.collect.Sets;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -47,6 +51,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.client.impl.MessageImpl.SchemaState;
 import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
@@ -369,6 +374,44 @@ public class MessageChunkingTest extends ProducerConsumerBase {
         producer = null; // clean reference of mocked producer
     }
 
+    @Test
+    public void testChunksEnqueueFailed() throws Exception {
+        final String topicName = "persistent://my-property/my-ns/test-chunks-enqueue-failed";
+        log.info("-- Starting {} test --", methodName);
+        this.conf.setMaxMessageSize(5);
+
+        final MemoryLimitController controller = ((PulsarClientImpl) pulsarClient).getMemoryLimitController();
+        assertEquals(controller.currentUsage(), 0);
+
+        final int maxPendingMessages = 10;
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .maxPendingMessages(maxPendingMessages)
+                .enableChunking(true)
+                .enableBatching(false)
+                .create();
+        assertTrue(producer instanceof ProducerImpl);
+        Semaphore semaphore = ((ProducerImpl<byte[]>) producer).getSemaphore().orElse(null);
+        assertNotNull(semaphore);
+        assertEquals(semaphore.availablePermits(), maxPendingMessages);
+        producer.send(createMessagePayload(1).getBytes());
+        try {
+            producer.send(createMessagePayload(100).getBytes(StandardCharsets.UTF_8));
+            fail("It should fail with ProducerQueueIsFullError");
+        } catch (PulsarClientException e) {
+            assertTrue(e instanceof PulsarClientException.ProducerQueueIsFullError);
+            assertEquals(controller.currentUsage(), 0);
+            assertEquals(semaphore.availablePermits(), maxPendingMessages);
+        }
+    }
+
+    @Override
+    protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
+        clientBuilder.memoryLimit(10000L, SizeUnit.BYTES);
+    }
+
     private String createMessagePayload(int size) {
         StringBuilder str = new StringBuilder();
         Random rand = new Random();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 3afd660..a5baba7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -456,6 +456,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         // chunked message also sent individually so, try to acquire send-permits
         for (int i = 0; i < (totalChunks - 1); i++) {
             if (!canEnqueueRequest(callback, message.getSequenceId(), 0 /* The memory was already reserved */)) {
+                client.getMemoryLimitController().releaseMemory(uncompressedSize);
+                semaphoreRelease(i + 1);
                 return;
             }
         }