You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2023/05/25 19:48:15 UTC
[pulsar] branch branch-3.0 updated: [fix][broker] partitioned __change_events topic is policy topic (#20392)
This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new b190a8acc4b [fix][broker] partitioned __change_events topic is policy topic (#20392)
b190a8acc4b is described below
commit b190a8acc4b75ed37c9696dd60a709456b3f93d8
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Thu May 25 03:46:31 2023 -0500
[fix][broker] partitioned __change_events topic is policy topic (#20392)
(cherry picked from commit 9918bced4465e0b0746a7959550c90cb76ae945f)
---
.../broker/service/persistent/PersistentTopic.java | 2 +-
.../pulsar/common/naming/SystemTopicNames.java | 6 +--
.../pulsar/common/naming/SystemTopicNamesTest.java | 47 ++++++++++++++++++++++
3 files changed, 51 insertions(+), 4 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 15854f55c5c..98e51a2e3ed 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -819,7 +819,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
}
try {
- if (!topic.endsWith(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)
+ if (!SystemTopicNames.isTopicPoliciesSystemTopic(topic)
&& !checkSubscriptionTypesEnable(subType)) {
return FutureUtil.failedFuture(
new NotAllowedException("Topic[{" + topic + "}] doesn't support "
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
index 8fc7d014b57..716d9bc31fa 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
@@ -81,7 +81,7 @@ public class SystemTopicNames {
if (topic == null) {
return false;
}
- return TopicName.get(topic).getLocalName().equals(NAMESPACE_EVENTS_LOCAL_NAME);
+ return TopicName.getPartitionedTopicName(topic).getLocalName().equals(NAMESPACE_EVENTS_LOCAL_NAME);
}
public static boolean isTransactionInternalName(TopicName topicName) {
@@ -92,7 +92,7 @@ public class SystemTopicNames {
}
public static boolean isSystemTopic(TopicName topicName) {
- TopicName nonePartitionedTopicName = TopicName.get(topicName.getPartitionedTopicName());
- return isEventSystemTopic(nonePartitionedTopicName) || isTransactionInternalName(nonePartitionedTopicName);
+ TopicName nonPartitionedTopicName = TopicName.get(topicName.getPartitionedTopicName());
+ return isEventSystemTopic(nonPartitionedTopicName) || isTransactionInternalName(nonPartitionedTopicName);
}
}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/SystemTopicNamesTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/SystemTopicNamesTest.java
new file mode 100644
index 00000000000..92d93021973
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/SystemTopicNamesTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.naming;
+
+import static org.testng.AssertJUnit.assertEquals;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Test
+public class SystemTopicNamesTest {
+
+ @DataProvider(name = "topicPoliciesSystemTopicNames")
+ public static Object[][] topicPoliciesSystemTopicNames() {
+ return new Object[][] {
+ {"persistent://public/default/__change_events", true},
+ {"persistent://public/default/__change_events-partition-0", true},
+ {"persistent://random-tenant/random-ns/__change_events", true},
+ {"persistent://random-tenant/random-ns/__change_events-partition-1", true},
+ {"persistent://public/default/not_really__change_events", false},
+ {"persistent://public/default/__change_events-diff-suffix", false},
+ {"persistent://a/b/not_really__change_events", false},
+ };
+ }
+
+ @Test(dataProvider = "topicPoliciesSystemTopicNames")
+ public void testIsTopicPoliciesSystemTopic(String topicName, boolean expectedResult) {
+ assertEquals(expectedResult, SystemTopicNames.isTopicPoliciesSystemTopic(topicName));
+ assertEquals(expectedResult, SystemTopicNames.isSystemTopic(TopicName.get(topicName)));
+ assertEquals(expectedResult, SystemTopicNames.isEventSystemTopic(TopicName.get(topicName)));
+ }
+}