You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/07/15 02:36:32 UTC
[pulsar] 01/15: [fix][broker]Fix getInternalStats occasional lack of LeaderInfo again (#16238)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit cbb060ec3e6486ee5a6df601f2724378e07b286c
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Fri Jul 1 10:19:19 2022 +0800
[fix][broker]Fix getInternalStats occasional lack of LeaderInfo again (#16238)
* Fix getInternalStats occasional lack of LeaderInfo again
* Make futures as wildcard
(cherry picked from commit a9af98050c0c50d3d9e25f1db50bf2df7584d2ba)
---
.../broker/service/persistent/PersistentTopic.java | 76 +++++++++++-----------
1 file changed, 37 insertions(+), 39 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index be5d6095e01..5be91078b14 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1972,40 +1972,43 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
stats.state = ml.getState().toString();
stats.ledgers = Lists.newArrayList();
- List<CompletableFuture<String>> futures = Lists.newArrayList();
+ Set<CompletableFuture<?>> futures = Sets.newConcurrentHashSet();
CompletableFuture<Set<String>> availableBookiesFuture =
brokerService.pulsar().getPulsarResources().getBookieResources().listAvailableBookiesAsync();
- futures.add(availableBookiesFuture.handle((strings, throwable) -> null));
- availableBookiesFuture.whenComplete((bookies, e) -> {
- if (e != null) {
- log.error("[{}] Failed to fetch available bookies.", topic, e);
- statFuture.completeExceptionally(e);
- } else {
- ml.getLedgersInfo().forEach((id, li) -> {
- LedgerInfo info = new LedgerInfo();
- info.ledgerId = li.getLedgerId();
- info.entries = li.getEntries();
- info.size = li.getSize();
- info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
- stats.ledgers.add(info);
- if (includeLedgerMetadata) {
- futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
- if (ex == null) {
- info.metadata = lMetadata;
- }
- return null;
- }));
- futures.add(ml.getEnsemblesAsync(li.getLedgerId()).handle((ensembles, ex) -> {
- if (ex == null) {
- info.underReplicated = !bookies.containsAll(ensembles.stream().map(BookieId::toString)
- .collect(Collectors.toList()));
+ futures.add(
+ availableBookiesFuture
+ .whenComplete((bookies, e) -> {
+ if (e != null) {
+ log.error("[{}] Failed to fetch available bookies.", topic, e);
+ statFuture.completeExceptionally(e);
+ } else {
+ ml.getLedgersInfo().forEach((id, li) -> {
+ LedgerInfo info = new LedgerInfo();
+ info.ledgerId = li.getLedgerId();
+ info.entries = li.getEntries();
+ info.size = li.getSize();
+ info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
+ stats.ledgers.add(info);
+ if (includeLedgerMetadata) {
+ futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
+ if (ex == null) {
+ info.metadata = lMetadata;
+ }
+ return null;
+ }));
+ futures.add(ml.getEnsemblesAsync(li.getLedgerId()).handle((ensembles, ex) -> {
+ if (ex == null) {
+ info.underReplicated =
+ !bookies.containsAll(ensembles.stream().map(BookieId::toString)
+ .collect(Collectors.toList()));
+ }
+ return null;
+ }));
}
- return null;
- }));
+ });
}
- });
- }
- });
+ })
+ );
// Add ledger info for compacted topic ledger if exist.
LedgerInfo info = new LedgerInfo();
@@ -2121,16 +2124,11 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
} else {
schemaStoreLedgersFuture.complete(null);
}
- schemaStoreLedgersFuture.thenRun(() -> {
- if (futures != null) {
- FutureUtil.waitForAll(futures).handle((res, ex) -> {
- statFuture.complete(stats);
- return null;
- });
- } else {
+ schemaStoreLedgersFuture.thenRun(() ->
+ FutureUtil.waitForAll(futures).handle((res, ex) -> {
statFuture.complete(stats);
- }
- }).exceptionally(e -> {
+ return null;
+ })).exceptionally(e -> {
statFuture.completeExceptionally(e);
return null;
});