You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yo...@apache.org on 2021/12/23 14:41:08 UTC
[pulsar] 02/08: [Broker] Optimize ManagedLedger Ledger Ownership Check (#13222)
This is an automated email from the ASF dual-hosted git repository.
yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c57882972e981aa69907c06f84c5896cf82cab34
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Thu Dec 9 18:11:50 2021 -0600
[Broker] Optimize ManagedLedger Ledger Ownership Check (#13222)
(cherry picked from commit 02b8de039df38fa95656e7ca9b5c0babd25ff021)
---
.../apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 14 ++++++--------
1 file changed, 6 insertions(+), 8 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 02ea420..4fe5e9a 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1825,21 +1825,19 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
if (log.isDebugEnabled()) {
log.debug("[{}] Reading entry ledger {}: {}", name, position.getLedgerId(), position.getEntryId());
}
- if (!ledgers.containsKey(position.getLedgerId())) {
- log.error("[{}] Failed to get message with ledger {}:{} the ledgerId does not belong to this topic "
- + "or has been deleted.", name, position.getLedgerId(), position.getEntryId());
- callback.readEntryFailed(new ManagedLedgerException.NonRecoverableLedgerException("Message not found, "
- + "the ledgerId does not belong to this topic or has been deleted"), ctx);
- return;
- }
if (position.getLedgerId() == currentLedger.getId()) {
asyncReadEntry(currentLedger, position, callback, ctx);
- } else {
+ } else if (ledgers.containsKey(position.getLedgerId())) {
getLedgerHandle(position.getLedgerId()).thenAccept(ledger -> asyncReadEntry(ledger, position, callback, ctx)).exceptionally(ex -> {
log.error("[{}] Error opening ledger for reading at position {} - {}", name, position, ex.getMessage());
callback.readEntryFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()), ctx);
return null;
});
+ } else {
+ log.error("[{}] Failed to get message with ledger {}:{} the ledgerId does not belong to this topic "
+ + "or has been deleted.", name, position.getLedgerId(), position.getEntryId());
+ callback.readEntryFailed(new ManagedLedgerException.NonRecoverableLedgerException("Message not found, "
+ + "the ledgerId does not belong to this topic or has been deleted"), ctx);
}
}