You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2023/05/25 19:48:15 UTC

[pulsar] branch branch-3.0 updated: [fix][broker] partitioned __change_events topic is policy topic (#20392)

This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new b190a8acc4b [fix][broker] partitioned __change_events topic is policy topic (#20392)
b190a8acc4b is described below

commit b190a8acc4b75ed37c9696dd60a709456b3f93d8
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Thu May 25 03:46:31 2023 -0500

    [fix][broker] partitioned __change_events topic is policy topic (#20392)
    
    (cherry picked from commit 9918bced4465e0b0746a7959550c90cb76ae945f)
---
 .../broker/service/persistent/PersistentTopic.java |  2 +-
 .../pulsar/common/naming/SystemTopicNames.java     |  6 +--
 .../pulsar/common/naming/SystemTopicNamesTest.java | 47 ++++++++++++++++++++++
 3 files changed, 51 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 15854f55c5c..98e51a2e3ed 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -819,7 +819,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             }
 
             try {
-                if (!topic.endsWith(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)
+                if (!SystemTopicNames.isTopicPoliciesSystemTopic(topic)
                         && !checkSubscriptionTypesEnable(subType)) {
                     return FutureUtil.failedFuture(
                             new NotAllowedException("Topic[{" + topic + "}] doesn't support "
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
index 8fc7d014b57..716d9bc31fa 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
@@ -81,7 +81,7 @@ public class SystemTopicNames {
         if (topic == null) {
             return false;
         }
-        return TopicName.get(topic).getLocalName().equals(NAMESPACE_EVENTS_LOCAL_NAME);
+        return TopicName.getPartitionedTopicName(topic).getLocalName().equals(NAMESPACE_EVENTS_LOCAL_NAME);
     }
 
     public static boolean isTransactionInternalName(TopicName topicName) {
@@ -92,7 +92,7 @@ public class SystemTopicNames {
     }
 
     public static boolean isSystemTopic(TopicName topicName) {
-        TopicName nonePartitionedTopicName = TopicName.get(topicName.getPartitionedTopicName());
-        return isEventSystemTopic(nonePartitionedTopicName) || isTransactionInternalName(nonePartitionedTopicName);
+        TopicName nonPartitionedTopicName = TopicName.get(topicName.getPartitionedTopicName());
+        return isEventSystemTopic(nonPartitionedTopicName) || isTransactionInternalName(nonPartitionedTopicName);
     }
 }
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/SystemTopicNamesTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/SystemTopicNamesTest.java
new file mode 100644
index 00000000000..92d93021973
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/SystemTopicNamesTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.naming;
+
+import static org.testng.AssertJUnit.assertEquals;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Test
+public class SystemTopicNamesTest {
+
+    @DataProvider(name = "topicPoliciesSystemTopicNames")
+    public static Object[][] topicPoliciesSystemTopicNames() {
+        return new Object[][] {
+                {"persistent://public/default/__change_events", true},
+                {"persistent://public/default/__change_events-partition-0", true},
+                {"persistent://random-tenant/random-ns/__change_events", true},
+                {"persistent://random-tenant/random-ns/__change_events-partition-1", true},
+                {"persistent://public/default/not_really__change_events", false},
+                {"persistent://public/default/__change_events-diff-suffix", false},
+                {"persistent://a/b/not_really__change_events", false},
+        };
+    }
+
+    @Test(dataProvider = "topicPoliciesSystemTopicNames")
+    public void testIsTopicPoliciesSystemTopic(String topicName, boolean expectedResult) {
+        assertEquals(expectedResult, SystemTopicNames.isTopicPoliciesSystemTopic(topicName));
+        assertEquals(expectedResult, SystemTopicNames.isSystemTopic(TopicName.get(topicName)));
+        assertEquals(expectedResult, SystemTopicNames.isEventSystemTopic(TopicName.get(topicName)));
+    }
+}