You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/06/06 08:16:55 UTC
[pulsar] 01/06: [Broker, Functions, Websocket] Disable memory limit controller in internal Pulsar clients (#15752)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a85fdcec5cb3ed311f50206ac650ae0cc0f68b94
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Fri May 27 10:52:13 2022 +0300
[Broker, Functions, Websocket] Disable memory limit controller in internal Pulsar clients (#15752)
(cherry picked from commit ec52320f16b1f7e04f4cef16dc3082779dfd4d50)
---
.../java/org/apache/pulsar/broker/namespace/NamespaceService.java | 6 ++++--
.../src/main/java/org/apache/pulsar/compaction/CompactorTool.java | 5 +++--
.../java/org/apache/pulsar/functions/instance/InstanceUtils.java | 4 +++-
.../pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java | 5 ++++-
.../main/java/org/apache/pulsar/functions/worker/WorkerUtils.java | 5 ++++-
.../src/main/java/org/apache/pulsar/websocket/WebSocketService.java | 2 ++
6 files changed, 20 insertions(+), 7 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 58abf48d9b0..9cc427aae67 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -63,6 +63,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
@@ -1253,8 +1254,9 @@ public class NamespaceService implements AutoCloseable {
return namespaceClients.computeIfAbsent(cluster, key -> {
try {
ClientBuilder clientBuilder = PulsarClient.builder()
- .enableTcpNoDelay(false)
- .statsInterval(0, TimeUnit.SECONDS);
+ .memoryLimit(0, SizeUnit.BYTES)
+ .enableTcpNoDelay(false)
+ .statsInterval(0, TimeUnit.SECONDS);
// Apply all arbitrary configuration. This must be called before setting any fields annotated as
// @Secret on the ClientConfigurationData object because of the way they are serialized.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
index 691217d9d77..59dd4c20aab 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.internal.PropertiesUtils;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.util.CmdGenerateDocs;
@@ -104,8 +105,8 @@ public class CompactorTool {
);
}
- ClientBuilder clientBuilder = PulsarClient.builder();
-
+ ClientBuilder clientBuilder = PulsarClient.builder()
+ .memoryLimit(0, SizeUnit.BYTES);
// Apply all arbitrary configuration. This must be called before setting any fields annotated as
// @Secret on the ClientConfigurationData object because of the way they are serialized.
// See https://github.com/apache/pulsar/issues/8509 for more information.
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
index d52c7dbefdb..d19e79d4026 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
@@ -161,7 +161,9 @@ public class InstanceUtils {
Optional<Long> memoryLimit) throws PulsarClientException {
ClientBuilder clientBuilder = null;
if (isNotBlank(pulsarServiceUrl)) {
- clientBuilder = PulsarClient.builder().serviceUrl(pulsarServiceUrl);
+ clientBuilder = PulsarClient.builder()
+ .memoryLimit(0, SizeUnit.BYTES)
+ .serviceUrl(pulsarServiceUrl);
if (authConfig != null) {
if (isNotBlank(authConfig.getClientAuthenticationPlugin())
&& isNotBlank(authConfig.getClientAuthenticationParameters())) {
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java
index c86e1d5f2d6..9462463c05a 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java
@@ -36,6 +36,8 @@ import org.testng.annotations.Test;
import java.util.Map;
import java.util.Optional;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mockStatic;
@@ -79,7 +81,7 @@ public class ThreadRuntimeFactoryTest {
ClientBuilder clientBuilder = testMemoryLimit(null, null);
- Mockito.verify(clientBuilder, Mockito.times(0)).memoryLimit(Mockito.anyLong(), Mockito.any());
+ Mockito.verify(clientBuilder, Mockito.times(1)).memoryLimit(Mockito.eq(0L), Mockito.eq(SizeUnit.BYTES));
}
@Test
@@ -110,6 +112,7 @@ public class ThreadRuntimeFactoryTest {
ClientBuilder clientBuilder = Mockito.mock(ClientBuilder.class);
mockedPulsarClient.when(() -> PulsarClient.builder()).thenAnswer(i -> clientBuilder);
doReturn(clientBuilder).when(clientBuilder).serviceUrl(anyString());
+ doReturn(clientBuilder).when(clientBuilder).memoryLimit(anyLong(), any());
ThreadRuntimeFactoryConfig threadRuntimeFactoryConfig = new ThreadRuntimeFactoryConfig();
threadRuntimeFactoryConfig.setThreadGroupName("foo");
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index d50d57ee27c..828b4e16516 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
+import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.internal.PropertiesUtils;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.functions.WorkerInfo;
@@ -295,7 +296,9 @@ public final class WorkerUtils {
WorkerConfig workerConfig) {
try {
- ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(pulsarServiceUrl);
+ ClientBuilder clientBuilder = PulsarClient.builder()
+ .memoryLimit(0, SizeUnit.BYTES)
+ .serviceUrl(pulsarServiceUrl);
if (workerConfig != null) {
// Apply all arbitrary configuration. This must be called before setting any fields annotated as
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index dbac405a0e0..a57c6c491e7 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.internal.PropertiesUtils;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -175,6 +176,7 @@ public class WebSocketService implements Closeable {
private PulsarClient createClientInstance(ClusterData clusterData) throws IOException {
ClientBuilder clientBuilder = PulsarClient.builder() //
+ .memoryLimit(0, SizeUnit.BYTES)
.statsInterval(0, TimeUnit.SECONDS) //
.enableTls(config.isTlsEnabled()) //
.allowTlsInsecureConnection(config.isTlsAllowInsecureConnection()) //