You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/12/08 08:32:19 UTC

[pulsar] branch branch-2.10 updated: [improve][broker] System topic writer/reader connection not counted (#18603)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new b33bff98d0d [improve][broker] System topic writer/reader connection not counted (#18603)
b33bff98d0d is described below

commit b33bff98d0da37cd7c194aa2264a13ebceb802bc
Author: Ruguo Yu <ji...@163.com>
AuthorDate: Mon Nov 28 09:44:20 2022 +0800

    [improve][broker] System topic writer/reader connection not counted (#18603)
    
    This PR is a supplement to #18369.
    - `AbstractTopic.isSameAddressProducersExceeded()`
    - `AbstractBaseDispatcher.isConsumersExceededOnSubscription()`
---
 .../org/apache/pulsar/broker/service/AbstractBaseDispatcher.java    | 6 +++++-
 .../main/java/org/apache/pulsar/broker/service/AbstractTopic.java   | 3 +++
 .../apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java   | 4 +++-
 3 files changed, 11 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 3d3f3db970c..c9c0300da05 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -288,8 +288,12 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
     protected abstract boolean isConsumersExceededOnSubscription();
 
     protected boolean isConsumersExceededOnSubscription(AbstractTopic topic, int consumerSize) {
+        if (topic.isSystemTopic()) {
+            return false;
+        }
         Integer maxConsumersPerSubscription = topic.getHierarchyTopicPolicies().getMaxConsumersPerSubscription().get();
-        return maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumerSize;
+        return maxConsumersPerSubscription != null && maxConsumersPerSubscription > 0
+                && maxConsumersPerSubscription <= consumerSize;
     }
 
     private void processReplicatedSubscriptionSnapshot(PositionImpl pos, ByteBuf headersAndPayload) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index d0451d5a5fe..8b85615efc9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -452,6 +452,9 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
     }
 
     protected boolean isSameAddressProducersExceeded(Producer producer) {
+        if (isSystemTopic() || producer.isRemote()) {
+            return false;
+        }
         final int maxSameAddressProducers = brokerService.pulsar().getConfiguration()
                 .getMaxSameAddressProducersPerTopic();
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index 89834d300f6..e936b974e76 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -244,6 +244,7 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
         admin.namespaces().createNamespace(ns, 2);
         admin.topics().createPartitionedTopic(String.format("persistent://%s", topic), 1);
 
+        conf.setMaxSameAddressConsumersPerTopic(1);
         admin.namespaces().setMaxConsumersPerTopic(ns, 1);
         admin.topicPolicies().setMaxConsumers(topic, 1);
         NamespaceEventsSystemTopicFactory systemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarClient);
@@ -252,8 +253,9 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
         SystemTopicClient.Reader reader1 = systemTopicClientForNamespace.newReader();
         SystemTopicClient.Reader reader2 = systemTopicClientForNamespace.newReader();
 
+        conf.setMaxSameAddressProducersPerTopic(1);
+        admin.namespaces().setMaxProducersPerTopic(ns, 1);
         admin.topicPolicies().setMaxProducers(topic, 1);
-
         CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writer1 = systemTopicClientForNamespace.newWriterAsync();
         CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writer2 = systemTopicClientForNamespace.newWriterAsync();
         CompletableFuture<Void> f1 = admin.topicPolicies().setCompactionThresholdAsync(topic, 1L);