You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/10/13 06:50:01 UTC

[pulsar] 01/02: [fix][broker] Fix system service namespace create internal event topic. (#17867)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 760b1adf56f5424dedfed4c41805c8874463f29b
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Thu Oct 13 13:17:16 2022 +0800

    [fix][broker] Fix system service namespace create internal event topic. (#17867)
---
 .../pulsar/broker/service/BrokerService.java       |  3 ++-
 .../SystemTopicBasedTopicPoliciesService.java      |  4 +++
 .../apache/pulsar/broker/admin/AdminApi2Test.java  |  2 +-
 .../systopic/PartitionedSystemTopicTest.java       | 30 +++++++++++++++++++---
 4 files changed, 33 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index b704044e139..b437f976837 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1507,7 +1507,8 @@ public class BrokerService implements Closeable {
                     RetentionPolicies retentionPolicies = null;
                     OffloadPoliciesImpl topicLevelOffloadPolicies = null;
 
-                    if (pulsar.getConfig().isTopicLevelPoliciesEnabled()) {
+                    if (pulsar.getConfig().isTopicLevelPoliciesEnabled()
+                            && !NamespaceService.isSystemServiceNamespace(namespace.toString())) {
                         try {
                             TopicPolicies topicPolicies = pulsar.getTopicPoliciesService().getTopicPolicies(topicName);
                             if (topicPolicies != null) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index f5db6e2311b..26879bff3ce 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -101,6 +101,10 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
 
     private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, ActionType actionType,
                                                          TopicPolicies policies) {
+        if (NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
+            return CompletableFuture.failedFuture(
+                    new BrokerServiceException.NotAllowedException("Not allowed to send event to health check topic"));
+        }
         CompletableFuture<Void> result = new CompletableFuture<>();
         try {
             createSystemTopicFactoryIfNeeded();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 31b3ee5355a..bb8749e9226 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -524,7 +524,7 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest {
      * @param namespaceName
      * @throws Exception
      */
-    @Test(dataProvider = "namespaceNames", timeOut = 10000)
+    @Test(dataProvider = "namespaceNames", timeOut = 30000)
     public void testResetCursorOnPosition(String namespaceName) throws Exception {
         final String topicName = "persistent://prop-xyz/use/" + namespaceName + "/resetPosition";
         final int totalProducedMessages = 50;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index d4ed12573f3..d2ae23bb6c9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -30,6 +30,7 @@ import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.ListTopicsOptions;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
@@ -172,10 +173,31 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
         LedgerOffloader ledgerOffloader = Mockito.mock(LedgerOffloader.class);
         config.setLedgerOffloader(ledgerOffloader);
         Assert.assertEquals(config.getLedgerOffloader(), ledgerOffloader);
-        admin.topicPolicies().setMaxConsumers(topicName.toString(), 2);
-        Awaitility.await().pollDelay(5, TimeUnit.SECONDS).untilAsserted(() -> {
-            Assert.assertEquals(persistentTopic.getManagedLedger().getConfig().getLedgerOffloader(),
-                    NullLedgerOffloader.INSTANCE);
+    }
+
+    @Test
+    public void testSystemNamespaceNotCreateChangeEventsTopic() throws Exception {
+        admin.brokers().healthcheck(TopicVersion.V2);
+        NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(),
+                pulsar.getConfig());
+        TopicName topicName = TopicName.get("persistent", namespaceName, SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
+        Optional<Topic> optionalTopic = pulsar.getBrokerService()
+                .getTopic(topicName.getPartition(1).toString(), false).join();
+        Assert.assertTrue(optionalTopic.isEmpty());
+    }
+
+    @Test
+    public void testHeartbeatTopicNotAllowedToSendEvent() throws Exception {
+        admin.brokers().healthcheck(TopicVersion.V2);
+        NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(),
+                pulsar.getConfig());
+        TopicName topicName = TopicName.get("persistent", namespaceName, SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
+        for (int partition = 0; partition < PARTITIONS; partition ++) {
+            pulsar.getBrokerService()
+                    .getTopic(topicName.getPartition(partition).toString(), true).join();
+        }
+        Assert.assertThrows(PulsarAdminException.ConflictException.class, () -> {
+            admin.topicPolicies().setMaxConsumers(topicName.toString(), 2);
         });
     }