You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/09 08:16:37 UTC

[pulsar] 04/09: [Issue 11936] forget to call SendCallback on producer close (#11939)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1703affb54cc2443dcb1bc185c5b2af7859710af
Author: Shoothzj <sh...@gmail.com>
AuthorDate: Tue Sep 7 23:41:06 2021 +0800

    [Issue 11936] forget to call SendCallback on producer close (#11939)
    
    * forget to call SendCallback on producer close
    
    * add unit tests
    
    * add unit tests
    
    (cherry picked from commit d494c43cc5cb249a7d139a0ee1a600103805bb85)
---
 .../pulsar/client/impl/ProducerCloseTest.java      | 82 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ProducerImpl.java    | 31 ++++----
 2 files changed, 97 insertions(+), 16 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java
new file mode 100644
index 0000000..0c4df15
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.common.api.proto.CommandSuccess;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+@Test(groups = "broker-impl")
+public class ProducerCloseTest extends ProducerConsumerBase {
+
+    @Override
+    @BeforeMethod
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @Override
+    @AfterMethod(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test(timeOut = 10_000)
+    public void testProducerCloseCallback() throws Exception {
+        initClient();
+        @Cleanup
+        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
+                .topic("testProducerClose")
+                .sendTimeout(5, TimeUnit.SECONDS)
+                .maxPendingMessages(0)
+                .enableBatching(false)
+                .create();
+        final TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage();
+        final TypedMessageBuilder<byte[]> value = messageBuilder.value("test-msg".getBytes(StandardCharsets.UTF_8));
+        producer.getClientCnx().channel().config().setAutoRead(false);
+        final CompletableFuture<MessageId> completableFuture = value.sendAsync();
+        producer.closeAsync();
+        final CommandSuccess commandSuccess = new CommandSuccess();
+        PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient;
+        commandSuccess.setRequestId(clientImpl.newRequestId() -1);
+        producer.getClientCnx().handleSuccess(commandSuccess);
+        Thread.sleep(3000);
+        Assert.assertEquals(completableFuture.isDone(), true);
+    }
+
+    private void initClient() throws PulsarClientException {
+        pulsarClient = PulsarClient.builder().
+                serviceUrl(lookupUrl.toString())
+                .build();
+    }
+
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index e531345..84062fb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -871,16 +871,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             synchronized (this) {
                 setState(State.Closed);
                 client.cleanupProducer(this);
-                PulsarClientException ex = new PulsarClientException.AlreadyClosedException(
-                    format("The producer %s of the topic %s was already closed when closing the producers",
-                        producerName, topic));
-                pendingMessages.forEach(msg -> {
-                    client.getMemoryLimitController().releaseMemory(msg.uncompressedSize);
-                    msg.sendComplete(ex);
-                    msg.cmd.release();
-                    msg.recycle();
-                });
-                pendingMessages.clear();
+                clearPendingMessagesWhenClose();
             }
 
             return CompletableFuture.completedFuture(null);
@@ -898,12 +889,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                 synchronized (ProducerImpl.this) {
                     log.info("[{}] [{}] Closed Producer", topic, producerName);
                     setState(State.Closed);
-                    pendingMessages.forEach(msg -> {
-                        client.getMemoryLimitController().releaseMemory(msg.uncompressedSize);
-                        msg.cmd.release();
-                        msg.recycle();
-                    });
-                    pendingMessages.clear();
+                    clearPendingMessagesWhenClose();
                 }
 
                 closeFuture.complete(null);
@@ -918,6 +904,19 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         return closeFuture;
     }
 
+    private void clearPendingMessagesWhenClose() {
+        PulsarClientException ex = new PulsarClientException.AlreadyClosedException(
+                format("The producer %s of the topic %s was already closed when closing the producers",
+                        producerName, topic));
+        pendingMessages.forEach(msg -> {
+            client.getMemoryLimitController().releaseMemory(msg.uncompressedSize);
+            msg.sendComplete(ex);
+            msg.cmd.release();
+            msg.recycle();
+        });
+        pendingMessages.clear();
+    }
+
     @Override
     public boolean isConnected() {
         return connectionHandler.cnx() != null && (getState() == State.Ready);