You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/04/12 06:10:37 UTC
[pulsar] branch master updated: [WebSocket Proxy] Make websocket proxy thread pool size configurable (#15117)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 189d23581b7 [WebSocket Proxy] Make websocket proxy thread pool size configurable (#15117)
189d23581b7 is described below
commit 189d23581b738798a277e026dc85dfe60b8df4d4
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Tue Apr 12 09:10:31 2022 +0300
[WebSocket Proxy] Make websocket proxy thread pool size configurable (#15117)
- it was hard coded to 20 threads
- make thread pool size configurable by webSocketNumServiceThreads
- remove unused orderedExecutor
---
conf/broker.conf | 3 +++
conf/websocket.conf | 3 +++
.../java/org/apache/pulsar/broker/ServiceConfiguration.java | 5 +++++
.../java/org/apache/pulsar/websocket/WebSocketService.java | 11 ++++-------
.../pulsar/websocket/service/WebSocketProxyConfiguration.java | 11 +++--------
5 files changed, 18 insertions(+), 15 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 1f3da82ab16..f5cb1ade396 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1219,6 +1219,9 @@ webSocketServiceEnabled=false
# Number of IO threads in Pulsar Client used in WebSocket proxy
webSocketNumIoThreads=
+# Number of threads used by Websocket service
+webSocketNumServiceThreads=
+
# Number of connections per Broker in Pulsar Client used in WebSocket proxy
webSocketConnectionsPerBroker=
diff --git a/conf/websocket.conf b/conf/websocket.conf
index 7acbf01d83c..eff639838fe 100644
--- a/conf/websocket.conf
+++ b/conf/websocket.conf
@@ -52,6 +52,9 @@ clusterName=
# Number of IO threads in Pulsar Client used in WebSocket proxy
webSocketNumIoThreads=
+# Number of threads used by Websocket service
+webSocketNumServiceThreads=
+
# Number of threads to use in HTTP server. Default is Runtime.getRuntime().availableProcessors()
numHttpServerThreads=
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 48ab4c1f982..eb3f86ac02e 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2270,6 +2270,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Number of IO threads in Pulsar Client used in WebSocket proxy"
)
private int webSocketNumIoThreads = Runtime.getRuntime().availableProcessors();
+
+ @FieldContext(category = CATEGORY_WEBSOCKET,
+ doc = "Number of threads used by Websocket service")
+ private int webSocketNumServiceThreads = 20;
+
@FieldContext(
category = CATEGORY_WEBSOCKET,
doc = "Number of connections per Broker in Pulsar Client used in WebSocket proxy"
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index 5a81d9f21a2..ab17239da02 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -28,7 +28,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.websocket.DeploymentException;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationService;
@@ -59,11 +58,7 @@ public class WebSocketService implements Closeable {
AuthorizationService authorizationService;
PulsarClient pulsarClient;
- private final ScheduledExecutorService executor = Executors
- .newScheduledThreadPool(WebSocketProxyConfiguration.WEBSOCKET_SERVICE_THREADS,
- new DefaultThreadFactory("pulsar-websocket"));
- private final OrderedScheduler orderedExecutor = OrderedScheduler.newSchedulerBuilder()
- .numThreads(WebSocketProxyConfiguration.GLOBAL_ZK_THREADS).name("pulsar-websocket-ordered").build();
+ private final ScheduledExecutorService executor;
private PulsarResources pulsarResources;
private MetadataStoreExtended configMetadataStore;
private ServiceConfiguration config;
@@ -80,6 +75,9 @@ public class WebSocketService implements Closeable {
public WebSocketService(ClusterData localCluster, ServiceConfiguration config) {
this.config = config;
+ this.executor = Executors
+ .newScheduledThreadPool(config.getWebSocketNumServiceThreads(),
+ new DefaultThreadFactory("pulsar-websocket"));
this.localCluster = localCluster;
this.topicProducerMap =
ConcurrentOpenHashMap.<String,
@@ -145,7 +143,6 @@ public class WebSocketService implements Closeable {
}
executor.shutdown();
- orderedExecutor.shutdown();
}
public AuthenticationService getAuthenticationService() {
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
index ad9b4be0228..e69400ef04a 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
@@ -31,14 +31,6 @@ import org.apache.pulsar.common.configuration.PulsarConfiguration;
@Getter
@Setter
public class WebSocketProxyConfiguration implements PulsarConfiguration {
-
- // Number of threads used by Proxy server
- public static final int PROXY_SERVER_EXECUTOR_THREADS = 2 * Runtime.getRuntime().availableProcessors();
- // Number of threads used by Websocket service
- public static final int WEBSOCKET_SERVICE_THREADS = 20;
- // Number of threads used by Global ZK
- public static final int GLOBAL_ZK_THREADS = 8;
-
@FieldContext(required = true, doc = "Name of the cluster to which this broker belongs to")
private String clusterName;
@@ -140,6 +132,9 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration {
@FieldContext(doc = "Number of threads to used in HTTP server")
private int numHttpServerThreads = Math.max(6, Runtime.getRuntime().availableProcessors());
+ @FieldContext(doc = "Number of threads used by Websocket service")
+ private int webSocketNumServiceThreads = 20;
+
@FieldContext(doc = "Number of connections per broker in Pulsar client used in WebSocket proxy")
private int webSocketConnectionsPerBroker = Runtime.getRuntime().availableProcessors();