You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/12/08 08:26:38 UTC

[pulsar] 02/02: [improve][broker] System topic writer/reader connection not counted (#18603)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit cfd04798789a3f2d5b9bcce17513fc84c4cb3449
Author: Ruguo Yu <ji...@163.com>
AuthorDate: Mon Nov 28 09:44:20 2022 +0800

    [improve][broker] System topic writer/reader connection not counted (#18603)
    
    This PR is a supplement to #18369.
    - `AbstractTopic.isSameAddressProducersExceeded()`
    - `AbstractBaseDispatcher.isConsumersExceededOnSubscription()`
---
 .../org/apache/pulsar/broker/service/AbstractBaseDispatcher.java    | 6 +++++-
 .../main/java/org/apache/pulsar/broker/service/AbstractTopic.java   | 3 +++
 .../apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java   | 4 +++-
 3 files changed, 11 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index d6f441f02bf..677b3a84a4c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -255,8 +255,12 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
     protected abstract boolean isConsumersExceededOnSubscription();
 
     protected boolean isConsumersExceededOnSubscription(AbstractTopic topic, int consumerSize) {
+        if (topic.isSystemTopic()) {
+            return false;
+        }
         Integer maxConsumersPerSubscription = topic.getHierarchyTopicPolicies().getMaxConsumersPerSubscription().get();
-        return maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumerSize;
+        return maxConsumersPerSubscription != null && maxConsumersPerSubscription > 0
+                && maxConsumersPerSubscription <= consumerSize;
     }
 
     private void processReplicatedSubscriptionSnapshot(PositionImpl pos, ByteBuf headersAndPayload) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index f93e0b7e5cf..c1fcee4a059 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -453,6 +453,9 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
     }
 
     protected boolean isSameAddressProducersExceeded(Producer producer) {
+        if (isSystemTopic() || producer.isRemote()) {
+            return false;
+        }
         final int maxSameAddressProducers = brokerService.pulsar().getConfiguration()
                 .getMaxSameAddressProducersPerTopic();
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index 5af983ed3af..3cd25d3b00c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -267,6 +267,7 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
         admin.namespaces().createNamespace(ns, 2);
         admin.topics().createPartitionedTopic(String.format("persistent://%s", topic), 1);
 
+        conf.setMaxSameAddressConsumersPerTopic(1);
         admin.namespaces().setMaxConsumersPerTopic(ns, 1);
         admin.topicPolicies().setMaxConsumers(topic, 1);
         NamespaceEventsSystemTopicFactory systemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarClient);
@@ -275,8 +276,9 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
         SystemTopicClient.Reader reader1 = systemTopicClientForNamespace.newReader();
         SystemTopicClient.Reader reader2 = systemTopicClientForNamespace.newReader();
 
+        conf.setMaxSameAddressProducersPerTopic(1);
+        admin.namespaces().setMaxProducersPerTopic(ns, 1);
         admin.topicPolicies().setMaxProducers(topic, 1);
-
         CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writer1 = systemTopicClientForNamespace.newWriterAsync();
         CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writer2 = systemTopicClientForNamespace.newWriterAsync();
         CompletableFuture<Void> f1 = admin.topicPolicies().setCompactionThresholdAsync(topic, 1L);