You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/06/27 10:58:28 UTC
[pulsar] branch master updated: [improve][broker][PIP-149]make getBacklog method async (#16218)
This is an automated email from the ASF dual-hosted git repository.
jianghaiting pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new debcf513391 [improve][broker][PIP-149]make getBacklog method async (#16218)
debcf513391 is described below
commit debcf513391e9c2550e3d07b0043cd4b168208fd
Author: Qiang Huang <HQ...@users.noreply.github.com>
AuthorDate: Mon Jun 27 18:58:16 2022 +0800
[improve][broker][PIP-149]make getBacklog method async (#16218)
---
.../broker/admin/impl/PersistentTopicsBase.java | 66 +++++++++++-----------
.../pulsar/broker/admin/v1/PersistentTopics.java | 29 ++++++++--
.../pulsar/broker/admin/v2/PersistentTopics.java | 23 ++++++--
3 files changed, 76 insertions(+), 42 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index f2b4d72c7b0..75ba3b2c7db 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3046,44 +3046,46 @@ public class PersistentTopicsBase extends AdminResource {
return responseBuilder.entity(stream).build();
}
- protected PersistentOfflineTopicStats internalGetBacklog(boolean authoritative) {
+ protected CompletableFuture<PersistentOfflineTopicStats> internalGetBacklogAsync(boolean authoritative) {
+ CompletableFuture<Void> ret;
if (topicName.isGlobal()) {
- validateGlobalNamespaceOwnership(namespaceName);
+ ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ ret = CompletableFuture.completedFuture(null);
}
// Validate that namespace exists, throw 404 if it doesn't exist
// note that we do not want to load the topic and hence skip authorization check
- try {
- namespaceResources().getPolicies(namespaceName);
- } catch (MetadataStoreException.NotFoundException e) {
- log.warn("[{}] Failed to get topic backlog {}: Namespace does not exist", clientAppId(), namespaceName);
- throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
- } catch (Exception e) {
- log.error("[{}] Failed to get topic backlog {}", clientAppId(), namespaceName, e);
- throw new RestException(e);
- }
+ return ret.thenCompose(__ -> namespaceResources().getPoliciesAsync(namespaceName))
+ .thenCompose(__ -> {
+ PersistentOfflineTopicStats offlineTopicStats =
+ pulsar().getBrokerService().getOfflineTopicStat(topicName);
+ if (offlineTopicStats != null) {
+ // offline topic stat has a cost - so use cached value until TTL
+ long elapsedMs = System.currentTimeMillis() - offlineTopicStats.statGeneratedAt.getTime();
+ if (TimeUnit.MINUTES.convert(elapsedMs, TimeUnit.MILLISECONDS) < OFFLINE_TOPIC_STAT_TTL_MINS) {
+ return CompletableFuture.completedFuture(offlineTopicStats);
+ }
+ }
- PersistentOfflineTopicStats offlineTopicStats = null;
- try {
+ return pulsar().getBrokerService().getManagedLedgerConfig(topicName)
+ .thenCompose(config -> {
+ ManagedLedgerOfflineBacklog offlineTopicBacklog =
+ new ManagedLedgerOfflineBacklog(config.getDigestType(), config.getPassword(),
+ pulsar().getAdvertisedAddress(), false);
+ try {
+ PersistentOfflineTopicStats estimateOfflineTopicStats =
+ offlineTopicBacklog.estimateUnloadedTopicBacklog(
+ (ManagedLedgerFactoryImpl) pulsar().getManagedLedgerFactory(),
+ topicName);
+ pulsar().getBrokerService()
+ .cacheOfflineTopicStats(topicName, estimateOfflineTopicStats);
+ return CompletableFuture.completedFuture(estimateOfflineTopicStats);
+ } catch (Exception e) {
+ throw new RestException(e);
+ }
+ });
- offlineTopicStats = pulsar().getBrokerService().getOfflineTopicStat(topicName);
- if (offlineTopicStats != null) {
- // offline topic stat has a cost - so use cached value until TTL
- long elapsedMs = System.currentTimeMillis() - offlineTopicStats.statGeneratedAt.getTime();
- if (TimeUnit.MINUTES.convert(elapsedMs, TimeUnit.MILLISECONDS) < OFFLINE_TOPIC_STAT_TTL_MINS) {
- return offlineTopicStats;
- }
- }
- final ManagedLedgerConfig config = pulsar().getBrokerService().getManagedLedgerConfig(topicName)
- .get();
- ManagedLedgerOfflineBacklog offlineTopicBacklog = new ManagedLedgerOfflineBacklog(config.getDigestType(),
- config.getPassword(), pulsar().getAdvertisedAddress(), false);
- offlineTopicStats = offlineTopicBacklog.estimateUnloadedTopicBacklog(
- (ManagedLedgerFactoryImpl) pulsar().getManagedLedgerFactory(), topicName);
- pulsar().getBrokerService().cacheOfflineTopicStats(topicName, offlineTopicStats);
- } catch (Exception exception) {
- throw new RestException(exception);
- }
- return offlineTopicStats;
+ });
}
protected CompletableFuture<Map<BacklogQuota.BacklogQuotaType, BacklogQuota>> internalGetBacklogQuota(
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 99c10c62f4b..a3689731436 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -48,7 +48,8 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ResetCursorData;
import org.apache.pulsar.common.policies.data.AuthAction;
-import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -795,14 +796,30 @@ public class PersistentTopics extends PersistentTopicsBase {
@GET
@Path("{property}/{cluster}/{namespace}/{topic}/backlog")
@ApiOperation(hidden = true, value = "Get estimated backlog for offline topic.")
- @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
- @ApiResponse(code = 404, message = "Namespace does not exist") })
- public PersistentOfflineTopicStats getBacklog(@PathParam("property") String property,
- @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Namespace does not exist")})
+ public void getBacklog(
+ @Suspended final AsyncResponse asyncResponse,
+ @PathParam("property") String property,
+ @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(property, cluster, namespace, encodedTopic);
- return internalGetBacklog(authoritative);
+ internalGetBacklogAsync(authoritative)
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ Throwable t = FutureUtil.unwrapCompletionException(ex);
+ if (t instanceof MetadataStoreException.NotFoundException) {
+ log.warn("[{}] Failed to get topic backlog {}: Namespace does not exist", clientAppId(),
+ namespaceName);
+ ex = new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
+ } else if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get estimated backlog for topic {}", clientAppId(), encodedTopic, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 3f3c34d78d3..55bacfec03c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -59,7 +59,6 @@ import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
-import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.PublishRate;
@@ -70,6 +69,8 @@ import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1848,8 +1849,9 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponses(value = {
@ApiResponse(code = 404, message = "Namespace does not exist"),
@ApiResponse(code = 412, message = "Topic name is not valid"),
- @ApiResponse(code = 503, message = "Failed to validate global cluster configuration") })
- public PersistentOfflineTopicStats getBacklog(
+ @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
+ public void getBacklog(
+ @Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@@ -1859,7 +1861,20 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- return internalGetBacklog(authoritative);
+ internalGetBacklogAsync(authoritative)
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ Throwable t = FutureUtil.unwrapCompletionException(ex);
+ if (t instanceof MetadataStoreException.NotFoundException) {
+ log.warn("[{}] Failed to get topic backlog {}: Namespace does not exist", clientAppId(),
+ namespaceName);
+ ex = new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
+ } else if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get estimated backlog for topic {}", clientAppId(), encodedTopic, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@PUT