You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/12/08 08:26:38 UTC
[pulsar] 02/02: [improve][broker] System topic writer/reader connection not counted (#18603)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit cfd04798789a3f2d5b9bcce17513fc84c4cb3449
Author: Ruguo Yu <ji...@163.com>
AuthorDate: Mon Nov 28 09:44:20 2022 +0800
[improve][broker] System topic writer/reader connection not counted (#18603)
This PR is a supplement to #18369.
- `AbstractTopic.isSameAddressProducersExceeded()`
- `AbstractBaseDispatcher.isConsumersExceededOnSubscription()`
---
.../org/apache/pulsar/broker/service/AbstractBaseDispatcher.java | 6 +++++-
.../main/java/org/apache/pulsar/broker/service/AbstractTopic.java | 3 +++
.../apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java | 4 +++-
3 files changed, 11 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index d6f441f02bf..677b3a84a4c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -255,8 +255,12 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
protected abstract boolean isConsumersExceededOnSubscription();
protected boolean isConsumersExceededOnSubscription(AbstractTopic topic, int consumerSize) {
+ if (topic.isSystemTopic()) {
+ return false;
+ }
Integer maxConsumersPerSubscription = topic.getHierarchyTopicPolicies().getMaxConsumersPerSubscription().get();
- return maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumerSize;
+ return maxConsumersPerSubscription != null && maxConsumersPerSubscription > 0
+ && maxConsumersPerSubscription <= consumerSize;
}
private void processReplicatedSubscriptionSnapshot(PositionImpl pos, ByteBuf headersAndPayload) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index f93e0b7e5cf..c1fcee4a059 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -453,6 +453,9 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
}
protected boolean isSameAddressProducersExceeded(Producer producer) {
+ if (isSystemTopic() || producer.isRemote()) {
+ return false;
+ }
final int maxSameAddressProducers = brokerService.pulsar().getConfiguration()
.getMaxSameAddressProducersPerTopic();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index 5af983ed3af..3cd25d3b00c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -267,6 +267,7 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
admin.namespaces().createNamespace(ns, 2);
admin.topics().createPartitionedTopic(String.format("persistent://%s", topic), 1);
+ conf.setMaxSameAddressConsumersPerTopic(1);
admin.namespaces().setMaxConsumersPerTopic(ns, 1);
admin.topicPolicies().setMaxConsumers(topic, 1);
NamespaceEventsSystemTopicFactory systemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarClient);
@@ -275,8 +276,9 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
SystemTopicClient.Reader reader1 = systemTopicClientForNamespace.newReader();
SystemTopicClient.Reader reader2 = systemTopicClientForNamespace.newReader();
+ conf.setMaxSameAddressProducersPerTopic(1);
+ admin.namespaces().setMaxProducersPerTopic(ns, 1);
admin.topicPolicies().setMaxProducers(topic, 1);
-
CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writer1 = systemTopicClientForNamespace.newWriterAsync();
CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writer2 = systemTopicClientForNamespace.newWriterAsync();
CompletableFuture<Void> f1 = admin.topicPolicies().setCompactionThresholdAsync(topic, 1L);