You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/07/01 02:19:27 UTC

[pulsar] branch master updated: [fix][broker]Fix getInternalStats occasional lack of LeaderInfo again (#16238)

This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a9af98050c0 [fix][broker]Fix getInternalStats occasional lack of LeaderInfo again (#16238)
a9af98050c0 is described below

commit a9af98050c0c50d3d9e25f1db50bf2df7584d2ba
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Fri Jul 1 10:19:19 2022 +0800

    [fix][broker]Fix getInternalStats occasional lack of LeaderInfo again (#16238)
    
    * Fix getInternalStats occasional lack of LeaderInfo again
    
    * Make futures as wildcard
---
 .../broker/service/persistent/PersistentTopic.java | 76 +++++++++++-----------
 1 file changed, 37 insertions(+), 39 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index ae44520ee31..5457f815b98 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1971,40 +1971,43 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         stats.state = ml.getState().toString();
 
         stats.ledgers = Lists.newArrayList();
-        List<CompletableFuture<String>> futures = Lists.newArrayList();
+        Set<CompletableFuture<?>> futures = Sets.newConcurrentHashSet();
         CompletableFuture<Set<String>> availableBookiesFuture =
                 brokerService.pulsar().getPulsarResources().getBookieResources().listAvailableBookiesAsync();
-        futures.add(availableBookiesFuture.handle((strings, throwable) -> null));
-        availableBookiesFuture.whenComplete((bookies, e) -> {
-            if (e != null) {
-                log.error("[{}] Failed to fetch available bookies.", topic, e);
-                statFuture.completeExceptionally(e);
-            } else {
-                ml.getLedgersInfo().forEach((id, li) -> {
-                    LedgerInfo info = new LedgerInfo();
-                    info.ledgerId = li.getLedgerId();
-                    info.entries = li.getEntries();
-                    info.size = li.getSize();
-                    info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
-                    stats.ledgers.add(info);
-                    if (includeLedgerMetadata) {
-                        futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
-                            if (ex == null) {
-                                info.metadata = lMetadata;
-                            }
-                            return null;
-                        }));
-                        futures.add(ml.getEnsemblesAsync(li.getLedgerId()).handle((ensembles, ex) -> {
-                            if (ex == null) {
-                                info.underReplicated = !bookies.containsAll(ensembles.stream().map(BookieId::toString)
-                                        .collect(Collectors.toList()));
+        futures.add(
+            availableBookiesFuture
+                .whenComplete((bookies, e) -> {
+                    if (e != null) {
+                        log.error("[{}] Failed to fetch available bookies.", topic, e);
+                        statFuture.completeExceptionally(e);
+                    } else {
+                        ml.getLedgersInfo().forEach((id, li) -> {
+                            LedgerInfo info = new LedgerInfo();
+                            info.ledgerId = li.getLedgerId();
+                            info.entries = li.getEntries();
+                            info.size = li.getSize();
+                            info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
+                            stats.ledgers.add(info);
+                            if (includeLedgerMetadata) {
+                                futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
+                                    if (ex == null) {
+                                        info.metadata = lMetadata;
+                                    }
+                                    return null;
+                                }));
+                                futures.add(ml.getEnsemblesAsync(li.getLedgerId()).handle((ensembles, ex) -> {
+                                    if (ex == null) {
+                                        info.underReplicated =
+                                            !bookies.containsAll(ensembles.stream().map(BookieId::toString)
+                                                .collect(Collectors.toList()));
+                                    }
+                                    return null;
+                                }));
                             }
-                            return null;
-                        }));
+                        });
                     }
-                });
-            }
-        });
+                })
+        );
 
         // Add ledger info for compacted topic ledger if exist.
         LedgerInfo info = new LedgerInfo();
@@ -2120,16 +2123,11 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         } else {
             schemaStoreLedgersFuture.complete(null);
         }
-        schemaStoreLedgersFuture.thenRun(() -> {
-            if (futures != null) {
-                FutureUtil.waitForAll(futures).handle((res, ex) -> {
-                    statFuture.complete(stats);
-                    return null;
-                });
-            } else {
+        schemaStoreLedgersFuture.thenRun(() ->
+            FutureUtil.waitForAll(futures).handle((res, ex) -> {
                 statFuture.complete(stats);
-            }
-        }).exceptionally(e -> {
+                return null;
+            })).exceptionally(e -> {
             statFuture.completeExceptionally(e);
             return null;
         });