You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/05/13 04:12:13 UTC

[pulsar] branch branch-2.9 updated: [fix][broker] Fix deadlock in broker after race condition in topic creation failure (#15570)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 4040977025d [fix][broker] Fix deadlock in broker after race condition in topic creation failure (#15570)
4040977025d is described below

commit 4040977025da6b885b2cc2d0195f5589fceb7704
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu May 12 21:06:46 2022 -0700

    [fix][broker] Fix deadlock in broker after race condition in topic creation failure (#15570)
    
    * Fix deadlock in broker after race condition in topic creation failure
    
    * Fixed checkstyle
---
 .../main/java/org/apache/pulsar/broker/service/BrokerService.java | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index a9f9c7fe090..5dc5edf9f08 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1360,9 +1360,9 @@ public class BrokerService implements Closeable {
                                     if (topicFuture.isCompletedExceptionally()) {
                                         log.warn("{} future is already completed with failure {}, closing the topic",
                                                 topic, FutureUtil.getException(topicFuture));
-                                        persistentTopic.stopReplProducers().whenComplete((v, exception) -> {
+                                        persistentTopic.stopReplProducers().whenCompleteAsync((v, exception) -> {
                                             topics.remove(topic, topicFuture);
-                                        });
+                                        }, executor());
                                     } else {
                                         addTopicToStatsMaps(topicName, persistentTopic);
                                         topicFuture.complete(Optional.of(persistentTopic));
@@ -1372,10 +1372,10 @@ public class BrokerService implements Closeable {
                                             "Replication or dedup check failed."
                                                     + " Removing topic from topics list {}, {}",
                                             topic, ex);
-                                    persistentTopic.stopReplProducers().whenComplete((v, exception) -> {
+                                    persistentTopic.stopReplProducers().whenCompleteAsync((v, exception) -> {
                                         topics.remove(topic, topicFuture);
                                         topicFuture.completeExceptionally(ex);
-                                    });
+                                    }, executor());
 
                                     return null;
                                 });