You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/22 11:53:16 UTC

[pulsar] 07/07: [Broker] Fix race conditions in closing producers and consumers (#13428)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit da61b915d9f648754b40dc928430746d89d00cbb
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed Dec 22 07:05:48 2021 +0200

    [Broker] Fix race conditions in closing producers and consumers (#13428)
    
    - closing ServerCnx while producers or consumers are created can lead
      to a producer or consumer never getting removed from the topic's
      list of producers
    
    (cherry picked from commit 3316db5a52cdeee49bc90fe18baac28d5688bfe8)
---
 .../apache/pulsar/broker/service/ServerCnx.java    | 24 ++++++++++++++--------
 1 file changed, 15 insertions(+), 9 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index a5d55bf..dbab10d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -292,6 +292,11 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
         // Connection is gone, close the producers immediately
         producers.forEach((__, producerFuture) -> {
+            // prevent race conditions in completing producers
+            if (!producerFuture.isDone()
+                    && producerFuture.completeExceptionally(new IllegalStateException("Connection closed."))) {
+                return;
+            }
             if (producerFuture.isDone() && !producerFuture.isCompletedExceptionally()) {
                 Producer producer = producerFuture.getNow(null);
                 producer.closeNow(true);
@@ -299,17 +304,18 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         });
 
         consumers.forEach((__, consumerFuture) -> {
-            Consumer consumer;
-            if (consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
-                consumer = consumerFuture.getNow(null);
-            } else {
+            // prevent race conditions in completing consumers
+            if (!consumerFuture.isDone()
+                    && consumerFuture.completeExceptionally(new IllegalStateException("Connection closed."))) {
                 return;
             }
-
-            try {
-                consumer.close();
-            } catch (BrokerServiceException e) {
-                log.warn("Consumer {} was already closed: {}", consumer, e);
+            if (consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
+                Consumer consumer = consumerFuture.getNow(null);
+                try {
+                    consumer.close();
+                } catch (BrokerServiceException e) {
+                    log.warn("Consumer {} was already closed: {}", consumer, e);
+                }
             }
         });
         this.service.getPulsarStats().recordConnectionClose();