You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/06/06 08:16:59 UTC

[pulsar] 05/06: [ML] Fix race condition in getManagedLedgerInternalStats when includeLedgerMetadata=true (#15918)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ec0215c1cf613fc71899d6156920b0f7c1e00528
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Sat Jun 4 10:36:00 2022 +0300

    [ML] Fix race condition in getManagedLedgerInternalStats when includeLedgerMetadata=true (#15918)
    
    - getLedgerMetadata is an asynchronous operation and the final result shouldn't complete before the
      metadata for all ledgers has been retrieved
    
    (cherry picked from commit fe19b3ca949009c952270a64ba94dbb329ea572f)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 86 +++++++++++++---------
 1 file changed, 51 insertions(+), 35 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 1c7297d880a..87f97ca8329 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -37,6 +37,7 @@ import io.netty.util.Recycler.Handle;
 import io.netty.util.ReferenceCountUtil;
 import java.io.IOException;
 import java.time.Clock;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -4029,45 +4030,60 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         stats.lastConfirmedEntry = this.getLastConfirmedEntry().toString();
         stats.state = this.getState().toString();
 
-        stats.ledgers = Lists.newArrayList();
-        this.getLedgersInfo().forEach((id, li) -> {
-            ManagedLedgerInternalStats.LedgerInfo info = new ManagedLedgerInternalStats.LedgerInfo();
-            info.ledgerId = li.getLedgerId();
-            info.entries = li.getEntries();
-            info.size = li.getSize();
-            info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
-            stats.ledgers.add(info);
-            if (includeLedgerMetadata) {
-                this.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
-                    if (ex == null) {
-                        info.metadata = lMetadata;
-                    }
+        stats.cursors = Maps.newTreeMap();
+        this.getCursors().forEach(c -> {
+            ManagedCursorImpl cursor = (ManagedCursorImpl) c;
+            PersistentTopicInternalStats.CursorStats cs = new PersistentTopicInternalStats.CursorStats();
+            cs.markDeletePosition = cursor.getMarkDeletedPosition().toString();
+            cs.readPosition = cursor.getReadPosition().toString();
+            cs.waitingReadOp = cursor.hasPendingReadRequest();
+            cs.pendingReadOps = cursor.getPendingReadOpsCount();
+            cs.messagesConsumedCounter = cursor.getMessagesConsumedCounter();
+            cs.cursorLedger = cursor.getCursorLedger();
+            cs.cursorLedgerLastEntry = cursor.getCursorLedgerLastEntry();
+            cs.individuallyDeletedMessages = cursor.getIndividuallyDeletedMessages();
+            cs.lastLedgerSwitchTimestamp = DateFormatter.format(cursor.getLastLedgerSwitchTimestamp());
+            cs.state = cursor.getState();
+            cs.numberOfEntriesSinceFirstNotAckedMessage = cursor.getNumberOfEntriesSinceFirstNotAckedMessage();
+            cs.totalNonContiguousDeletedMessagesRange = cursor.getTotalNonContiguousDeletedMessagesRange();
+            cs.properties = cursor.getProperties();
+            stats.cursors.put(cursor.getName(), cs);
+        });
+
+        // make a snapshot of the ledgers infos since we are iterating it twice when metadata is included
+        // a list is sufficient since there's no need to lookup by the ledger id
+        List<LedgerInfo> ledgersInfos = new ArrayList<>(this.getLedgersInfo().values());
+
+        // add asynchronous metadata retrieval operations to a hashmap
+        Map<Long, CompletableFuture<String>> ledgerMetadataFutures = new HashMap();
+        if (includeLedgerMetadata) {
+            ledgersInfos.forEach(li -> {
+                long ledgerId = li.getLedgerId();
+                ledgerMetadataFutures.put(ledgerId, this.getLedgerMetadata(ledgerId).exceptionally(throwable -> {
+                    log.warn("Getting metadata for ledger {} failed.", ledgerId, throwable);
                     return null;
-                });
-            }
+                }));
+            });
+        }
 
-            stats.cursors = Maps.newTreeMap();
-            this.getCursors().forEach(c -> {
-                ManagedCursorImpl cursor = (ManagedCursorImpl) c;
-                PersistentTopicInternalStats.CursorStats cs = new PersistentTopicInternalStats.CursorStats();
-                cs.markDeletePosition = cursor.getMarkDeletedPosition().toString();
-                cs.readPosition = cursor.getReadPosition().toString();
-                cs.waitingReadOp = cursor.hasPendingReadRequest();
-                cs.pendingReadOps = cursor.getPendingReadOpsCount();
-                cs.messagesConsumedCounter = cursor.getMessagesConsumedCounter();
-                cs.cursorLedger = cursor.getCursorLedger();
-                cs.cursorLedgerLastEntry = cursor.getCursorLedgerLastEntry();
-                cs.individuallyDeletedMessages = cursor.getIndividuallyDeletedMessages();
-                cs.lastLedgerSwitchTimestamp = DateFormatter.format(cursor.getLastLedgerSwitchTimestamp());
-                cs.state = cursor.getState();
-                cs.numberOfEntriesSinceFirstNotAckedMessage =
-                        cursor.getNumberOfEntriesSinceFirstNotAckedMessage();
-                cs.totalNonContiguousDeletedMessagesRange = cursor.getTotalNonContiguousDeletedMessagesRange();
-                cs.properties = cursor.getProperties();
-                stats.cursors.put(cursor.getName(), cs);
+        // wait until metadata has been retrieved
+        FutureUtil.waitForAll(ledgerMetadataFutures.values()).thenAccept(__ -> {
+            stats.ledgers = Lists.newArrayList();
+            ledgersInfos.forEach(li -> {
+                ManagedLedgerInternalStats.LedgerInfo info = new ManagedLedgerInternalStats.LedgerInfo();
+                info.ledgerId = li.getLedgerId();
+                info.entries = li.getEntries();
+                info.size = li.getSize();
+                info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
+                if (includeLedgerMetadata) {
+                    // lookup metadata from the hashmap which contains completed async operations
+                    info.metadata = ledgerMetadataFutures.get(li.getLedgerId()).getNow(null);
+                }
+                stats.ledgers.add(info);
             });
+            statFuture.complete(stats);
         });
-        statFuture.complete(stats);
+
         return statFuture;
     }