You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/08/31 12:28:23 UTC
[pulsar] branch master updated: Fix NPE when acknowledge messages
at the broker side (#7878) (#7937)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new af3a7af Fix NPE when acknowledge messages at the broker side (#7878) (#7937)
af3a7af is described below
commit af3a7afdfe811b4701d1c19a8783cd33faff0120
Author: Tboy <gu...@immomo.com>
AuthorDate: Mon Aug 31 20:27:56 2020 +0800
Fix NPE when acknowledge messages at the broker side (#7878) (#7937)
---
.../broker/service/persistent/PersistentSubscription.java | 12 +++++++++---
1 file changed, 9 insertions(+), 3 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 24696e9..5476b8d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -361,7 +361,9 @@ public class PersistentSubscription implements Subscription {
cursor.asyncDelete(positions, deleteCallback, positions);
}
- dispatcher.getRedeliveryTracker().removeBatch(positions);
+ if(dispatcher != null){
+ dispatcher.getRedeliveryTracker().removeBatch(positions);
+ }
}
if (!cursor.getMarkDeletedPosition().equals(previousMarkDeletePosition)) {
@@ -379,11 +381,15 @@ public class PersistentSubscription implements Subscription {
if (topic.getManagedLedger().isTerminated() && cursor.getNumberOfEntriesInBacklog(false) == 0) {
// Notify all consumer that the end of topic was reached
- dispatcher.getConsumers().forEach(Consumer::reachedEndOfTopic);
+ if(dispatcher != null){
+ dispatcher.getConsumers().forEach(Consumer::reachedEndOfTopic);
+ }
}
// Signal the dispatchers to give chance to take extra actions
- dispatcher.acknowledgementWasProcessed();
+ if(dispatcher != null){
+ dispatcher.acknowledgementWasProcessed();
+ }
}
/**