You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/30 03:22:05 UTC
[pulsar] 11/18: Fix reousrce leak when create producer failed (#13505)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit fc3ab2eaca4b7154b61b18aa360efc8d99a06999
Author: Aloys <lo...@gmail.com>
AuthorDate: Tue Dec 28 19:06:43 2021 +0800
Fix reousrce leak when create producer failed (#13505)
Fixes #13214
### Motivation
When client create producer failed caused by connection failed, topic terminated, or produce fenced. There are some resources that are not released in the client.
### Modifications
When creating producer failed.
1. stop the sendTimout task
2. cancel the batchTimerTask
3. cancel the keyGeneratorTask
4. cancel the statTimeout task
(cherry picked from commit 57eccf48e418e0243f586a945134785dbe9b5d08)
---
.../broker/service/ExclusiveProducerTest.java | 28 +++++++++++++++
.../broker/service/TopicTerminationTest.java | 28 +++++++++++++++
.../apache/pulsar/client/impl/ProducerImpl.java | 42 +++++++++++++---------
3 files changed, 81 insertions(+), 17 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
index 04da55d..4d2b16a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
@@ -25,6 +25,7 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import io.netty.util.HashedWheelTimer;
import lombok.Cleanup;
import org.apache.pulsar.client.api.Producer;
@@ -33,8 +34,11 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
import org.apache.pulsar.client.api.PulsarClientException.ProducerFencedException;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
+import org.awaitility.Awaitility;
import org.powermock.reflect.Whitebox;
+import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
@@ -118,6 +122,30 @@ public class ExclusiveProducerTest extends BrokerTestBase {
}
@Test(dataProvider = "topics")
+ public void testProducerTasksCleanupWhenUsingExclusiveProducers(String type, boolean partitioned) throws Exception {
+ String topic = newTopic(type, partitioned);
+ Producer<String> p1 = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .accessMode(ProducerAccessMode.Exclusive)
+ .create();
+
+ try {
+ pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .accessMode(ProducerAccessMode.Exclusive)
+ .create();
+ fail("Should have failed");
+ } catch (ProducerFencedException e) {
+ // Expected
+ }
+
+ p1.close();
+
+ HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl) pulsarClient).timer();
+ Awaitility.await().untilAsserted(() -> Assert.assertEquals(timer.pendingTimeouts(), 0));
+ }
+
+ @Test(dataProvider = "topics")
public void existingSharedProducer(String type, boolean partitioned) throws Exception {
String topic = newTopic(type, partitioned);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
index 0cd84f3..dedc990 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
@@ -31,6 +31,8 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+
+import io.netty.util.HashedWheelTimer;
import org.apache.pulsar.client.admin.PulsarAdminException.NotAllowedException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -41,8 +43,10 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderListener;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
+import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -108,6 +112,30 @@ public class TopicTerminationTest extends BrokerTestBase {
}
}
+ public void testCreatingProducerTasksCleanupWhenOnTerminatedTopic() throws Exception {
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+
+ producer.send("msg-1".getBytes());
+ producer.send("msg-2".getBytes());
+ MessageId msgId3 = producer.send("msg-3".getBytes());
+
+ MessageId lastMessageId = admin.topics().terminateTopicAsync(topicName).get();
+ assertEquals(lastMessageId, msgId3);
+ producer.close();
+
+ try {
+ pulsarClient.newProducer().topic(topicName).create();
+ fail("Should have thrown exception");
+ } catch (PulsarClientException.TopicTerminatedException e) {
+ // Expected
+ }
+ HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl) pulsarClient).timer();
+ Awaitility.await().untilAsserted(() -> Assert.assertEquals(timer.pendingTimeouts(), 0));
+ }
+
@Test(timeOut = 20000)
public void testTerminateWhilePublishing() throws Exception {
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 5c22f13..31a32da 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -872,23 +872,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
return CompletableFuture.completedFuture(null);
}
- Timeout timeout = sendTimeout;
- if (timeout != null) {
- timeout.cancel();
- sendTimeout = null;
- }
-
- ScheduledFuture<?> batchTimerTask = this.batchTimerTask;
- if (batchTimerTask != null) {
- batchTimerTask.cancel(false);
- this.batchTimerTask = null;
- }
-
- if (keyGeneratorTask != null && !keyGeneratorTask.isCancelled()) {
- keyGeneratorTask.cancel(false);
- }
-
- stats.cancelStatsTimeout();
+ closeProducerTasks();
ClientCnx cnx = cnx();
if (cnx == null || currentState != State.Ready) {
@@ -1558,6 +1542,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
failPendingMessages(cnx(), (PulsarClientException) cause);
}
producerCreatedFuture.completeExceptionally(cause);
+ closeProducerTasks();
client.cleanupProducer(this);
} else if (cause instanceof PulsarClientException.ProducerFencedException) {
setState(State.ProducerFenced);
@@ -1565,6 +1550,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
failPendingMessages(cnx(), (PulsarClientException) cause);
}
producerCreatedFuture.completeExceptionally(cause);
+ closeProducerTasks();
client.cleanupProducer(this);
} else if (producerCreatedFuture.isDone() || //
(cause instanceof PulsarClientException && PulsarClientException.isRetriableError(cause)
@@ -1575,6 +1561,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
} else {
setState(State.Failed);
producerCreatedFuture.completeExceptionally(cause);
+ closeProducerTasks();
client.cleanupProducer(this);
Timeout timeout = sendTimeout;
if (timeout != null) {
@@ -1599,6 +1586,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
} else {
log.info("[{}] Producer creation failed for producer {} after producerTimeout", topic, producerId);
}
+ closeProducerTasks();
setState(State.Failed);
client.cleanupProducer(this);
}
@@ -1607,6 +1595,26 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
}
}
+ private void closeProducerTasks() {
+ Timeout timeout = sendTimeout;
+ if (timeout != null) {
+ timeout.cancel();
+ sendTimeout = null;
+ }
+
+ ScheduledFuture<?> batchTimerTask = this.batchTimerTask;
+ if (batchTimerTask != null) {
+ batchTimerTask.cancel(false);
+ this.batchTimerTask = null;
+ }
+
+ if (keyGeneratorTask != null && !keyGeneratorTask.isCancelled()) {
+ keyGeneratorTask.cancel(false);
+ }
+
+ stats.cancelStatsTimeout();
+ }
+
private void resendMessages(ClientCnx cnx, long expectedEpoch) {
cnx.ctx().channel().eventLoop().execute(() -> {
synchronized (this) {