You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/19 06:02:23 UTC
[pulsar] 09/14: [ Issue 14633] [pulsar-broker] Fix metadata store deadlock when checking BacklogQuota (#14634)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4181d038a95281da088529f10ba78e43774a160f
Author: Bharani Chadalavada <bh...@gmail.com>
AuthorDate: Tue Mar 15 07:12:45 2022 -0700
[ Issue 14633] [pulsar-broker] Fix metadata store deadlock when checking BacklogQuota (#14634)
(cherry picked from commit 06ed9445bf29ea30b1f094b2d0ff608cb76aa3f6)
---
.../main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java | 5 +++--
.../main/java/org/apache/pulsar/broker/service/BrokerService.java | 3 ++-
2 files changed, 5 insertions(+), 3 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index eda186a..5f73bc9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin.impl;
+import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import com.google.common.collect.Maps;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
@@ -286,7 +287,7 @@ public class BrokersBase extends AdminResource {
@ApiResponse(code = 500, message = "Internal server error")})
public void backlogQuotaCheck(@Suspended AsyncResponse asyncResponse) {
validateSuperUserAccess();
- pulsar().getBrokerService().executor().execute(()->{
+ pulsar().getBrokerService().getBacklogQuotaChecker().execute(safeRun(()->{
try {
pulsar().getBrokerService().monitorBacklogQuota();
asyncResponse.resume(Response.noContent().build());
@@ -294,7 +295,7 @@ public class BrokersBase extends AdminResource {
LOG.error("trigger backlogQuotaCheck fail", e);
asyncResponse.resume(new RestException(e));
}
- });
+ }));
}
@GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 2b50365..47a2aae 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -218,6 +218,7 @@ public class BrokerService implements Closeable {
private AuthorizationService authorizationService = null;
private final ScheduledExecutorService statsUpdater;
+ @Getter
private final ScheduledExecutorService backlogQuotaChecker;
protected final AtomicReference<Semaphore> lookupRequestSemaphore;
@@ -1754,7 +1755,7 @@ public class BrokerService implements Closeable {
return this.backlogQuotaManager;
}
- public synchronized void monitorBacklogQuota() {
+ public void monitorBacklogQuota() {
forEachTopic(topic -> {
if (topic instanceof PersistentTopic) {
PersistentTopic persistentTopic = (PersistentTopic) topic;