You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/07/02 04:04:58 UTC

[pulsar] branch branch-2.9 updated: [fix][broker]Fix getInternalStats occasional lack of LeaderInfo again (#16238)

This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 15895c3ab0c [fix][broker]Fix getInternalStats occasional lack of LeaderInfo again (#16238)
15895c3ab0c is described below

commit 15895c3ab0cc9ec2146ec95deddb9cd4c6a0b283
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Fri Jul 1 10:19:19 2022 +0800

    [fix][broker]Fix getInternalStats occasional lack of LeaderInfo again (#16238)
    
    * Fix getInternalStats occasional lack of LeaderInfo again
    
    * Make futures as wildcard
    
    (cherry picked from commit a9af98050c0c50d3d9e25f1db50bf2df7584d2ba)
---
 .../broker/service/persistent/PersistentTopic.java | 76 +++++++++++-----------
 1 file changed, 37 insertions(+), 39 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 129333581ba..8c592a7e29c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2023,40 +2023,43 @@ public class PersistentTopic extends AbstractTopic
         stats.state = ml.getState().toString();
 
         stats.ledgers = Lists.newArrayList();
-        List<CompletableFuture<String>> futures = Lists.newArrayList();
+        Set<CompletableFuture<?>> futures = Sets.newConcurrentHashSet();
         CompletableFuture<Set<String>> availableBookiesFuture =
                 brokerService.pulsar().getPulsarResources().getBookieResources().listAvailableBookiesAsync();
-        futures.add(availableBookiesFuture.handle((strings, throwable) -> null));
-        availableBookiesFuture.whenComplete((bookies, e) -> {
-            if (e != null) {
-                log.error("[{}] Failed to fetch available bookies.", topic, e);
-                statFuture.completeExceptionally(e);
-            } else {
-                ml.getLedgersInfo().forEach((id, li) -> {
-                    LedgerInfo info = new LedgerInfo();
-                    info.ledgerId = li.getLedgerId();
-                    info.entries = li.getEntries();
-                    info.size = li.getSize();
-                    info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
-                    stats.ledgers.add(info);
-                    if (includeLedgerMetadata) {
-                        futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
-                            if (ex == null) {
-                                info.metadata = lMetadata;
-                            }
-                            return null;
-                        }));
-                        futures.add(ml.getEnsemblesAsync(li.getLedgerId()).handle((ensembles, ex) -> {
-                            if (ex == null) {
-                                info.underReplicated = !bookies.containsAll(ensembles.stream().map(BookieId::toString)
-                                        .collect(Collectors.toList()));
+        futures.add(
+            availableBookiesFuture
+                .whenComplete((bookies, e) -> {
+                    if (e != null) {
+                        log.error("[{}] Failed to fetch available bookies.", topic, e);
+                        statFuture.completeExceptionally(e);
+                    } else {
+                        ml.getLedgersInfo().forEach((id, li) -> {
+                            LedgerInfo info = new LedgerInfo();
+                            info.ledgerId = li.getLedgerId();
+                            info.entries = li.getEntries();
+                            info.size = li.getSize();
+                            info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
+                            stats.ledgers.add(info);
+                            if (includeLedgerMetadata) {
+                                futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
+                                    if (ex == null) {
+                                        info.metadata = lMetadata;
+                                    }
+                                    return null;
+                                }));
+                                futures.add(ml.getEnsemblesAsync(li.getLedgerId()).handle((ensembles, ex) -> {
+                                    if (ex == null) {
+                                        info.underReplicated =
+                                            !bookies.containsAll(ensembles.stream().map(BookieId::toString)
+                                                .collect(Collectors.toList()));
+                                    }
+                                    return null;
+                                }));
                             }
-                            return null;
-                        }));
+                        });
                     }
-                });
-            }
-        });
+                })
+        );
 
         // Add ledger info for compacted topic ledger if exist.
         LedgerInfo info = new LedgerInfo();
@@ -2172,16 +2175,11 @@ public class PersistentTopic extends AbstractTopic
         } else {
             schemaStoreLedgersFuture.complete(null);
         }
-        schemaStoreLedgersFuture.thenRun(() -> {
-            if (futures != null) {
-                FutureUtil.waitForAll(futures).handle((res, ex) -> {
-                    statFuture.complete(stats);
-                    return null;
-                });
-            } else {
+        schemaStoreLedgersFuture.thenRun(() ->
+            FutureUtil.waitForAll(futures).handle((res, ex) -> {
                 statFuture.complete(stats);
-            }
-        }).exceptionally(e -> {
+                return null;
+            })).exceptionally(e -> {
             statFuture.completeExceptionally(e);
             return null;
         });