You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/06/07 13:52:12 UTC
[pulsar] branch master updated: [Fix][broker] Fix NPE when ledger id not found in `OpReadEntry` (#15837)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7a3ad611f51 [Fix][broker] Fix NPE when ledger id not found in `OpReadEntry` (#15837)
7a3ad611f51 is described below
commit 7a3ad611f51511afca4bcaa1de299517a1907e8e
Author: Qiang Zhao <ma...@gmail.com>
AuthorDate: Tue Jun 7 21:52:05 2022 +0800
[Fix][broker] Fix NPE when ledger id not found in `OpReadEntry` (#15837)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 10 ++------
.../bookkeeper/mledger/impl/OpReadEntry.java | 4 ++--
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 28 ++++++++++++++++++++++
3 files changed, 32 insertions(+), 10 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 6b7eb90e0d8..f3bcb611ca3 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2240,15 +2240,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
}
}
- PositionImpl startReadOperationOnLedger(PositionImpl position, OpReadEntry opReadEntry) {
+ PositionImpl startReadOperationOnLedger(PositionImpl position) {
Long ledgerId = ledgers.ceilingKey(position.getLedgerId());
- if (null == ledgerId) {
- opReadEntry.readEntriesFailed(new ManagedLedgerException.NoMoreEntriesToReadException("The ceilingKey(K key"
- + ") method is used to return the least key greater than or equal to the given key, "
- + "or null if there is no such key"), null);
- }
-
- if (ledgerId != position.getLedgerId()) {
+ if (ledgerId != null && ledgerId != position.getLedgerId()) {
// The ledger pointed by this position does not exist anymore. It was deleted because it was empty. We need
// to skip on the next available ledger
position = new PositionImpl(ledgerId, 0);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index 006accaf252..a805802e633 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -48,7 +48,7 @@ class OpReadEntry implements ReadEntriesCallback {
public static OpReadEntry create(ManagedCursorImpl cursor, PositionImpl readPositionRef, int count,
ReadEntriesCallback callback, Object ctx, PositionImpl maxPosition) {
OpReadEntry op = RECYCLER.get();
- op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef, op);
+ op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef);
op.cursor = cursor;
op.count = count;
op.callback = callback;
@@ -140,7 +140,7 @@ class OpReadEntry implements ReadEntriesCallback {
// We still have more entries to read from the next ledger, schedule a new async operation
cursor.ledger.getExecutor().execute(safeRun(() -> {
- readPosition = cursor.ledger.startReadOperationOnLedger(nextReadPosition, OpReadEntry.this);
+ readPosition = cursor.ledger.startReadOperationOnLedger(nextReadPosition);
cursor.ledger.asyncReadEntries(OpReadEntry.this);
}));
} else {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 32265cd49ac..04f75dd7232 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -53,6 +53,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.NavigableMap;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
@@ -611,6 +612,33 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
ledger.close();
}
+ @Test
+ public void testStartReadOperationOnLedgerWithEmptyLedgers() throws ManagedLedgerException, InterruptedException {
+ ManagedLedger ledger = factory.open("my_test_ledger_1");
+ ManagedLedgerImpl ledgerImpl = (ManagedLedgerImpl) ledger;
+ NavigableMap<Long, LedgerInfo> ledgers = ledgerImpl.getLedgersInfo();
+ LedgerInfo ledgerInfo = ledgers.firstEntry().getValue();
+ ledgers.clear();
+ ManagedCursor c1 = ledger.openCursor("c1");
+ PositionImpl position = new PositionImpl(ledgerInfo.getLedgerId(), 0);
+ PositionImpl maxPosition = new PositionImpl(ledgerInfo.getLedgerId(), 99);
+ OpReadEntry opReadEntry = OpReadEntry.create((ManagedCursorImpl) c1, position, 20,
+ new ReadEntriesCallback() {
+
+ @Override
+ public void readEntriesComplete(List<Entry> entries, Object ctx) {
+
+ }
+
+ @Override
+ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+
+ }
+ }, null, maxPosition);
+ Assert.assertEquals(opReadEntry.readPosition, position);
+ }
+
+
@Test(timeOut = 20000)
public void spanningMultipleLedgersWithSize() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(1000000);