You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/06/13 03:26:39 UTC
[pulsar] branch branch-2.9 updated: [fix][client] Remove consumer when close consumer command is received (#15761)
This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 7349c23eeda [fix][client] Remove consumer when close consumer command is received (#15761)
7349c23eeda is described below
commit 7349c23eeda45c170e17ef6f267e6c9d1084606b
Author: mattison chao <ma...@gmail.com>
AuthorDate: Mon Jun 13 11:26:24 2022 +0800
[fix][client] Remove consumer when close consumer command is received (#15761)
(cherry picked from commit 5246c8e1cc44b96db6ba684e0ce64914cfd05a61)
---
.../org/apache/pulsar/client/impl/ClientCnx.java | 7 ++++---
.../org/apache/pulsar/client/impl/ClientCnxTest.java | 20 ++++++++++++++++++++
2 files changed, 24 insertions(+), 3 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 20325ade4f6..52d10ad996e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -22,7 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static org.apache.pulsar.client.impl.TransactionMetaStoreHandler.getExceptionByServerError;
import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
-
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Queues;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
@@ -120,7 +120,8 @@ public class ClientCnx extends PulsarHandler {
.expectedItems(16)
.concurrencyLevel(1)
.build();
- private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers =
+ @VisibleForTesting
+ final ConcurrentLongHashMap<ConsumerImpl<?>> consumers =
ConcurrentLongHashMap.<ConsumerImpl<?>>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
@@ -737,7 +738,7 @@ public class ClientCnx extends PulsarHandler {
protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) {
log.info("[{}] Broker notification of Closed consumer: {}", remoteAddress, closeConsumer.getConsumerId());
final long consumerId = closeConsumer.getConsumerId();
- ConsumerImpl<?> consumer = consumers.get(consumerId);
+ ConsumerImpl<?> consumer = consumers.remove(consumerId);
if (consumer != null) {
consumer.connectionClosed(this);
} else {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
index 558c0bfa13f..a3a00b1b70e 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ThreadFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.BrokerMetadataException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.api.proto.CommandCloseConsumer;
import org.apache.pulsar.common.api.proto.CommandError;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.protocol.Commands;
@@ -152,4 +153,23 @@ public class ClientCnxTest {
eventLoop.shutdownGracefully();
}
+
+ @Test
+ public void testHandleCloseConsumer() {
+ ThreadFactory threadFactory = new DefaultThreadFactory("testReceiveErrorAtSendConnectFrameState");
+ EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, threadFactory);
+ ClientConfigurationData conf = new ClientConfigurationData();
+ ClientCnx cnx = new ClientCnx(conf, eventLoop);
+
+ long consumerId = 1;
+ cnx.registerConsumer(consumerId, mock(ConsumerImpl.class));
+ assertEquals(cnx.consumers.size(), 1);
+
+ CommandCloseConsumer closeConsumer = new CommandCloseConsumer()
+ .setConsumerId(1);
+ cnx.handleCloseConsumer(closeConsumer);
+ assertEquals(cnx.consumers.size(), 0);
+
+ eventLoop.shutdownGracefully();
+ }
}