You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/08/31 12:28:23 UTC

[pulsar] branch master updated: Fix NPE when acknowledge messages at the broker side (#7878) (#7937)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new af3a7af  Fix NPE when acknowledge messages at the broker side (#7878) (#7937)
af3a7af is described below

commit af3a7afdfe811b4701d1c19a8783cd33faff0120
Author: Tboy <gu...@immomo.com>
AuthorDate: Mon Aug 31 20:27:56 2020 +0800

    Fix NPE when acknowledge messages at the broker side (#7878) (#7937)
---
 .../broker/service/persistent/PersistentSubscription.java    | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 24696e9..5476b8d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -361,7 +361,9 @@ public class PersistentSubscription implements Subscription {
                 cursor.asyncDelete(positions, deleteCallback, positions);
             }
 
-            dispatcher.getRedeliveryTracker().removeBatch(positions);
+            if(dispatcher != null){
+                dispatcher.getRedeliveryTracker().removeBatch(positions);
+            }
         }
 
         if (!cursor.getMarkDeletedPosition().equals(previousMarkDeletePosition)) {
@@ -379,11 +381,15 @@ public class PersistentSubscription implements Subscription {
 
         if (topic.getManagedLedger().isTerminated() && cursor.getNumberOfEntriesInBacklog(false) == 0) {
             // Notify all consumer that the end of topic was reached
-            dispatcher.getConsumers().forEach(Consumer::reachedEndOfTopic);
+            if(dispatcher != null){
+                dispatcher.getConsumers().forEach(Consumer::reachedEndOfTopic);
+            }
         }
 
         // Signal the dispatchers to give chance to take extra actions
-        dispatcher.acknowledgementWasProcessed();
+        if(dispatcher != null){
+            dispatcher.acknowledgementWasProcessed();
+        }
     }
 
     /**