You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/19 06:02:23 UTC

[pulsar] 09/14: [ Issue 14633] [pulsar-broker] Fix metadata store deadlock when checking BacklogQuota (#14634)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4181d038a95281da088529f10ba78e43774a160f
Author: Bharani Chadalavada <bh...@gmail.com>
AuthorDate: Tue Mar 15 07:12:45 2022 -0700

    [ Issue 14633] [pulsar-broker] Fix metadata store deadlock when checking BacklogQuota (#14634)
    
    (cherry picked from commit 06ed9445bf29ea30b1f094b2d0ff608cb76aa3f6)
---
 .../main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java   | 5 +++--
 .../main/java/org/apache/pulsar/broker/service/BrokerService.java    | 3 ++-
 2 files changed, 5 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index eda186a..5f73bc9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.admin.impl;
 
+import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 import com.google.common.collect.Maps;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
@@ -286,7 +287,7 @@ public class BrokersBase extends AdminResource {
             @ApiResponse(code = 500, message = "Internal server error")})
     public void backlogQuotaCheck(@Suspended AsyncResponse asyncResponse) {
         validateSuperUserAccess();
-        pulsar().getBrokerService().executor().execute(()->{
+        pulsar().getBrokerService().getBacklogQuotaChecker().execute(safeRun(()->{
             try {
                 pulsar().getBrokerService().monitorBacklogQuota();
                 asyncResponse.resume(Response.noContent().build());
@@ -294,7 +295,7 @@ public class BrokersBase extends AdminResource {
                 LOG.error("trigger backlogQuotaCheck fail", e);
                 asyncResponse.resume(new RestException(e));
             }
-        });
+        }));
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 2b50365..47a2aae 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -218,6 +218,7 @@ public class BrokerService implements Closeable {
 
     private AuthorizationService authorizationService = null;
     private final ScheduledExecutorService statsUpdater;
+    @Getter
     private final ScheduledExecutorService backlogQuotaChecker;
 
     protected final AtomicReference<Semaphore> lookupRequestSemaphore;
@@ -1754,7 +1755,7 @@ public class BrokerService implements Closeable {
         return this.backlogQuotaManager;
     }
 
-    public synchronized void monitorBacklogQuota() {
+    public void monitorBacklogQuota() {
         forEachTopic(topic -> {
             if (topic instanceof PersistentTopic) {
                 PersistentTopic persistentTopic = (PersistentTopic) topic;