You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/06/13 07:16:47 UTC
[pulsar] branch master updated: [fix][client] Remove producer when close producer command is received (#16028)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5ef895af7d8 [fix][client] Remove producer when close producer command is received (#16028)
5ef895af7d8 is described below
commit 5ef895af7d8dec851167e56cdf3e8bec11080f8d
Author: Zixuan Liu <no...@gmail.com>
AuthorDate: Mon Jun 13 15:16:39 2022 +0800
[fix][client] Remove producer when close producer command is received (#16028)
---
.../org/apache/pulsar/client/impl/ClientCnx.java | 5 +++--
.../apache/pulsar/client/impl/ClientCnxTest.java | 24 +++++++++++++++++++---
2 files changed, 24 insertions(+), 5 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index f37e820acb7..e0eee02e957 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -111,7 +111,8 @@ public class ClientCnx extends PulsarHandler {
// LookupRequests that waiting in client side.
private final Queue<Pair<Long, Pair<ByteBuf, TimedCompletableFuture<LookupDataResult>>>> waitingLookupRequests;
- private final ConcurrentLongHashMap<ProducerImpl<?>> producers =
+ @VisibleForTesting
+ final ConcurrentLongHashMap<ProducerImpl<?>> producers =
ConcurrentLongHashMap.<ProducerImpl<?>>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
@@ -721,7 +722,7 @@ public class ClientCnx extends PulsarHandler {
protected void handleCloseProducer(CommandCloseProducer closeProducer) {
log.info("[{}] Broker notification of Closed producer: {}", remoteAddress, closeProducer.getProducerId());
final long producerId = closeProducer.getProducerId();
- ProducerImpl<?> producer = producers.get(producerId);
+ ProducerImpl<?> producer = producers.remove(producerId);
if (producer != null) {
producer.connectionClosed(this);
} else {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
index a3a00b1b70e..6ce4afecd02 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.BrokerMetadataException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.proto.CommandCloseConsumer;
+import org.apache.pulsar.common.api.proto.CommandCloseProducer;
import org.apache.pulsar.common.api.proto.CommandError;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.protocol.Commands;
@@ -156,7 +157,7 @@ public class ClientCnxTest {
@Test
public void testHandleCloseConsumer() {
- ThreadFactory threadFactory = new DefaultThreadFactory("testReceiveErrorAtSendConnectFrameState");
+ ThreadFactory threadFactory = new DefaultThreadFactory("testHandleCloseConsumer");
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, threadFactory);
ClientConfigurationData conf = new ClientConfigurationData();
ClientCnx cnx = new ClientCnx(conf, eventLoop);
@@ -165,11 +166,28 @@ public class ClientCnxTest {
cnx.registerConsumer(consumerId, mock(ConsumerImpl.class));
assertEquals(cnx.consumers.size(), 1);
- CommandCloseConsumer closeConsumer = new CommandCloseConsumer()
- .setConsumerId(1);
+ CommandCloseConsumer closeConsumer = new CommandCloseConsumer().setConsumerId(consumerId);
cnx.handleCloseConsumer(closeConsumer);
assertEquals(cnx.consumers.size(), 0);
eventLoop.shutdownGracefully();
}
+
+ @Test
+ public void testHandleCloseProducer() {
+ ThreadFactory threadFactory = new DefaultThreadFactory("testHandleCloseProducer");
+ EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, threadFactory);
+ ClientConfigurationData conf = new ClientConfigurationData();
+ ClientCnx cnx = new ClientCnx(conf, eventLoop);
+
+ long producerId = 1;
+ cnx.registerProducer(producerId, mock(ProducerImpl.class));
+ assertEquals(cnx.producers.size(), 1);
+
+ CommandCloseProducer closeProducerCmd = new CommandCloseProducer().setProducerId(producerId);
+ cnx.handleCloseProducer(closeProducerCmd);
+ assertEquals(cnx.producers.size(), 0);
+
+ eventLoop.shutdownGracefully();
+ }
}