You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/28 00:39:25 UTC

[GitHub] [pulsar] mattisonchao commented on a diff in pull request #16218: [improve][broker][PIP-149]make getBacklog method async

mattisonchao commented on code in PR #16218:
URL: https://github.com/apache/pulsar/pull/16218#discussion_r907917870


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -3040,44 +3040,46 @@ private Response generateResponseWithEntry(Entry entry) throws IOException {
         return responseBuilder.entity(stream).build();
     }
 
-    protected PersistentOfflineTopicStats internalGetBacklog(boolean authoritative) {
+    protected CompletableFuture<PersistentOfflineTopicStats> internalGetBacklogAsync(boolean authoritative) {
+        CompletableFuture<Void> ret;
         if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
+            ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        } else {
+            ret = CompletableFuture.completedFuture(null);
         }
         // Validate that namespace exists, throw 404 if it doesn't exist
         // note that we do not want to load the topic and hence skip authorization check
-        try {
-            namespaceResources().getPolicies(namespaceName);
-        } catch (MetadataStoreException.NotFoundException e) {
-            log.warn("[{}] Failed to get topic backlog {}: Namespace does not exist", clientAppId(), namespaceName);
-            throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
-        } catch (Exception e) {
-            log.error("[{}] Failed to get topic backlog {}", clientAppId(), namespaceName, e);
-            throw new RestException(e);
-        }
+        return ret.thenCompose(__ -> namespaceResources().getPoliciesAsync(namespaceName))
+                .thenCompose(__ -> {
+                    PersistentOfflineTopicStats offlineTopicStats =
+                            pulsar().getBrokerService().getOfflineTopicStat(topicName);
+                    if (offlineTopicStats != null) {
+                        // offline topic stat has a cost - so use cached value until TTL
+                        long elapsedMs = System.currentTimeMillis() - offlineTopicStats.statGeneratedAt.getTime();
+                        if (TimeUnit.MINUTES.convert(elapsedMs, TimeUnit.MILLISECONDS) < OFFLINE_TOPIC_STAT_TTL_MINS) {
+                            return CompletableFuture.completedFuture(offlineTopicStats);
+                        }
+                    }
 
-        PersistentOfflineTopicStats offlineTopicStats = null;
-        try {
+                    return pulsar().getBrokerService().getManagedLedgerConfig(topicName)
+                            .thenCompose(config -> {
+                                ManagedLedgerOfflineBacklog offlineTopicBacklog =
+                                        new ManagedLedgerOfflineBacklog(config.getDigestType(), config.getPassword(),
+                                                pulsar().getAdvertisedAddress(), false);
+                                try {
+                                    PersistentOfflineTopicStats estimateOfflineTopicStats =
+                                            offlineTopicBacklog.estimateUnloadedTopicBacklog(

Review Comment:
   I'm not sure, maybe we can make this method pure async.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org