You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/06/13 01:01:23 UTC

[pulsar] branch master updated: [fix][client] Remove consumer when close consumer command is received (#15761)

This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5246c8e1cc4 [fix][client] Remove consumer when close consumer command is received (#15761)
5246c8e1cc4 is described below

commit 5246c8e1cc44b96db6ba684e0ce64914cfd05a61
Author: Zixuan Liu <no...@gmail.com>
AuthorDate: Mon Jun 13 09:01:16 2022 +0800

    [fix][client] Remove consumer when close consumer command is received (#15761)
---
 .../org/apache/pulsar/client/impl/ClientCnx.java     |  6 ++++--
 .../org/apache/pulsar/client/impl/ClientCnxTest.java | 20 ++++++++++++++++++++
 2 files changed, 24 insertions(+), 2 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 0e1709b709a..f37e820acb7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static org.apache.pulsar.client.impl.TransactionMetaStoreHandler.getExceptionByServerError;
 import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
@@ -115,7 +116,8 @@ public class ClientCnx extends PulsarHandler {
                     .expectedItems(16)
                     .concurrencyLevel(1)
                     .build();
-    private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers =
+    @VisibleForTesting
+    final ConcurrentLongHashMap<ConsumerImpl<?>> consumers =
             ConcurrentLongHashMap.<ConsumerImpl<?>>newBuilder()
                     .expectedItems(16)
                     .concurrencyLevel(1)
@@ -731,7 +733,7 @@ public class ClientCnx extends PulsarHandler {
     protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) {
         log.info("[{}] Broker notification of Closed consumer: {}", remoteAddress, closeConsumer.getConsumerId());
         final long consumerId = closeConsumer.getConsumerId();
-        ConsumerImpl<?> consumer = consumers.get(consumerId);
+        ConsumerImpl<?> consumer = consumers.remove(consumerId);
         if (consumer != null) {
             consumer.connectionClosed(this);
         } else {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
index 558c0bfa13f..a3a00b1b70e 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ThreadFactory;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.BrokerMetadataException;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.api.proto.CommandCloseConsumer;
 import org.apache.pulsar.common.api.proto.CommandError;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.protocol.Commands;
@@ -152,4 +153,23 @@ public class ClientCnxTest {
 
         eventLoop.shutdownGracefully();
     }
+
+    @Test
+    public void testHandleCloseConsumer() {
+        ThreadFactory threadFactory = new DefaultThreadFactory("testReceiveErrorAtSendConnectFrameState");
+        EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, threadFactory);
+        ClientConfigurationData conf = new ClientConfigurationData();
+        ClientCnx cnx = new ClientCnx(conf, eventLoop);
+
+        long consumerId = 1;
+        cnx.registerConsumer(consumerId, mock(ConsumerImpl.class));
+        assertEquals(cnx.consumers.size(), 1);
+
+        CommandCloseConsumer closeConsumer = new CommandCloseConsumer()
+                .setConsumerId(1);
+        cnx.handleCloseConsumer(closeConsumer);
+        assertEquals(cnx.consumers.size(), 0);
+
+        eventLoop.shutdownGracefully();
+    }
 }