You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/06/13 03:26:39 UTC

[pulsar] branch branch-2.9 updated: [fix][client] Remove consumer when close consumer command is received (#15761)

This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 7349c23eeda [fix][client] Remove consumer when close consumer command is received (#15761)
7349c23eeda is described below

commit 7349c23eeda45c170e17ef6f267e6c9d1084606b
Author: mattison chao <ma...@gmail.com>
AuthorDate: Mon Jun 13 11:26:24 2022 +0800

    [fix][client] Remove consumer when close consumer command is received (#15761)
    
    (cherry picked from commit 5246c8e1cc44b96db6ba684e0ce64914cfd05a61)
---
 .../org/apache/pulsar/client/impl/ClientCnx.java     |  7 ++++---
 .../org/apache/pulsar/client/impl/ClientCnxTest.java | 20 ++++++++++++++++++++
 2 files changed, 24 insertions(+), 3 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 20325ade4f6..52d10ad996e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -22,7 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static org.apache.pulsar.client.impl.TransactionMetaStoreHandler.getExceptionByServerError;
 import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
-
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
@@ -120,7 +120,8 @@ public class ClientCnx extends PulsarHandler {
                     .expectedItems(16)
                     .concurrencyLevel(1)
                     .build();
-    private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers =
+    @VisibleForTesting
+    final ConcurrentLongHashMap<ConsumerImpl<?>> consumers =
             ConcurrentLongHashMap.<ConsumerImpl<?>>newBuilder()
                     .expectedItems(16)
                     .concurrencyLevel(1)
@@ -737,7 +738,7 @@ public class ClientCnx extends PulsarHandler {
     protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) {
         log.info("[{}] Broker notification of Closed consumer: {}", remoteAddress, closeConsumer.getConsumerId());
         final long consumerId = closeConsumer.getConsumerId();
-        ConsumerImpl<?> consumer = consumers.get(consumerId);
+        ConsumerImpl<?> consumer = consumers.remove(consumerId);
         if (consumer != null) {
             consumer.connectionClosed(this);
         } else {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
index 558c0bfa13f..a3a00b1b70e 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ThreadFactory;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.BrokerMetadataException;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.api.proto.CommandCloseConsumer;
 import org.apache.pulsar.common.api.proto.CommandError;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.protocol.Commands;
@@ -152,4 +153,23 @@ public class ClientCnxTest {
 
         eventLoop.shutdownGracefully();
     }
+
+    @Test
+    public void testHandleCloseConsumer() {
+        ThreadFactory threadFactory = new DefaultThreadFactory("testReceiveErrorAtSendConnectFrameState");
+        EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, threadFactory);
+        ClientConfigurationData conf = new ClientConfigurationData();
+        ClientCnx cnx = new ClientCnx(conf, eventLoop);
+
+        long consumerId = 1;
+        cnx.registerConsumer(consumerId, mock(ConsumerImpl.class));
+        assertEquals(cnx.consumers.size(), 1);
+
+        CommandCloseConsumer closeConsumer = new CommandCloseConsumer()
+                .setConsumerId(1);
+        cnx.handleCloseConsumer(closeConsumer);
+        assertEquals(cnx.consumers.size(), 0);
+
+        eventLoop.shutdownGracefully();
+    }
 }