You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/09 08:16:37 UTC
[pulsar] 04/09: [Issue 11936] forget to call SendCallback on
producer close (#11939)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 1703affb54cc2443dcb1bc185c5b2af7859710af
Author: Shoothzj <sh...@gmail.com>
AuthorDate: Tue Sep 7 23:41:06 2021 +0800
[Issue 11936] forget to call SendCallback on producer close (#11939)
* forget to call SendCallback on producer close
* add unit tests
* add unit tests
(cherry picked from commit d494c43cc5cb249a7d139a0ee1a600103805bb85)
---
.../pulsar/client/impl/ProducerCloseTest.java | 82 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ProducerImpl.java | 31 ++++----
2 files changed, 97 insertions(+), 16 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java
new file mode 100644
index 0000000..0c4df15
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.common.api.proto.CommandSuccess;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+@Test(groups = "broker-impl")
+public class ProducerCloseTest extends ProducerConsumerBase {
+
+ @Override
+ @BeforeMethod
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @Override
+ @AfterMethod(alwaysRun = true)
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test(timeOut = 10_000)
+ public void testProducerCloseCallback() throws Exception {
+ initClient();
+ @Cleanup
+ ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
+ .topic("testProducerClose")
+ .sendTimeout(5, TimeUnit.SECONDS)
+ .maxPendingMessages(0)
+ .enableBatching(false)
+ .create();
+ final TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage();
+ final TypedMessageBuilder<byte[]> value = messageBuilder.value("test-msg".getBytes(StandardCharsets.UTF_8));
+ producer.getClientCnx().channel().config().setAutoRead(false);
+ final CompletableFuture<MessageId> completableFuture = value.sendAsync();
+ producer.closeAsync();
+ final CommandSuccess commandSuccess = new CommandSuccess();
+ PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient;
+ commandSuccess.setRequestId(clientImpl.newRequestId() -1);
+ producer.getClientCnx().handleSuccess(commandSuccess);
+ Thread.sleep(3000);
+ Assert.assertEquals(completableFuture.isDone(), true);
+ }
+
+ private void initClient() throws PulsarClientException {
+ pulsarClient = PulsarClient.builder().
+ serviceUrl(lookupUrl.toString())
+ .build();
+ }
+
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index e531345..84062fb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -871,16 +871,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
synchronized (this) {
setState(State.Closed);
client.cleanupProducer(this);
- PulsarClientException ex = new PulsarClientException.AlreadyClosedException(
- format("The producer %s of the topic %s was already closed when closing the producers",
- producerName, topic));
- pendingMessages.forEach(msg -> {
- client.getMemoryLimitController().releaseMemory(msg.uncompressedSize);
- msg.sendComplete(ex);
- msg.cmd.release();
- msg.recycle();
- });
- pendingMessages.clear();
+ clearPendingMessagesWhenClose();
}
return CompletableFuture.completedFuture(null);
@@ -898,12 +889,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
synchronized (ProducerImpl.this) {
log.info("[{}] [{}] Closed Producer", topic, producerName);
setState(State.Closed);
- pendingMessages.forEach(msg -> {
- client.getMemoryLimitController().releaseMemory(msg.uncompressedSize);
- msg.cmd.release();
- msg.recycle();
- });
- pendingMessages.clear();
+ clearPendingMessagesWhenClose();
}
closeFuture.complete(null);
@@ -918,6 +904,19 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
return closeFuture;
}
+ private void clearPendingMessagesWhenClose() {
+ PulsarClientException ex = new PulsarClientException.AlreadyClosedException(
+ format("The producer %s of the topic %s was already closed when closing the producers",
+ producerName, topic));
+ pendingMessages.forEach(msg -> {
+ client.getMemoryLimitController().releaseMemory(msg.uncompressedSize);
+ msg.sendComplete(ex);
+ msg.cmd.release();
+ msg.recycle();
+ });
+ pendingMessages.clear();
+ }
+
@Override
public boolean isConnected() {
return connectionHandler.cnx() != null && (getState() == State.Ready);