You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/28 09:05:37 UTC

[GitHub] [pulsar] Technoboy- opened a new pull request, #17867: [fix][broker] Fix system service namespace create internal event topic.

Technoboy- opened a new pull request, #17867:
URL: https://github.com/apache/pulsar/pull/17867

   ### Motivation
   
   Fix system service namespace(heartbeat namespace) creates internal `__change_events` topics:
   
   ```
   2022-09-28T17:04:24,833 - INFO  - [metadata-store-12-1:ServerCnx@1474] - [/127.0.0.1:50223] Created new producer: Producer{topic=SystemTopic{topic=persistent://pulsar/localhost:50220/healthcheck}, client=/127.0.0.1:50223, producerName=test-0-0, producerId=0}
   2022-09-28T17:04:24,845 - INFO  - [pulsar-io-6-3:ConsumerImpl@822] - [persistent://pulsar/localhost:50220/__change_events-partition-0][multiTopicsReader-895a02f0d4] Subscribing to topic on cnx [id: 0xc6ef1b83, L:/127.0.0.1:50223 - R:localhost/127.0.0.1:50218], consumerId 0
   2022-09-28T17:04:24,848 - INFO  - [pulsar-io-6-3:ConsumerImpl@822] - [persistent://pulsar/localhost:50220/__change_events-partition-1][multiTopicsReader-895a02f0d4] Subscribing to topic on cnx [id: 0xc6ef1b83, L:/127.0.0.1:50223 - R:localhost/127.0.0.1:50218], consumerId 1
   2022-09-28T17:04:24,849 - INFO  - [pulsar-io-6-3:ConsumerImpl@822] - [persistent://pulsar/localhost:50220/__change_events-partition-2][multiTopicsReader-895a02f0d4] Subscribing to topic on cnx [id: 0xc6ef1b83, L:/127.0.0.1:50223 - R:localhost/127.0.0.1:50218], consumerId 2
   2022-09-28T17:04:24,853 - INFO  - [pulsar-io-6-3:ConsumerImpl@822] - [persistent://pulsar/localhost:50220/__change_events-partition-3][multiTopicsReader-895a02f0d4] Subscribing to topic on cnx [id: 0xc6ef1b83, L:/127.0.0.1:50223 - R:localhost/127.0.0.1:50218], consumerId 3
   2022-09-28T17:04:24,853 - INFO  - [pulsar-io-6-3:ConsumerImpl@822] - [persistent://pulsar/localhost:50220/__change_events-partition-4][multiTopicsReader-895a02f0d4] Subscribing to topic on cnx [id: 0xc6ef1b83, L:/127.0.0.1:50223 - R:localhost/127.0.0.1:50218], consumerId 4
   2022-09-28T17:04:24,856 - INFO  - [pulsar-io-6-3:ProducerImpl@1694] - [persistent://pulsar/localhost:50220/healthcheck] [test-0-0] Created producer on cnx [id: 0xc6ef1b83, L:/127.0.0.1:50223 - R:localhost/127.0.0.1:50218]
   ```
   
   ### Documentation
   
   - [x] `doc-not-needed` 
   (Please explain why)
   
   
   ### Matching PR in forked repository
   
   PR in forked repository:  https://github.com/Technoboy-/pulsar/pull/8
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17867: [fix][broker] Fix system service namespace create internal event topic.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17867:
URL: https://github.com/apache/pulsar/pull/17867#discussion_r990587827


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java:
##########
@@ -101,6 +101,10 @@ public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName topicName, Top
 
     private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, ActionType actionType,
                                                          TopicPolicies policies) {
+        if (NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {

Review Comment:
   Why don't we use `NamespaceService#isSystemServiceNamespace`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- merged pull request #17867: [fix][broker] Fix system service namespace create internal event topic.

Posted by GitBox <gi...@apache.org>.
Technoboy- merged PR #17867:
URL: https://github.com/apache/pulsar/pull/17867


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codecov-commenter commented on pull request #17867: [fix][broker] Fix system service namespace create internal event topic.

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #17867:
URL: https://github.com/apache/pulsar/pull/17867#issuecomment-1276925939

   # [Codecov](https://codecov.io/gh/apache/pulsar/pull/17867?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > :exclamation: No coverage uploaded for pull request base (`master@7e420c6`). [Click here to learn what that means](https://docs.codecov.io/docs/error-reference?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#section-missing-base-commit).
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/pulsar/pull/17867/graphs/tree.svg?width=650&height=150&src=pr&token=acYqCpsK9J&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pulsar/pull/17867?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff            @@
   ##             master   #17867   +/-   ##
   =========================================
     Coverage          ?   71.05%           
     Complexity        ?      438           
   =========================================
     Files             ?       26           
     Lines             ?     2246           
     Branches          ?      245           
   =========================================
     Hits              ?     1596           
     Misses            ?      477           
     Partials          ?      173           
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | unittests | `71.05% <0.00%> (?)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #17867: [fix][broker] Fix system service namespace create internal event topic.

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #17867:
URL: https://github.com/apache/pulsar/pull/17867#discussion_r990589450


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java:
##########
@@ -101,6 +101,10 @@ public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName topicName, Top
 
     private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, ActionType actionType,
                                                          TopicPolicies policies) {
+        if (NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {

Review Comment:
   Keep the same with line 240.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #17867: [fix][broker] Fix system service namespace create internal event topic.

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #17867:
URL: https://github.com/apache/pulsar/pull/17867#discussion_r984151426


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java:
##########
@@ -172,10 +173,31 @@ public void testHealthCheckTopicNotOffload() throws Exception {
         LedgerOffloader ledgerOffloader = Mockito.mock(LedgerOffloader.class);
         config.setLedgerOffloader(ledgerOffloader);
         Assert.assertEquals(config.getLedgerOffloader(), ledgerOffloader);
-        admin.topicPolicies().setMaxConsumers(topicName.toString(), 2);
-        Awaitility.await().pollDelay(5, TimeUnit.SECONDS).untilAsserted(() -> {
-            Assert.assertEquals(persistentTopic.getManagedLedger().getConfig().getLedgerOffloader(),
-                    NullLedgerOffloader.INSTANCE);
+    }
+
+    @Test
+    public void testSystemNamespaceNotCreateChangeEventsTopic() throws Exception {
+        admin.brokers().healthcheck(TopicVersion.V2);
+        NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(),
+                pulsar.getConfig());
+        TopicName topicName = TopicName.get("persistent", namespaceName, SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
+        Optional<Topic> optionalTopic = pulsar.getBrokerService()
+                .getTopic(topicName.getPartition(1).toString(), false).join();
+        Assert.assertTrue(optionalTopic.isEmpty());
+    }
+
+    @Test
+    public void testHeartbeatTopicNotAllowedToSendEvent() throws Exception {
+        admin.brokers().healthcheck(TopicVersion.V2);
+        NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(),
+                pulsar.getConfig());
+        TopicName topicName = TopicName.get("persistent", namespaceName, SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
+        for (int partition = 0; partition < PARTITIONS; partition ++) {
+            pulsar.getBrokerService()
+                    .getTopic(topicName.getPartition(partition).toString(), true).join();
+        }
+        Assert.assertThrows(PulsarAdminException.ConflictException.class, () -> {

Review Comment:
   @mattisonchao will push a fix for this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17867: [fix][broker] Fix system service namespace create internal event topic.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17867:
URL: https://github.com/apache/pulsar/pull/17867#discussion_r990587575


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -1567,7 +1567,8 @@ public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(TopicName t
                     RetentionPolicies retentionPolicies = null;
                     OffloadPoliciesImpl topicLevelOffloadPolicies = null;
 
-                    if (pulsar.getConfig().isTopicLevelPoliciesEnabled()) {
+                    if (pulsar.getConfig().isTopicLevelPoliciesEnabled()
+                            && !NamespaceService.isSystemServiceNamespace(namespace.toString())) {

Review Comment:
   Maybe we can also move this check `!NamespaceService.isSystemServiceNamespace(namespace.toString())` to  `pulsar.getTopicPoliciesService().getTopicPolicies(topicName)`?
   
   Because I see you already check it at `SystemTopicBasedTopicPoliciesService#sendTopicPolicyEvent`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #17867: [fix][broker] Fix system service namespace create internal event topic.

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #17867:
URL: https://github.com/apache/pulsar/pull/17867#discussion_r990589262


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -1567,7 +1567,8 @@ public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(TopicName t
                     RetentionPolicies retentionPolicies = null;
                     OffloadPoliciesImpl topicLevelOffloadPolicies = null;
 
-                    if (pulsar.getConfig().isTopicLevelPoliciesEnabled()) {
+                    if (pulsar.getConfig().isTopicLevelPoliciesEnabled()
+                            && !NamespaceService.isSystemServiceNamespace(namespace.toString())) {

Review Comment:
   Ah, we should avoid creating the topic first. This is the root cause.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17867: [fix][broker] Fix system service namespace create internal event topic.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17867:
URL: https://github.com/apache/pulsar/pull/17867#discussion_r982435958


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java:
##########
@@ -172,10 +173,31 @@ public void testHealthCheckTopicNotOffload() throws Exception {
         LedgerOffloader ledgerOffloader = Mockito.mock(LedgerOffloader.class);
         config.setLedgerOffloader(ledgerOffloader);
         Assert.assertEquals(config.getLedgerOffloader(), ledgerOffloader);
-        admin.topicPolicies().setMaxConsumers(topicName.toString(), 2);
-        Awaitility.await().pollDelay(5, TimeUnit.SECONDS).untilAsserted(() -> {
-            Assert.assertEquals(persistentTopic.getManagedLedger().getConfig().getLedgerOffloader(),
-                    NullLedgerOffloader.INSTANCE);
+    }
+
+    @Test
+    public void testSystemNamespaceNotCreateChangeEventsTopic() throws Exception {
+        admin.brokers().healthcheck(TopicVersion.V2);
+        NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(),
+                pulsar.getConfig());
+        TopicName topicName = TopicName.get("persistent", namespaceName, SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
+        Optional<Topic> optionalTopic = pulsar.getBrokerService()
+                .getTopic(topicName.getPartition(1).toString(), false).join();
+        Assert.assertTrue(optionalTopic.isEmpty());
+    }
+
+    @Test
+    public void testHeartbeatTopicNotAllowedToSendEvent() throws Exception {
+        admin.brokers().healthcheck(TopicVersion.V2);
+        NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(),
+                pulsar.getConfig());
+        TopicName topicName = TopicName.get("persistent", namespaceName, SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
+        for (int partition = 0; partition < PARTITIONS; partition ++) {
+            pulsar.getBrokerService()
+                    .getTopic(topicName.getPartition(partition).toString(), true).join();
+        }
+        Assert.assertThrows(PulsarAdminException.ConflictException.class, () -> {

Review Comment:
   It should be `NotAllowedException`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #17867: [fix][broker] Fix system service namespace create internal event topic.

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #17867:
URL: https://github.com/apache/pulsar/pull/17867#discussion_r983107135


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java:
##########
@@ -172,10 +173,31 @@ public void testHealthCheckTopicNotOffload() throws Exception {
         LedgerOffloader ledgerOffloader = Mockito.mock(LedgerOffloader.class);
         config.setLedgerOffloader(ledgerOffloader);
         Assert.assertEquals(config.getLedgerOffloader(), ledgerOffloader);
-        admin.topicPolicies().setMaxConsumers(topicName.toString(), 2);
-        Awaitility.await().pollDelay(5, TimeUnit.SECONDS).untilAsserted(() -> {
-            Assert.assertEquals(persistentTopic.getManagedLedger().getConfig().getLedgerOffloader(),
-                    NullLedgerOffloader.INSTANCE);
+    }
+
+    @Test
+    public void testSystemNamespaceNotCreateChangeEventsTopic() throws Exception {
+        admin.brokers().healthcheck(TopicVersion.V2);
+        NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(),
+                pulsar.getConfig());
+        TopicName topicName = TopicName.get("persistent", namespaceName, SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
+        Optional<Topic> optionalTopic = pulsar.getBrokerService()
+                .getTopic(topicName.getPartition(1).toString(), false).join();
+        Assert.assertTrue(optionalTopic.isEmpty());
+    }
+
+    @Test
+    public void testHeartbeatTopicNotAllowedToSendEvent() throws Exception {
+        admin.brokers().healthcheck(TopicVersion.V2);
+        NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(),
+                pulsar.getConfig());
+        TopicName topicName = TopicName.get("persistent", namespaceName, SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
+        for (int partition = 0; partition < PARTITIONS; partition ++) {
+            pulsar.getBrokerService()
+                    .getTopic(topicName.getPartition(partition).toString(), true).join();
+        }
+        Assert.assertThrows(PulsarAdminException.ConflictException.class, () -> {

Review Comment:
   yes, find another issue for this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org