You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/06/27 10:58:28 UTC

[pulsar] branch master updated: [improve][broker][PIP-149]make getBacklog method async (#16218)

This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new debcf513391 [improve][broker][PIP-149]make getBacklog method async (#16218)
debcf513391 is described below

commit debcf513391e9c2550e3d07b0043cd4b168208fd
Author: Qiang Huang <HQ...@users.noreply.github.com>
AuthorDate: Mon Jun 27 18:58:16 2022 +0800

    [improve][broker][PIP-149]make getBacklog method async (#16218)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 66 +++++++++++-----------
 .../pulsar/broker/admin/v1/PersistentTopics.java   | 29 ++++++++--
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 23 ++++++--
 3 files changed, 76 insertions(+), 42 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index f2b4d72c7b0..75ba3b2c7db 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3046,44 +3046,46 @@ public class PersistentTopicsBase extends AdminResource {
         return responseBuilder.entity(stream).build();
     }
 
-    protected PersistentOfflineTopicStats internalGetBacklog(boolean authoritative) {
+    protected CompletableFuture<PersistentOfflineTopicStats> internalGetBacklogAsync(boolean authoritative) {
+        CompletableFuture<Void> ret;
         if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
+            ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        } else {
+            ret = CompletableFuture.completedFuture(null);
         }
         // Validate that namespace exists, throw 404 if it doesn't exist
         // note that we do not want to load the topic and hence skip authorization check
-        try {
-            namespaceResources().getPolicies(namespaceName);
-        } catch (MetadataStoreException.NotFoundException e) {
-            log.warn("[{}] Failed to get topic backlog {}: Namespace does not exist", clientAppId(), namespaceName);
-            throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
-        } catch (Exception e) {
-            log.error("[{}] Failed to get topic backlog {}", clientAppId(), namespaceName, e);
-            throw new RestException(e);
-        }
+        return ret.thenCompose(__ -> namespaceResources().getPoliciesAsync(namespaceName))
+                .thenCompose(__ -> {
+                    PersistentOfflineTopicStats offlineTopicStats =
+                            pulsar().getBrokerService().getOfflineTopicStat(topicName);
+                    if (offlineTopicStats != null) {
+                        // offline topic stat has a cost - so use cached value until TTL
+                        long elapsedMs = System.currentTimeMillis() - offlineTopicStats.statGeneratedAt.getTime();
+                        if (TimeUnit.MINUTES.convert(elapsedMs, TimeUnit.MILLISECONDS) < OFFLINE_TOPIC_STAT_TTL_MINS) {
+                            return CompletableFuture.completedFuture(offlineTopicStats);
+                        }
+                    }
 
-        PersistentOfflineTopicStats offlineTopicStats = null;
-        try {
+                    return pulsar().getBrokerService().getManagedLedgerConfig(topicName)
+                            .thenCompose(config -> {
+                                ManagedLedgerOfflineBacklog offlineTopicBacklog =
+                                        new ManagedLedgerOfflineBacklog(config.getDigestType(), config.getPassword(),
+                                                pulsar().getAdvertisedAddress(), false);
+                                try {
+                                    PersistentOfflineTopicStats estimateOfflineTopicStats =
+                                            offlineTopicBacklog.estimateUnloadedTopicBacklog(
+                                                    (ManagedLedgerFactoryImpl) pulsar().getManagedLedgerFactory(),
+                                                    topicName);
+                                    pulsar().getBrokerService()
+                                            .cacheOfflineTopicStats(topicName, estimateOfflineTopicStats);
+                                    return CompletableFuture.completedFuture(estimateOfflineTopicStats);
+                                } catch (Exception e) {
+                                    throw new RestException(e);
+                                }
+                            });
 
-            offlineTopicStats = pulsar().getBrokerService().getOfflineTopicStat(topicName);
-            if (offlineTopicStats != null) {
-                // offline topic stat has a cost - so use cached value until TTL
-                long elapsedMs = System.currentTimeMillis() - offlineTopicStats.statGeneratedAt.getTime();
-                if (TimeUnit.MINUTES.convert(elapsedMs, TimeUnit.MILLISECONDS) < OFFLINE_TOPIC_STAT_TTL_MINS) {
-                    return offlineTopicStats;
-                }
-            }
-            final ManagedLedgerConfig config = pulsar().getBrokerService().getManagedLedgerConfig(topicName)
-                    .get();
-            ManagedLedgerOfflineBacklog offlineTopicBacklog = new ManagedLedgerOfflineBacklog(config.getDigestType(),
-                    config.getPassword(), pulsar().getAdvertisedAddress(), false);
-            offlineTopicStats = offlineTopicBacklog.estimateUnloadedTopicBacklog(
-                    (ManagedLedgerFactoryImpl) pulsar().getManagedLedgerFactory(), topicName);
-            pulsar().getBrokerService().cacheOfflineTopicStats(topicName, offlineTopicStats);
-        } catch (Exception exception) {
-            throw new RestException(exception);
-        }
-        return offlineTopicStats;
+                });
     }
 
     protected CompletableFuture<Map<BacklogQuota.BacklogQuotaType, BacklogQuota>> internalGetBacklogQuota(
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 99c10c62f4b..a3689731436 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -48,7 +48,8 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.ResetCursorData;
 import org.apache.pulsar.common.policies.data.AuthAction;
-import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -795,14 +796,30 @@ public class PersistentTopics extends PersistentTopicsBase {
     @GET
     @Path("{property}/{cluster}/{namespace}/{topic}/backlog")
     @ApiOperation(hidden = true, value = "Get estimated backlog for offline topic.")
-    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
-            @ApiResponse(code = 404, message = "Namespace does not exist") })
-    public PersistentOfflineTopicStats getBacklog(@PathParam("property") String property,
-            @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist")})
+    public void getBacklog(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("property") String property,
+            @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(property, cluster, namespace, encodedTopic);
-        return internalGetBacklog(authoritative);
+        internalGetBacklogAsync(authoritative)
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    Throwable t = FutureUtil.unwrapCompletionException(ex);
+                    if (t instanceof MetadataStoreException.NotFoundException) {
+                        log.warn("[{}] Failed to get topic backlog {}: Namespace does not exist", clientAppId(),
+                                namespaceName);
+                        ex = new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
+                    } else if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to get estimated backlog for topic {}", clientAppId(), encodedTopic, ex);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 3f3c34d78d3..55bacfec03c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -59,7 +59,6 @@ import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
-import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
 import org.apache.pulsar.common.policies.data.PolicyName;
 import org.apache.pulsar.common.policies.data.PolicyOperation;
 import org.apache.pulsar.common.policies.data.PublishRate;
@@ -70,6 +69,8 @@ import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
 import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
 import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1848,8 +1849,9 @@ public class PersistentTopics extends PersistentTopicsBase {
     @ApiResponses(value = {
             @ApiResponse(code = 404, message = "Namespace does not exist"),
             @ApiResponse(code = 412, message = "Topic name is not valid"),
-            @ApiResponse(code = 503, message = "Failed to validate global cluster configuration") })
-    public PersistentOfflineTopicStats getBacklog(
+            @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
+    public void getBacklog(
+            @Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "Specify the tenant", required = true)
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
@@ -1859,7 +1861,20 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        return internalGetBacklog(authoritative);
+        internalGetBacklogAsync(authoritative)
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    Throwable t = FutureUtil.unwrapCompletionException(ex);
+                    if (t instanceof MetadataStoreException.NotFoundException) {
+                        log.warn("[{}] Failed to get topic backlog {}: Namespace does not exist", clientAppId(),
+                                namespaceName);
+                        ex = new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
+                    } else if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to get estimated backlog for topic {}", clientAppId(), encodedTopic, ex);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @PUT