You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/03/25 17:50:56 UTC
[pulsar] branch branch-2.9 updated: [Broker] Fix NPE when subscription is already removed (#14363)
This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 9fc376b [Broker] Fix NPE when subscription is already removed (#14363)
9fc376b is described below
commit 9fc376bb75a4732122cda9b383a5c2e788207a7c
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Fri Mar 25 12:03:49 2022 -0500
[Broker] Fix NPE when subscription is already removed (#14363)
* [Broker] Fix NPE when subscription is already removed
* Cover same case for NonPersistentTopic
Master Issue: #14362
There is current a race condition when we remove a subscription. The race and how to reproduce it is described in the #14362. One of the consequences of the race is that there is a chance we try to remove the subscription from the topic twice. This leads to an NPE, as described in the issue.
* Verify that the `sub` is not null before getting its stats.
This is a trivial change.
(cherry picked from commit aee1e7dbc55099c6b7cdc49e7b5e1c4cd66994ce)
---
.../broker/service/nonpersistent/NonPersistentTopic.java | 10 ++++++----
.../pulsar/broker/service/persistent/PersistentTopic.java | 10 ++++++----
2 files changed, 12 insertions(+), 8 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 4a5aa1f..886140eb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -1026,10 +1026,12 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
// That creates deadlock. so, execute remove it in different thread.
return CompletableFuture.runAsync(() -> {
NonPersistentSubscription sub = subscriptions.remove(subscriptionName);
- // preserve accumulative stats form removed subscription
- SubscriptionStatsImpl stats = sub.getStats();
- bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
- msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
+ if (sub != null) {
+ // preserve accumulative stats form removed subscription
+ SubscriptionStatsImpl stats = sub.getStats();
+ bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
+ msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
+ }
}, brokerService.executor());
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index e6cf77f..7052fc5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1054,10 +1054,12 @@ public class PersistentTopic extends AbstractTopic
void removeSubscription(String subscriptionName) {
PersistentSubscription sub = subscriptions.remove(subscriptionName);
- // preserve accumulative stats form removed subscription
- SubscriptionStatsImpl stats = sub.getStats(false, false);
- bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
- msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
+ if (sub != null) {
+ // preserve accumulative stats form removed subscription
+ SubscriptionStatsImpl stats = sub.getStats(false, false);
+ bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
+ msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
+ }
}
/**