You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/07 08:11:55 UTC

[pulsar] branch master updated: [Improve][Broker]Reduce PartitionedStats local REST call (#16916)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d1cf3cf19c5 [Improve][Broker]Reduce PartitionedStats local REST call (#16916)
d1cf3cf19c5 is described below

commit d1cf3cf19c588cca3fb14da28648f8d762659b9c
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Sun Aug 7 16:11:49 2022 +0800

    [Improve][Broker]Reduce PartitionedStats local REST call (#16916)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 35 ++++++++++++++--------
 1 file changed, 23 insertions(+), 12 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 6f48145b0ed..4b635f205e3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1342,17 +1342,29 @@ public class PersistentTopicsBase extends AdminResource {
                 return;
             }
             PartitionedTopicStatsImpl stats = new PartitionedTopicStatsImpl(partitionMetadata);
-            List<CompletableFuture<TopicStats>> topicStatsFutureList = Lists.newArrayList();
+            List<CompletableFuture<TopicStats>> topicStatsFutureList = new ArrayList<>(partitionMetadata.partitions);
             for (int i = 0; i < partitionMetadata.partitions; i++) {
-                try {
-                    topicStatsFutureList
-                            .add(pulsar().getAdminClient().topics().getStatsAsync(
-                                    (topicName.getPartition(i).toString()), getPreciseBacklog, subscriptionBacklogSize,
-                                    getEarliestTimeInBacklog));
-                } catch (PulsarServerException e) {
-                    asyncResponse.resume(new RestException(e));
-                    return;
-                }
+                TopicName partition = topicName.getPartition(i);
+                topicStatsFutureList.add(
+                    pulsar().getNamespaceService()
+                        .isServiceUnitOwnedAsync(partition)
+                        .thenCompose(owned -> {
+                            if (owned) {
+                                return getTopicReferenceAsync(partition)
+                                    .thenApply(ref ->
+                                        ref.getStats(getPreciseBacklog, subscriptionBacklogSize,
+                                            getEarliestTimeInBacklog));
+                            } else {
+                                try {
+                                    return pulsar().getAdminClient().topics().getStatsAsync(
+                                        partition.toString(), getPreciseBacklog, subscriptionBacklogSize,
+                                        getEarliestTimeInBacklog);
+                                } catch (PulsarServerException e) {
+                                    return FutureUtil.failedFuture(e);
+                                }
+                            }
+                        })
+                );
             }
 
             FutureUtil.waitForAll(topicStatsFutureList).handle((result, exception) -> {
@@ -1363,8 +1375,7 @@ public class PersistentTopicsBase extends AdminResource {
                         try {
                             stats.add(statFuture.get());
                             if (perPartition) {
-                                stats.getPartitions().put(topicName.getPartition(i).toString(),
-                                        (TopicStatsImpl) statFuture.get());
+                                stats.getPartitions().put(topicName.getPartition(i).toString(), statFuture.get());
                             }
                         } catch (Exception e) {
                             asyncResponse.resume(new RestException(e));