You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/10 15:01:55 UTC

[pulsar] 11/12: [Fix][broker] Fix NPE when ledger id not found in `OpReadEntry` (#15837)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 53615e05cff4010ef0834f15f057b78c1eb6fc77
Author: Qiang Zhao <ma...@gmail.com>
AuthorDate: Tue Jun 7 21:52:05 2022 +0800

    [Fix][broker] Fix NPE when ledger id not found in `OpReadEntry` (#15837)
    
    (cherry picked from commit 7a3ad611f51511afca4bcaa1de299517a1907e8e)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  9 ++----
 .../bookkeeper/mledger/impl/OpReadEntry.java       |  4 +--
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 32 ++++++++++++++++++++--
 .../broker/service/MessageCumulativeAckTest.java   | 15 ++++------
 4 files changed, 39 insertions(+), 21 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index d03d98a3f06..c79b8a9f8f7 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2153,14 +2153,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         }
     }
 
-    PositionImpl startReadOperationOnLedger(PositionImpl position, OpReadEntry opReadEntry) {
+    PositionImpl startReadOperationOnLedger(PositionImpl position) {
         Long ledgerId = ledgers.ceilingKey(position.getLedgerId());
-        if (null == ledgerId) {
-            opReadEntry.readEntriesFailed(new ManagedLedgerException.NoMoreEntriesToReadException("The ceilingKey(K key) method is used to return the " +
-                    "least key greater than or equal to the given key, or null if there is no such key"), null);
-        }
-
-        if (ledgerId != position.getLedgerId()) {
+        if (ledgerId != null && ledgerId != position.getLedgerId()) {
             // The ledger pointed by this position does not exist anymore. It was deleted because it was empty. We need
             // to skip on the next available ledger
             position = new PositionImpl(ledgerId, 0);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index d7eb0467f56..27e99169e31 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -48,7 +48,7 @@ class OpReadEntry implements ReadEntriesCallback {
     public static OpReadEntry create(ManagedCursorImpl cursor, PositionImpl readPositionRef, int count,
             ReadEntriesCallback callback, Object ctx, PositionImpl maxPosition) {
         OpReadEntry op = RECYCLER.get();
-        op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef, op);
+        op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef);
         op.cursor = cursor;
         op.count = count;
         op.callback = callback;
@@ -140,7 +140,7 @@ class OpReadEntry implements ReadEntriesCallback {
 
             // We still have more entries to read from the next ledger, schedule a new async operation
             cursor.ledger.getExecutor().execute(safeRun(() -> {
-                readPosition = cursor.ledger.startReadOperationOnLedger(nextReadPosition, OpReadEntry.this);
+                readPosition = cursor.ledger.startReadOperationOnLedger(nextReadPosition);
                 cursor.ledger.asyncReadEntries(OpReadEntry.this);
             }));
         } else {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 8826f0d99fc..317fb7e2b30 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -52,6 +52,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
@@ -408,6 +409,33 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         ledger.close();
     }
 
+    @Test
+    public void testStartReadOperationOnLedgerWithEmptyLedgers() throws ManagedLedgerException, InterruptedException {
+        ManagedLedger ledger = factory.open("my_test_ledger_1");
+        ManagedLedgerImpl ledgerImpl = (ManagedLedgerImpl) ledger;
+        NavigableMap<Long, LedgerInfo> ledgers = ledgerImpl.getLedgersInfo();
+        LedgerInfo ledgerInfo = ledgers.firstEntry().getValue();
+        ledgers.clear();
+        ManagedCursor c1 = ledger.openCursor("c1");
+        PositionImpl position = new PositionImpl(ledgerInfo.getLedgerId(), 0);
+        PositionImpl maxPosition = new PositionImpl(ledgerInfo.getLedgerId(), 99);
+        OpReadEntry opReadEntry = OpReadEntry.create((ManagedCursorImpl) c1, position, 20,
+                new ReadEntriesCallback() {
+
+                    @Override
+                    public void readEntriesComplete(List<Entry> entries, Object ctx) {
+
+                    }
+
+                    @Override
+                    public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+
+                    }
+                }, null, maxPosition);
+        Assert.assertEquals(opReadEntry.readPosition, position);
+    }
+
+
     @Test(timeOut = 20000)
     public void spanningMultipleLedgersWithSize() throws Exception {
         ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(1000000);
@@ -2262,8 +2290,8 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
         managedLedger.rollCurrentLedgerIfFull();
         Awaitility.await().untilAsserted(() -> {
-            assertEquals(managedLedger.getLedgersInfo().size(), 2);
-            assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.ClosedLedger);
+            assertEquals(managedLedger.getLedgersInfo().size(), 3);
+            assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.LedgerOpened);
         });
         assertEquals(5, managedLedger.getLedgersInfoAsList().get(0).getEntries());
         assertEquals(5, managedLedger.getLedgersInfoAsList().get(1).getEntries());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
index 86754efc0c2..d45054fab79 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
@@ -25,7 +25,6 @@ import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Exclus
 import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Failover;
 import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Key_Shared;
 import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Shared;
-import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
@@ -39,7 +38,6 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import java.net.InetSocketAddress;
-import java.util.Optional;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
@@ -79,7 +77,7 @@ public class MessageCumulativeAckTest {
         executor = OrderedExecutor.newBuilder().numThreads(1).name("persistent-dispatcher-cumulative-ack-test").build();
         ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
         svcConfig.setBrokerShutdownTimeoutMs(0L);
-        svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
+        svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(1.0d);
         svcConfig.setClusterName("pulsar-cluster");
         pulsar = spyWithClassAndConstructorArgs(PulsarService.class, svcConfig);
         doReturn(svcConfig).when(pulsar).getConfiguration();
@@ -89,7 +87,7 @@ public class MessageCumulativeAckTest {
         doReturn(TransactionTestBase.createMockBookKeeper(executor))
             .when(pulsar).getBookKeeperClient();
 
-        store = MetadataStoreFactory.create("memory:local", MetadataStoreConfig.builder().build());
+        store = MetadataStoreFactory.create("memory://local", MetadataStoreConfig.builder().build());
         doReturn(store).when(pulsar).getLocalMetadataStore();
         doReturn(store).when(pulsar).getConfigurationMetadataStore();
 
@@ -154,8 +152,7 @@ public class MessageCumulativeAckTest {
     @Test(timeOut = 5000, dataProvider = "individualAckModes")
     public void testAckWithIndividualAckMode(CommandSubscribe.SubType subType) throws Exception {
         Consumer consumer = new Consumer(sub, subType, "topic-1", consumerId, 0,
-            "Cons1", true, serverCnx, "myrole-1", emptyMap(), false, null,
-            MessageId.latest, DEFAULT_CONSUMER_EPOCH);
+            "Cons1", 50000, serverCnx, "myrole-1", emptyMap(), false, CommandSubscribe.InitialPosition.Latest, null, MessageId.latest);
 
         CommandAck commandAck = new CommandAck();
         commandAck.setAckType(Cumulative);
@@ -169,8 +166,7 @@ public class MessageCumulativeAckTest {
     @Test(timeOut = 5000, dataProvider = "notIndividualAckModes")
     public void testAckWithNotIndividualAckMode(CommandSubscribe.SubType subType) throws Exception {
         Consumer consumer = new Consumer(sub, subType, "topic-1", consumerId, 0,
-            "Cons1", true, serverCnx, "myrole-1", emptyMap(), false, null,
-            MessageId.latest, DEFAULT_CONSUMER_EPOCH);
+                "Cons1", 50000, serverCnx, "myrole-1", emptyMap(), false, CommandSubscribe.InitialPosition.Latest, null, MessageId.latest);
 
         CommandAck commandAck = new CommandAck();
         commandAck.setAckType(Cumulative);
@@ -184,8 +180,7 @@ public class MessageCumulativeAckTest {
     @Test(timeOut = 5000)
     public void testAckWithMoreThanNoneMessageIds() throws Exception {
         Consumer consumer = new Consumer(sub, Failover, "topic-1", consumerId, 0,
-            "Cons1", true, serverCnx, "myrole-1", emptyMap(), false, null,
-            MessageId.latest, DEFAULT_CONSUMER_EPOCH);
+                "Cons1", 50000, serverCnx, "myrole-1", emptyMap(), false, CommandSubscribe.InitialPosition.Latest, null, MessageId.latest);
 
         CommandAck commandAck = new CommandAck();
         commandAck.setAckType(Cumulative);