You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/29 09:18:33 UTC

[pulsar] 07/20: Avoid introduce null read position for the managed cursor. (#7264)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 86c69e86db65c0e1d54a89dba41a6a2879b29767
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue Jun 16 09:20:18 2020 +0800

    Avoid introduce null read position for the managed cursor. (#7264)
    
    ### Motivation
    
    Avoid introduce null read position for the managed cursor.
    
    Here is the error log related to null read position:
    ```
    18:52:13.366 [pulsar-stats-updater-23-1] ERROR org.apache.pulsar.broker.service.persistent.PersistentTopic - Got exception when creating consumer stats for subscription itom-di-dp-preload_chotest_2-reader-4bd4e3dd50: null
    java.lang.NullPointerException: null
    	at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:877) ~[com.google.guava-guava-25.1-jre.jar:?]
    	at org.apache.bookkeeper.mledger.impl.PositionImpl.compareTo(PositionImpl.java:92) ~[org.apache.pulsar-managed-ledger-2.5.2.jar:2.5.2]
    	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.getNumberOfEntriesSinceFirstNotAckedMessage(ManagedCursorImpl.java:721) ~[org.apache.pulsar-managed-ledger-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.getNumberOfEntriesSinceFirstNotAckedMessage(PersistentSubscription.java:790) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$updateRates$46(PersistentTopic.java:1419) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.broker.service.persistent.PersistentTopic.updateRates(PersistentTopic.java:1387) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.broker.service.PulsarStats.lambda$null$1(PulsarStats.java:134) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.broker.service.PulsarStats.lambda$null$3(PulsarStats.java:131) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.broker.service.PulsarStats.lambda$updateStats$4(PulsarStats.java:120) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.broker.service.PulsarStats.updateStats(PulsarStats.java:110) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.broker.service.BrokerService.updateRates(BrokerService.java:1145) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
    	at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [org.apache.pulsar-managed-ledger-2.5.2.jar:2.5.2]
    	at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_242]
    	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_242]
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_242]
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_242]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_242]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_242]
    	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
    	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_242]
    ```
    The most doubtful thing is `getNextValidPosition` method in the ManagedLedgerImpl. If given a position which greater than the last add position, it will return a null value. This may cause the read position to become null. But I haven’t found how this situation appears. So in the PR, I added a log and print the stack trace which can help us to find the root cause and failback to the next position of the last position if the null next valid position occurs.
    
    (cherry picked from commit 7955cef6c5dff2f22cfc91e53d1d29562f232846)
---
 .../apache/bookkeeper/mledger/impl/ManagedCursorImpl.java   |  2 +-
 .../apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java   | 13 ++++++++++++-
 .../apache/bookkeeper/mledger/impl/ManagedLedgerTest.java   |  2 ++
 .../pulsar/broker/service/persistent/PersistentTopic.java   |  2 +-
 4 files changed, 16 insertions(+), 3 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 304429d..66e977f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -780,7 +780,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         // validate it before preparing range
         PositionImpl markDeletePosition = this.markDeletePosition;
         PositionImpl readPosition = this.readPosition;
-        return (markDeletePosition.compareTo(readPosition) < 0)
+        return (markDeletePosition != null && readPosition != null && markDeletePosition.compareTo(readPosition) < 0)
                 ? ledger.getNumberOfEntries(Range.openClosed(markDeletePosition, readPosition))
                 : 0;
     }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 4b3937f..27d8848 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2864,11 +2864,22 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     }
 
     public PositionImpl getNextValidPosition(final PositionImpl position) {
+        PositionImpl next;
+        try {
+            next = getNextValidPositionInternal(position);
+        } catch (NullPointerException e) {
+            next = lastConfirmedEntry.getNext();
+            log.error("[{}] Can't find next valid position, fail back to the next position of the last position.", name, e);
+        }
+        return next;
+    }
+
+    public PositionImpl getNextValidPositionInternal(final PositionImpl position) {
         PositionImpl nextPosition = position.getNext();
         while (!isValidPosition(nextPosition)) {
             Long nextLedgerId = ledgers.ceilingKey(nextPosition.getLedgerId() + 1);
             if (nextLedgerId == null) {
-                return null;
+                throw new NullPointerException();
             }
             nextPosition = PositionImpl.get(nextLedgerId, 0);
         }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index b5b702f..e507c99 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -1966,6 +1966,8 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         assertEquals(ledger.getNextValidPosition((PositionImpl) c1.getMarkDeletedPosition()), p1);
         assertEquals(ledger.getNextValidPosition(p1), p2);
         assertEquals(ledger.getNextValidPosition(p3), PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1));
+        assertEquals(ledger.getNextValidPosition(PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1)), PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1));
+        assertEquals(ledger.getNextValidPosition(PositionImpl.get(p3.getLedgerId() + 1, p3.getEntryId() + 1)), PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1));
     }
 
     /**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index a7fa72d..d7fcd41 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -624,7 +624,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                 future.completeExceptionally(e);
             }
         }).exceptionally(ex -> {
-            log.warn("[{}] Failed to create subscription: {} error: {}", topic, subscriptionName, ex.getMessage());
+            log.error("[{}] Failed to create subscription: {} error: {}", topic, subscriptionName, ex);
             USAGE_COUNT_UPDATER.decrementAndGet(PersistentTopic.this);
             future.completeExceptionally(new PersistenceException(ex));
             return null;