You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/18 09:01:54 UTC
[pulsar] 04/10: [ Issue 14633] [pulsar-broker] Fix metadata store deadlock when checking BacklogQuota (#14634)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f33115b7e60cf4d9e2bc4084d928b73cc970c4c1
Author: Bharani Chadalavada <bh...@gmail.com>
AuthorDate: Tue Mar 15 07:12:45 2022 -0700
[ Issue 14633] [pulsar-broker] Fix metadata store deadlock when checking BacklogQuota (#14634)
(cherry picked from commit 06ed9445bf29ea30b1f094b2d0ff608cb76aa3f6)
---
.../main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java | 5 +++--
.../main/java/org/apache/pulsar/broker/service/BrokerService.java | 3 ++-
2 files changed, 5 insertions(+), 3 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 15303b1..e43336b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin.impl;
+import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import static org.apache.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONFIGURATION_PATH;
import com.google.common.collect.Maps;
import io.swagger.annotations.ApiOperation;
@@ -271,7 +272,7 @@ public class BrokersBase extends PulsarWebResource {
@ApiResponse(code = 500, message = "Internal server error")})
public void backlogQuotaCheck(@Suspended AsyncResponse asyncResponse) {
validateSuperUserAccess();
- pulsar().getBrokerService().executor().execute(()->{
+ pulsar().getBrokerService().getBacklogQuotaChecker().execute(safeRun(()->{
try {
pulsar().getBrokerService().monitorBacklogQuota();
asyncResponse.resume(Response.noContent().build());
@@ -279,7 +280,7 @@ public class BrokersBase extends PulsarWebResource {
LOG.error("trigger backlogQuotaCheck fail", e);
asyncResponse.resume(new RestException(e));
}
- });
+ }));
}
@GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index a9d417c..0003121 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -221,6 +221,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
private AuthorizationService authorizationService = null;
private final ScheduledExecutorService statsUpdater;
+ @Getter
private final ScheduledExecutorService backlogQuotaChecker;
protected final AtomicReference<Semaphore> lookupRequestSemaphore;
@@ -1691,7 +1692,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
return this.backlogQuotaManager;
}
- public synchronized void monitorBacklogQuota() {
+ public void monitorBacklogQuota() {
forEachTopic(topic -> {
if (topic instanceof PersistentTopic) {
PersistentTopic persistentTopic = (PersistentTopic) topic;