You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/04/27 01:05:27 UTC
[pulsar] branch master updated: [improve][broker] Make some methods of `BrokersBase` pure async. (#15281)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a0c2c59c24e [improve][broker] Make some methods of `BrokersBase` pure async. (#15281)
a0c2c59c24e is described below
commit a0c2c59c24ebdf07a26394d888f0222e56961ad3
Author: Qiang Zhao <74...@users.noreply.github.com>
AuthorDate: Wed Apr 27 09:05:20 2022 +0800
[improve][broker] Make some methods of `BrokersBase` pure async. (#15281)
---
.../pulsar/broker/admin/impl/BrokersBase.java | 43 ++++++++++++--------
.../pulsar/broker/service/BrokerService.java | 47 ++++++++++++----------
.../org/apache/pulsar/broker/admin/AdminTest.java | 8 ++--
3 files changed, 56 insertions(+), 42 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 5a3db2302ee..8e9167a886c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.admin.impl;
-import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import com.google.common.collect.Maps;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
@@ -247,9 +246,14 @@ public class BrokersBase extends AdminResource {
@Path("/configuration/runtime")
@ApiOperation(value = "Get all runtime configurations. This operation requires Pulsar super-user privileges.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
- public Map<String, String> getRuntimeConfiguration() {
- validateSuperUserAccess();
- return pulsar().getBrokerService().getRuntimeConfiguration();
+ public void getRuntimeConfiguration(@Suspended AsyncResponse asyncResponse) {
+ validateSuperUserAccessAsync()
+ .thenAccept(__ -> asyncResponse.resume(pulsar().getBrokerService().getRuntimeConfiguration()))
+ .exceptionally(ex -> {
+ LOG.error("[{}] Failed to get runtime configuration.", clientAppId(), ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
/**
@@ -283,9 +287,14 @@ public class BrokersBase extends AdminResource {
@Path("/internal-configuration")
@ApiOperation(value = "Get the internal configuration data", response = InternalConfigurationData.class)
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
- public InternalConfigurationData getInternalConfigurationData() {
- validateSuperUserAccess();
- return pulsar().getInternalConfigurationData();
+ public void getInternalConfigurationData(@Suspended AsyncResponse asyncResponse) {
+ validateSuperUserAccessAsync()
+ .thenAccept(__ -> asyncResponse.resume(pulsar().getInternalConfigurationData()))
+ .exceptionally(ex -> {
+ LOG.error("[{}] Failed to get internal configuration data.", clientAppId(), ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
@@ -296,16 +305,16 @@ public class BrokersBase extends AdminResource {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 500, message = "Internal server error")})
public void backlogQuotaCheck(@Suspended AsyncResponse asyncResponse) {
- validateSuperUserAccess();
- pulsar().getBrokerService().getBacklogQuotaChecker().execute(safeRun(()->{
- try {
- pulsar().getBrokerService().monitorBacklogQuota();
- asyncResponse.resume(Response.noContent().build());
- } catch (Exception e) {
- LOG.error("trigger backlogQuotaCheck fail", e);
- asyncResponse.resume(new RestException(e));
- }
- }));
+ validateSuperUserAccessAsync()
+ .thenAcceptAsync(__ -> {
+ pulsar().getBrokerService().monitorBacklogQuota();
+ asyncResponse.resume(Response.noContent().build());
+ } , pulsar().getBrokerService().getBacklogQuotaChecker())
+ .exceptionally(ex -> {
+ LOG.error("[{}] Failed to trigger backlog quota check.", clientAppId(), ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index c3f3dc64e5a..f6af11f1e65 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1803,34 +1803,37 @@ public class BrokerService implements Closeable {
});
}
+ public void forEachPersistentTopic(Consumer<PersistentTopic> consumer) {
+ topics.values().stream().map(BrokerService::extractTopic)
+ .map(topicOp -> topicOp.filter(topic -> topic instanceof PersistentTopic))
+ .forEach(topicOp -> topicOp.ifPresent(topic -> consumer.accept((PersistentTopic) topic)));
+ }
+
public BacklogQuotaManager getBacklogQuotaManager() {
return this.backlogQuotaManager;
}
public void monitorBacklogQuota() {
- forEachTopic(topic -> {
- if (topic instanceof PersistentTopic) {
- PersistentTopic persistentTopic = (PersistentTopic) topic;
- if (persistentTopic.isSizeBacklogExceeded()) {
- getBacklogQuotaManager().handleExceededBacklogQuota(persistentTopic,
- BacklogQuota.BacklogQuotaType.destination_storage, false);
- } else {
- persistentTopic.checkTimeBacklogExceeded().thenAccept(isExceeded -> {
- if (isExceeded) {
- getBacklogQuotaManager().handleExceededBacklogQuota(persistentTopic,
- BacklogQuota.BacklogQuotaType.message_age,
- pulsar.getConfiguration().isPreciseTimeBasedBacklogQuotaCheck());
- } else {
- if (log.isDebugEnabled()) {
- log.debug("quota not exceeded for [{}]", topic.getName());
- }
+ forEachPersistentTopic(topic -> {
+ if (topic.isSizeBacklogExceeded()) {
+ getBacklogQuotaManager().handleExceededBacklogQuota(topic,
+ BacklogQuota.BacklogQuotaType.destination_storage, false);
+ } else {
+ topic.checkTimeBacklogExceeded().thenAccept(isExceeded -> {
+ if (isExceeded) {
+ getBacklogQuotaManager().handleExceededBacklogQuota(topic,
+ BacklogQuota.BacklogQuotaType.message_age,
+ pulsar.getConfiguration().isPreciseTimeBasedBacklogQuotaCheck());
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("quota not exceeded for [{}]", topic.getName());
}
- }).exceptionally(throwable -> {
- log.error("Error when checkTimeBacklogExceeded({}) in monitorBacklogQuota",
- persistentTopic.getName(), throwable);
- return null;
- });
- }
+ }
+ }).exceptionally(throwable -> {
+ log.error("Error when checkTimeBacklogExceeded({}) in monitorBacklogQuota",
+ topic.getName(), throwable);
+ return null;
+ });
}
});
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 11bed0583b7..b6f2e3b8b8e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -89,6 +89,7 @@ import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.stats.AllocatorStats;
import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
@@ -194,9 +195,10 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
conf.getConfigurationMetadataStoreUrl(),
new ClientConfiguration().getZkLedgersRootPath(),
conf.isBookkeeperMetadataStoreSeparated() ? conf.getBookkeeperMetadataStoreUrl() : null,
- pulsar.getWorkerConfig().map(wc -> wc.getStateStorageServiceUrl()).orElse(null));
-
- assertEquals(brokers.getInternalConfigurationData(), expectedData);
+ pulsar.getWorkerConfig().map(WorkerConfig::getStateStorageServiceUrl).orElse(null));
+ Object response = asynRequests(ctx -> brokers.getInternalConfigurationData(ctx));
+ assertTrue(response instanceof InternalConfigurationData);
+ assertEquals(response, expectedData);
}
@Test