You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ho...@apache.org on 2022/12/30 06:02:17 UTC
[pulsar] branch master updated: [fix][broker] Fix estimateBacklogFromPosition if position is greater than the greatest ledgerId (#19017)
This is an automated email from the ASF dual-hosted git repository.
houxiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c611a1a2739 [fix][broker] Fix estimateBacklogFromPosition if position is greater than the greatest ledgerId (#19017)
c611a1a2739 is described below
commit c611a1a273920a17a5f0231dda2490d7a0261f9f
Author: houxiaoyu <ho...@apache.org>
AuthorDate: Fri Dec 30 14:02:07 2022 +0800
[fix][broker] Fix estimateBacklogFromPosition if position is greater than the greatest ledgerId (#19017)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 18 ++++++++----------
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 18 ++++++++++++++++++
2 files changed, 26 insertions(+), 10 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index c849a347c7d..290e4bb530f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1269,18 +1269,16 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
long estimateBacklogFromPosition(PositionImpl pos) {
synchronized (this) {
+ long sizeBeforePosLedger = ledgers.headMap(pos.getLedgerId()).values()
+ .stream().mapToLong(LedgerInfo::getSize).sum();
LedgerInfo ledgerInfo = ledgers.get(pos.getLedgerId());
+ long sizeAfter = getTotalSize() - sizeBeforePosLedger;
if (ledgerInfo == null) {
- return getTotalSize(); // position no longer in managed ledger, so return total size
- }
- long sizeBeforePosLedger = ledgers.values().stream().filter(li -> li.getLedgerId() < pos.getLedgerId())
- .mapToLong(LedgerInfo::getSize).sum();
- long size = getTotalSize() - sizeBeforePosLedger;
-
- if (pos.getLedgerId() == currentLedger.getId()) {
- return size - consumedLedgerSize(currentLedgerSize, currentLedgerEntries, pos.getEntryId());
+ return sizeAfter;
+ } else if (pos.getLedgerId() == currentLedger.getId()) {
+ return sizeAfter - consumedLedgerSize(currentLedgerSize, currentLedgerEntries, pos.getEntryId());
} else {
- return size - consumedLedgerSize(ledgerInfo.getSize(), ledgerInfo.getEntries(), pos.getEntryId());
+ return sizeAfter - consumedLedgerSize(ledgerInfo.getSize(), ledgerInfo.getEntries(), pos.getEntryId());
}
}
}
@@ -1289,7 +1287,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
if (ledgerEntries <= 0) {
return 0;
}
- if (ledgerEntries == (consumedEntries + 1)) {
+ if (ledgerEntries <= (consumedEntries + 1)) {
return ledgerSize;
} else {
long averageSize = ledgerSize / ledgerEntries;
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 29e30a958bd..1338c714c73 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -3837,4 +3837,22 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
managedLedger.getEnsemblesAsync(lastLedger).join();
Assert.assertFalse(managedLedger.ledgerCache.containsKey(lastLedger));
}
+
+ @Test
+ public void testGetEstimatedBacklogSize() throws Exception {
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(2);
+ config.setRetentionTime(-1, TimeUnit.SECONDS);
+ config.setRetentionSizeInMB(-1);
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testGetEstimatedBacklogSize", config);
+ List<Position> positions = new ArrayList<>(10);
+ for (int i = 0; i < 10; i++) {
+ positions.add(ledger.addEntry(new byte[1]));
+ }
+
+ Assert.assertEquals(ledger.getEstimatedBacklogSize(new PositionImpl(-1, -1)), 10);
+ Assert.assertEquals(ledger.getEstimatedBacklogSize(((PositionImpl) positions.get(1))), 8);
+ Assert.assertEquals(ledger.getEstimatedBacklogSize(((PositionImpl) positions.get(9)).getNext()), 0);
+ ledger.close();
+ }
}