You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/05/25 00:49:31 UTC
[pulsar] branch master updated: [Broker] Disable memory limit controller for broker client and replication clients (#15723)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 563a7cb2341 [Broker] Disable memory limit controller for broker client and replication clients (#15723)
563a7cb2341 is described below
commit 563a7cb2341e74deba4d2aa3ee0470c288ab49eb
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed May 25 03:49:21 2022 +0300
[Broker] Disable memory limit controller for broker client and replication clients (#15723)
- disable memory limit by default for broker client and replication clients
- restore maxPendingMessages and maxPendingMessagesAcrossPartitions when memory limit is
disabled so that pre-PIP-120 default configuration is restored when limit is disabled
---
.../main/java/org/apache/pulsar/broker/PulsarService.java | 4 ++++
.../org/apache/pulsar/broker/service/BrokerService.java | 5 +++++
.../apache/pulsar/client/impl/MemoryLimitController.java | 4 ++++
.../org/apache/pulsar/client/impl/PulsarClientImpl.java | 13 ++++++++++++-
4 files changed, 25 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index b4e13f4fcf7..e3349a9f964 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1397,6 +1397,10 @@ public class PulsarService implements AutoCloseable, ShutdownService {
if (this.client == null) {
try {
ClientConfigurationData conf = new ClientConfigurationData();
+
+ // Disable memory limit for broker client
+ conf.setMemoryLimitBytes(0);
+
conf.setServiceUrl(this.getConfiguration().isTlsEnabled()
? this.brokerServiceUrlTls : this.brokerServiceUrl);
conf.setTlsAllowInsecureConnection(this.getConfiguration().isTlsAllowInsecureConnection());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index bb61e06eada..daea3e43828 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -126,6 +126,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
@@ -1176,6 +1177,10 @@ public class BrokerService implements Closeable {
.enableTcpNoDelay(false)
.connectionsPerBroker(pulsar.getConfiguration().getReplicationConnectionsPerBroker())
.statsInterval(0, TimeUnit.SECONDS);
+
+ // Disable memory limit for replication client
+ clientBuilder.memoryLimit(0, SizeUnit.BYTES);
+
if (data.getAuthenticationPlugin() != null && data.getAuthenticationParameters() != null) {
clientBuilder.authentication(data.getAuthenticationPlugin(), data.getAuthenticationParameters());
} else if (pulsar.getConfiguration().isAuthenticationEnabled()) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
index e4d8388a02e..086959078a5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
@@ -106,4 +106,8 @@ public class MemoryLimitController {
public double currentUsagePercent() {
return 1.0 * currentUsage.get() / memoryLimit;
}
+
+ public boolean isMemoryLimited() {
+ return memoryLimit > 0;
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index e00f1f2e97c..b6f4050b452 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -95,6 +95,10 @@ public class PulsarClientImpl implements PulsarClient {
private static final int CLOSE_TIMEOUT_SECONDS = 60;
private static final double THRESHOLD_FOR_CONSUMER_RECEIVER_QUEUE_SIZE_SHRINKING = 0.95;
+ // default limits for producers when memory limit controller is disabled
+ private static final int NO_MEMORY_LIMIT_DEFAULT_MAX_PENDING_MESSAGES = 1000;
+ private static final int NO_MEMORY_LIMIT_DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = 50000;
+
protected final ClientConfigurationData conf;
private final boolean createdExecutorProviders;
private LookupService lookup;
@@ -262,7 +266,14 @@ public class PulsarClientImpl implements PulsarClient {
@Override
public <T> ProducerBuilder<T> newProducer(Schema<T> schema) {
- return new ProducerBuilderImpl<>(this, schema);
+ ProducerBuilderImpl<T> producerBuilder = new ProducerBuilderImpl<>(this, schema);
+ if (!memoryLimitController.isMemoryLimited()) {
+ // set default limits for producers when memory limit controller is disabled
+ producerBuilder.maxPendingMessages(NO_MEMORY_LIMIT_DEFAULT_MAX_PENDING_MESSAGES);
+ producerBuilder.maxPendingMessagesAcrossPartitions(
+ NO_MEMORY_LIMIT_DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS);
+ }
+ return producerBuilder;
}
@Override