You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/07/29 07:07:27 UTC
[pulsar] 04/09: [fix][broker] fix No such ledger exception (#16420)
This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 0dc2cede6c76d23e02de039961977e38314a8d20
Author: LinChen <15...@qq.com>
AuthorDate: Fri Jul 8 21:36:12 2022 +0800
[fix][broker] fix No such ledger exception (#16420)
* fix No such ledger exception
* move currentLedgerEntries and currentLedgerSize
* move ledgers.put to operationComplete
* fix ledgers.put
* use ledgersTmp to write zookkeeper,after sucess,update ldgers
* update updateLedgersListAfterRollover
* put lh to ledgersTmp
* extend buildManagedLedgerInfo
* getManagedLedgerInfo(newLedger) after get metadataMutex
(cherry picked from commit 2e5fbbc09af28d1ba4d159ee0cdb71ec7735bd29)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 32 ++++++++++++++--------
1 file changed, 20 insertions(+), 12 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 44e0ed96a25..3a35d71736e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1417,11 +1417,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
lastLedgerCreationFailureTimestamp = clock.millis();
} else {
log.info("[{}] Created new ledger {}", name, lh.getId());
- ledgers.put(lh.getId(), LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build());
- currentLedger = lh;
- currentLedgerEntries = 0;
- currentLedgerSize = 0;
-
+ LedgerInfo newLedger = LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build();
final MetaStoreCallback<Void> cb = new MetaStoreCallback<Void>() {
@Override
public void operationComplete(Void v, Stat stat) {
@@ -1429,6 +1425,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
log.debug("[{}] Updating of ledgers list after create complete. version={}", name, stat);
}
ledgersStat = stat;
+ ledgers.put(lh.getId(), newLedger);
+ currentLedger = lh;
+ currentLedgerEntries = 0;
+ currentLedgerSize = 0;
metadataMutex.unlock();
updateLedgersIdsComplete(stat);
synchronized (ManagedLedgerImpl.this) {
@@ -1443,8 +1443,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
@Override
public void operationFailed(MetaStoreException e) {
log.warn("[{}] Error updating meta data with the new list of ledgers: {}", name, e.getMessage());
- // Remove the ledger, since we failed to update the list
- ledgers.remove(lh.getId());
mbean.startDataLedgerDeleteOp();
bookKeeper.asyncDeleteLedger(lh.getId(), (rc1, ctx1) -> {
mbean.endDataLedgerDeleteOp();
@@ -1479,21 +1477,22 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
}
};
- updateLedgersListAfterRollover(cb);
+ updateLedgersListAfterRollover(cb, newLedger);
}
}
-
- private void updateLedgersListAfterRollover(MetaStoreCallback<Void> callback) {
+ private void updateLedgersListAfterRollover(MetaStoreCallback<Void> callback, LedgerInfo newLedger) {
if (!metadataMutex.tryLock()) {
// Defer update for later
- scheduledExecutor.schedule(() -> updateLedgersListAfterRollover(callback), 100, TimeUnit.MILLISECONDS);
+ scheduledExecutor.schedule(() -> updateLedgersListAfterRollover(callback, newLedger),
+ 100, TimeUnit.MILLISECONDS);
return;
}
if (log.isDebugEnabled()) {
log.debug("[{}] Updating ledgers ids with new ledger. version={}", name, ledgersStat);
}
- store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, callback);
+ ManagedLedgerInfo mlInfo = getManagedLedgerInfo(newLedger);
+ store.asyncUpdateLedgerIds(name, mlInfo, ledgersStat, callback);
}
public synchronized void updateLedgersIdsComplete(Stat stat) {
@@ -3483,8 +3482,17 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
return mlInfo.build();
}
+ private ManagedLedgerInfo getManagedLedgerInfo(LedgerInfo newLedger) {
+ ManagedLedgerInfo.Builder mlInfo = ManagedLedgerInfo.newBuilder().addAllLedgerInfo(ledgers.values())
+ .addLedgerInfo(newLedger);
+ return buildManagedLedgerInfo(mlInfo);
+ }
private ManagedLedgerInfo buildManagedLedgerInfo(Map<Long, LedgerInfo> ledgers) {
ManagedLedgerInfo.Builder mlInfo = ManagedLedgerInfo.newBuilder().addAllLedgerInfo(ledgers.values());
+ return buildManagedLedgerInfo(mlInfo);
+ }
+
+ private ManagedLedgerInfo buildManagedLedgerInfo(ManagedLedgerInfo.Builder mlInfo) {
if (state == State.Terminated) {
mlInfo.setTerminatedPosition(NestedPositionInfo.newBuilder().setLedgerId(lastConfirmedEntry.getLedgerId())
.setEntryId(lastConfirmedEntry.getEntryId()));