You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/30 03:22:05 UTC

[pulsar] 11/18: Fix reousrce leak when create producer failed (#13505)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fc3ab2eaca4b7154b61b18aa360efc8d99a06999
Author: Aloys <lo...@gmail.com>
AuthorDate: Tue Dec 28 19:06:43 2021 +0800

    Fix reousrce leak when create producer failed (#13505)
    
    Fixes #13214
    
    ### Motivation
    
    When client create producer failed caused by connection failed, topic terminated, or produce fenced. There are some resources that are not released in the client.
    
    ### Modifications
    
    When creating producer failed.
    1. stop the sendTimout  task
    2. cancel the batchTimerTask
    3. cancel the keyGeneratorTask
    4. cancel the statTimeout task
    
    (cherry picked from commit 57eccf48e418e0243f586a945134785dbe9b5d08)
---
 .../broker/service/ExclusiveProducerTest.java      | 28 +++++++++++++++
 .../broker/service/TopicTerminationTest.java       | 28 +++++++++++++++
 .../apache/pulsar/client/impl/ProducerImpl.java    | 42 +++++++++++++---------
 3 files changed, 81 insertions(+), 17 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
index 04da55d..4d2b16a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
@@ -25,6 +25,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
+import io.netty.util.HashedWheelTimer;
 import lombok.Cleanup;
 
 import org.apache.pulsar.client.api.Producer;
@@ -33,8 +34,11 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
 import org.apache.pulsar.client.api.PulsarClientException.ProducerFencedException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.naming.TopicName;
+import org.awaitility.Awaitility;
 import org.powermock.reflect.Whitebox;
+import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
@@ -118,6 +122,30 @@ public class ExclusiveProducerTest extends BrokerTestBase {
     }
 
     @Test(dataProvider = "topics")
+    public void testProducerTasksCleanupWhenUsingExclusiveProducers(String type, boolean partitioned) throws Exception {
+        String topic = newTopic(type, partitioned);
+        Producer<String> p1 = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .accessMode(ProducerAccessMode.Exclusive)
+                .create();
+
+        try {
+            pulsarClient.newProducer(Schema.STRING)
+                    .topic(topic)
+                    .accessMode(ProducerAccessMode.Exclusive)
+                    .create();
+            fail("Should have failed");
+        } catch (ProducerFencedException e) {
+            // Expected
+        }
+
+        p1.close();
+
+        HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl) pulsarClient).timer();
+        Awaitility.await().untilAsserted(() -> Assert.assertEquals(timer.pendingTimeouts(), 0));
+    }
+
+    @Test(dataProvider = "topics")
     public void existingSharedProducer(String type, boolean partitioned) throws Exception {
         String topic = newTopic(type, partitioned);
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
index 0cd84f3..dedc990 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
@@ -31,6 +31,8 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+
+import io.netty.util.HashedWheelTimer;
 import org.apache.pulsar.client.admin.PulsarAdminException.NotAllowedException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -41,8 +43,10 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderListener;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -108,6 +112,30 @@ public class TopicTerminationTest extends BrokerTestBase {
         }
     }
 
+    public void testCreatingProducerTasksCleanupWhenOnTerminatedTopic() throws Exception {
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+
+        producer.send("msg-1".getBytes());
+        producer.send("msg-2".getBytes());
+        MessageId msgId3 = producer.send("msg-3".getBytes());
+
+        MessageId lastMessageId = admin.topics().terminateTopicAsync(topicName).get();
+        assertEquals(lastMessageId, msgId3);
+        producer.close();
+
+        try {
+            pulsarClient.newProducer().topic(topicName).create();
+            fail("Should have thrown exception");
+        } catch (PulsarClientException.TopicTerminatedException e) {
+            // Expected
+        }
+        HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl) pulsarClient).timer();
+        Awaitility.await().untilAsserted(() -> Assert.assertEquals(timer.pendingTimeouts(), 0));
+    }
+
     @Test(timeOut = 20000)
     public void testTerminateWhilePublishing() throws Exception {
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 5c22f13..31a32da 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -872,23 +872,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             return CompletableFuture.completedFuture(null);
         }
 
-        Timeout timeout = sendTimeout;
-        if (timeout != null) {
-            timeout.cancel();
-            sendTimeout = null;
-        }
-
-        ScheduledFuture<?> batchTimerTask = this.batchTimerTask;
-        if (batchTimerTask != null) {
-            batchTimerTask.cancel(false);
-            this.batchTimerTask = null;
-        }
-
-        if (keyGeneratorTask != null && !keyGeneratorTask.isCancelled()) {
-            keyGeneratorTask.cancel(false);
-        }
-
-        stats.cancelStatsTimeout();
+        closeProducerTasks();
 
         ClientCnx cnx = cnx();
         if (cnx == null || currentState != State.Ready) {
@@ -1558,6 +1542,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                             failPendingMessages(cnx(), (PulsarClientException) cause);
                         }
                         producerCreatedFuture.completeExceptionally(cause);
+                        closeProducerTasks();
                         client.cleanupProducer(this);
                     } else if (cause instanceof PulsarClientException.ProducerFencedException) {
                         setState(State.ProducerFenced);
@@ -1565,6 +1550,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                             failPendingMessages(cnx(), (PulsarClientException) cause);
                         }
                         producerCreatedFuture.completeExceptionally(cause);
+                        closeProducerTasks();
                         client.cleanupProducer(this);
                     } else if (producerCreatedFuture.isDone() || //
                                (cause instanceof PulsarClientException && PulsarClientException.isRetriableError(cause)
@@ -1575,6 +1561,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                     } else {
                         setState(State.Failed);
                         producerCreatedFuture.completeExceptionally(cause);
+                        closeProducerTasks();
                         client.cleanupProducer(this);
                         Timeout timeout = sendTimeout;
                         if (timeout != null) {
@@ -1599,6 +1586,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                 } else {
                     log.info("[{}] Producer creation failed for producer {} after producerTimeout", topic, producerId);
                 }
+                closeProducerTasks();
                 setState(State.Failed);
                 client.cleanupProducer(this);
             }
@@ -1607,6 +1595,26 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         }
     }
 
+    private void closeProducerTasks() {
+        Timeout timeout = sendTimeout;
+        if (timeout != null) {
+            timeout.cancel();
+            sendTimeout = null;
+        }
+
+        ScheduledFuture<?> batchTimerTask = this.batchTimerTask;
+        if (batchTimerTask != null) {
+            batchTimerTask.cancel(false);
+            this.batchTimerTask = null;
+        }
+
+        if (keyGeneratorTask != null && !keyGeneratorTask.isCancelled()) {
+            keyGeneratorTask.cancel(false);
+        }
+
+        stats.cancelStatsTimeout();
+    }
+
     private void resendMessages(ClientCnx cnx, long expectedEpoch) {
         cnx.ctx().channel().eventLoop().execute(() -> {
             synchronized (this) {