You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2022/12/07 07:06:53 UTC
[pulsar] branch branch-2.10 updated: [fix][broker] Fix `getPositionAfterN` infinite loop. (#17971)
This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 49a3a137ea4 [fix][broker] Fix `getPositionAfterN` infinite loop. (#17971)
49a3a137ea4 is described below
commit 49a3a137ea49d5429d7b92f05560d1690836a31d
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Thu Oct 13 14:46:14 2022 +0800
[fix][broker] Fix `getPositionAfterN` infinite loop. (#17971)
(cherry picked from commit c73285205dafaf5bed827ad99f7fb8edd3ddb7f2)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 17 +++++------------
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 17 +++++++++++++++++
2 files changed, 22 insertions(+), 12 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index c95e1ad66ee..68263834705 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3282,20 +3282,16 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
long entriesToSkip = n;
long currentLedgerId;
long currentEntryId;
-
if (startRange == PositionBound.startIncluded) {
currentLedgerId = startPosition.getLedgerId();
currentEntryId = startPosition.getEntryId();
} else {
- // e.g. a mark-delete position
PositionImpl nextValidPosition = getNextValidPosition(startPosition);
currentLedgerId = nextValidPosition.getLedgerId();
currentEntryId = nextValidPosition.getEntryId();
}
-
boolean lastLedger = false;
long totalEntriesInCurrentLedger;
-
while (entriesToSkip >= 0) {
// for the current ledger, the number of entries written is deduced from the lastConfirmedEntry
// for previous ledgers, LedgerInfo in ZK has the number of entries
@@ -3310,10 +3306,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
LedgerInfo ledgerInfo = ledgers.get(currentLedgerId);
totalEntriesInCurrentLedger = ledgerInfo != null ? ledgerInfo.getEntries() : 0;
}
-
-
- long unreadEntriesInCurrentLedger = totalEntriesInCurrentLedger - currentEntryId;
-
+ long unreadEntriesInCurrentLedger = totalEntriesInCurrentLedger > 0
+ ? totalEntriesInCurrentLedger - currentEntryId : 0;
if (unreadEntriesInCurrentLedger >= entriesToSkip) {
// if the current ledger has more entries than what we need to skip
// then the return position is in the same ledger
@@ -3326,11 +3320,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
// there are no more ledgers, return the last position
currentEntryId = totalEntriesInCurrentLedger;
break;
- } else {
- Long lid = ledgers.ceilingKey(currentLedgerId + 1);
- currentLedgerId = lid != null ? lid : (ledgers.lastKey() + 1);
- currentEntryId = 0;
}
+ Long lid = ledgers.ceilingKey(currentLedgerId + 1);
+ currentLedgerId = lid != null ? lid : ledgers.lastKey();
+ currentEntryId = 0;
}
}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 753c34f38ef..99a9307abd5 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -2301,6 +2301,23 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
log.info("Target position is {}", targetPosition);
assertEquals(targetPosition.getLedgerId(), secondLedger);
assertEquals(targetPosition.getEntryId(), 4);
+
+ // test for n > NumberOfEntriesInStorage
+ searchPosition = new PositionImpl(secondLedger, 0);
+ targetPosition = managedLedger.getPositionAfterN(searchPosition, 100, ManagedLedgerImpl.PositionBound.startIncluded);
+ assertEquals(targetPosition.getLedgerId(), secondLedger);
+ assertEquals(targetPosition.getEntryId(), 4);
+
+ // test for startPosition > current ledger
+ searchPosition = new PositionImpl(999, 0);
+ targetPosition = managedLedger.getPositionAfterN(searchPosition, 0, ManagedLedgerImpl.PositionBound.startIncluded);
+ assertEquals(targetPosition.getLedgerId(), secondLedger);
+ assertEquals(targetPosition.getEntryId(), 4);
+
+ searchPosition = new PositionImpl(999, 0);
+ targetPosition = managedLedger.getPositionAfterN(searchPosition, 10, ManagedLedgerImpl.PositionBound.startExcluded);
+ assertEquals(targetPosition.getLedgerId(), secondLedger);
+ assertEquals(targetPosition.getEntryId(), 4);
}
@Test