You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/05/25 00:49:31 UTC

[pulsar] branch master updated: [Broker] Disable memory limit controller for broker client and replication clients (#15723)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 563a7cb2341 [Broker] Disable memory limit controller for broker client and replication clients (#15723)
563a7cb2341 is described below

commit 563a7cb2341e74deba4d2aa3ee0470c288ab49eb
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed May 25 03:49:21 2022 +0300

    [Broker] Disable memory limit controller for broker client and replication clients (#15723)
    
    - disable memory limit by default for broker client and replication clients
    - restore maxPendingMessages and maxPendingMessagesAcrossPartitions when memory limit is
      disabled so that pre-PIP-120 default configuration is restored when limit is disabled
---
 .../main/java/org/apache/pulsar/broker/PulsarService.java   |  4 ++++
 .../org/apache/pulsar/broker/service/BrokerService.java     |  5 +++++
 .../apache/pulsar/client/impl/MemoryLimitController.java    |  4 ++++
 .../org/apache/pulsar/client/impl/PulsarClientImpl.java     | 13 ++++++++++++-
 4 files changed, 25 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index b4e13f4fcf7..e3349a9f964 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1397,6 +1397,10 @@ public class PulsarService implements AutoCloseable, ShutdownService {
         if (this.client == null) {
             try {
                 ClientConfigurationData conf = new ClientConfigurationData();
+
+                // Disable memory limit for broker client
+                conf.setMemoryLimitBytes(0);
+
                 conf.setServiceUrl(this.getConfiguration().isTlsEnabled()
                                 ? this.brokerServiceUrlTls : this.brokerServiceUrl);
                 conf.setTlsAllowInsecureConnection(this.getConfiguration().isTlsAllowInsecureConnection());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index bb61e06eada..daea3e43828 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -126,6 +126,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
@@ -1176,6 +1177,10 @@ public class BrokerService implements Closeable {
                         .enableTcpNoDelay(false)
                         .connectionsPerBroker(pulsar.getConfiguration().getReplicationConnectionsPerBroker())
                         .statsInterval(0, TimeUnit.SECONDS);
+
+                // Disable memory limit for replication client
+                clientBuilder.memoryLimit(0, SizeUnit.BYTES);
+
                 if (data.getAuthenticationPlugin() != null && data.getAuthenticationParameters() != null) {
                     clientBuilder.authentication(data.getAuthenticationPlugin(), data.getAuthenticationParameters());
                 } else if (pulsar.getConfiguration().isAuthenticationEnabled()) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
index e4d8388a02e..086959078a5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
@@ -106,4 +106,8 @@ public class MemoryLimitController {
     public double currentUsagePercent() {
         return 1.0 * currentUsage.get() / memoryLimit;
     }
+
+    public boolean isMemoryLimited() {
+        return memoryLimit > 0;
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index e00f1f2e97c..b6f4050b452 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -95,6 +95,10 @@ public class PulsarClientImpl implements PulsarClient {
     private static final int CLOSE_TIMEOUT_SECONDS = 60;
     private static final double THRESHOLD_FOR_CONSUMER_RECEIVER_QUEUE_SIZE_SHRINKING = 0.95;
 
+    // default limits for producers when memory limit controller is disabled
+    private static final int NO_MEMORY_LIMIT_DEFAULT_MAX_PENDING_MESSAGES = 1000;
+    private static final int NO_MEMORY_LIMIT_DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = 50000;
+
     protected final ClientConfigurationData conf;
     private final boolean createdExecutorProviders;
     private LookupService lookup;
@@ -262,7 +266,14 @@ public class PulsarClientImpl implements PulsarClient {
 
     @Override
     public <T> ProducerBuilder<T> newProducer(Schema<T> schema) {
-        return new ProducerBuilderImpl<>(this, schema);
+        ProducerBuilderImpl<T> producerBuilder = new ProducerBuilderImpl<>(this, schema);
+        if (!memoryLimitController.isMemoryLimited()) {
+            // set default limits for producers when memory limit controller is disabled
+            producerBuilder.maxPendingMessages(NO_MEMORY_LIMIT_DEFAULT_MAX_PENDING_MESSAGES);
+            producerBuilder.maxPendingMessagesAcrossPartitions(
+                    NO_MEMORY_LIMIT_DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS);
+        }
+        return producerBuilder;
     }
 
     @Override