You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/27 08:49:11 UTC

[GitHub] [pulsar] AnonHxy opened a new pull request, #16238: [fix][broker]Fix getInternalStats occasional lack of LeaderInfo again

AnonHxy opened a new pull request, #16238:
URL: https://github.com/apache/pulsar/pull/16238

   ### Motivation
   * AdminApiTest is flaky. The testGetPartitionedStatsInternal test method fails sporadically.  This PR https://github.com/apache/pulsar/pull/13656 has tried to fix the bug, but I find the test `AdminApiTest#testGetPartitionedStatsInternal` can also be faiure.
    * The root cause is the `availableBookiesFuture#handle` can run before `availableBookiesFuture#whenComplete`. Because there is no guarantee that `CompleteFuture`  will run `handle` and `whenComplete` and so on in order. For examle,
   ```
       public static void main(String[] args) {
           CompletableFuture<String> future = new CompletableFuture<>();
           CompletableFuture<String> appFuture = future.thenApply(s -> s);
   
           appFuture.handle((res, throwable) -> {
               System.out.println("handle");
               return null;
           });
           appFuture.whenComplete((res, ex) -> {
               System.out.println("complete");
           });
           future.complete("hello");
       }
   ```
   the result will be 
   ```
   handle
   complete
   ```
   But if we remove `thenApply` like this:
   ```
    public static void main(String[] args) {
           CompletableFuture<String> future = new CompletableFuture<>();
           CompletableFuture<String> appFuture = future; // remove thenApply
   
           appFuture.handle((res, throwable) -> {
               System.out.println("handle");
               return null;
           });
           appFuture.whenComplete((res, ex) -> {
               System.out.println("complete");
           });
           future.complete("hello");
       }
   ```
   the result will be
   ```
   complete
   handle
   ```
   
   *Explain here the context, and why you're making that change. What is the problem you're trying to solve.*
   
   ### Modifications
   
   * Moving `handle`  after `whenComplete`
   * Also remove  checking `futures != null` because futures can not be null
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   
   ### Documentation
   
     
   - [x] `doc-not-needed` 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on pull request #16238: [fix][broker]Fix getInternalStats occasional lack of LeaderInfo again

Posted by GitBox <gi...@apache.org>.
HQebupt commented on PR #16238:
URL: https://github.com/apache/pulsar/pull/16238#issuecomment-1167085067

   Nice catch! 👍 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] AnonHxy commented on pull request #16238: [fix][broker]Fix getInternalStats occasional lack of LeaderInfo again

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on PR #16238:
URL: https://github.com/apache/pulsar/pull/16238#issuecomment-1167182819

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] AnonHxy commented on a diff in pull request #16238: [test][fix][broker]Fix getInternalStats occasional lack of LeaderInfo again

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on code in PR #16238:
URL: https://github.com/apache/pulsar/pull/16238#discussion_r907971130


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1971,40 +1971,43 @@ public CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean
         stats.state = ml.getState().toString();
 
         stats.ledgers = Lists.newArrayList();
-        List<CompletableFuture<String>> futures = Lists.newArrayList();
+        Set<CompletableFuture<String>> futures = Sets.newConcurrentHashSet();
         CompletableFuture<Set<String>> availableBookiesFuture =
                 brokerService.pulsar().getPulsarResources().getBookieResources().listAvailableBookiesAsync();
-        futures.add(availableBookiesFuture.handle((strings, throwable) -> null));
-        availableBookiesFuture.whenComplete((bookies, e) -> {
-            if (e != null) {
-                log.error("[{}] Failed to fetch available bookies.", topic, e);
-                statFuture.completeExceptionally(e);
-            } else {
-                ml.getLedgersInfo().forEach((id, li) -> {
-                    LedgerInfo info = new LedgerInfo();
-                    info.ledgerId = li.getLedgerId();
-                    info.entries = li.getEntries();
-                    info.size = li.getSize();
-                    info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
-                    stats.ledgers.add(info);
-                    if (includeLedgerMetadata) {
-                        futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
-                            if (ex == null) {
-                                info.metadata = lMetadata;
-                            }
-                            return null;
-                        }));
-                        futures.add(ml.getEnsemblesAsync(li.getLedgerId()).handle((ensembles, ex) -> {
-                            if (ex == null) {
-                                info.underReplicated = !bookies.containsAll(ensembles.stream().map(BookieId::toString)
-                                        .collect(Collectors.toList()));
+        futures.add(
+            availableBookiesFuture
+                .whenComplete((bookies, e) -> {
+                    if (e != null) {
+                        log.error("[{}] Failed to fetch available bookies.", topic, e);
+                        statFuture.completeExceptionally(e);
+                    } else {
+                        ml.getLedgersInfo().forEach((id, li) -> {
+                            LedgerInfo info = new LedgerInfo();
+                            info.ledgerId = li.getLedgerId();
+                            info.entries = li.getEntries();
+                            info.size = li.getSize();
+                            info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
+                            stats.ledgers.add(info);
+                            if (includeLedgerMetadata) {
+                                futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
+                                    if (ex == null) {
+                                        info.metadata = lMetadata;
+                                    }
+                                    return null;
+                                }));
+                                futures.add(ml.getEnsemblesAsync(li.getLedgerId()).handle((ensembles, ex) -> {
+                                    if (ex == null) {
+                                        info.underReplicated =
+                                            !bookies.containsAll(ensembles.stream().map(BookieId::toString)
+                                                .collect(Collectors.toList()));
+                                    }
+                                    return null;
+                                }));
                             }
-                            return null;
-                        }));
+                        });
                     }
-                });
-            }
-        });
+                })
+                .handle((strings, throwable) -> null));

Review Comment:
   Yes, it's necessary. Because `whenComplete` return `CompletableFuture<List<String>>`, but we need add`CompletableFuture<String>` to `futures` @HQebupt 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on a diff in pull request #16238: [test][fix][broker]Fix getInternalStats occasional lack of LeaderInfo again

Posted by GitBox <gi...@apache.org>.
HQebupt commented on code in PR #16238:
URL: https://github.com/apache/pulsar/pull/16238#discussion_r907968607


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1971,40 +1971,43 @@ public CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean
         stats.state = ml.getState().toString();
 
         stats.ledgers = Lists.newArrayList();
-        List<CompletableFuture<String>> futures = Lists.newArrayList();
+        Set<CompletableFuture<String>> futures = Sets.newConcurrentHashSet();
         CompletableFuture<Set<String>> availableBookiesFuture =
                 brokerService.pulsar().getPulsarResources().getBookieResources().listAvailableBookiesAsync();
-        futures.add(availableBookiesFuture.handle((strings, throwable) -> null));
-        availableBookiesFuture.whenComplete((bookies, e) -> {
-            if (e != null) {
-                log.error("[{}] Failed to fetch available bookies.", topic, e);
-                statFuture.completeExceptionally(e);
-            } else {
-                ml.getLedgersInfo().forEach((id, li) -> {
-                    LedgerInfo info = new LedgerInfo();
-                    info.ledgerId = li.getLedgerId();
-                    info.entries = li.getEntries();
-                    info.size = li.getSize();
-                    info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
-                    stats.ledgers.add(info);
-                    if (includeLedgerMetadata) {
-                        futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
-                            if (ex == null) {
-                                info.metadata = lMetadata;
-                            }
-                            return null;
-                        }));
-                        futures.add(ml.getEnsemblesAsync(li.getLedgerId()).handle((ensembles, ex) -> {
-                            if (ex == null) {
-                                info.underReplicated = !bookies.containsAll(ensembles.stream().map(BookieId::toString)
-                                        .collect(Collectors.toList()));
+        futures.add(
+            availableBookiesFuture
+                .whenComplete((bookies, e) -> {
+                    if (e != null) {
+                        log.error("[{}] Failed to fetch available bookies.", topic, e);
+                        statFuture.completeExceptionally(e);
+                    } else {
+                        ml.getLedgersInfo().forEach((id, li) -> {
+                            LedgerInfo info = new LedgerInfo();
+                            info.ledgerId = li.getLedgerId();
+                            info.entries = li.getEntries();
+                            info.size = li.getSize();
+                            info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
+                            stats.ledgers.add(info);
+                            if (includeLedgerMetadata) {
+                                futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
+                                    if (ex == null) {
+                                        info.metadata = lMetadata;
+                                    }
+                                    return null;
+                                }));
+                                futures.add(ml.getEnsemblesAsync(li.getLedgerId()).handle((ensembles, ex) -> {
+                                    if (ex == null) {
+                                        info.underReplicated =
+                                            !bookies.containsAll(ensembles.stream().map(BookieId::toString)
+                                                .collect(Collectors.toList()));
+                                    }
+                                    return null;
+                                }));
                             }
-                            return null;
-                        }));
+                        });
                     }
-                });
-            }
-        });
+                })
+                .handle((strings, throwable) -> null));

Review Comment:
   It the `handle ` necessary since it is similar with `whenComplete ?`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] AnonHxy commented on pull request #16238: [test][fix][broker]Fix getInternalStats occasional lack of LeaderInfo again

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on PR #16238:
URL: https://github.com/apache/pulsar/pull/16238#issuecomment-1168259706

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] AnonHxy commented on a diff in pull request #16238: [test][fix][broker]Fix getInternalStats occasional lack of LeaderInfo again

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on code in PR #16238:
URL: https://github.com/apache/pulsar/pull/16238#discussion_r907971130


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1971,40 +1971,43 @@ public CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean
         stats.state = ml.getState().toString();
 
         stats.ledgers = Lists.newArrayList();
-        List<CompletableFuture<String>> futures = Lists.newArrayList();
+        Set<CompletableFuture<String>> futures = Sets.newConcurrentHashSet();
         CompletableFuture<Set<String>> availableBookiesFuture =
                 brokerService.pulsar().getPulsarResources().getBookieResources().listAvailableBookiesAsync();
-        futures.add(availableBookiesFuture.handle((strings, throwable) -> null));
-        availableBookiesFuture.whenComplete((bookies, e) -> {
-            if (e != null) {
-                log.error("[{}] Failed to fetch available bookies.", topic, e);
-                statFuture.completeExceptionally(e);
-            } else {
-                ml.getLedgersInfo().forEach((id, li) -> {
-                    LedgerInfo info = new LedgerInfo();
-                    info.ledgerId = li.getLedgerId();
-                    info.entries = li.getEntries();
-                    info.size = li.getSize();
-                    info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
-                    stats.ledgers.add(info);
-                    if (includeLedgerMetadata) {
-                        futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
-                            if (ex == null) {
-                                info.metadata = lMetadata;
-                            }
-                            return null;
-                        }));
-                        futures.add(ml.getEnsemblesAsync(li.getLedgerId()).handle((ensembles, ex) -> {
-                            if (ex == null) {
-                                info.underReplicated = !bookies.containsAll(ensembles.stream().map(BookieId::toString)
-                                        .collect(Collectors.toList()));
+        futures.add(
+            availableBookiesFuture
+                .whenComplete((bookies, e) -> {
+                    if (e != null) {
+                        log.error("[{}] Failed to fetch available bookies.", topic, e);
+                        statFuture.completeExceptionally(e);
+                    } else {
+                        ml.getLedgersInfo().forEach((id, li) -> {
+                            LedgerInfo info = new LedgerInfo();
+                            info.ledgerId = li.getLedgerId();
+                            info.entries = li.getEntries();
+                            info.size = li.getSize();
+                            info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
+                            stats.ledgers.add(info);
+                            if (includeLedgerMetadata) {
+                                futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
+                                    if (ex == null) {
+                                        info.metadata = lMetadata;
+                                    }
+                                    return null;
+                                }));
+                                futures.add(ml.getEnsemblesAsync(li.getLedgerId()).handle((ensembles, ex) -> {
+                                    if (ex == null) {
+                                        info.underReplicated =
+                                            !bookies.containsAll(ensembles.stream().map(BookieId::toString)
+                                                .collect(Collectors.toList()));
+                                    }
+                                    return null;
+                                }));
                             }
-                            return null;
-                        }));
+                        });
                     }
-                });
-            }
-        });
+                })
+                .handle((strings, throwable) -> null));

Review Comment:
   `handle ` return type is `CompleteFutre<String>` but `whenComplet` return type is `CompleteFutre<List<String>>`. I can change `futures` as `Set<CompletableFuture<?>>` , so `handle` is no necessary now. @HQebupt 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Jason918 merged pull request #16238: [fix][broker]Fix getInternalStats occasional lack of LeaderInfo again

Posted by GitBox <gi...@apache.org>.
Jason918 merged PR #16238:
URL: https://github.com/apache/pulsar/pull/16238


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on a diff in pull request #16238: [fix][broker]Fix getInternalStats occasional lack of LeaderInfo again

Posted by GitBox <gi...@apache.org>.
lhotari commented on code in PR #16238:
URL: https://github.com/apache/pulsar/pull/16238#discussion_r917600097


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1971,40 +1971,43 @@ public CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean
         stats.state = ml.getState().toString();
 
         stats.ledgers = Lists.newArrayList();
-        List<CompletableFuture<String>> futures = Lists.newArrayList();
+        Set<CompletableFuture<?>> futures = Sets.newConcurrentHashSet();
         CompletableFuture<Set<String>> availableBookiesFuture =
                 brokerService.pulsar().getPulsarResources().getBookieResources().listAvailableBookiesAsync();
-        futures.add(availableBookiesFuture.handle((strings, throwable) -> null));
-        availableBookiesFuture.whenComplete((bookies, e) -> {
-            if (e != null) {
-                log.error("[{}] Failed to fetch available bookies.", topic, e);
-                statFuture.completeExceptionally(e);
-            } else {
-                ml.getLedgersInfo().forEach((id, li) -> {
-                    LedgerInfo info = new LedgerInfo();
-                    info.ledgerId = li.getLedgerId();
-                    info.entries = li.getEntries();
-                    info.size = li.getSize();
-                    info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
-                    stats.ledgers.add(info);
-                    if (includeLedgerMetadata) {
-                        futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
-                            if (ex == null) {
-                                info.metadata = lMetadata;
-                            }
-                            return null;
-                        }));
-                        futures.add(ml.getEnsemblesAsync(li.getLedgerId()).handle((ensembles, ex) -> {
-                            if (ex == null) {
-                                info.underReplicated = !bookies.containsAll(ensembles.stream().map(BookieId::toString)
-                                        .collect(Collectors.toList()));
+        futures.add(
+            availableBookiesFuture
+                .whenComplete((bookies, e) -> {
+                    if (e != null) {
+                        log.error("[{}] Failed to fetch available bookies.", topic, e);
+                        statFuture.completeExceptionally(e);
+                    } else {
+                        ml.getLedgersInfo().forEach((id, li) -> {
+                            LedgerInfo info = new LedgerInfo();
+                            info.ledgerId = li.getLedgerId();
+                            info.entries = li.getEntries();
+                            info.size = li.getSize();
+                            info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
+                            stats.ledgers.add(info);
+                            if (includeLedgerMetadata) {
+                                futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
+                                    if (ex == null) {
+                                        info.metadata = lMetadata;

Review Comment:
   Mutating `info.metadata` and `info.underReplicated` this doesn't seem to be thread safe so some problems remain.
   
   btw. There was a similar bug pattern which was fixed in #15918.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on a diff in pull request #16238: [test][fix][broker]Fix getInternalStats occasional lack of LeaderInfo again

Posted by GitBox <gi...@apache.org>.
HQebupt commented on code in PR #16238:
URL: https://github.com/apache/pulsar/pull/16238#discussion_r909410431


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1971,40 +1971,43 @@ public CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean
         stats.state = ml.getState().toString();
 
         stats.ledgers = Lists.newArrayList();
-        List<CompletableFuture<String>> futures = Lists.newArrayList();
+        Set<CompletableFuture<String>> futures = Sets.newConcurrentHashSet();
         CompletableFuture<Set<String>> availableBookiesFuture =
                 brokerService.pulsar().getPulsarResources().getBookieResources().listAvailableBookiesAsync();
-        futures.add(availableBookiesFuture.handle((strings, throwable) -> null));
-        availableBookiesFuture.whenComplete((bookies, e) -> {
-            if (e != null) {
-                log.error("[{}] Failed to fetch available bookies.", topic, e);
-                statFuture.completeExceptionally(e);
-            } else {
-                ml.getLedgersInfo().forEach((id, li) -> {
-                    LedgerInfo info = new LedgerInfo();
-                    info.ledgerId = li.getLedgerId();
-                    info.entries = li.getEntries();
-                    info.size = li.getSize();
-                    info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
-                    stats.ledgers.add(info);
-                    if (includeLedgerMetadata) {
-                        futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
-                            if (ex == null) {
-                                info.metadata = lMetadata;
-                            }
-                            return null;
-                        }));
-                        futures.add(ml.getEnsemblesAsync(li.getLedgerId()).handle((ensembles, ex) -> {
-                            if (ex == null) {
-                                info.underReplicated = !bookies.containsAll(ensembles.stream().map(BookieId::toString)
-                                        .collect(Collectors.toList()));
+        futures.add(
+            availableBookiesFuture
+                .whenComplete((bookies, e) -> {
+                    if (e != null) {
+                        log.error("[{}] Failed to fetch available bookies.", topic, e);
+                        statFuture.completeExceptionally(e);
+                    } else {
+                        ml.getLedgersInfo().forEach((id, li) -> {
+                            LedgerInfo info = new LedgerInfo();
+                            info.ledgerId = li.getLedgerId();
+                            info.entries = li.getEntries();
+                            info.size = li.getSize();
+                            info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
+                            stats.ledgers.add(info);
+                            if (includeLedgerMetadata) {
+                                futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
+                                    if (ex == null) {
+                                        info.metadata = lMetadata;
+                                    }
+                                    return null;
+                                }));
+                                futures.add(ml.getEnsemblesAsync(li.getLedgerId()).handle((ensembles, ex) -> {
+                                    if (ex == null) {
+                                        info.underReplicated =
+                                            !bookies.containsAll(ensembles.stream().map(BookieId::toString)
+                                                .collect(Collectors.toList()));
+                                    }
+                                    return null;
+                                }));
                             }
-                            return null;
-                        }));
+                        });
                     }
-                });
-            }
-        });
+                })
+                .handle((strings, throwable) -> null));

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] AnonHxy commented on pull request #16238: [test][fix][broker]Fix getInternalStats occasional lack of LeaderInfo again

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on PR #16238:
URL: https://github.com/apache/pulsar/pull/16238#issuecomment-1167443237

   @Jason918  @nicoloboschi @Technoboy- @codelipenghui  PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] AnonHxy commented on a diff in pull request #16238: [test][fix][broker]Fix getInternalStats occasional lack of LeaderInfo again

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on code in PR #16238:
URL: https://github.com/apache/pulsar/pull/16238#discussion_r907971130


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1971,40 +1971,43 @@ public CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean
         stats.state = ml.getState().toString();
 
         stats.ledgers = Lists.newArrayList();
-        List<CompletableFuture<String>> futures = Lists.newArrayList();
+        Set<CompletableFuture<String>> futures = Sets.newConcurrentHashSet();
         CompletableFuture<Set<String>> availableBookiesFuture =
                 brokerService.pulsar().getPulsarResources().getBookieResources().listAvailableBookiesAsync();
-        futures.add(availableBookiesFuture.handle((strings, throwable) -> null));
-        availableBookiesFuture.whenComplete((bookies, e) -> {
-            if (e != null) {
-                log.error("[{}] Failed to fetch available bookies.", topic, e);
-                statFuture.completeExceptionally(e);
-            } else {
-                ml.getLedgersInfo().forEach((id, li) -> {
-                    LedgerInfo info = new LedgerInfo();
-                    info.ledgerId = li.getLedgerId();
-                    info.entries = li.getEntries();
-                    info.size = li.getSize();
-                    info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
-                    stats.ledgers.add(info);
-                    if (includeLedgerMetadata) {
-                        futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
-                            if (ex == null) {
-                                info.metadata = lMetadata;
-                            }
-                            return null;
-                        }));
-                        futures.add(ml.getEnsemblesAsync(li.getLedgerId()).handle((ensembles, ex) -> {
-                            if (ex == null) {
-                                info.underReplicated = !bookies.containsAll(ensembles.stream().map(BookieId::toString)
-                                        .collect(Collectors.toList()));
+        futures.add(
+            availableBookiesFuture
+                .whenComplete((bookies, e) -> {
+                    if (e != null) {
+                        log.error("[{}] Failed to fetch available bookies.", topic, e);
+                        statFuture.completeExceptionally(e);
+                    } else {
+                        ml.getLedgersInfo().forEach((id, li) -> {
+                            LedgerInfo info = new LedgerInfo();
+                            info.ledgerId = li.getLedgerId();
+                            info.entries = li.getEntries();
+                            info.size = li.getSize();
+                            info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
+                            stats.ledgers.add(info);
+                            if (includeLedgerMetadata) {
+                                futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
+                                    if (ex == null) {
+                                        info.metadata = lMetadata;
+                                    }
+                                    return null;
+                                }));
+                                futures.add(ml.getEnsemblesAsync(li.getLedgerId()).handle((ensembles, ex) -> {
+                                    if (ex == null) {
+                                        info.underReplicated =
+                                            !bookies.containsAll(ensembles.stream().map(BookieId::toString)
+                                                .collect(Collectors.toList()));
+                                    }
+                                    return null;
+                                }));
                             }
-                            return null;
-                        }));
+                        });
                     }
-                });
-            }
-        });
+                })
+                .handle((strings, throwable) -> null));

Review Comment:
   Yes, it's necessary. Because `whenComplete` return CompletableFuture<List<String>>, but we need `CompletableFuture<String>` to `futures` @HQebupt 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org