You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/19 09:51:11 UTC

[pulsar] 09/26: [fix][broker] Fix creating system namespace topic failure. (#14949)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fed8b8c324685d907e9867eac5a14ed8a5fefa78
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Sun Apr 3 21:50:45 2022 +0800

    [fix][broker] Fix creating system namespace topic failure. (#14949)
    
    (cherry picked from commit f3b87b65c6946eb197c1eece22cff8ff04e16fcb)
---
 .../pulsar/broker/service/BrokerService.java       |  3 +-
 .../systopic/PartitionedSystemTopicTest.java       | 33 +++++++++++++++++++++-
 2 files changed, 33 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 9f14da56653..261d006ebd0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -24,7 +24,6 @@ import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 import static org.apache.commons.collections.CollectionUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.broker.PulsarService.isTransactionSystemTopic;
-import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -2606,7 +2605,7 @@ public class BrokerService implements Closeable {
 
     public boolean isAllowAutoTopicCreation(final TopicName topicName) {
         //System topic can always be created automatically
-        if (pulsar.getConfiguration().isSystemTopicEnabled() && checkTopicIsEventsNames(topicName)) {
+        if (pulsar.getConfiguration().isSystemTopicEnabled() && isSystemTopic(topicName)) {
             return true;
         }
         AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index bbd3cae7117..ff45c140f56 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -19,21 +19,28 @@
 package org.apache.pulsar.broker.systopic;
 
 import com.google.common.collect.Sets;
+import lombok.Cleanup;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
-
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 @Test(groups = "broker")
 public class PartitionedSystemTopicTest extends BrokerTestBase {
@@ -104,4 +111,28 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
         }
     }
 
+    @Test
+    public void testProduceAndConsumeUnderSystemNamespace() throws Exception {
+        TenantInfo tenantInfo = TenantInfo
+                .builder()
+                .adminRoles(Sets.newHashSet("admin"))
+                .allowedClusters(Sets.newHashSet("test"))
+                .build();
+        admin.tenants().createTenant("pulsar", tenantInfo);
+        admin.namespaces().createNamespace("pulsar/system", 2);
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer().topic("pulsar/system/__topic-1").create();
+        producer.send("test".getBytes(StandardCharsets.UTF_8));
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient
+                .newConsumer()
+                .topic("pulsar/system/__topic-1")
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionName("sub1")
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+        Message<byte[]> receive = consumer.receive(5, TimeUnit.SECONDS);
+        Assert.assertNotNull(receive);
+    }
+
 }