You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/06/02 21:39:51 UTC

[pulsar] branch branch-2.5 updated: Fix NPE on opening non-durable cursors on an empty managed ledger (#7133)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new bdb7f32  Fix NPE on opening non-durable cursors on an empty managed ledger (#7133)
bdb7f32 is described below

commit bdb7f327e90c25d9d8368cc09bd06c7f1fde4841
Author: Sijie Guo <si...@apache.org>
AuthorDate: Mon Jun 1 19:23:23 2020 -0700

    Fix NPE on opening non-durable cursors on an empty managed ledger (#7133)
    
    *Motivation*
    
    When a reader attempts to open a non-durable cursor on an empty manager ledger.
    
    NPE is thrown and reader is not able to be created on the topic.
    
    ```
    2020-06-01 12:07:30.673 [pulsar-client-io-66-2] WARN  org.apache.pulsar.client.impl.PulsarClientImpl  - [persistent://public/default/test-partition-1] Failed to get create topic reader
    java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$TimeoutException: 10333 lookup request timedout after ms 30000
            at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) [na:1.8.0_242]
            at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) [na:1.8.0_242]
            at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:714) [na:1.8.0_242]
            at java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:701) ~[na:1.8.0_242]
            at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) [na:1.8.0_242]
            at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) [na:1.8.0_242]
            at org.apache.pulsar.client.impl.ClientCnx.checkRequestTimeout(ClientCnx.java:1026) ~[pulsar-flink-connector_2.11-2.4.20.jar:2.4.20]
            at org.apache.pulsar.client.impl.ClientCnx.lambda$channelActive$0(ClientCnx.java:187) ~[pulsar-flink-connector_2.11-2.4.20.jar:2.4.20]
            at org.apache.pulsar.shade.io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98) ~[pulsar-flink-connector_2.11-2.4.20.jar:2.4.20]
            at org.apache.pulsar.shade.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:176) ~[pulsar-flink-connector_2.11-2.4.20.jar:2.4.20]
            at org.apache.pulsar.shade.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) ~[pulsar-flink-connector_2.11-2.4.20.jar:2.4.20]
            at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) ~[pulsar-flink-connector_2.11-2.4. 20.jar:2.4.20]
            at org.apache.pulsar.shade.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384) ~[pulsar-flink-connector_2.11-2.4.20.jar:2.4.20]
            at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[pulsar-flink-connector_2.11-2.4.20.jar:2.4.20]
            at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[pulsar-flink-connector_2.11-2.4.20.jar:2.4.20]
            at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[pulsar-flink-connector_2.11-2.4.20.jar:2.4.20]
            at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_242]
    Caused by: org.apache.pulsar.client.api.PulsarClientException$TimeoutException: 10333 lookup request timedout after ms 30000
            at org.apache.pulsar.client.impl.ClientCnx.checkRequestTimeout(ClientCnx.java:1025) ~[pulsar-flink-connector_2.11-2.4.20.jar:2.4.20]
            ... 10 common frames omitted
    ```
    
    NPE:
    
    ```
    Caused by: java.lang.NullPointerException
            at org.apache.bookkeeper.mledger.impl.NonDurableCursorImpl.recoverCursor(NonDurableCursorImpl.java:65) ~[org.apache.pulsar-managed-ledger-2.5.1.jar:2.5.1]
            at org.apache.bookkeeper.mledger.impl.NonDurableCursorImpl.<init>(NonDurableCursorImpl.java:51) ~[org.apache.pulsar-managed-ledger-2.5.1.jar:2.5.1]
            at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.newNonDurableCursor(ManagedLedgerImpl.java:855) ~[org.apache.pulsar-managed-ledger-2.5.1.jar:2.5.1]
            at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$getNonDurableSubscription$13(PersistentTopic.java:692) ~[org.apache.pulsar-pulsar-broker-2.5.1.jar:2.5.1]
            at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:274) ~[org.apache.pulsar-pulsar-common-2.5.1.jar:2.5.1]        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129) ~[org.apache.pulsar-pulsar-common-2.5.1.jar:2.5.1]
            at org.apache.pulsar.broker.service.persistent.PersistentTopic.getNonDurableSubscription(PersistentTopic.java:675) ~[org.apache.pulsar-pulsar-broker-2.5.1.jar:2.5.1]
            at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:584) ~[org.apache.pulsar-pulsar-broker-2.5.1.jar:2.5.1]
            at org.apache.pulsar.broker.service.ServerCnx.lambda$null$11(ServerCnx.java:824) ~[org.apache.pulsar-pulsar-broker-2.5.1.jar:2.5.1]
            at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995) ~[?:1.8.0_242]
    ```
    
    *Modifications*
    
    - Change the `nextPosition` logic to set entryId of nextPosition to 0 if it is an invalid entry (entryId < 0)
    - Handle the case where `readPosition` is null
    - Fix the PersistentTopic logic to only handle valid entry position (ledgerId >= 0 && entryId >= 0)
---
 .../mledger/impl/NonDurableCursorImpl.java          |  9 +++++++--
 .../bookkeeper/mledger/impl/PositionImpl.java       |  6 +++++-
 .../mledger/impl/NonDurableCursorTest.java          | 21 +++++++++++++++++++++
 .../broker/service/persistent/PersistentTopic.java  |  3 ++-
 4 files changed, 35 insertions(+), 4 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
index a0720d4..2ed57fc 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
@@ -62,9 +62,14 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
 
         // Initialize the counter such that the difference between the messages written on the ML and the
         // messagesConsumed is equal to the current backlog (negated).
-        long initialBacklog = readPosition.compareTo(lastEntryAndCounter.getLeft()) < 0
+        if (null != this.readPosition) {
+            long initialBacklog = readPosition.compareTo(lastEntryAndCounter.getLeft()) < 0
                 ? ledger.getNumberOfEntries(Range.closed(readPosition, lastEntryAndCounter.getLeft())) : 0;
-        messagesConsumedCounter = lastEntryAndCounter.getRight() - initialBacklog;
+            messagesConsumedCounter = lastEntryAndCounter.getRight() - initialBacklog;
+        } else {
+            log.warn("Recovered a non-durable cursor from position {} but didn't find a valid read position {}",
+                mdPosition, readPosition);
+        }
     }
 
     @Override
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
index f090b52..2afb739 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
@@ -72,7 +72,11 @@ public class PositionImpl implements Position, Comparable<PositionImpl> {
 
     @Override
     public PositionImpl getNext() {
-        return PositionImpl.get(ledgerId, entryId + 1);
+        if (entryId < 0) {
+            return PositionImpl.get(ledgerId, 0);
+        } else {
+            return PositionImpl.get(ledgerId, entryId + 1);
+        }
     }
 
     /**
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
index 26cad0a..e02f3db 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
@@ -77,6 +77,27 @@ public class NonDurableCursorTest extends MockedBookKeeperTestCase {
     }
 
     @Test(timeOut = 20000)
+    void testOpenNonDurableCursorAtNonExistentMessageId() throws Exception {
+        ManagedLedger ledger = factory.open("non_durable_cursor_at_non_existent_msgid");
+        ManagedLedgerImpl mlImpl = (ManagedLedgerImpl) ledger;
+
+        PositionImpl position = mlImpl.getLastPosition();
+
+        ManagedCursor c1 = ledger.newNonDurableCursor(new PositionImpl(
+            position.getLedgerId(),
+            position.getEntryId() - 1
+        ));
+
+        assertEquals(c1.getReadPosition(), new PositionImpl(
+            position.getLedgerId(),
+            0
+        ));
+
+        c1.close();
+        ledger.close();
+    }
+
+    @Test(timeOut = 20000)
     void testZNodeBypassed() throws Exception {
         ManagedLedger ledger = factory.open("my_test_ledger");
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 28911bc..f22b4ba 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -682,7 +682,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
 
                 long ledgerId = msgId.getLedgerId();
                 long entryId = msgId.getEntryId();
-                if (ledgerId >= 0
+                // Ensure that the start message id starts from a valid entry.
+                if (ledgerId >= 0 && entryId >= 0
                         && msgId instanceof BatchMessageIdImpl) {
                     // When the start message is relative to a batch, we need to take one step back on the previous message,
                     // because the "batch" might not have been consumed in its entirety.