You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ho...@apache.org on 2022/12/30 06:02:17 UTC

[pulsar] branch master updated: [fix][broker] Fix estimateBacklogFromPosition if position is greater than the greatest ledgerId (#19017)

This is an automated email from the ASF dual-hosted git repository.

houxiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c611a1a2739 [fix][broker] Fix estimateBacklogFromPosition if position is greater than the greatest ledgerId  (#19017)
c611a1a2739 is described below

commit c611a1a273920a17a5f0231dda2490d7a0261f9f
Author: houxiaoyu <ho...@apache.org>
AuthorDate: Fri Dec 30 14:02:07 2022 +0800

    [fix][broker] Fix estimateBacklogFromPosition if position is greater than the greatest ledgerId  (#19017)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java     | 18 ++++++++----------
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java     | 18 ++++++++++++++++++
 2 files changed, 26 insertions(+), 10 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index c849a347c7d..290e4bb530f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1269,18 +1269,16 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
 
     long estimateBacklogFromPosition(PositionImpl pos) {
         synchronized (this) {
+            long sizeBeforePosLedger = ledgers.headMap(pos.getLedgerId()).values()
+                    .stream().mapToLong(LedgerInfo::getSize).sum();
             LedgerInfo ledgerInfo = ledgers.get(pos.getLedgerId());
+            long sizeAfter = getTotalSize() - sizeBeforePosLedger;
             if (ledgerInfo == null) {
-                return getTotalSize(); // position no longer in managed ledger, so return total size
-            }
-            long sizeBeforePosLedger = ledgers.values().stream().filter(li -> li.getLedgerId() < pos.getLedgerId())
-                    .mapToLong(LedgerInfo::getSize).sum();
-            long size = getTotalSize() - sizeBeforePosLedger;
-
-            if (pos.getLedgerId() == currentLedger.getId()) {
-                return size - consumedLedgerSize(currentLedgerSize, currentLedgerEntries, pos.getEntryId());
+                return sizeAfter;
+            } else if (pos.getLedgerId() == currentLedger.getId()) {
+                return sizeAfter - consumedLedgerSize(currentLedgerSize, currentLedgerEntries, pos.getEntryId());
             } else {
-                return size - consumedLedgerSize(ledgerInfo.getSize(), ledgerInfo.getEntries(), pos.getEntryId());
+                return sizeAfter - consumedLedgerSize(ledgerInfo.getSize(), ledgerInfo.getEntries(), pos.getEntryId());
             }
         }
     }
@@ -1289,7 +1287,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         if (ledgerEntries <= 0) {
             return 0;
         }
-        if (ledgerEntries == (consumedEntries + 1)) {
+        if (ledgerEntries <= (consumedEntries + 1)) {
             return ledgerSize;
         } else {
             long averageSize = ledgerSize / ledgerEntries;
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 29e30a958bd..1338c714c73 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -3837,4 +3837,22 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         managedLedger.getEnsemblesAsync(lastLedger).join();
         Assert.assertFalse(managedLedger.ledgerCache.containsKey(lastLedger));
     }
+
+    @Test
+    public void testGetEstimatedBacklogSize() throws Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(2);
+        config.setRetentionTime(-1, TimeUnit.SECONDS);
+        config.setRetentionSizeInMB(-1);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testGetEstimatedBacklogSize", config);
+        List<Position> positions = new ArrayList<>(10);
+        for (int i = 0; i < 10; i++) {
+            positions.add(ledger.addEntry(new byte[1]));
+        }
+
+        Assert.assertEquals(ledger.getEstimatedBacklogSize(new PositionImpl(-1, -1)), 10);
+        Assert.assertEquals(ledger.getEstimatedBacklogSize(((PositionImpl) positions.get(1))), 8);
+        Assert.assertEquals(ledger.getEstimatedBacklogSize(((PositionImpl) positions.get(9)).getNext()), 0);
+        ledger.close();
+    }
 }