You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2020/03/10 17:55:32 UTC

[pulsar] branch master updated: [pulsar-broker] add support to configure max pending publish request per connection (#5742)

This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a9d56ea  [pulsar-broker] add support to configure max pending publish request per connection (#5742)
a9d56ea is described below

commit a9d56ea20ba938b31233edb87345e4a9f62b6041
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Tue Mar 10 10:55:18 2020 -0700

    [pulsar-broker] add support to configure max pending publish request per connection (#5742)
---
 conf/broker.conf                                               |  4 ++++
 conf/standalone.conf                                           |  4 ++++
 .../java/org/apache/pulsar/broker/ServiceConfiguration.java    |  6 ++++++
 .../main/java/org/apache/pulsar/broker/service/ServerCnx.java  | 10 ++++++----
 4 files changed, 20 insertions(+), 4 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 5989880..08f370c 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -114,6 +114,10 @@ brokerDeleteInactiveTopicsMode=delete_when_no_subscriptions
 # Topics that are inactive for longer than this value will be deleted
 brokerDeleteInactiveTopicsMaxInactiveDurationSeconds=
 
+# Max pending publish requests per connection to avoid keeping large number of pending 
+# requests in memory. Default: 1000
+maxPendingPublishdRequestsPerConnection=1000;
+
 # How frequently to proactively check and purge expired messages
 messageExpiryCheckIntervalInMinutes=5
 
diff --git a/conf/standalone.conf b/conf/standalone.conf
index f351d8d..81bbd43 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -75,6 +75,10 @@ brokerDeleteInactiveTopicsEnabled=true
 # How often to check for inactive topics
 brokerDeleteInactiveTopicsFrequencySeconds=60
 
+# Max pending publish requests per connection to avoid keeping large number of pending 
+# requests in memory. Default: 1000
+maxPendingPublishdRequestsPerConnection=1000;
+
 # How frequently to proactively check and purge expired messages
 messageExpiryCheckIntervalInMinutes=5
 
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 0d554f0..9d2d79d 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -276,6 +276,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
 
     @FieldContext(
         category = CATEGORY_POLICIES,
+        doc = "Max pending publish requests per connection to avoid keeping large number of pending "
+                + "requests in memory. Default: 1000"
+    )
+    private int maxPendingPublishdRequestsPerConnection = 1000;
+    @FieldContext(
+        category = CATEGORY_POLICIES,
         doc = "How frequently to proactively check and purge expired messages"
     )
     private int messageExpiryCheckIntervalInMinutes = 5;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 1037952..b91bce8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -142,8 +142,8 @@ public class ServerCnx extends PulsarHandler {
 
     // Max number of pending requests per connections. If multiple producers are sharing the same connection the flow
     // control done by a single producer might not be enough to prevent write spikes on the broker.
-    private static final int MaxPendingSendRequests = 1000;
-    private static final int ResumeReadsThreshold = MaxPendingSendRequests / 2;
+    private final int maxPendingSendRequests;
+    private final int resumeReadsThreshold;
     private int pendingSendRequest = 0;
     private final String replicatorPrefix;
     private String clientVersion = null;
@@ -183,6 +183,8 @@ public class ServerCnx extends PulsarHandler {
         this.authenticateOriginalAuthData = service.pulsar().getConfiguration().isAuthenticateOriginalAuthData();
         this.schemaValidationEnforced = pulsar.getConfiguration().isSchemaValidationEnforced();
         this.maxMessageSize = pulsar.getConfiguration().getMaxMessageSize();
+        this.maxPendingSendRequests = pulsar.getConfiguration().getMaxPendingPublishdRequestsPerConnection();
+        this.resumeReadsThreshold = maxPendingSendRequests / 2;
     }
 
     @Override
@@ -1759,7 +1761,7 @@ public class ServerCnx extends PulsarHandler {
     private void startSendOperation(Producer producer, int msgSize) {
         messagePublishBufferSize += msgSize;
         boolean isPublishRateExceeded = producer.getTopic().isPublishRateExceeded();
-        if (++pendingSendRequest == MaxPendingSendRequests || isPublishRateExceeded) {
+        if (++pendingSendRequest == maxPendingSendRequests || isPublishRateExceeded) {
             // When the quota of pending send requests is reached, stop reading from socket to cause backpressure on
             // client connection, possibly shared between multiple producers
             ctx.channel().config().setAutoRead(false);
@@ -1774,7 +1776,7 @@ public class ServerCnx extends PulsarHandler {
 
     void completedSendOperation(boolean isNonPersistentTopic, int msgSize) {
         messagePublishBufferSize -= msgSize;
-        if (--pendingSendRequest == ResumeReadsThreshold) {
+        if (--pendingSendRequest == resumeReadsThreshold) {
             // Resume reading from socket
             ctx.channel().config().setAutoRead(true);
             // triggers channel read if autoRead couldn't trigger it