You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/04/27 01:05:27 UTC

[pulsar] branch master updated: [improve][broker] Make some methods of `BrokersBase` pure async. (#15281)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a0c2c59c24e [improve][broker] Make some methods of `BrokersBase` pure async. (#15281)
a0c2c59c24e is described below

commit a0c2c59c24ebdf07a26394d888f0222e56961ad3
Author: Qiang Zhao <74...@users.noreply.github.com>
AuthorDate: Wed Apr 27 09:05:20 2022 +0800

    [improve][broker] Make some methods of `BrokersBase` pure async. (#15281)
---
 .../pulsar/broker/admin/impl/BrokersBase.java      | 43 ++++++++++++--------
 .../pulsar/broker/service/BrokerService.java       | 47 ++++++++++++----------
 .../org/apache/pulsar/broker/admin/AdminTest.java  |  8 ++--
 3 files changed, 56 insertions(+), 42 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 5a3db2302ee..8e9167a886c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.admin.impl;
 
-import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 import com.google.common.collect.Maps;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
@@ -247,9 +246,14 @@ public class BrokersBase extends AdminResource {
     @Path("/configuration/runtime")
     @ApiOperation(value = "Get all runtime configurations. This operation requires Pulsar super-user privileges.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
-    public Map<String, String> getRuntimeConfiguration() {
-        validateSuperUserAccess();
-        return pulsar().getBrokerService().getRuntimeConfiguration();
+    public void getRuntimeConfiguration(@Suspended AsyncResponse asyncResponse) {
+        validateSuperUserAccessAsync()
+                .thenAccept(__ -> asyncResponse.resume(pulsar().getBrokerService().getRuntimeConfiguration()))
+                .exceptionally(ex -> {
+                    LOG.error("[{}] Failed to get runtime configuration.", clientAppId(), ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     /**
@@ -283,9 +287,14 @@ public class BrokersBase extends AdminResource {
     @Path("/internal-configuration")
     @ApiOperation(value = "Get the internal configuration data", response = InternalConfigurationData.class)
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
-    public InternalConfigurationData getInternalConfigurationData() {
-        validateSuperUserAccess();
-        return pulsar().getInternalConfigurationData();
+    public void getInternalConfigurationData(@Suspended AsyncResponse asyncResponse) {
+        validateSuperUserAccessAsync()
+                .thenAccept(__ -> asyncResponse.resume(pulsar().getInternalConfigurationData()))
+                .exceptionally(ex -> {
+                    LOG.error("[{}] Failed to get internal configuration data.", clientAppId(), ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET
@@ -296,16 +305,16 @@ public class BrokersBase extends AdminResource {
             @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 500, message = "Internal server error")})
     public void backlogQuotaCheck(@Suspended AsyncResponse asyncResponse) {
-        validateSuperUserAccess();
-        pulsar().getBrokerService().getBacklogQuotaChecker().execute(safeRun(()->{
-            try {
-                pulsar().getBrokerService().monitorBacklogQuota();
-                asyncResponse.resume(Response.noContent().build());
-            } catch (Exception e) {
-                LOG.error("trigger backlogQuotaCheck fail", e);
-                asyncResponse.resume(new RestException(e));
-            }
-        }));
+        validateSuperUserAccessAsync()
+                .thenAcceptAsync(__ -> {
+                    pulsar().getBrokerService().monitorBacklogQuota();
+                    asyncResponse.resume(Response.noContent().build());
+                } , pulsar().getBrokerService().getBacklogQuotaChecker())
+                .exceptionally(ex -> {
+                    LOG.error("[{}] Failed to trigger backlog quota check.", clientAppId(), ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index c3f3dc64e5a..f6af11f1e65 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1803,34 +1803,37 @@ public class BrokerService implements Closeable {
         });
     }
 
+    public void forEachPersistentTopic(Consumer<PersistentTopic> consumer) {
+        topics.values().stream().map(BrokerService::extractTopic)
+                .map(topicOp -> topicOp.filter(topic -> topic instanceof PersistentTopic))
+                .forEach(topicOp -> topicOp.ifPresent(topic -> consumer.accept((PersistentTopic) topic)));
+    }
+
     public BacklogQuotaManager getBacklogQuotaManager() {
         return this.backlogQuotaManager;
     }
 
     public void monitorBacklogQuota() {
-        forEachTopic(topic -> {
-            if (topic instanceof PersistentTopic) {
-                PersistentTopic persistentTopic = (PersistentTopic) topic;
-                if (persistentTopic.isSizeBacklogExceeded()) {
-                    getBacklogQuotaManager().handleExceededBacklogQuota(persistentTopic,
-                            BacklogQuota.BacklogQuotaType.destination_storage, false);
-                } else {
-                    persistentTopic.checkTimeBacklogExceeded().thenAccept(isExceeded -> {
-                        if (isExceeded) {
-                            getBacklogQuotaManager().handleExceededBacklogQuota(persistentTopic,
-                                    BacklogQuota.BacklogQuotaType.message_age,
-                                    pulsar.getConfiguration().isPreciseTimeBasedBacklogQuotaCheck());
-                        } else {
-                            if (log.isDebugEnabled()) {
-                                log.debug("quota not exceeded for [{}]", topic.getName());
-                            }
+        forEachPersistentTopic(topic -> {
+            if (topic.isSizeBacklogExceeded()) {
+                getBacklogQuotaManager().handleExceededBacklogQuota(topic,
+                        BacklogQuota.BacklogQuotaType.destination_storage, false);
+            } else {
+                topic.checkTimeBacklogExceeded().thenAccept(isExceeded -> {
+                    if (isExceeded) {
+                        getBacklogQuotaManager().handleExceededBacklogQuota(topic,
+                                BacklogQuota.BacklogQuotaType.message_age,
+                                pulsar.getConfiguration().isPreciseTimeBasedBacklogQuotaCheck());
+                    } else {
+                        if (log.isDebugEnabled()) {
+                            log.debug("quota not exceeded for [{}]", topic.getName());
                         }
-                    }).exceptionally(throwable -> {
-                        log.error("Error when checkTimeBacklogExceeded({}) in monitorBacklogQuota",
-                                persistentTopic.getName(), throwable);
-                        return null;
-                    });
-                }
+                    }
+                }).exceptionally(throwable -> {
+                    log.error("Error when checkTimeBacklogExceeded({}) in monitorBacklogQuota",
+                            topic.getName(), throwable);
+                    return null;
+                });
             }
         });
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 11bed0583b7..b6f2e3b8b8e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -89,6 +89,7 @@ import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.stats.AllocatorStats;
 import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
 import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
 import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
@@ -194,9 +195,10 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
                 conf.getConfigurationMetadataStoreUrl(),
                 new ClientConfiguration().getZkLedgersRootPath(),
                 conf.isBookkeeperMetadataStoreSeparated() ? conf.getBookkeeperMetadataStoreUrl() : null,
-                pulsar.getWorkerConfig().map(wc -> wc.getStateStorageServiceUrl()).orElse(null));
-
-        assertEquals(brokers.getInternalConfigurationData(), expectedData);
+                pulsar.getWorkerConfig().map(WorkerConfig::getStateStorageServiceUrl).orElse(null));
+        Object response = asynRequests(ctx -> brokers.getInternalConfigurationData(ctx));
+        assertTrue(response instanceof InternalConfigurationData);
+        assertEquals(response, expectedData);
     }
 
     @Test