You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/12/06 15:05:27 UTC
[pulsar] branch branch-2.9 updated: [improve][broker] System topic writer/reader connection not counted. (#18369)
This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 1d01714348d [improve][broker] System topic writer/reader connection not counted. (#18369)
1d01714348d is described below
commit 1d01714348d3995ec5199665a1fa2a1220911688
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Wed Nov 23 14:23:57 2022 +0800
[improve][broker] System topic writer/reader connection not counted. (#18369)
(cherry picked from commit b219ccac334c4b57df48d07e78ea381f54db1a7d)
---
.../pulsar/broker/service/AbstractTopic.java | 19 ++++++++--
.../broker/service/persistent/PersistentTopic.java | 3 ++
.../pulsar/broker/service/ReplicatorTest.java | 32 +++++++++++++++++
.../systopic/PartitionedSystemTopicTest.java | 40 +++++++++++++++++++++-
4 files changed, 90 insertions(+), 4 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 7c8d3b24223..eb06a41d42f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -164,7 +164,10 @@ public abstract class AbstractTopic implements Topic {
}
}
- protected boolean isProducersExceeded() {
+ protected boolean isProducersExceeded(Producer producer) {
+ if (isSystemTopic() || producer.isRemote()) {
+ return false;
+ }
Integer maxProducers = getTopicPolicies().map(TopicPolicies::getMaxProducerPerTopic).orElse(null);
if (maxProducers == null) {
@@ -175,12 +178,16 @@ public abstract class AbstractTopic implements Topic {
}
maxProducers = maxProducers != null ? maxProducers : brokerService.pulsar()
.getConfiguration().getMaxProducersPerTopic();
- if (maxProducers > 0 && maxProducers <= producers.size()) {
+ if (maxProducers > 0 && maxProducers <= producers.size() && maxProducers <= getUserCreatedProducersSize()) {
return true;
}
return false;
}
+ private long getUserCreatedProducersSize() {
+ return producers.values().stream().filter(p -> !p.isRemote()).count();
+ }
+
protected boolean isSameAddressProducersExceeded(Producer producer) {
final int maxSameAddressProducers = brokerService.pulsar().getConfiguration()
.getMaxSameAddressProducersPerTopic();
@@ -206,6 +213,9 @@ public abstract class AbstractTopic implements Topic {
}
protected boolean isConsumersExceededOnTopic() {
+ if (isSystemTopic()) {
+ return false;
+ }
Integer maxConsumers = getTopicPolicies().map(TopicPolicies::getMaxConsumerPerTopic).orElse(null);
if (maxConsumers == null) {
@@ -225,6 +235,9 @@ public abstract class AbstractTopic implements Topic {
}
protected boolean isSameAddressConsumersExceededOnTopic(Consumer consumer) {
+ if (isSystemTopic()) {
+ return false;
+ }
final int maxSameAddressConsumers = brokerService.pulsar().getConfiguration()
.getMaxSameAddressConsumersPerTopic();
@@ -652,7 +665,7 @@ public abstract class AbstractTopic implements Topic {
}
protected void internalAddProducer(Producer producer) throws BrokerServiceException {
- if (isProducersExceeded()) {
+ if (isProducersExceeded(producer)) {
log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic);
throw new BrokerServiceException.ProducerBusyException("Topic reached max producers limit");
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index df90547c723..f630ed32d47 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3284,6 +3284,9 @@ public class PersistentTopic extends AbstractTopic
}
private boolean checkMaxSubscriptionsPerTopicExceed(String subscriptionName) {
+ if (isSystemTopic()) {
+ return false;
+ }
//Existing subscriptions are not affected
if (StringUtils.isNotEmpty(subscriptionName) && getSubscription(subscriptionName) != null) {
return false;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index d9ac04c3fc2..8fa267c4e35 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -73,6 +73,7 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.api.Schema;
@@ -1394,4 +1395,35 @@ public class ReplicatorTest extends ReplicatorTestBase {
private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class);
+ @Test
+ public void testReplicatorProducerNotExceed() throws Exception {
+ log.info("--- testReplicatorProducerNotExceed ---");
+ String namespace1 = "pulsar/ns11";
+ admin1.namespaces().createNamespace(namespace1);
+ admin1.namespaces().setNamespaceReplicationClusters(namespace1, Sets.newHashSet("r1", "r2"));
+ final TopicName dest1 = TopicName.get(
+ BrokerTestUtil.newUniqueName("persistent://" + namespace1 + "/testReplicatorProducerNotExceed1"));
+ String namespace2 = "pulsar/ns22";
+ admin2.namespaces().createNamespace(namespace2);
+ admin2.namespaces().setNamespaceReplicationClusters(namespace2, Sets.newHashSet("r1", "r2"));
+ final TopicName dest2 = TopicName.get(
+ BrokerTestUtil.newUniqueName("persistent://" + namespace1 + "/testReplicatorProducerNotExceed2"));
+ admin1.topics().createPartitionedTopic(dest1.toString(), 1);
+ admin1.topicPolicies().setMaxProducers(dest1.toString(), 1);
+ admin2.topics().createPartitionedTopic(dest2.toString(), 1);
+ admin2.topicPolicies().setMaxProducers(dest2.toString(), 1);
+ @Cleanup
+ MessageProducer producer1 = new MessageProducer(url1, dest1);
+ log.info("--- Starting producer1 --- " + url1);
+
+ producer1.produce(1);
+
+ @Cleanup
+ MessageProducer producer2 = new MessageProducer(url2, dest2);
+ log.info("--- Starting producer2 --- " + url2);
+
+ producer2.produce(1);
+
+ Assert.assertThrows(PulsarClientException.ProducerBusyException.class, () -> new MessageProducer(url2, dest2));
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index cf45d614f0f..ce4f44dd339 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.events.EventsTopicNames;
+import org.apache.pulsar.common.events.PulsarEvent;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -179,7 +180,7 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
}
@Test
- private void testSetBacklogCausedCreatingProducerFailure() throws Exception {
+ public void testSetBacklogCausedCreatingProducerFailure() throws Exception {
final String ns = "prop/ns-test";
final String topic = ns + "/topic-1";
@@ -234,4 +235,41 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
Assert.fail("failed to create producer");
}
}
+
+ @Test
+ public void testSystemTopicNotCheckExceed() throws Exception {
+ final String ns = "prop/ns-test";
+ final String topic = ns + "/topic-1";
+
+ admin.namespaces().createNamespace(ns, 2);
+ admin.topics().createPartitionedTopic(String.format("persistent://%s", topic), 1);
+
+ admin.namespaces().setMaxConsumersPerTopic(ns, 1);
+ admin.topicPolicies().setMaxConsumers(topic, 1);
+ NamespaceEventsSystemTopicFactory systemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarClient);
+ TopicPoliciesSystemTopicClient systemTopicClientForNamespace = systemTopicFactory
+ .createTopicPoliciesSystemTopicClient(NamespaceName.get(ns));
+ SystemTopicClient.Reader reader1 = systemTopicClientForNamespace.newReader();
+ SystemTopicClient.Reader reader2 = systemTopicClientForNamespace.newReader();
+
+ admin.topicPolicies().setMaxProducers(topic, 1);
+
+ CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writer1 = systemTopicClientForNamespace.newWriterAsync();
+ CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writer2 = systemTopicClientForNamespace.newWriterAsync();
+ CompletableFuture<Void> f1 = admin.topicPolicies().setCompactionThresholdAsync(topic, 1L);
+
+ List list = new ArrayList();
+ list.add(writer1);
+ list.add(writer2);
+ list.add(f1);
+ FutureUtil.waitForAll(list).join();
+ Assert.assertTrue(reader1.hasMoreEvents());
+ Assert.assertNotNull(reader1.readNext());
+ Assert.assertTrue(reader2.hasMoreEvents());
+ Assert.assertNotNull(reader2.readNext());
+ reader1.close();
+ reader2.close();
+ writer1.get().close();
+ writer2.get().close();
+ }
}