You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/08/07 06:30:17 UTC

[pulsar] branch branch-2.7 updated: [Fix][broker] Fix NPE when ledger id not found in `OpReadEntry`(#16966)

This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 9abab5b4a32 [Fix][broker] Fix NPE when ledger id not found in `OpReadEntry`(#16966)
9abab5b4a32 is described below

commit 9abab5b4a3206e84d6a1164dca76c04e51ffaeaa
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Sun Aug 7 14:30:12 2022 +0800

    [Fix][broker] Fix NPE when ledger id not found in `OpReadEntry`(#16966)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  9 ++------
 .../bookkeeper/mledger/impl/OpReadEntry.java       |  4 ++--
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 27 ++++++++++++++++++++++
 3 files changed, 31 insertions(+), 9 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index babe6320ae0..9faf714ff36 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1969,14 +1969,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         }
     }
 
-    PositionImpl startReadOperationOnLedger(PositionImpl position, OpReadEntry opReadEntry) {
+    PositionImpl startReadOperationOnLedger(PositionImpl position) {
         Long ledgerId = ledgers.ceilingKey(position.getLedgerId());
-        if (null == ledgerId) {
-            opReadEntry.readEntriesFailed(new ManagedLedgerException.NoMoreEntriesToReadException("The ceilingKey(K key) method is used to return the " +
-                    "least key greater than or equal to the given key, or null if there is no such key"), null);
-        }
-
-        if (ledgerId != position.getLedgerId()) {
+        if (ledgerId != null && ledgerId != position.getLedgerId()) {
             // The ledger pointed by this position does not exist anymore. It was deleted because it was empty. We need
             // to skip on the next available ledger
             position = new PositionImpl(ledgerId, 0);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index 768b673966b..1feebb7e9f2 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -47,7 +47,7 @@ class OpReadEntry implements ReadEntriesCallback {
     public static OpReadEntry create(ManagedCursorImpl cursor, PositionImpl readPositionRef, int count,
             ReadEntriesCallback callback, Object ctx) {
         OpReadEntry op = RECYCLER.get();
-        op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef, op);
+        op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef);
         op.cursor = cursor;
         op.count = count;
         op.callback = callback;
@@ -132,7 +132,7 @@ class OpReadEntry implements ReadEntriesCallback {
         if (entries.size() < count && cursor.hasMoreEntries()) {
             // We still have more entries to read from the next ledger, schedule a new async operation
             cursor.ledger.getExecutor().execute(safeRun(() -> {
-                readPosition = cursor.ledger.startReadOperationOnLedger(nextReadPosition, OpReadEntry.this);
+                readPosition = cursor.ledger.startReadOperationOnLedger(nextReadPosition);
                 cursor.ledger.asyncReadEntries(OpReadEntry.this);
             }));
         } else {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index d6fb1bee585..dcbf6fe26b0 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -49,6 +49,8 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -409,6 +411,31 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         ledger.close();
     }
 
+    @Test
+    public void testStartReadOperationOnLedgerWithEmptyLedgers() throws ManagedLedgerException, InterruptedException {
+        ManagedLedger ledger = factory.open("my_test_ledger_1");
+        ManagedLedgerImpl ledgerImpl = (ManagedLedgerImpl) ledger;
+        NavigableMap<Long, LedgerInfo> ledgers = ledgerImpl.getLedgersInfo();
+        LedgerInfo ledgerInfo = ledgers.firstEntry().getValue();
+        ledgers.clear();
+        ManagedCursor c1 = ledger.openCursor("c1");
+        PositionImpl position = new PositionImpl(ledgerInfo.getLedgerId(), 0);
+        OpReadEntry opReadEntry = OpReadEntry.create((ManagedCursorImpl) c1, position, 20,
+                new ReadEntriesCallback() {
+
+                    @Override
+                    public void readEntriesComplete(List<Entry> entries, Object ctx) {
+
+                    }
+
+                    @Override
+                    public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+
+                    }
+                }, null);
+        Assert.assertEquals(opReadEntry.readPosition, position);
+    }
+
     @Test(timeOut = 20000)
     public void spanningMultipleLedgersWithSize() throws Exception {
         ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(1000000);