You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/12/06 15:05:27 UTC

[pulsar] branch branch-2.9 updated: [improve][broker] System topic writer/reader connection not counted. (#18369)

This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 1d01714348d [improve][broker] System topic writer/reader connection not counted. (#18369)
1d01714348d is described below

commit 1d01714348d3995ec5199665a1fa2a1220911688
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Wed Nov 23 14:23:57 2022 +0800

    [improve][broker] System topic writer/reader connection not counted. (#18369)
    
    (cherry picked from commit b219ccac334c4b57df48d07e78ea381f54db1a7d)
---
 .../pulsar/broker/service/AbstractTopic.java       | 19 ++++++++--
 .../broker/service/persistent/PersistentTopic.java |  3 ++
 .../pulsar/broker/service/ReplicatorTest.java      | 32 +++++++++++++++++
 .../systopic/PartitionedSystemTopicTest.java       | 40 +++++++++++++++++++++-
 4 files changed, 90 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 7c8d3b24223..eb06a41d42f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -164,7 +164,10 @@ public abstract class AbstractTopic implements Topic {
         }
     }
 
-    protected boolean isProducersExceeded() {
+    protected boolean isProducersExceeded(Producer producer) {
+        if (isSystemTopic() || producer.isRemote()) {
+            return false;
+        }
         Integer maxProducers = getTopicPolicies().map(TopicPolicies::getMaxProducerPerTopic).orElse(null);
 
         if (maxProducers == null) {
@@ -175,12 +178,16 @@ public abstract class AbstractTopic implements Topic {
         }
         maxProducers = maxProducers != null ? maxProducers : brokerService.pulsar()
                 .getConfiguration().getMaxProducersPerTopic();
-        if (maxProducers > 0 && maxProducers <= producers.size()) {
+        if (maxProducers > 0 && maxProducers <= producers.size() && maxProducers <= getUserCreatedProducersSize()) {
             return true;
         }
         return false;
     }
 
+    private long getUserCreatedProducersSize() {
+        return producers.values().stream().filter(p -> !p.isRemote()).count();
+    }
+
     protected boolean isSameAddressProducersExceeded(Producer producer) {
         final int maxSameAddressProducers = brokerService.pulsar().getConfiguration()
                 .getMaxSameAddressProducersPerTopic();
@@ -206,6 +213,9 @@ public abstract class AbstractTopic implements Topic {
     }
 
     protected boolean isConsumersExceededOnTopic() {
+        if (isSystemTopic()) {
+            return false;
+        }
         Integer maxConsumers = getTopicPolicies().map(TopicPolicies::getMaxConsumerPerTopic).orElse(null);
         if (maxConsumers == null) {
 
@@ -225,6 +235,9 @@ public abstract class AbstractTopic implements Topic {
     }
 
     protected boolean isSameAddressConsumersExceededOnTopic(Consumer consumer) {
+        if (isSystemTopic()) {
+            return false;
+        }
         final int maxSameAddressConsumers = brokerService.pulsar().getConfiguration()
                 .getMaxSameAddressConsumersPerTopic();
 
@@ -652,7 +665,7 @@ public abstract class AbstractTopic implements Topic {
     }
 
     protected void internalAddProducer(Producer producer) throws BrokerServiceException {
-        if (isProducersExceeded()) {
+        if (isProducersExceeded(producer)) {
             log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic);
             throw new BrokerServiceException.ProducerBusyException("Topic reached max producers limit");
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index df90547c723..f630ed32d47 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3284,6 +3284,9 @@ public class PersistentTopic extends AbstractTopic
     }
 
     private boolean checkMaxSubscriptionsPerTopicExceed(String subscriptionName) {
+        if (isSystemTopic()) {
+            return false;
+        }
         //Existing subscriptions are not affected
         if (StringUtils.isNotEmpty(subscriptionName) && getSubscription(subscriptionName) != null) {
             return false;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index d9ac04c3fc2..8fa267c4e35 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -73,6 +73,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.client.api.RawReader;
 import org.apache.pulsar.client.api.Schema;
@@ -1394,4 +1395,35 @@ public class ReplicatorTest extends ReplicatorTestBase {
 
     private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class);
 
+    @Test
+    public void testReplicatorProducerNotExceed() throws Exception {
+        log.info("--- testReplicatorProducerNotExceed ---");
+        String namespace1 = "pulsar/ns11";
+        admin1.namespaces().createNamespace(namespace1);
+        admin1.namespaces().setNamespaceReplicationClusters(namespace1, Sets.newHashSet("r1", "r2"));
+        final TopicName dest1 = TopicName.get(
+                BrokerTestUtil.newUniqueName("persistent://" + namespace1 + "/testReplicatorProducerNotExceed1"));
+        String namespace2 = "pulsar/ns22";
+        admin2.namespaces().createNamespace(namespace2);
+        admin2.namespaces().setNamespaceReplicationClusters(namespace2, Sets.newHashSet("r1", "r2"));
+        final TopicName dest2 = TopicName.get(
+                BrokerTestUtil.newUniqueName("persistent://" + namespace1 + "/testReplicatorProducerNotExceed2"));
+        admin1.topics().createPartitionedTopic(dest1.toString(), 1);
+        admin1.topicPolicies().setMaxProducers(dest1.toString(), 1);
+        admin2.topics().createPartitionedTopic(dest2.toString(), 1);
+        admin2.topicPolicies().setMaxProducers(dest2.toString(), 1);
+        @Cleanup
+        MessageProducer producer1 = new MessageProducer(url1, dest1);
+        log.info("--- Starting producer1 --- " + url1);
+
+        producer1.produce(1);
+
+        @Cleanup
+        MessageProducer producer2 = new MessageProducer(url2, dest2);
+        log.info("--- Starting producer2 --- " + url2);
+
+        producer2.produce(1);
+
+        Assert.assertThrows(PulsarClientException.ProducerBusyException.class, () -> new MessageProducer(url2, dest2));
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index cf45d614f0f..ce4f44dd339 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.events.EventsTopicNames;
+import org.apache.pulsar.common.events.PulsarEvent;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -179,7 +180,7 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
     }
 
     @Test
-    private void testSetBacklogCausedCreatingProducerFailure() throws Exception {
+    public void testSetBacklogCausedCreatingProducerFailure() throws Exception {
         final String ns = "prop/ns-test";
         final String topic = ns + "/topic-1";
 
@@ -234,4 +235,41 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
             Assert.fail("failed to create producer");
         }
     }
+
+    @Test
+    public void testSystemTopicNotCheckExceed() throws Exception {
+        final String ns = "prop/ns-test";
+        final String topic = ns + "/topic-1";
+
+        admin.namespaces().createNamespace(ns, 2);
+        admin.topics().createPartitionedTopic(String.format("persistent://%s", topic), 1);
+
+        admin.namespaces().setMaxConsumersPerTopic(ns, 1);
+        admin.topicPolicies().setMaxConsumers(topic, 1);
+        NamespaceEventsSystemTopicFactory systemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarClient);
+        TopicPoliciesSystemTopicClient systemTopicClientForNamespace = systemTopicFactory
+                .createTopicPoliciesSystemTopicClient(NamespaceName.get(ns));
+        SystemTopicClient.Reader reader1 = systemTopicClientForNamespace.newReader();
+        SystemTopicClient.Reader reader2 = systemTopicClientForNamespace.newReader();
+
+        admin.topicPolicies().setMaxProducers(topic, 1);
+
+        CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writer1 = systemTopicClientForNamespace.newWriterAsync();
+        CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writer2 = systemTopicClientForNamespace.newWriterAsync();
+        CompletableFuture<Void> f1 = admin.topicPolicies().setCompactionThresholdAsync(topic, 1L);
+
+        List list = new ArrayList();
+        list.add(writer1);
+        list.add(writer2);
+        list.add(f1);
+        FutureUtil.waitForAll(list).join();
+        Assert.assertTrue(reader1.hasMoreEvents());
+        Assert.assertNotNull(reader1.readNext());
+        Assert.assertTrue(reader2.hasMoreEvents());
+        Assert.assertNotNull(reader2.readNext());
+        reader1.close();
+        reader2.close();
+        writer1.get().close();
+        writer2.get().close();
+    }
 }