You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/12/08 20:30:02 UTC

[GitHub] [pulsar] rdhabalia opened a new pull request #13196: [pulsar-broker] clean up active consumer on already closed connection

rdhabalia opened a new pull request #13196:
URL: https://github.com/apache/pulsar/pull/13196


   ### Motivation
   
   We have been frequently seeing an issue with an exclusive subscription where the broker doesn't clean up consumers on closed on connection and because of that exclusive consumer keeps failing with `ConsumerBusyException` even though none of the consumers is actually connected to the broker.
   Below log example shows that connection is closed but still broker couldn't clean up consumer in race condition when consumer quickly disconnects after creating the subscription and subsequent consumer creation requests are keep failing.
   
   ```
   23:30:16.896 [pulsar-io-23-42] INFO  org.apache.pulsar.broker.service.ServerCnx - [/10.11.12.13:60851] Subscribing on topic persistent://my-prop/my-cluster/ns/topic / sub1
   :
   23:30:17.223 [pulsar-io-23-42] INFO  org.apache.pulsar.broker.service.ServerCnx - [/10.11.12.13:60851] Closed consumer, consumerId=20
   :
   23:30:17.291 [bookkeeper-ml-workers-OrderedExecutor-1-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://my-prop/my-cluster/ns/topic][sub1] Created new subscription for 25
   :
   23:30:17.301 [pulsar-io-23-42] INFO  org.apache.pulsar.broker.service.ServerCnx - Closed connection from /10.11.12.13:60851
   :
   23:30:17.302 [pulsar-io-23-42] INFO  org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer Consumer{subscription=PersistentSubscription{topic=persistent://my-prop/my-cluster/ns/topic, name=sub1}, consumerId=21, consumerName=c2302, address=/10.11.12.13:60851}
   :
   23:30:17.302 [bookkeeper-ml-workers-OrderedExecutor-1-0] INFO  org.apache.pulsar.broker.service.ServerCnx - [/10.11.12.13:60851] Created subscription on topic persistent://my-prop/my-cluster/ns/topic / sub1
   :
   :
   :
   
   23:30:17.496 [pulsar-io-23-36] WARN  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://my-prop/my-cluster/ns/topic][sub1] Consumer 25 7a977 already connected
   23:30:17.885 [pulsar-io-23-36] INFO  org.apache.pulsar.broker.service.ServerCnx - [/10.11.12.13:60853] Subscribing on topic persistent://my-prop/my-cluster/ns/topic / sub1
   23:30:17.886 [pulsar-io-23-36] WARN  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://my-prop/my-cluster/ns/topic][sub1] Consumer 25 7a977 already connected
   23:30:18.637 [pulsar-io-23-36] INFO  org.apache.pulsar.broker.service.ServerCnx - [/10.11.12.13:60853] Subscribing on topic persistent://my-prop/my-cluster/ns/topic / sub1
   ```
   
   ### Modification
   Broker should clean up consumer if connection is already closed and allow consumer to reconnect as an active consumer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on a change in pull request #13196: [pulsar-broker] clean up active consumer on already closed connection

Posted by GitBox <gi...@apache.org>.
Jason918 commented on a change in pull request #13196:
URL: https://github.com/apache/pulsar/pull/13196#discussion_r765386892



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -804,6 +804,16 @@ protected void handleProducerRemoved(Producer producer) {
                 if (ex.getCause() instanceof ConsumerBusyException) {
                     log.warn("[{}][{}] Consumer {} {} already connected", topic, subscriptionName, consumerId,
                             consumerName);
+                    Consumer consumer = null;
+                    try {
+                        consumer = subscriptionFuture.isDone() ? getActiveConsumer(subscriptionFuture.get()) : null;
+                        // cleanup consumer if connection is already closed
+                        if (consumer != null && !consumer.cnx().isActive()) {
+                            consumer.close();
+                        }
+                    } catch (Exception be) {
+                        log.info("Failed to clean up consumer on closed connection {}, {}", consumer, be.getMessage());

Review comment:
       Any reason this log is info level? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia commented on pull request #13196: [pulsar-broker] clean up active consumer on already closed connection

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on pull request #13196:
URL: https://github.com/apache/pulsar/pull/13196#issuecomment-1007859255


   ping. can someone please review the PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on a change in pull request #13196: [pulsar-broker] clean up active consumer on already closed connection

Posted by GitBox <gi...@apache.org>.
Jason918 commented on a change in pull request #13196:
URL: https://github.com/apache/pulsar/pull/13196#discussion_r773553165



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -804,6 +804,16 @@ protected void handleProducerRemoved(Producer producer) {
                 if (ex.getCause() instanceof ConsumerBusyException) {
                     log.warn("[{}][{}] Consumer {} {} already connected", topic, subscriptionName, consumerId,
                             consumerName);
+                    Consumer consumer = null;
+                    try {
+                        consumer = subscriptionFuture.isDone() ? getActiveConsumer(subscriptionFuture.get()) : null;
+                        // cleanup consumer if connection is already closed
+                        if (consumer != null && !consumer.cnx().isActive()) {
+                            consumer.close();

Review comment:
       > however, in any case it would be safe to call close for already closed consumer,
   
   Great to know. Thx




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #13196: [pulsar-broker] clean up active consumer on already closed connection

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #13196:
URL: https://github.com/apache/pulsar/pull/13196


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia commented on a change in pull request #13196: [pulsar-broker] clean up active consumer on already closed connection

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on a change in pull request #13196:
URL: https://github.com/apache/pulsar/pull/13196#discussion_r772857677



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -804,6 +804,16 @@ protected void handleProducerRemoved(Producer producer) {
                 if (ex.getCause() instanceof ConsumerBusyException) {
                     log.warn("[{}][{}] Consumer {} {} already connected", topic, subscriptionName, consumerId,
                             consumerName);
+                    Consumer consumer = null;
+                    try {
+                        consumer = subscriptionFuture.isDone() ? getActiveConsumer(subscriptionFuture.get()) : null;
+                        // cleanup consumer if connection is already closed
+                        if (consumer != null && !consumer.cnx().isActive()) {
+                            consumer.close();

Review comment:
       no need to check closed state because if consumer connection is not active and if consumer is still pretends as an active consumer then it requires cleanup by closing it immedaitely.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on a change in pull request #13196: [pulsar-broker] clean up active consumer on already closed connection

Posted by GitBox <gi...@apache.org>.
Jason918 commented on a change in pull request #13196:
URL: https://github.com/apache/pulsar/pull/13196#discussion_r772873784



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -804,6 +804,16 @@ protected void handleProducerRemoved(Producer producer) {
                 if (ex.getCause() instanceof ConsumerBusyException) {
                     log.warn("[{}][{}] Consumer {} {} already connected", topic, subscriptionName, consumerId,
                             consumerName);
+                    Consumer consumer = null;
+                    try {
+                        consumer = subscriptionFuture.isDone() ? getActiveConsumer(subscriptionFuture.get()) : null;
+                        // cleanup consumer if connection is already closed
+                        if (consumer != null && !consumer.cnx().isActive()) {
+                            consumer.close();

Review comment:
       > if consumer connection is not active and if consumer is still pretends as an active consumer then it requires cleanup by closing it immedaitely.
   
   I get what you are trying to do, and I am +1 for this.
   
   But do you agree that `consumer.close` may be called in two threads at the same time? 
   One is here, and the other may be in `org.apache.pulsar.broker.service.ServerCnx#channelInactive`.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] github-actions[bot] commented on pull request #13196: [pulsar-broker] clean up active consumer on already closed connection

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13196:
URL: https://github.com/apache/pulsar/pull/13196#issuecomment-989172008


   @rdhabalia:Thanks for your contribution. For this PR, do we need to update docs?
   (The [PR template contains info about doc](https://github.com/apache/pulsar/blob/master/.github/PULL_REQUEST_TEMPLATE.md#documentation), which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia commented on pull request #13196: [pulsar-broker] clean up active consumer on already closed connection

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on pull request #13196:
URL: https://github.com/apache/pulsar/pull/13196#issuecomment-999139688


   ping. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia commented on a change in pull request #13196: [pulsar-broker] clean up active consumer on already closed connection

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on a change in pull request #13196:
URL: https://github.com/apache/pulsar/pull/13196#discussion_r770224974



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -804,6 +804,16 @@ protected void handleProducerRemoved(Producer producer) {
                 if (ex.getCause() instanceof ConsumerBusyException) {
                     log.warn("[{}][{}] Consumer {} {} already connected", topic, subscriptionName, consumerId,
                             consumerName);
+                    Consumer consumer = null;
+                    try {
+                        consumer = subscriptionFuture.isDone() ? getActiveConsumer(subscriptionFuture.get()) : null;
+                        // cleanup consumer if connection is already closed
+                        if (consumer != null && !consumer.cnx().isActive()) {
+                            consumer.close();
+                        }
+                    } catch (Exception be) {
+                        log.info("Failed to clean up consumer on closed connection {}, {}", consumer, be.getMessage());

Review comment:
       @Jason918 changed to error log level.
   
   @pkumar-singh this part handles resiliency to avoid consumer unavailability due to any kind of race condition. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia commented on pull request #13196: [pulsar-broker] clean up active consumer on already closed connection

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on pull request #13196:
URL: https://github.com/apache/pulsar/pull/13196#issuecomment-998206635


   ping. PR is ready to merge.
   @Jason918 PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet commented on pull request #13196: [pulsar-broker] clean up active consumer on already closed connection

Posted by GitBox <gi...@apache.org>.
Anonymitaet commented on pull request #13196:
URL: https://github.com/apache/pulsar/pull/13196#issuecomment-989446071


   @rdhabalia is this a bug fix? (do not need to update docs?)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia commented on a change in pull request #13196: [pulsar-broker] clean up active consumer on already closed connection

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on a change in pull request #13196:
URL: https://github.com/apache/pulsar/pull/13196#discussion_r770224974



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -804,6 +804,16 @@ protected void handleProducerRemoved(Producer producer) {
                 if (ex.getCause() instanceof ConsumerBusyException) {
                     log.warn("[{}][{}] Consumer {} {} already connected", topic, subscriptionName, consumerId,
                             consumerName);
+                    Consumer consumer = null;
+                    try {
+                        consumer = subscriptionFuture.isDone() ? getActiveConsumer(subscriptionFuture.get()) : null;
+                        // cleanup consumer if connection is already closed
+                        if (consumer != null && !consumer.cnx().isActive()) {
+                            consumer.close();
+                        }
+                    } catch (Exception be) {
+                        log.info("Failed to clean up consumer on closed connection {}, {}", consumer, be.getMessage());

Review comment:
       @Jason918 changed to error log level.
   
   @pkumar-singh this is part handles resiliency to avoid consumer unavailability due to any kind of race condition. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on a change in pull request #13196: [pulsar-broker] clean up active consumer on already closed connection

Posted by GitBox <gi...@apache.org>.
Jason918 commented on a change in pull request #13196:
URL: https://github.com/apache/pulsar/pull/13196#discussion_r772775368



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -804,6 +804,16 @@ protected void handleProducerRemoved(Producer producer) {
                 if (ex.getCause() instanceof ConsumerBusyException) {
                     log.warn("[{}][{}] Consumer {} {} already connected", topic, subscriptionName, consumerId,
                             consumerName);
+                    Consumer consumer = null;
+                    try {
+                        consumer = subscriptionFuture.isDone() ? getActiveConsumer(subscriptionFuture.get()) : null;
+                        // cleanup consumer if connection is already closed
+                        if (consumer != null && !consumer.cnx().isActive()) {
+                            consumer.close();

Review comment:
       I my understanding, this consumer should be closed elsewhere in normal situation. This means there is a chance that `consumer.close` wii be called multi times.  I think we should add closed state check in `consumer.close` to avoid other unexpected issues.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia commented on pull request #13196: [pulsar-broker] clean up active consumer on already closed connection

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on pull request #13196:
URL: https://github.com/apache/pulsar/pull/13196#issuecomment-1007859255


   ping. can someone please review the PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia commented on a change in pull request #13196: [pulsar-broker] clean up active consumer on already closed connection

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on a change in pull request #13196:
URL: https://github.com/apache/pulsar/pull/13196#discussion_r765390545



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -804,6 +804,16 @@ protected void handleProducerRemoved(Producer producer) {
                 if (ex.getCause() instanceof ConsumerBusyException) {
                     log.warn("[{}][{}] Consumer {} {} already connected", topic, subscriptionName, consumerId,
                             consumerName);
+                    Consumer consumer = null;
+                    try {
+                        consumer = subscriptionFuture.isDone() ? getActiveConsumer(subscriptionFuture.get()) : null;
+                        // cleanup consumer if connection is already closed
+                        if (consumer != null && !consumer.cnx().isActive()) {
+                            consumer.close();
+                        }
+                    } catch (Exception be) {
+                        log.info("Failed to clean up consumer on closed connection {}, {}", consumer, be.getMessage());

Review comment:
       it's just info and doesn't have to be critical. therefore, log level is info.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] pkumar-singh commented on a change in pull request #13196: [pulsar-broker] clean up active consumer on already closed connection

Posted by GitBox <gi...@apache.org>.
pkumar-singh commented on a change in pull request #13196:
URL: https://github.com/apache/pulsar/pull/13196#discussion_r768282046



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -804,6 +804,16 @@ protected void handleProducerRemoved(Producer producer) {
                 if (ex.getCause() instanceof ConsumerBusyException) {
                     log.warn("[{}][{}] Consumer {} {} already connected", topic, subscriptionName, consumerId,
                             consumerName);
+                    Consumer consumer = null;
+                    try {
+                        consumer = subscriptionFuture.isDone() ? getActiveConsumer(subscriptionFuture.get()) : null;
+                        // cleanup consumer if connection is already closed
+                        if (consumer != null && !consumer.cnx().isActive()) {
+                            consumer.close();
+                        }
+                    } catch (Exception be) {
+                        log.info("Failed to clean up consumer on closed connection {}, {}", consumer, be.getMessage());

Review comment:
       To me. It seems like a bandaid. I think we should see how we landed in this situation. Perhaps we should have clean the consumer at the connection close itself. If there is a race condition we should the race condition itself. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #13196: [pulsar-broker] clean up active consumer on already closed connection

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #13196:
URL: https://github.com/apache/pulsar/pull/13196#discussion_r780710799



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
##########
@@ -997,4 +999,53 @@ public void testConsumerWithPooledMessagesWithReader(boolean isBatchingEnabled)
         reader.close();
         producer.close();
     }
+
+    @Test
+    public void testActiveConsumerCleanup() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        int numMessages = 100;
+        final CountDownLatch latch = new CountDownLatch(numMessages);
+        String topic = "persistent://my-property/my-ns/closed-cnx-topic";
+        String sub = "my-subscriber-name";
+
+        PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
+        pulsarClient.newConsumer().topic(topic).subscriptionName(sub).messageListener((c1, msg) -> {
+            Assert.assertNotNull(msg, "Message cannot be null");
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message [{}] in the listener", receivedMessage);
+            c1.acknowledgeAsync(msg);
+            latch.countDown();
+        }).subscribe();
+
+        PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get();
+
+        AbstractDispatcherSingleActiveConsumer dispatcher = (AbstractDispatcherSingleActiveConsumer) topicRef
+                .getSubscription(sub).getDispatcher();
+        ServerCnx cnx = (ServerCnx) dispatcher.getActiveConsumer().cnx();
+        Field field = ServerCnx.class.getDeclaredField("isActive");
+        field.setAccessible(true);
+        field.set(cnx, false);
+
+        assertNotNull(dispatcher.getActiveConsumer());
+
+        pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
+        Consumer<byte[]> consumer = null;
+        for (int i = 0; i < 2; i++) {
+            try {
+                consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(sub).messageListener((c1, msg) -> {
+                    Assert.assertNotNull(msg, "Message cannot be null");
+                    String receivedMessage = new String(msg.getData());
+                    log.debug("Received message [{}] in the listener", receivedMessage);
+                    c1.acknowledgeAsync(msg);
+                    latch.countDown();
+                }).subscribe();
+            } catch (Exception e) {
+                // Ok

Review comment:
        Can we perform some checks on this Exception? At least match the Java class.
   Otherwise here we can pass the tests even for other kinds of bugs




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia commented on pull request #13196: [pulsar-broker] clean up active consumer on already closed connection

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on pull request #13196:
URL: https://github.com/apache/pulsar/pull/13196#issuecomment-998398712


   ping


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia commented on a change in pull request #13196: [pulsar-broker] clean up active consumer on already closed connection

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on a change in pull request #13196:
URL: https://github.com/apache/pulsar/pull/13196#discussion_r773483490



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -804,6 +804,16 @@ protected void handleProducerRemoved(Producer producer) {
                 if (ex.getCause() instanceof ConsumerBusyException) {
                     log.warn("[{}][{}] Consumer {} {} already connected", topic, subscriptionName, consumerId,
                             consumerName);
+                    Consumer consumer = null;
+                    try {
+                        consumer = subscriptionFuture.isDone() ? getActiveConsumer(subscriptionFuture.get()) : null;
+                        // cleanup consumer if connection is already closed
+                        if (consumer != null && !consumer.cnx().isActive()) {
+                            consumer.close();

Review comment:
       it won't happen in this code path. however, in any case it would be safe to call close for already closed consumer,




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #13196: [pulsar-broker] clean up active consumer on already closed connection

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #13196:
URL: https://github.com/apache/pulsar/pull/13196


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org