You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/06/16 01:20:30 UTC

[pulsar] branch master updated: Avoid introduce null read position for the managed cursor. (#7264)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7955cef  Avoid introduce null read position for the managed cursor. (#7264)
7955cef is described below

commit 7955cef6c5dff2f22cfc91e53d1d29562f232846
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue Jun 16 09:20:18 2020 +0800

    Avoid introduce null read position for the managed cursor. (#7264)
    
    ### Motivation
    
    Avoid introduce null read position for the managed cursor.
    
    Here is the error log related to null read position:
    ```
    18:52:13.366 [pulsar-stats-updater-23-1] ERROR org.apache.pulsar.broker.service.persistent.PersistentTopic - Got exception when creating consumer stats for subscription itom-di-dp-preload_chotest_2-reader-4bd4e3dd50: null
    java.lang.NullPointerException: null
    	at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:877) ~[com.google.guava-guava-25.1-jre.jar:?]
    	at org.apache.bookkeeper.mledger.impl.PositionImpl.compareTo(PositionImpl.java:92) ~[org.apache.pulsar-managed-ledger-2.5.2.jar:2.5.2]
    	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.getNumberOfEntriesSinceFirstNotAckedMessage(ManagedCursorImpl.java:721) ~[org.apache.pulsar-managed-ledger-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.getNumberOfEntriesSinceFirstNotAckedMessage(PersistentSubscription.java:790) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$updateRates$46(PersistentTopic.java:1419) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.broker.service.persistent.PersistentTopic.updateRates(PersistentTopic.java:1387) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.broker.service.PulsarStats.lambda$null$1(PulsarStats.java:134) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.broker.service.PulsarStats.lambda$null$3(PulsarStats.java:131) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.broker.service.PulsarStats.lambda$updateStats$4(PulsarStats.java:120) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.broker.service.PulsarStats.updateStats(PulsarStats.java:110) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.broker.service.BrokerService.updateRates(BrokerService.java:1145) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
    	at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [org.apache.pulsar-managed-ledger-2.5.2.jar:2.5.2]
    	at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_242]
    	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_242]
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_242]
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_242]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_242]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_242]
    	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
    	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_242]
    ```
    The most doubtful thing is `getNextValidPosition` method in the ManagedLedgerImpl. If given a position which greater than the last add position, it will return a null value. This may cause the read position to become null. But I haven’t found how this situation appears. So in the PR, I added a log and print the stack trace which can help us to find the root cause and failback to the next position of the last position if the null next valid position occurs.
---
 .../apache/bookkeeper/mledger/impl/ManagedCursorImpl.java   |  2 +-
 .../apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java   | 13 ++++++++++++-
 .../apache/bookkeeper/mledger/impl/ManagedLedgerTest.java   |  2 ++
 .../pulsar/broker/service/persistent/PersistentTopic.java   |  2 +-
 4 files changed, 16 insertions(+), 3 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 304429d..66e977f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -780,7 +780,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         // validate it before preparing range
         PositionImpl markDeletePosition = this.markDeletePosition;
         PositionImpl readPosition = this.readPosition;
-        return (markDeletePosition.compareTo(readPosition) < 0)
+        return (markDeletePosition != null && readPosition != null && markDeletePosition.compareTo(readPosition) < 0)
                 ? ledger.getNumberOfEntries(Range.openClosed(markDeletePosition, readPosition))
                 : 0;
     }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 451ce30..8b12f7b 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2830,11 +2830,22 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     }
 
     public PositionImpl getNextValidPosition(final PositionImpl position) {
+        PositionImpl next;
+        try {
+            next = getNextValidPositionInternal(position);
+        } catch (NullPointerException e) {
+            next = lastConfirmedEntry.getNext();
+            log.error("[{}] Can't find next valid position, fail back to the next position of the last position.", name, e);
+        }
+        return next;
+    }
+
+    public PositionImpl getNextValidPositionInternal(final PositionImpl position) {
         PositionImpl nextPosition = position.getNext();
         while (!isValidPosition(nextPosition)) {
             Long nextLedgerId = ledgers.ceilingKey(nextPosition.getLedgerId() + 1);
             if (nextLedgerId == null) {
-                return null;
+                throw new NullPointerException();
             }
             nextPosition = PositionImpl.get(nextLedgerId, 0);
         }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 06fd37d..427a773 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -1965,6 +1965,8 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         assertEquals(ledger.getNextValidPosition((PositionImpl) c1.getMarkDeletedPosition()), p1);
         assertEquals(ledger.getNextValidPosition(p1), p2);
         assertEquals(ledger.getNextValidPosition(p3), PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1));
+        assertEquals(ledger.getNextValidPosition(PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1)), PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1));
+        assertEquals(ledger.getNextValidPosition(PositionImpl.get(p3.getLedgerId() + 1, p3.getEntryId() + 1)), PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1));
     }
 
     /**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index a7fa72d..d7fcd41 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -624,7 +624,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                 future.completeExceptionally(e);
             }
         }).exceptionally(ex -> {
-            log.warn("[{}] Failed to create subscription: {} error: {}", topic, subscriptionName, ex.getMessage());
+            log.error("[{}] Failed to create subscription: {} error: {}", topic, subscriptionName, ex);
             USAGE_COUNT_UPDATER.decrementAndGet(PersistentTopic.this);
             future.completeExceptionally(new PersistenceException(ex));
             return null;