You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/06/06 08:16:55 UTC

[pulsar] 01/06: [Broker, Functions, Websocket] Disable memory limit controller in internal Pulsar clients (#15752)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a85fdcec5cb3ed311f50206ac650ae0cc0f68b94
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Fri May 27 10:52:13 2022 +0300

    [Broker, Functions, Websocket] Disable memory limit controller in internal Pulsar clients (#15752)
    
    (cherry picked from commit ec52320f16b1f7e04f4cef16dc3082779dfd4d50)
---
 .../java/org/apache/pulsar/broker/namespace/NamespaceService.java   | 6 ++++--
 .../src/main/java/org/apache/pulsar/compaction/CompactorTool.java   | 5 +++--
 .../java/org/apache/pulsar/functions/instance/InstanceUtils.java    | 4 +++-
 .../pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java   | 5 ++++-
 .../main/java/org/apache/pulsar/functions/worker/WorkerUtils.java   | 5 ++++-
 .../src/main/java/org/apache/pulsar/websocket/WebSocketService.java | 2 ++
 6 files changed, 20 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 58abf48d9b0..9cc427aae67 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -63,6 +63,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
@@ -1253,8 +1254,9 @@ public class NamespaceService implements AutoCloseable {
         return namespaceClients.computeIfAbsent(cluster, key -> {
             try {
                 ClientBuilder clientBuilder = PulsarClient.builder()
-                    .enableTcpNoDelay(false)
-                    .statsInterval(0, TimeUnit.SECONDS);
+                        .memoryLimit(0, SizeUnit.BYTES)
+                        .enableTcpNoDelay(false)
+                        .statsInterval(0, TimeUnit.SECONDS);
 
                 // Apply all arbitrary configuration. This must be called before setting any fields annotated as
                 // @Secret on the ClientConfigurationData object because of the way they are serialized.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
index 691217d9d77..59dd4c20aab 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.client.internal.PropertiesUtils;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.util.CmdGenerateDocs;
@@ -104,8 +105,8 @@ public class CompactorTool {
             );
         }
 
-        ClientBuilder clientBuilder = PulsarClient.builder();
-
+        ClientBuilder clientBuilder = PulsarClient.builder()
+                .memoryLimit(0, SizeUnit.BYTES);
         // Apply all arbitrary configuration. This must be called before setting any fields annotated as
         // @Secret on the ClientConfigurationData object because of the way they are serialized.
         // See https://github.com/apache/pulsar/issues/8509 for more information.
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
index d52c7dbefdb..d19e79d4026 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
@@ -161,7 +161,9 @@ public class InstanceUtils {
                                                           Optional<Long> memoryLimit) throws PulsarClientException {
         ClientBuilder clientBuilder = null;
         if (isNotBlank(pulsarServiceUrl)) {
-            clientBuilder = PulsarClient.builder().serviceUrl(pulsarServiceUrl);
+            clientBuilder = PulsarClient.builder()
+                    .memoryLimit(0, SizeUnit.BYTES)
+                    .serviceUrl(pulsarServiceUrl);
             if (authConfig != null) {
                 if (isNotBlank(authConfig.getClientAuthenticationPlugin())
                         && isNotBlank(authConfig.getClientAuthenticationParameters())) {
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java
index c86e1d5f2d6..9462463c05a 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java
@@ -36,6 +36,8 @@ import org.testng.annotations.Test;
 import java.util.Map;
 import java.util.Optional;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mockStatic;
@@ -79,7 +81,7 @@ public class ThreadRuntimeFactoryTest {
 
         ClientBuilder clientBuilder = testMemoryLimit(null, null);
 
-        Mockito.verify(clientBuilder, Mockito.times(0)).memoryLimit(Mockito.anyLong(), Mockito.any());
+        Mockito.verify(clientBuilder, Mockito.times(1)).memoryLimit(Mockito.eq(0L), Mockito.eq(SizeUnit.BYTES));
     }
 
     @Test
@@ -110,6 +112,7 @@ public class ThreadRuntimeFactoryTest {
             ClientBuilder clientBuilder = Mockito.mock(ClientBuilder.class);
             mockedPulsarClient.when(() -> PulsarClient.builder()).thenAnswer(i -> clientBuilder);
             doReturn(clientBuilder).when(clientBuilder).serviceUrl(anyString());
+            doReturn(clientBuilder).when(clientBuilder).memoryLimit(anyLong(), any());
 
             ThreadRuntimeFactoryConfig threadRuntimeFactoryConfig = new ThreadRuntimeFactoryConfig();
             threadRuntimeFactoryConfig.setThreadGroupName("foo");
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index d50d57ee27c..828b4e16516 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
+import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.client.internal.PropertiesUtils;
 import org.apache.pulsar.common.conf.InternalConfigurationData;
 import org.apache.pulsar.common.functions.WorkerInfo;
@@ -295,7 +296,9 @@ public final class WorkerUtils {
                                                WorkerConfig workerConfig) {
 
         try {
-            ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(pulsarServiceUrl);
+            ClientBuilder clientBuilder = PulsarClient.builder()
+                    .memoryLimit(0, SizeUnit.BYTES)
+                    .serviceUrl(pulsarServiceUrl);
 
             if (workerConfig != null) {
                 // Apply all arbitrary configuration. This must be called before setting any fields annotated as
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index dbac405a0e0..a57c6c491e7 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.client.internal.PropertiesUtils;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -175,6 +176,7 @@ public class WebSocketService implements Closeable {
 
     private PulsarClient createClientInstance(ClusterData clusterData) throws IOException {
         ClientBuilder clientBuilder = PulsarClient.builder() //
+                .memoryLimit(0, SizeUnit.BYTES)
                 .statsInterval(0, TimeUnit.SECONDS) //
                 .enableTls(config.isTlsEnabled()) //
                 .allowTlsInsecureConnection(config.isTlsAllowInsecureConnection()) //