You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/26 13:26:46 UTC

[pulsar] 02/06: Fix partitioned system topic check bug (#10529)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 749c6832f8d225718640515f66fec7346ff6f033
Author: hangc0276 <ch...@apache.org>
AuthorDate: Sat May 15 09:58:19 2021 +0800

    Fix partitioned system topic check bug (#10529)
    
    ### Motivation
    When checking a partitioned topic whether a system topic, it will always be `false`. The check logic is.
    ```Java
    static boolean isSystemTopic(TopicName topicName) {
            return EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME.equals(topicName.getLocalName());
        }
    ```
    ```Java
    public static final String NAMESPACE_EVENTS_LOCAL_NAME = "__change_events";
    ```
    The partitioned topic name is like `__change_events-partition-0`.
    
    ### Modification
    1. Trim the partition suffix for the topic local name if exists.
    2. Add a test to cover this case.
    
    (cherry picked from commit 4a8d40c7540c5ec337a4c086db13299102380e12)
---
 .../apache/pulsar/broker/systopic/SystemTopicClient.java   |  5 +++++
 .../systopic/NamespaceEventsSystemTopicServiceTest.java    | 14 ++++++++++++++
 2 files changed, 19 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
index c5a3352..855f30d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
@@ -169,6 +169,11 @@ public interface SystemTopicClient {
     }
 
     static boolean isSystemTopic(TopicName topicName) {
+        if (topicName.isPartitioned()) {
+            return EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME
+                    .equals(TopicName.get(topicName.getPartitionedTopicName()).getLocalName());
+        }
+
         return EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME.equals(topicName.getLocalName());
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
index 52da458..1363e1d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
@@ -24,9 +24,11 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.common.events.ActionType;
 import org.apache.pulsar.common.events.EventType;
+import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.events.PulsarEvent;
 import org.apache.pulsar.common.events.TopicPoliciesEvent;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
@@ -106,6 +108,18 @@ public class NamespaceEventsSystemTopicServiceTest extends MockedPulsarServiceBa
         Assert.assertEquals(systemTopicClientForNamespace1.getReaders().size(), 0);
     }
 
+    @Test(timeOut = 30000)
+    public void checkSystemTopic() throws PulsarAdminException {
+        final String systemTopic = "persistent://" + NAMESPACE1 + "/" + EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
+        final String normalTopic = "persistent://" + NAMESPACE1 + "/normal_topic";
+        admin.topics().createPartitionedTopic(normalTopic, 3);
+        TopicName systemTopicName = TopicName.get(systemTopic);
+        TopicName normalTopicName = TopicName.get(normalTopic);
+
+        Assert.assertEquals(SystemTopicClient.isSystemTopic(systemTopicName), true);
+        Assert.assertEquals(SystemTopicClient.isSystemTopic(normalTopicName), false);
+    }
+
     private void prepareData() throws PulsarAdminException {
         admin.clusters().createCluster("test", new ClusterData(pulsar.getBrokerServiceUrl()));
         admin.tenants().createTenant("system-topic",