You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/11/26 13:10:14 UTC

[pulsar] 03/14: Remove unused listeners if it have no listeners. (#12654)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d342f35bd62850a6f030ff6120ce8bd7a1a9bf1b
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Fri Nov 19 18:38:45 2021 +0800

    Remove unused listeners if it have no listeners. (#12654)
    
    (cherry picked from commit d74af88a6aed5a7da3139a4228ae29f793ec72b2)
---
 .../SystemTopicBasedTopicPoliciesService.java      | 21 +++++++++++--
 .../SystemTopicBasedTopicPoliciesServiceTest.java  | 36 ++++++++++++++++++++--
 2 files changed, 52 insertions(+), 5 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 10f47e6..1cc71e9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -69,7 +69,8 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
     @VisibleForTesting
     final Map<NamespaceName, Boolean> policyCacheInitMap = new ConcurrentHashMap<>();
 
-    private final Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listeners = new ConcurrentHashMap<>();
+    @VisibleForTesting
+    final Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listeners = new ConcurrentHashMap<>();
 
     public SystemTopicBasedTopicPoliciesService(PulsarService pulsarService) {
         this.pulsarService = pulsarService;
@@ -483,12 +484,26 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
 
     @Override
     public void registerListener(TopicName topicName, TopicPolicyListener<TopicPolicies> listener) {
-        listeners.computeIfAbsent(topicName, k -> Lists.newCopyOnWriteArrayList()).add(listener);
+        listeners.compute(topicName, (k, topicListeners) -> {
+            if (topicListeners == null) {
+                topicListeners = Lists.newCopyOnWriteArrayList();
+            }
+            topicListeners.add(listener);
+            return topicListeners;
+        });
     }
 
     @Override
     public void unregisterListener(TopicName topicName, TopicPolicyListener<TopicPolicies> listener) {
-        listeners.computeIfAbsent(topicName, k -> Lists.newCopyOnWriteArrayList()).remove(listener);
+        listeners.compute(topicName, (k, topicListeners) -> {
+            if (topicListeners != null){
+                topicListeners.remove(listener);
+                if (topicListeners.isEmpty()) {
+                    topicListeners = null;
+                }
+            }
+            return topicListeners;
+        });
     }
 
     @Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 80f3dc9..be52f9a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -76,7 +76,6 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic
     private static final TopicName TOPIC5 = TopicName.get("persistent", NamespaceName.get(NAMESPACE3), "topic-1");
     private static final TopicName TOPIC6 = TopicName.get("persistent", NamespaceName.get(NAMESPACE3), "topic-2");
 
-    private NamespaceEventsSystemTopicFactory systemTopicFactory;
     private SystemTopicBasedTopicPoliciesService systemTopicBasedTopicPoliciesService;
 
     @BeforeMethod(alwaysRun = true)
@@ -95,6 +94,40 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic
     }
 
     @Test
+    public void testConcurrentlyRegisterUnregisterListeners() throws ExecutionException, InterruptedException {
+        TopicName topicName = TopicName.get("test");
+        class TopicPolicyListenerImpl implements TopicPolicyListener<TopicPolicies> {
+
+            @Override
+            public void onUpdate(TopicPolicies data) {
+                //no op.
+            }
+        }
+
+        CompletableFuture<Void> f = CompletableFuture.completedFuture(null).thenRunAsync(() -> {
+            for (int i = 0; i < 100; i++) {
+                TopicPolicyListener<TopicPolicies> listener = new TopicPolicyListenerImpl();
+                systemTopicBasedTopicPoliciesService.registerListener(topicName, listener);
+                Assert.assertNotNull(systemTopicBasedTopicPoliciesService.listeners.get(topicName));
+                Assert.assertTrue(systemTopicBasedTopicPoliciesService.listeners.get(topicName).size() >= 1);
+                systemTopicBasedTopicPoliciesService.unregisterListener(topicName, listener);
+            }
+        });
+
+        for (int i = 0; i < 100; i++) {
+            TopicPolicyListener<TopicPolicies> listener = new TopicPolicyListenerImpl();
+            systemTopicBasedTopicPoliciesService.registerListener(topicName, listener);
+            Assert.assertNotNull(systemTopicBasedTopicPoliciesService.listeners.get(topicName));
+            Assert.assertTrue(systemTopicBasedTopicPoliciesService.listeners.get(topicName).size() >= 1);
+            systemTopicBasedTopicPoliciesService.unregisterListener(topicName, listener);
+        }
+
+        f.get();
+        //Some system topics will be added to the listeners. Just check if it contains topicName.
+        Assert.assertFalse(systemTopicBasedTopicPoliciesService.listeners.containsKey(topicName));
+    }
+
+    @Test
     public void testGetPolicy() throws ExecutionException, InterruptedException, TopicPoliciesCacheNotInitException {
 
         // Init topic policies
@@ -239,7 +272,6 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic
         admin.lookups().lookupTopic(TOPIC4.toString());
         admin.lookups().lookupTopic(TOPIC5.toString());
         admin.lookups().lookupTopic(TOPIC6.toString());
-        systemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarClient);
         systemTopicBasedTopicPoliciesService = (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
     }