You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/21 11:28:19 UTC

[pulsar] branch branch-2.9 updated: [broker] Fix deadlock in metadata-store callback thread for branch 2.9 (#13426)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new c5a4639  [broker] Fix deadlock in metadata-store callback thread for branch 2.9 (#13426)
c5a4639 is described below

commit c5a4639e87e8c9dd00cbd8bcb040072c9fc0c852
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Tue Dec 21 19:24:37 2021 +0800

    [broker] Fix deadlock in metadata-store callback thread for branch 2.9 (#13426)
    
    Fixes #12726 for branch 2.9
    
    ### Motivation
    
    See #12726
    
    #12753 fixed this in master, but can not merged into branch 2.9.
---
 .../src/main/java/org/apache/pulsar/broker/service/ServerCnx.java     | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 30ce9c1..a5d55bf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1185,7 +1185,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
                         log.info("[{}][{}] Creating producer. producerId={}", remoteAddress, topicName, producerId);
 
-                        service.getOrCreateTopic(topicName.toString()).thenAccept((Topic topic) -> {
+                        service.getOrCreateTopic(topicName.toString()).thenAcceptAsync((Topic topic) -> {
                             // Before creating producer, check if backlog quota exceeded
                             // on topic for size based limit and time based limit
                             for (BacklogQuota.BacklogQuotaType backlogQuotaType :
@@ -1250,7 +1250,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                                     return null;
                                 });
                             });
-                        }).exceptionally(exception -> {
+                        }, getBrokerService().getPulsar().getExecutor()).exceptionally(exception -> {
                             Throwable cause = exception.getCause();
                             if (cause instanceof NoSuchElementException) {
                                 cause = new TopicNotFoundException("Topic Not Found.");