You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/11/20 09:08:28 UTC

[GitHub] [pulsar] Jason918 opened a new pull request #12904: [broker] Fix topic policy listener deleted by mistake.

Jason918 opened a new pull request #12904:
URL: https://github.com/apache/pulsar/pull/12904


   <!--
   ### Contribution Checklist
     
     - Name the pull request in the form "[Issue XYZ][component] Title of the pull request", where *XYZ* should be replaced by the actual issue number.
       Skip *Issue XYZ* if there is no associated github issue for this pull request.
       Skip *component* if you are unsure about which is the best component. E.g. `[docs] Fix typo in produce method`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ### Motivation
   
   Here is the current way of dealing topic policy listeners in PersistentTopic, for example topic name is "A", with 3 partitions.
   - Register: call TopicPoliciesService.registerListener("A", listener), for all 3 partitions of topic "A".
   - Clean: call TopicPoliciesService.clean("A-partition-x"), here is the problem it will delete all listeners of all partitions of topic "A", if any partition is closed.
   
   A detailed case is designed in the new unit test `testListenerCleanupByPartition`.
   
   ### Modifications
   
   With previous optimization of #12654 , now we can use `org.apache.pulsar.broker.service.TopicPoliciesService#unregisterListener` to do the clean up.
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   This change added tests and can be verified as follows:
     - testListenerCleanupByPartition
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API: (no)
     - The schema: (no)
     - The default values of configurations: (no)
     - The wire protocol: (no)
     - The rest endpoints: (no)
     - The admin cli options: (no)
     - Anything that affects deployment: (no)
   
   ### Documentation
   
   Check the box below and label this PR (if you have committer privilege).
   
   Need to update docs? 
     
   - [x] `no-need-doc` 
   
   Bug fix.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on a change in pull request #12904: [broker] Fix topic policy listener deleted by mistake.

Posted by GitBox <gi...@apache.org>.
Jason918 commented on a change in pull request #12904:
URL: https://github.com/apache/pulsar/pull/12904#discussion_r753683117



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -3137,16 +3136,25 @@ private void initializeTopicSubscribeRateLimiterIfNeeded(Optional<TopicPolicies>
         }
     }
 
+    private TopicName getPartitionedTopicName() {

Review comment:
       Great point. This is a quite common usage in broker.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on a change in pull request #12904: [broker] Fix topic policy listener deleted by mistake.

Posted by GitBox <gi...@apache.org>.
Jason918 commented on a change in pull request #12904:
URL: https://github.com/apache/pulsar/pull/12904#discussion_r753683117



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -3137,16 +3136,25 @@ private void initializeTopicSubscribeRateLimiterIfNeeded(Optional<TopicPolicies>
         }
     }
 
+    private TopicName getPartitionedTopicName() {

Review comment:
       Great point. This is a quite common usage in broker.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -3140,13 +3139,16 @@ private void initializeTopicSubscribeRateLimiterIfNeeded(Optional<TopicPolicies>
     private void registerTopicPolicyListener() {
         if (brokerService.pulsar().getConfig().isSystemTopicEnabled()
                 && brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
-            TopicName topicName = TopicName.get(topic);
-            TopicName cloneTopicName = topicName;
-            if (topicName.isPartitioned()) {
-                cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
-            }
+            brokerService.getPulsar().getTopicPoliciesService()
+                    .registerListener(TopicName.getPartitionedTopicName(topic), this);
+        }
+    }
 
-            brokerService.getPulsar().getTopicPoliciesService().registerListener(cloneTopicName, this);
+    private void unregisterTopicPolicyListener() {
+        if (brokerService.pulsar().getConfig().isSystemTopicEnabled()
+                && brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
+            brokerService.getPulsar().getTopicPoliciesService()
+                    .unregisterListener(TopicName.getPartitionedTopicName(topic), this);

Review comment:
       I have optimized the unregister implementation in #12654.  See the last line of the unit test, 
   `listMap.get(topicName)` returns null instead of empty map.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Jason918 commented on a change in pull request #12904: [broker] Fix topic policy listener deleted by mistake.

Posted by GitBox <gi...@apache.org>.
Jason918 commented on a change in pull request #12904:
URL: https://github.com/apache/pulsar/pull/12904#discussion_r753782206



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -3140,13 +3139,16 @@ private void initializeTopicSubscribeRateLimiterIfNeeded(Optional<TopicPolicies>
     private void registerTopicPolicyListener() {
         if (brokerService.pulsar().getConfig().isSystemTopicEnabled()
                 && brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
-            TopicName topicName = TopicName.get(topic);
-            TopicName cloneTopicName = topicName;
-            if (topicName.isPartitioned()) {
-                cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
-            }
+            brokerService.getPulsar().getTopicPoliciesService()
+                    .registerListener(TopicName.getPartitionedTopicName(topic), this);
+        }
+    }
 
-            brokerService.getPulsar().getTopicPoliciesService().registerListener(cloneTopicName, this);
+    private void unregisterTopicPolicyListener() {
+        if (brokerService.pulsar().getConfig().isSystemTopicEnabled()
+                && brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
+            brokerService.getPulsar().getTopicPoliciesService()
+                    .unregisterListener(TopicName.getPartitionedTopicName(topic), this);

Review comment:
       I have optimized the unregister implementation in #12654.  See the last line of the unit test, 
   `listMap.get(topicName)` returns null instead of empty map.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #12904: [broker] Fix topic policy listener deleted by mistake.

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #12904:
URL: https://github.com/apache/pulsar/pull/12904#discussion_r753671643



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -3137,16 +3136,25 @@ private void initializeTopicSubscribeRateLimiterIfNeeded(Optional<TopicPolicies>
         }
     }
 
+    private TopicName getPartitionedTopicName() {

Review comment:
       this method looks like an utility method that should be moved to the TopicName class




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #12904: [broker] Fix topic policy listener deleted by mistake.

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #12904:
URL: https://github.com/apache/pulsar/pull/12904#discussion_r753671643



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -3137,16 +3136,25 @@ private void initializeTopicSubscribeRateLimiterIfNeeded(Optional<TopicPolicies>
         }
     }
 
+    private TopicName getPartitionedTopicName() {

Review comment:
       this method looks like an utility method that should be moved to the TopicName class

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -3137,16 +3136,25 @@ private void initializeTopicSubscribeRateLimiterIfNeeded(Optional<TopicPolicies>
         }
     }
 
+    private TopicName getPartitionedTopicName() {

Review comment:
       most of the body of this method looks like an utility method that should be moved to the TopicName class




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #12904: [broker] Fix topic policy listener deleted by mistake.

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #12904:
URL: https://github.com/apache/pulsar/pull/12904


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on a change in pull request #12904: [broker] Fix topic policy listener deleted by mistake.

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on a change in pull request #12904:
URL: https://github.com/apache/pulsar/pull/12904#discussion_r753781181



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -3140,13 +3139,16 @@ private void initializeTopicSubscribeRateLimiterIfNeeded(Optional<TopicPolicies>
     private void registerTopicPolicyListener() {
         if (brokerService.pulsar().getConfig().isSystemTopicEnabled()
                 && brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
-            TopicName topicName = TopicName.get(topic);
-            TopicName cloneTopicName = topicName;
-            if (topicName.isPartitioned()) {
-                cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
-            }
+            brokerService.getPulsar().getTopicPoliciesService()
+                    .registerListener(TopicName.getPartitionedTopicName(topic), this);
+        }
+    }
 
-            brokerService.getPulsar().getTopicPoliciesService().registerListener(cloneTopicName, this);
+    private void unregisterTopicPolicyListener() {
+        if (brokerService.pulsar().getConfig().isSystemTopicEnabled()
+                && brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
+            brokerService.getPulsar().getTopicPoliciesService()
+                    .unregisterListener(TopicName.getPartitionedTopicName(topic), this);

Review comment:
       In current `unregisterListener` implementation, it will make listeners key keep growing even thought  the listener unregister called.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #12904: [broker] Fix topic policy listener deleted by mistake.

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #12904:
URL: https://github.com/apache/pulsar/pull/12904#discussion_r753671643



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -3137,16 +3136,25 @@ private void initializeTopicSubscribeRateLimiterIfNeeded(Optional<TopicPolicies>
         }
     }
 
+    private TopicName getPartitionedTopicName() {

Review comment:
       most of the body of this method looks like an utility method that should be moved to the TopicName class




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on a change in pull request #12904: [broker] Fix topic policy listener deleted by mistake.

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on a change in pull request #12904:
URL: https://github.com/apache/pulsar/pull/12904#discussion_r753781181



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -3140,13 +3139,16 @@ private void initializeTopicSubscribeRateLimiterIfNeeded(Optional<TopicPolicies>
     private void registerTopicPolicyListener() {
         if (brokerService.pulsar().getConfig().isSystemTopicEnabled()
                 && brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
-            TopicName topicName = TopicName.get(topic);
-            TopicName cloneTopicName = topicName;
-            if (topicName.isPartitioned()) {
-                cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
-            }
+            brokerService.getPulsar().getTopicPoliciesService()
+                    .registerListener(TopicName.getPartitionedTopicName(topic), this);
+        }
+    }
 
-            brokerService.getPulsar().getTopicPoliciesService().registerListener(cloneTopicName, this);
+    private void unregisterTopicPolicyListener() {
+        if (brokerService.pulsar().getConfig().isSystemTopicEnabled()
+                && brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
+            brokerService.getPulsar().getTopicPoliciesService()
+                    .unregisterListener(TopicName.getPartitionedTopicName(topic), this);

Review comment:
       In current `unregisterListener` implementation, it will make listeners key keep growing even thought  the listener unregister called.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org