You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/11 15:57:35 UTC
[pulsar] 09/11: [Broker] Optimize ManagedLedger Ledger Ownership Check (#13222)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit df0e021422ee17ee393dbb0bb9c125009ce5e5d2
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Thu Dec 9 18:11:50 2021 -0600
[Broker] Optimize ManagedLedger Ledger Ownership Check (#13222)
(cherry picked from commit 02b8de039df38fa95656e7ca9b5c0babd25ff021)
---
.../apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 14 ++++++--------
1 file changed, 6 insertions(+), 8 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 7658ea7..f2fdb84 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1650,21 +1650,19 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
if (log.isDebugEnabled()) {
log.debug("[{}] Reading entry ledger {}: {}", name, position.getLedgerId(), position.getEntryId());
}
- if (!ledgers.containsKey(position.getLedgerId())) {
- log.error("[{}] Failed to get message with ledger {}:{} the ledgerId does not belong to this topic.",
- name, position.getLedgerId(), position.getEntryId());
- callback.readEntryFailed(new ManagedLedgerException.NonRecoverableLedgerException("Message not found, "
- + "the ledgerId does not belong to this topic or has been deleted"), ctx);
- return;
- }
if (position.getLedgerId() == currentLedger.getId()) {
asyncReadEntry(currentLedger, position, callback, ctx);
- } else {
+ } else if (ledgers.containsKey(position.getLedgerId())) {
getLedgerHandle(position.getLedgerId()).thenAccept(ledger -> asyncReadEntry(ledger, position, callback, ctx)).exceptionally(ex -> {
log.error("[{}] Error opening ledger for reading at position {} - {}", name, position, ex.getMessage());
callback.readEntryFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()), ctx);
return null;
});
+ } else {
+ log.error("[{}] Failed to get message with ledger {}:{} the ledgerId does not belong to this topic "
+ + "or has been deleted.", name, position.getLedgerId(), position.getEntryId());
+ callback.readEntryFailed(new ManagedLedgerException.NonRecoverableLedgerException("Message not found, "
+ + "the ledgerId does not belong to this topic or has been deleted"), ctx);
}
}