You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/26 13:26:46 UTC
[pulsar] 02/06: Fix partitioned system topic check bug (#10529)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 749c6832f8d225718640515f66fec7346ff6f033
Author: hangc0276 <ch...@apache.org>
AuthorDate: Sat May 15 09:58:19 2021 +0800
Fix partitioned system topic check bug (#10529)
### Motivation
When checking a partitioned topic whether a system topic, it will always be `false`. The check logic is.
```Java
static boolean isSystemTopic(TopicName topicName) {
return EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME.equals(topicName.getLocalName());
}
```
```Java
public static final String NAMESPACE_EVENTS_LOCAL_NAME = "__change_events";
```
The partitioned topic name is like `__change_events-partition-0`.
### Modification
1. Trim the partition suffix for the topic local name if exists.
2. Add a test to cover this case.
(cherry picked from commit 4a8d40c7540c5ec337a4c086db13299102380e12)
---
.../apache/pulsar/broker/systopic/SystemTopicClient.java | 5 +++++
.../systopic/NamespaceEventsSystemTopicServiceTest.java | 14 ++++++++++++++
2 files changed, 19 insertions(+)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
index c5a3352..855f30d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
@@ -169,6 +169,11 @@ public interface SystemTopicClient {
}
static boolean isSystemTopic(TopicName topicName) {
+ if (topicName.isPartitioned()) {
+ return EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME
+ .equals(TopicName.get(topicName.getPartitionedTopicName()).getLocalName());
+ }
+
return EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME.equals(topicName.getLocalName());
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
index 52da458..1363e1d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
@@ -24,9 +24,11 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.common.events.ActionType;
import org.apache.pulsar.common.events.EventType;
+import org.apache.pulsar.common.events.EventsTopicNames;
import org.apache.pulsar.common.events.PulsarEvent;
import org.apache.pulsar.common.events.TopicPoliciesEvent;
import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicPolicies;
@@ -106,6 +108,18 @@ public class NamespaceEventsSystemTopicServiceTest extends MockedPulsarServiceBa
Assert.assertEquals(systemTopicClientForNamespace1.getReaders().size(), 0);
}
+ @Test(timeOut = 30000)
+ public void checkSystemTopic() throws PulsarAdminException {
+ final String systemTopic = "persistent://" + NAMESPACE1 + "/" + EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
+ final String normalTopic = "persistent://" + NAMESPACE1 + "/normal_topic";
+ admin.topics().createPartitionedTopic(normalTopic, 3);
+ TopicName systemTopicName = TopicName.get(systemTopic);
+ TopicName normalTopicName = TopicName.get(normalTopic);
+
+ Assert.assertEquals(SystemTopicClient.isSystemTopic(systemTopicName), true);
+ Assert.assertEquals(SystemTopicClient.isSystemTopic(normalTopicName), false);
+ }
+
private void prepareData() throws PulsarAdminException {
admin.clusters().createCluster("test", new ClusterData(pulsar.getBrokerServiceUrl()));
admin.tenants().createTenant("system-topic",