You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/07/02 03:04:51 UTC
[pulsar] branch branch-2.9 updated: [fix][client] Remove producer when close producer command is received (#16028)
This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 4fb769e0283 [fix][client] Remove producer when close producer command is received (#16028)
4fb769e0283 is described below
commit 4fb769e0283b539554f7d9d7e3c8d474cef27bfe
Author: Zixuan Liu <no...@gmail.com>
AuthorDate: Mon Jun 13 15:16:39 2022 +0800
[fix][client] Remove producer when close producer command is received (#16028)
(cherry picked from commit 5ef895af7d8dec851167e56cdf3e8bec11080f8d)
---
.../org/apache/pulsar/client/impl/ClientCnx.java | 5 +++--
.../apache/pulsar/client/impl/ClientCnxTest.java | 24 +++++++++++++++++++---
2 files changed, 24 insertions(+), 5 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 52d10ad996e..e8aed1abb55 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -115,7 +115,8 @@ public class ClientCnx extends PulsarHandler {
// LookupRequests that waiting in client side.
private final Queue<Pair<Long, Pair<ByteBuf, TimedCompletableFuture<LookupDataResult>>>> waitingLookupRequests;
- private final ConcurrentLongHashMap<ProducerImpl<?>> producers =
+ @VisibleForTesting
+ final ConcurrentLongHashMap<ProducerImpl<?>> producers =
ConcurrentLongHashMap.<ProducerImpl<?>>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
@@ -726,7 +727,7 @@ public class ClientCnx extends PulsarHandler {
protected void handleCloseProducer(CommandCloseProducer closeProducer) {
log.info("[{}] Broker notification of Closed producer: {}", remoteAddress, closeProducer.getProducerId());
final long producerId = closeProducer.getProducerId();
- ProducerImpl<?> producer = producers.get(producerId);
+ ProducerImpl<?> producer = producers.remove(producerId);
if (producer != null) {
producer.connectionClosed(this);
} else {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
index a3a00b1b70e..6ce4afecd02 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.BrokerMetadataException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.proto.CommandCloseConsumer;
+import org.apache.pulsar.common.api.proto.CommandCloseProducer;
import org.apache.pulsar.common.api.proto.CommandError;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.protocol.Commands;
@@ -156,7 +157,7 @@ public class ClientCnxTest {
@Test
public void testHandleCloseConsumer() {
- ThreadFactory threadFactory = new DefaultThreadFactory("testReceiveErrorAtSendConnectFrameState");
+ ThreadFactory threadFactory = new DefaultThreadFactory("testHandleCloseConsumer");
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, threadFactory);
ClientConfigurationData conf = new ClientConfigurationData();
ClientCnx cnx = new ClientCnx(conf, eventLoop);
@@ -165,11 +166,28 @@ public class ClientCnxTest {
cnx.registerConsumer(consumerId, mock(ConsumerImpl.class));
assertEquals(cnx.consumers.size(), 1);
- CommandCloseConsumer closeConsumer = new CommandCloseConsumer()
- .setConsumerId(1);
+ CommandCloseConsumer closeConsumer = new CommandCloseConsumer().setConsumerId(consumerId);
cnx.handleCloseConsumer(closeConsumer);
assertEquals(cnx.consumers.size(), 0);
eventLoop.shutdownGracefully();
}
+
+ @Test
+ public void testHandleCloseProducer() {
+ ThreadFactory threadFactory = new DefaultThreadFactory("testHandleCloseProducer");
+ EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, threadFactory);
+ ClientConfigurationData conf = new ClientConfigurationData();
+ ClientCnx cnx = new ClientCnx(conf, eventLoop);
+
+ long producerId = 1;
+ cnx.registerProducer(producerId, mock(ProducerImpl.class));
+ assertEquals(cnx.producers.size(), 1);
+
+ CommandCloseProducer closeProducerCmd = new CommandCloseProducer().setProducerId(producerId);
+ cnx.handleCloseProducer(closeProducerCmd);
+ assertEquals(cnx.producers.size(), 0);
+
+ eventLoop.shutdownGracefully();
+ }
}