You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/06/06 08:16:59 UTC
[pulsar] 05/06: [ML] Fix race condition in getManagedLedgerInternalStats when includeLedgerMetadata=true (#15918)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ec0215c1cf613fc71899d6156920b0f7c1e00528
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Sat Jun 4 10:36:00 2022 +0300
[ML] Fix race condition in getManagedLedgerInternalStats when includeLedgerMetadata=true (#15918)
- getLedgerMetadata is an asynchronous operation and the final result shouldn't complete before the
metadata for all ledgers has been retrieved
(cherry picked from commit fe19b3ca949009c952270a64ba94dbb329ea572f)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 86 +++++++++++++---------
1 file changed, 51 insertions(+), 35 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 1c7297d880a..87f97ca8329 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -37,6 +37,7 @@ import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.time.Clock;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -4029,45 +4030,60 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
stats.lastConfirmedEntry = this.getLastConfirmedEntry().toString();
stats.state = this.getState().toString();
- stats.ledgers = Lists.newArrayList();
- this.getLedgersInfo().forEach((id, li) -> {
- ManagedLedgerInternalStats.LedgerInfo info = new ManagedLedgerInternalStats.LedgerInfo();
- info.ledgerId = li.getLedgerId();
- info.entries = li.getEntries();
- info.size = li.getSize();
- info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
- stats.ledgers.add(info);
- if (includeLedgerMetadata) {
- this.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
- if (ex == null) {
- info.metadata = lMetadata;
- }
+ stats.cursors = Maps.newTreeMap();
+ this.getCursors().forEach(c -> {
+ ManagedCursorImpl cursor = (ManagedCursorImpl) c;
+ PersistentTopicInternalStats.CursorStats cs = new PersistentTopicInternalStats.CursorStats();
+ cs.markDeletePosition = cursor.getMarkDeletedPosition().toString();
+ cs.readPosition = cursor.getReadPosition().toString();
+ cs.waitingReadOp = cursor.hasPendingReadRequest();
+ cs.pendingReadOps = cursor.getPendingReadOpsCount();
+ cs.messagesConsumedCounter = cursor.getMessagesConsumedCounter();
+ cs.cursorLedger = cursor.getCursorLedger();
+ cs.cursorLedgerLastEntry = cursor.getCursorLedgerLastEntry();
+ cs.individuallyDeletedMessages = cursor.getIndividuallyDeletedMessages();
+ cs.lastLedgerSwitchTimestamp = DateFormatter.format(cursor.getLastLedgerSwitchTimestamp());
+ cs.state = cursor.getState();
+ cs.numberOfEntriesSinceFirstNotAckedMessage = cursor.getNumberOfEntriesSinceFirstNotAckedMessage();
+ cs.totalNonContiguousDeletedMessagesRange = cursor.getTotalNonContiguousDeletedMessagesRange();
+ cs.properties = cursor.getProperties();
+ stats.cursors.put(cursor.getName(), cs);
+ });
+
+ // make a snapshot of the ledgers infos since we are iterating it twice when metadata is included
+ // a list is sufficient since there's no need to lookup by the ledger id
+ List<LedgerInfo> ledgersInfos = new ArrayList<>(this.getLedgersInfo().values());
+
+ // add asynchronous metadata retrieval operations to a hashmap
+ Map<Long, CompletableFuture<String>> ledgerMetadataFutures = new HashMap();
+ if (includeLedgerMetadata) {
+ ledgersInfos.forEach(li -> {
+ long ledgerId = li.getLedgerId();
+ ledgerMetadataFutures.put(ledgerId, this.getLedgerMetadata(ledgerId).exceptionally(throwable -> {
+ log.warn("Getting metadata for ledger {} failed.", ledgerId, throwable);
return null;
- });
- }
+ }));
+ });
+ }
- stats.cursors = Maps.newTreeMap();
- this.getCursors().forEach(c -> {
- ManagedCursorImpl cursor = (ManagedCursorImpl) c;
- PersistentTopicInternalStats.CursorStats cs = new PersistentTopicInternalStats.CursorStats();
- cs.markDeletePosition = cursor.getMarkDeletedPosition().toString();
- cs.readPosition = cursor.getReadPosition().toString();
- cs.waitingReadOp = cursor.hasPendingReadRequest();
- cs.pendingReadOps = cursor.getPendingReadOpsCount();
- cs.messagesConsumedCounter = cursor.getMessagesConsumedCounter();
- cs.cursorLedger = cursor.getCursorLedger();
- cs.cursorLedgerLastEntry = cursor.getCursorLedgerLastEntry();
- cs.individuallyDeletedMessages = cursor.getIndividuallyDeletedMessages();
- cs.lastLedgerSwitchTimestamp = DateFormatter.format(cursor.getLastLedgerSwitchTimestamp());
- cs.state = cursor.getState();
- cs.numberOfEntriesSinceFirstNotAckedMessage =
- cursor.getNumberOfEntriesSinceFirstNotAckedMessage();
- cs.totalNonContiguousDeletedMessagesRange = cursor.getTotalNonContiguousDeletedMessagesRange();
- cs.properties = cursor.getProperties();
- stats.cursors.put(cursor.getName(), cs);
+ // wait until metadata has been retrieved
+ FutureUtil.waitForAll(ledgerMetadataFutures.values()).thenAccept(__ -> {
+ stats.ledgers = Lists.newArrayList();
+ ledgersInfos.forEach(li -> {
+ ManagedLedgerInternalStats.LedgerInfo info = new ManagedLedgerInternalStats.LedgerInfo();
+ info.ledgerId = li.getLedgerId();
+ info.entries = li.getEntries();
+ info.size = li.getSize();
+ info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
+ if (includeLedgerMetadata) {
+ // lookup metadata from the hashmap which contains completed async operations
+ info.metadata = ledgerMetadataFutures.get(li.getLedgerId()).getNow(null);
+ }
+ stats.ledgers.add(info);
});
+ statFuture.complete(stats);
});
- statFuture.complete(stats);
+
return statFuture;
}