You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/07 08:11:55 UTC
[pulsar] branch master updated: [Improve][Broker]Reduce PartitionedStats local REST call (#16916)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d1cf3cf19c5 [Improve][Broker]Reduce PartitionedStats local REST call (#16916)
d1cf3cf19c5 is described below
commit d1cf3cf19c588cca3fb14da28648f8d762659b9c
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Sun Aug 7 16:11:49 2022 +0800
[Improve][Broker]Reduce PartitionedStats local REST call (#16916)
---
.../broker/admin/impl/PersistentTopicsBase.java | 35 ++++++++++++++--------
1 file changed, 23 insertions(+), 12 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 6f48145b0ed..4b635f205e3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1342,17 +1342,29 @@ public class PersistentTopicsBase extends AdminResource {
return;
}
PartitionedTopicStatsImpl stats = new PartitionedTopicStatsImpl(partitionMetadata);
- List<CompletableFuture<TopicStats>> topicStatsFutureList = Lists.newArrayList();
+ List<CompletableFuture<TopicStats>> topicStatsFutureList = new ArrayList<>(partitionMetadata.partitions);
for (int i = 0; i < partitionMetadata.partitions; i++) {
- try {
- topicStatsFutureList
- .add(pulsar().getAdminClient().topics().getStatsAsync(
- (topicName.getPartition(i).toString()), getPreciseBacklog, subscriptionBacklogSize,
- getEarliestTimeInBacklog));
- } catch (PulsarServerException e) {
- asyncResponse.resume(new RestException(e));
- return;
- }
+ TopicName partition = topicName.getPartition(i);
+ topicStatsFutureList.add(
+ pulsar().getNamespaceService()
+ .isServiceUnitOwnedAsync(partition)
+ .thenCompose(owned -> {
+ if (owned) {
+ return getTopicReferenceAsync(partition)
+ .thenApply(ref ->
+ ref.getStats(getPreciseBacklog, subscriptionBacklogSize,
+ getEarliestTimeInBacklog));
+ } else {
+ try {
+ return pulsar().getAdminClient().topics().getStatsAsync(
+ partition.toString(), getPreciseBacklog, subscriptionBacklogSize,
+ getEarliestTimeInBacklog);
+ } catch (PulsarServerException e) {
+ return FutureUtil.failedFuture(e);
+ }
+ }
+ })
+ );
}
FutureUtil.waitForAll(topicStatsFutureList).handle((result, exception) -> {
@@ -1363,8 +1375,7 @@ public class PersistentTopicsBase extends AdminResource {
try {
stats.add(statFuture.get());
if (perPartition) {
- stats.getPartitions().put(topicName.getPartition(i).toString(),
- (TopicStatsImpl) statFuture.get());
+ stats.getPartitions().put(topicName.getPartition(i).toString(), statFuture.get());
}
} catch (Exception e) {
asyncResponse.resume(new RestException(e));