You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/11/26 13:10:14 UTC
[pulsar] 03/14: Remove unused listeners if it have no listeners. (#12654)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d342f35bd62850a6f030ff6120ce8bd7a1a9bf1b
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Fri Nov 19 18:38:45 2021 +0800
Remove unused listeners if it have no listeners. (#12654)
(cherry picked from commit d74af88a6aed5a7da3139a4228ae29f793ec72b2)
---
.../SystemTopicBasedTopicPoliciesService.java | 21 +++++++++++--
.../SystemTopicBasedTopicPoliciesServiceTest.java | 36 ++++++++++++++++++++--
2 files changed, 52 insertions(+), 5 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 10f47e6..1cc71e9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -69,7 +69,8 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
@VisibleForTesting
final Map<NamespaceName, Boolean> policyCacheInitMap = new ConcurrentHashMap<>();
- private final Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listeners = new ConcurrentHashMap<>();
+ @VisibleForTesting
+ final Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listeners = new ConcurrentHashMap<>();
public SystemTopicBasedTopicPoliciesService(PulsarService pulsarService) {
this.pulsarService = pulsarService;
@@ -483,12 +484,26 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
@Override
public void registerListener(TopicName topicName, TopicPolicyListener<TopicPolicies> listener) {
- listeners.computeIfAbsent(topicName, k -> Lists.newCopyOnWriteArrayList()).add(listener);
+ listeners.compute(topicName, (k, topicListeners) -> {
+ if (topicListeners == null) {
+ topicListeners = Lists.newCopyOnWriteArrayList();
+ }
+ topicListeners.add(listener);
+ return topicListeners;
+ });
}
@Override
public void unregisterListener(TopicName topicName, TopicPolicyListener<TopicPolicies> listener) {
- listeners.computeIfAbsent(topicName, k -> Lists.newCopyOnWriteArrayList()).remove(listener);
+ listeners.compute(topicName, (k, topicListeners) -> {
+ if (topicListeners != null){
+ topicListeners.remove(listener);
+ if (topicListeners.isEmpty()) {
+ topicListeners = null;
+ }
+ }
+ return topicListeners;
+ });
}
@Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 80f3dc9..be52f9a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -76,7 +76,6 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic
private static final TopicName TOPIC5 = TopicName.get("persistent", NamespaceName.get(NAMESPACE3), "topic-1");
private static final TopicName TOPIC6 = TopicName.get("persistent", NamespaceName.get(NAMESPACE3), "topic-2");
- private NamespaceEventsSystemTopicFactory systemTopicFactory;
private SystemTopicBasedTopicPoliciesService systemTopicBasedTopicPoliciesService;
@BeforeMethod(alwaysRun = true)
@@ -95,6 +94,40 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic
}
@Test
+ public void testConcurrentlyRegisterUnregisterListeners() throws ExecutionException, InterruptedException {
+ TopicName topicName = TopicName.get("test");
+ class TopicPolicyListenerImpl implements TopicPolicyListener<TopicPolicies> {
+
+ @Override
+ public void onUpdate(TopicPolicies data) {
+ //no op.
+ }
+ }
+
+ CompletableFuture<Void> f = CompletableFuture.completedFuture(null).thenRunAsync(() -> {
+ for (int i = 0; i < 100; i++) {
+ TopicPolicyListener<TopicPolicies> listener = new TopicPolicyListenerImpl();
+ systemTopicBasedTopicPoliciesService.registerListener(topicName, listener);
+ Assert.assertNotNull(systemTopicBasedTopicPoliciesService.listeners.get(topicName));
+ Assert.assertTrue(systemTopicBasedTopicPoliciesService.listeners.get(topicName).size() >= 1);
+ systemTopicBasedTopicPoliciesService.unregisterListener(topicName, listener);
+ }
+ });
+
+ for (int i = 0; i < 100; i++) {
+ TopicPolicyListener<TopicPolicies> listener = new TopicPolicyListenerImpl();
+ systemTopicBasedTopicPoliciesService.registerListener(topicName, listener);
+ Assert.assertNotNull(systemTopicBasedTopicPoliciesService.listeners.get(topicName));
+ Assert.assertTrue(systemTopicBasedTopicPoliciesService.listeners.get(topicName).size() >= 1);
+ systemTopicBasedTopicPoliciesService.unregisterListener(topicName, listener);
+ }
+
+ f.get();
+ //Some system topics will be added to the listeners. Just check if it contains topicName.
+ Assert.assertFalse(systemTopicBasedTopicPoliciesService.listeners.containsKey(topicName));
+ }
+
+ @Test
public void testGetPolicy() throws ExecutionException, InterruptedException, TopicPoliciesCacheNotInitException {
// Init topic policies
@@ -239,7 +272,6 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic
admin.lookups().lookupTopic(TOPIC4.toString());
admin.lookups().lookupTopic(TOPIC5.toString());
admin.lookups().lookupTopic(TOPIC6.toString());
- systemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarClient);
systemTopicBasedTopicPoliciesService = (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
}