You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/28 15:13:10 UTC

[pulsar] 05/29: [fix][client] Remove producer when close producer command is received (#16028)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit eb60a55edf0cea48074a3a9a30510dbbb2cda240
Author: Zixuan Liu <no...@gmail.com>
AuthorDate: Mon Jun 13 15:16:39 2022 +0800

    [fix][client] Remove producer when close producer command is received (#16028)
    
    (cherry picked from commit 5ef895af7d8dec851167e56cdf3e8bec11080f8d)
---
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  5 +++--
 .../apache/pulsar/client/impl/ClientCnxTest.java   | 24 +++++++++++++++++++---
 2 files changed, 24 insertions(+), 5 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 38a508bb716..322138699d5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -111,7 +111,8 @@ public class ClientCnx extends PulsarHandler {
     // LookupRequests that waiting in client side.
     private final Queue<Pair<Long, Pair<ByteBuf, TimedCompletableFuture<LookupDataResult>>>> waitingLookupRequests;
 
-    private final ConcurrentLongHashMap<ProducerImpl<?>> producers =
+    @VisibleForTesting
+    final ConcurrentLongHashMap<ProducerImpl<?>> producers =
             ConcurrentLongHashMap.<ProducerImpl<?>>newBuilder()
                     .expectedItems(16)
                     .concurrencyLevel(1)
@@ -721,7 +722,7 @@ public class ClientCnx extends PulsarHandler {
     protected void handleCloseProducer(CommandCloseProducer closeProducer) {
         log.info("[{}] Broker notification of Closed producer: {}", remoteAddress, closeProducer.getProducerId());
         final long producerId = closeProducer.getProducerId();
-        ProducerImpl<?> producer = producers.get(producerId);
+        ProducerImpl<?> producer = producers.remove(producerId);
         if (producer != null) {
             producer.connectionClosed(this);
         } else {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
index a3a00b1b70e..6ce4afecd02 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.BrokerMetadataException;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.api.proto.CommandCloseConsumer;
+import org.apache.pulsar.common.api.proto.CommandCloseProducer;
 import org.apache.pulsar.common.api.proto.CommandError;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.protocol.Commands;
@@ -156,7 +157,7 @@ public class ClientCnxTest {
 
     @Test
     public void testHandleCloseConsumer() {
-        ThreadFactory threadFactory = new DefaultThreadFactory("testReceiveErrorAtSendConnectFrameState");
+        ThreadFactory threadFactory = new DefaultThreadFactory("testHandleCloseConsumer");
         EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, threadFactory);
         ClientConfigurationData conf = new ClientConfigurationData();
         ClientCnx cnx = new ClientCnx(conf, eventLoop);
@@ -165,11 +166,28 @@ public class ClientCnxTest {
         cnx.registerConsumer(consumerId, mock(ConsumerImpl.class));
         assertEquals(cnx.consumers.size(), 1);
 
-        CommandCloseConsumer closeConsumer = new CommandCloseConsumer()
-                .setConsumerId(1);
+        CommandCloseConsumer closeConsumer = new CommandCloseConsumer().setConsumerId(consumerId);
         cnx.handleCloseConsumer(closeConsumer);
         assertEquals(cnx.consumers.size(), 0);
 
         eventLoop.shutdownGracefully();
     }
+
+    @Test
+    public void testHandleCloseProducer() {
+        ThreadFactory threadFactory = new DefaultThreadFactory("testHandleCloseProducer");
+        EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, threadFactory);
+        ClientConfigurationData conf = new ClientConfigurationData();
+        ClientCnx cnx = new ClientCnx(conf, eventLoop);
+
+        long producerId = 1;
+        cnx.registerProducer(producerId, mock(ProducerImpl.class));
+        assertEquals(cnx.producers.size(), 1);
+
+        CommandCloseProducer closeProducerCmd = new CommandCloseProducer().setProducerId(producerId);
+        cnx.handleCloseProducer(closeProducerCmd);
+        assertEquals(cnx.producers.size(), 0);
+
+        eventLoop.shutdownGracefully();
+    }
 }