You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/03/25 17:50:56 UTC

[pulsar] branch branch-2.9 updated: [Broker] Fix NPE when subscription is already removed (#14363)

This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 9fc376b  [Broker] Fix NPE when subscription is already removed (#14363)
9fc376b is described below

commit 9fc376bb75a4732122cda9b383a5c2e788207a7c
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Fri Mar 25 12:03:49 2022 -0500

    [Broker] Fix NPE when subscription is already removed (#14363)
    
    * [Broker] Fix NPE when subscription is already removed
    
    * Cover same case for NonPersistentTopic
    
    Master Issue: #14362
    
    There is current a race condition when we remove a subscription. The race and how to reproduce it is described in the #14362. One of the consequences of the race is that there is a chance we try to remove the subscription from the topic twice. This leads to an NPE, as described in the issue.
    
    * Verify that the `sub` is not null before getting its stats.
    
    This is a trivial change.
    
    (cherry picked from commit aee1e7dbc55099c6b7cdc49e7b5e1c4cd66994ce)
---
 .../broker/service/nonpersistent/NonPersistentTopic.java       | 10 ++++++----
 .../pulsar/broker/service/persistent/PersistentTopic.java      | 10 ++++++----
 2 files changed, 12 insertions(+), 8 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 4a5aa1f..886140eb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -1026,10 +1026,12 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
         // That creates deadlock. so, execute remove it in different thread.
         return CompletableFuture.runAsync(() -> {
             NonPersistentSubscription sub = subscriptions.remove(subscriptionName);
-            // preserve accumulative stats form removed subscription
-            SubscriptionStatsImpl stats = sub.getStats();
-            bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
-            msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
+            if (sub != null) {
+                // preserve accumulative stats form removed subscription
+                SubscriptionStatsImpl stats = sub.getStats();
+                bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
+                msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
+            }
         }, brokerService.executor());
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index e6cf77f..7052fc5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1054,10 +1054,12 @@ public class PersistentTopic extends AbstractTopic
 
     void removeSubscription(String subscriptionName) {
         PersistentSubscription sub = subscriptions.remove(subscriptionName);
-        // preserve accumulative stats form removed subscription
-        SubscriptionStatsImpl stats = sub.getStats(false, false);
-        bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
-        msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
+        if (sub != null) {
+            // preserve accumulative stats form removed subscription
+            SubscriptionStatsImpl stats = sub.getStats(false, false);
+            bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
+            msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
+        }
     }
 
     /**