You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/11 15:57:35 UTC

[pulsar] 09/11: [Broker] Optimize ManagedLedger Ledger Ownership Check (#13222)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit df0e021422ee17ee393dbb0bb9c125009ce5e5d2
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Thu Dec 9 18:11:50 2021 -0600

    [Broker] Optimize ManagedLedger Ledger Ownership Check (#13222)
    
    (cherry picked from commit 02b8de039df38fa95656e7ca9b5c0babd25ff021)
---
 .../apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java  | 14 ++++++--------
 1 file changed, 6 insertions(+), 8 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 7658ea7..f2fdb84 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1650,21 +1650,19 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         if (log.isDebugEnabled()) {
             log.debug("[{}] Reading entry ledger {}: {}", name, position.getLedgerId(), position.getEntryId());
         }
-        if (!ledgers.containsKey(position.getLedgerId())) {
-            log.error("[{}] Failed to get message with ledger {}:{} the ledgerId does not belong to this topic.",
-                name, position.getLedgerId(), position.getEntryId());
-            callback.readEntryFailed(new ManagedLedgerException.NonRecoverableLedgerException("Message not found, "
-                + "the ledgerId does not belong to this topic or has been deleted"), ctx);
-            return;
-        }
         if (position.getLedgerId() == currentLedger.getId()) {
             asyncReadEntry(currentLedger, position, callback, ctx);
-        } else {
+        } else if (ledgers.containsKey(position.getLedgerId())) {
             getLedgerHandle(position.getLedgerId()).thenAccept(ledger -> asyncReadEntry(ledger, position, callback, ctx)).exceptionally(ex -> {
                 log.error("[{}] Error opening ledger for reading at position {} - {}", name, position, ex.getMessage());
                 callback.readEntryFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()), ctx);
                 return null;
             });
+        } else {
+            log.error("[{}] Failed to get message with ledger {}:{} the ledgerId does not belong to this topic "
+                    + "or has been deleted.", name, position.getLedgerId(), position.getEntryId());
+            callback.readEntryFailed(new ManagedLedgerException.NonRecoverableLedgerException("Message not found, "
+                    + "the ledgerId does not belong to this topic or has been deleted"), ctx);
         }
 
     }