You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2020/03/10 17:55:32 UTC
[pulsar] branch master updated: [pulsar-broker] add support to
configure max pending publish request per connection (#5742)
This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a9d56ea [pulsar-broker] add support to configure max pending publish request per connection (#5742)
a9d56ea is described below
commit a9d56ea20ba938b31233edb87345e4a9f62b6041
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Tue Mar 10 10:55:18 2020 -0700
[pulsar-broker] add support to configure max pending publish request per connection (#5742)
---
conf/broker.conf | 4 ++++
conf/standalone.conf | 4 ++++
.../java/org/apache/pulsar/broker/ServiceConfiguration.java | 6 ++++++
.../main/java/org/apache/pulsar/broker/service/ServerCnx.java | 10 ++++++----
4 files changed, 20 insertions(+), 4 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 5989880..08f370c 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -114,6 +114,10 @@ brokerDeleteInactiveTopicsMode=delete_when_no_subscriptions
# Topics that are inactive for longer than this value will be deleted
brokerDeleteInactiveTopicsMaxInactiveDurationSeconds=
+# Max pending publish requests per connection to avoid keeping large number of pending
+# requests in memory. Default: 1000
+maxPendingPublishdRequestsPerConnection=1000;
+
# How frequently to proactively check and purge expired messages
messageExpiryCheckIntervalInMinutes=5
diff --git a/conf/standalone.conf b/conf/standalone.conf
index f351d8d..81bbd43 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -75,6 +75,10 @@ brokerDeleteInactiveTopicsEnabled=true
# How often to check for inactive topics
brokerDeleteInactiveTopicsFrequencySeconds=60
+# Max pending publish requests per connection to avoid keeping large number of pending
+# requests in memory. Default: 1000
+maxPendingPublishdRequestsPerConnection=1000;
+
# How frequently to proactively check and purge expired messages
messageExpiryCheckIntervalInMinutes=5
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 0d554f0..9d2d79d 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -276,6 +276,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
@FieldContext(
category = CATEGORY_POLICIES,
+ doc = "Max pending publish requests per connection to avoid keeping large number of pending "
+ + "requests in memory. Default: 1000"
+ )
+ private int maxPendingPublishdRequestsPerConnection = 1000;
+ @FieldContext(
+ category = CATEGORY_POLICIES,
doc = "How frequently to proactively check and purge expired messages"
)
private int messageExpiryCheckIntervalInMinutes = 5;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 1037952..b91bce8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -142,8 +142,8 @@ public class ServerCnx extends PulsarHandler {
// Max number of pending requests per connections. If multiple producers are sharing the same connection the flow
// control done by a single producer might not be enough to prevent write spikes on the broker.
- private static final int MaxPendingSendRequests = 1000;
- private static final int ResumeReadsThreshold = MaxPendingSendRequests / 2;
+ private final int maxPendingSendRequests;
+ private final int resumeReadsThreshold;
private int pendingSendRequest = 0;
private final String replicatorPrefix;
private String clientVersion = null;
@@ -183,6 +183,8 @@ public class ServerCnx extends PulsarHandler {
this.authenticateOriginalAuthData = service.pulsar().getConfiguration().isAuthenticateOriginalAuthData();
this.schemaValidationEnforced = pulsar.getConfiguration().isSchemaValidationEnforced();
this.maxMessageSize = pulsar.getConfiguration().getMaxMessageSize();
+ this.maxPendingSendRequests = pulsar.getConfiguration().getMaxPendingPublishdRequestsPerConnection();
+ this.resumeReadsThreshold = maxPendingSendRequests / 2;
}
@Override
@@ -1759,7 +1761,7 @@ public class ServerCnx extends PulsarHandler {
private void startSendOperation(Producer producer, int msgSize) {
messagePublishBufferSize += msgSize;
boolean isPublishRateExceeded = producer.getTopic().isPublishRateExceeded();
- if (++pendingSendRequest == MaxPendingSendRequests || isPublishRateExceeded) {
+ if (++pendingSendRequest == maxPendingSendRequests || isPublishRateExceeded) {
// When the quota of pending send requests is reached, stop reading from socket to cause backpressure on
// client connection, possibly shared between multiple producers
ctx.channel().config().setAutoRead(false);
@@ -1774,7 +1776,7 @@ public class ServerCnx extends PulsarHandler {
void completedSendOperation(boolean isNonPersistentTopic, int msgSize) {
messagePublishBufferSize -= msgSize;
- if (--pendingSendRequest == ResumeReadsThreshold) {
+ if (--pendingSendRequest == resumeReadsThreshold) {
// Resume reading from socket
ctx.channel().config().setAutoRead(true);
// triggers channel read if autoRead couldn't trigger it