You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/06/02 21:39:51 UTC
[pulsar] branch branch-2.5 updated: Fix NPE on opening non-durable
cursors on an empty managed ledger (#7133)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.5 by this push:
new bdb7f32 Fix NPE on opening non-durable cursors on an empty managed ledger (#7133)
bdb7f32 is described below
commit bdb7f327e90c25d9d8368cc09bd06c7f1fde4841
Author: Sijie Guo <si...@apache.org>
AuthorDate: Mon Jun 1 19:23:23 2020 -0700
Fix NPE on opening non-durable cursors on an empty managed ledger (#7133)
*Motivation*
When a reader attempts to open a non-durable cursor on an empty manager ledger.
NPE is thrown and reader is not able to be created on the topic.
```
2020-06-01 12:07:30.673 [pulsar-client-io-66-2] WARN org.apache.pulsar.client.impl.PulsarClientImpl - [persistent://public/default/test-partition-1] Failed to get create topic reader
java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$TimeoutException: 10333 lookup request timedout after ms 30000
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) [na:1.8.0_242]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) [na:1.8.0_242]
at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:714) [na:1.8.0_242]
at java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:701) ~[na:1.8.0_242]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) [na:1.8.0_242]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) [na:1.8.0_242]
at org.apache.pulsar.client.impl.ClientCnx.checkRequestTimeout(ClientCnx.java:1026) ~[pulsar-flink-connector_2.11-2.4.20.jar:2.4.20]
at org.apache.pulsar.client.impl.ClientCnx.lambda$channelActive$0(ClientCnx.java:187) ~[pulsar-flink-connector_2.11-2.4.20.jar:2.4.20]
at org.apache.pulsar.shade.io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98) ~[pulsar-flink-connector_2.11-2.4.20.jar:2.4.20]
at org.apache.pulsar.shade.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:176) ~[pulsar-flink-connector_2.11-2.4.20.jar:2.4.20]
at org.apache.pulsar.shade.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) ~[pulsar-flink-connector_2.11-2.4.20.jar:2.4.20]
at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) ~[pulsar-flink-connector_2.11-2.4. 20.jar:2.4.20]
at org.apache.pulsar.shade.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384) ~[pulsar-flink-connector_2.11-2.4.20.jar:2.4.20]
at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[pulsar-flink-connector_2.11-2.4.20.jar:2.4.20]
at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[pulsar-flink-connector_2.11-2.4.20.jar:2.4.20]
at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[pulsar-flink-connector_2.11-2.4.20.jar:2.4.20]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_242]
Caused by: org.apache.pulsar.client.api.PulsarClientException$TimeoutException: 10333 lookup request timedout after ms 30000
at org.apache.pulsar.client.impl.ClientCnx.checkRequestTimeout(ClientCnx.java:1025) ~[pulsar-flink-connector_2.11-2.4.20.jar:2.4.20]
... 10 common frames omitted
```
NPE:
```
Caused by: java.lang.NullPointerException
at org.apache.bookkeeper.mledger.impl.NonDurableCursorImpl.recoverCursor(NonDurableCursorImpl.java:65) ~[org.apache.pulsar-managed-ledger-2.5.1.jar:2.5.1]
at org.apache.bookkeeper.mledger.impl.NonDurableCursorImpl.<init>(NonDurableCursorImpl.java:51) ~[org.apache.pulsar-managed-ledger-2.5.1.jar:2.5.1]
at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.newNonDurableCursor(ManagedLedgerImpl.java:855) ~[org.apache.pulsar-managed-ledger-2.5.1.jar:2.5.1]
at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$getNonDurableSubscription$13(PersistentTopic.java:692) ~[org.apache.pulsar-pulsar-broker-2.5.1.jar:2.5.1]
at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:274) ~[org.apache.pulsar-pulsar-common-2.5.1.jar:2.5.1] at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129) ~[org.apache.pulsar-pulsar-common-2.5.1.jar:2.5.1]
at org.apache.pulsar.broker.service.persistent.PersistentTopic.getNonDurableSubscription(PersistentTopic.java:675) ~[org.apache.pulsar-pulsar-broker-2.5.1.jar:2.5.1]
at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:584) ~[org.apache.pulsar-pulsar-broker-2.5.1.jar:2.5.1]
at org.apache.pulsar.broker.service.ServerCnx.lambda$null$11(ServerCnx.java:824) ~[org.apache.pulsar-pulsar-broker-2.5.1.jar:2.5.1]
at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995) ~[?:1.8.0_242]
```
*Modifications*
- Change the `nextPosition` logic to set entryId of nextPosition to 0 if it is an invalid entry (entryId < 0)
- Handle the case where `readPosition` is null
- Fix the PersistentTopic logic to only handle valid entry position (ledgerId >= 0 && entryId >= 0)
---
.../mledger/impl/NonDurableCursorImpl.java | 9 +++++++--
.../bookkeeper/mledger/impl/PositionImpl.java | 6 +++++-
.../mledger/impl/NonDurableCursorTest.java | 21 +++++++++++++++++++++
.../broker/service/persistent/PersistentTopic.java | 3 ++-
4 files changed, 35 insertions(+), 4 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
index a0720d4..2ed57fc 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
@@ -62,9 +62,14 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
// Initialize the counter such that the difference between the messages written on the ML and the
// messagesConsumed is equal to the current backlog (negated).
- long initialBacklog = readPosition.compareTo(lastEntryAndCounter.getLeft()) < 0
+ if (null != this.readPosition) {
+ long initialBacklog = readPosition.compareTo(lastEntryAndCounter.getLeft()) < 0
? ledger.getNumberOfEntries(Range.closed(readPosition, lastEntryAndCounter.getLeft())) : 0;
- messagesConsumedCounter = lastEntryAndCounter.getRight() - initialBacklog;
+ messagesConsumedCounter = lastEntryAndCounter.getRight() - initialBacklog;
+ } else {
+ log.warn("Recovered a non-durable cursor from position {} but didn't find a valid read position {}",
+ mdPosition, readPosition);
+ }
}
@Override
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
index f090b52..2afb739 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
@@ -72,7 +72,11 @@ public class PositionImpl implements Position, Comparable<PositionImpl> {
@Override
public PositionImpl getNext() {
- return PositionImpl.get(ledgerId, entryId + 1);
+ if (entryId < 0) {
+ return PositionImpl.get(ledgerId, 0);
+ } else {
+ return PositionImpl.get(ledgerId, entryId + 1);
+ }
}
/**
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
index 26cad0a..e02f3db 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
@@ -77,6 +77,27 @@ public class NonDurableCursorTest extends MockedBookKeeperTestCase {
}
@Test(timeOut = 20000)
+ void testOpenNonDurableCursorAtNonExistentMessageId() throws Exception {
+ ManagedLedger ledger = factory.open("non_durable_cursor_at_non_existent_msgid");
+ ManagedLedgerImpl mlImpl = (ManagedLedgerImpl) ledger;
+
+ PositionImpl position = mlImpl.getLastPosition();
+
+ ManagedCursor c1 = ledger.newNonDurableCursor(new PositionImpl(
+ position.getLedgerId(),
+ position.getEntryId() - 1
+ ));
+
+ assertEquals(c1.getReadPosition(), new PositionImpl(
+ position.getLedgerId(),
+ 0
+ ));
+
+ c1.close();
+ ledger.close();
+ }
+
+ @Test(timeOut = 20000)
void testZNodeBypassed() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger");
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 28911bc..f22b4ba 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -682,7 +682,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
long ledgerId = msgId.getLedgerId();
long entryId = msgId.getEntryId();
- if (ledgerId >= 0
+ // Ensure that the start message id starts from a valid entry.
+ if (ledgerId >= 0 && entryId >= 0
&& msgId instanceof BatchMessageIdImpl) {
// When the start message is relative to a batch, we need to take one step back on the previous message,
// because the "batch" might not have been consumed in its entirety.