You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/04/12 06:10:37 UTC

[pulsar] branch master updated: [WebSocket Proxy] Make websocket proxy thread pool size configurable (#15117)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 189d23581b7 [WebSocket Proxy] Make websocket proxy thread pool size configurable (#15117)
189d23581b7 is described below

commit 189d23581b738798a277e026dc85dfe60b8df4d4
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Tue Apr 12 09:10:31 2022 +0300

    [WebSocket Proxy] Make websocket proxy thread pool size configurable (#15117)
    
    - it was hard coded to 20 threads
    - make thread pool size configurable by webSocketNumServiceThreads
    - remove unused orderedExecutor
---
 conf/broker.conf                                              |  3 +++
 conf/websocket.conf                                           |  3 +++
 .../java/org/apache/pulsar/broker/ServiceConfiguration.java   |  5 +++++
 .../java/org/apache/pulsar/websocket/WebSocketService.java    | 11 ++++-------
 .../pulsar/websocket/service/WebSocketProxyConfiguration.java | 11 +++--------
 5 files changed, 18 insertions(+), 15 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 1f3da82ab16..f5cb1ade396 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1219,6 +1219,9 @@ webSocketServiceEnabled=false
 # Number of IO threads in Pulsar Client used in WebSocket proxy
 webSocketNumIoThreads=
 
+# Number of threads used by Websocket service
+webSocketNumServiceThreads=
+
 # Number of connections per Broker in Pulsar Client used in WebSocket proxy
 webSocketConnectionsPerBroker=
 
diff --git a/conf/websocket.conf b/conf/websocket.conf
index 7acbf01d83c..eff639838fe 100644
--- a/conf/websocket.conf
+++ b/conf/websocket.conf
@@ -52,6 +52,9 @@ clusterName=
 # Number of IO threads in Pulsar Client used in WebSocket proxy
 webSocketNumIoThreads=
 
+# Number of threads used by Websocket service
+webSocketNumServiceThreads=
+
 # Number of threads to use in HTTP server. Default is Runtime.getRuntime().availableProcessors()
 numHttpServerThreads=
 
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 48ab4c1f982..eb3f86ac02e 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2270,6 +2270,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
         doc = "Number of IO threads in Pulsar Client used in WebSocket proxy"
     )
     private int webSocketNumIoThreads = Runtime.getRuntime().availableProcessors();
+
+    @FieldContext(category = CATEGORY_WEBSOCKET,
+            doc = "Number of threads used by Websocket service")
+    private int webSocketNumServiceThreads = 20;
+
     @FieldContext(
         category = CATEGORY_WEBSOCKET,
         doc = "Number of connections per Broker in Pulsar Client used in WebSocket proxy"
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index 5a81d9f21a2..ab17239da02 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -28,7 +28,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import javax.servlet.ServletException;
 import javax.websocket.DeploymentException;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
@@ -59,11 +58,7 @@ public class WebSocketService implements Closeable {
     AuthorizationService authorizationService;
     PulsarClient pulsarClient;
 
-    private final ScheduledExecutorService executor = Executors
-            .newScheduledThreadPool(WebSocketProxyConfiguration.WEBSOCKET_SERVICE_THREADS,
-                    new DefaultThreadFactory("pulsar-websocket"));
-    private final OrderedScheduler orderedExecutor = OrderedScheduler.newSchedulerBuilder()
-            .numThreads(WebSocketProxyConfiguration.GLOBAL_ZK_THREADS).name("pulsar-websocket-ordered").build();
+    private final ScheduledExecutorService executor;
     private PulsarResources pulsarResources;
     private MetadataStoreExtended configMetadataStore;
     private ServiceConfiguration config;
@@ -80,6 +75,9 @@ public class WebSocketService implements Closeable {
 
     public WebSocketService(ClusterData localCluster, ServiceConfiguration config) {
         this.config = config;
+        this.executor = Executors
+                .newScheduledThreadPool(config.getWebSocketNumServiceThreads(),
+                        new DefaultThreadFactory("pulsar-websocket"));
         this.localCluster = localCluster;
         this.topicProducerMap =
                 ConcurrentOpenHashMap.<String,
@@ -145,7 +143,6 @@ public class WebSocketService implements Closeable {
         }
 
         executor.shutdown();
-        orderedExecutor.shutdown();
     }
 
     public AuthenticationService getAuthenticationService() {
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
index ad9b4be0228..e69400ef04a 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
@@ -31,14 +31,6 @@ import org.apache.pulsar.common.configuration.PulsarConfiguration;
 @Getter
 @Setter
 public class WebSocketProxyConfiguration implements PulsarConfiguration {
-
-    // Number of threads used by Proxy server
-    public static final int PROXY_SERVER_EXECUTOR_THREADS = 2 * Runtime.getRuntime().availableProcessors();
-    // Number of threads used by Websocket service
-    public static final int WEBSOCKET_SERVICE_THREADS = 20;
-    // Number of threads used by Global ZK
-    public static final int GLOBAL_ZK_THREADS = 8;
-
     @FieldContext(required = true, doc = "Name of the cluster to which this broker belongs to")
     private String clusterName;
 
@@ -140,6 +132,9 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration {
     @FieldContext(doc = "Number of threads to used in HTTP server")
     private int numHttpServerThreads = Math.max(6, Runtime.getRuntime().availableProcessors());
 
+    @FieldContext(doc = "Number of threads used by Websocket service")
+    private int webSocketNumServiceThreads = 20;
+
     @FieldContext(doc = "Number of connections per broker in Pulsar client used in WebSocket proxy")
     private int webSocketConnectionsPerBroker = Runtime.getRuntime().availableProcessors();