You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/12/01 10:35:26 UTC

[pulsar] 02/02: [Broker] Use shared executors for broker and geo-replication clients (#13839)

This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1bd15b7bfd4fefb94c692153609b4bb4f318c98e
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Thu Jan 20 20:16:19 2022 +0200

    [Broker] Use shared executors for broker and geo-replication clients (#13839)
    
    * [Broker] Use shared executors for broker clients and geo-replication clients
    
    * Remove brokerClientNumIOThreads configuration key and default to 1
    
    * Revisit the shared timer creation
    
    - don't ever make it a daemon thread
    
    (cherry picked from commit 4924e6d54a8fa4fb1a01f48644c23c625d0407f2)
---
 .../org/apache/pulsar/broker/PulsarService.java    | 34 +++++++++++++++++++++-
 .../pulsar/broker/namespace/NamespaceService.java  |  3 +-
 .../pulsar/broker/service/BrokerService.java       |  3 +-
 3 files changed, 35 insertions(+), 5 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index a95ba68344a..9c799603df8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -31,6 +31,7 @@ import io.netty.channel.ChannelInitializer;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
@@ -234,6 +235,9 @@ public class PulsarService implements AutoCloseable, ShutdownService {
 
     private final Consumer<Integer> processTerminator;
     protected final EventLoopGroup ioEventLoopGroup;
+    private final ExecutorProvider brokerClientSharedInternalExecutorProvider;
+    private final ExecutorProvider brokerClientSharedExternalExecutorProvider;
+    private final Timer brokerClientSharedTimer;
 
     private MetricsGenerator metricsGenerator;
 
@@ -326,6 +330,18 @@ public class PulsarService implements AutoCloseable, ShutdownService {
 
         this.ioEventLoopGroup = EventLoopUtil.newEventLoopGroup(config.getNumIOThreads(), config.isEnableBusyWait(),
                 new DefaultThreadFactory("pulsar-io"));
+        // the internal executor is not used in the broker client or replication clients since this executor is
+        // used for consumers and the transaction support in the client.
+        // since an instance is required, a single threaded shared instance is used for all broker client instances
+        this.brokerClientSharedInternalExecutorProvider =
+                new ExecutorProvider(1, "broker-client-shared-internal-executor");
+        // the external executor is not used in the broker client or replication clients since this executor is
+        // used for consumer listeners.
+        // since an instance is required, a single threaded shared instance is used for all broker client instances
+        this.brokerClientSharedExternalExecutorProvider =
+                new ExecutorProvider(1, "broker-client-shared-external-executor");
+        this.brokerClientSharedTimer =
+                new HashedWheelTimer(new DefaultThreadFactory("broker-client-shared-timer"), 1, TimeUnit.MILLISECONDS);
     }
 
     public MetadataStore createConfigurationMetadataStore() throws MetadataStoreException {
@@ -518,6 +534,9 @@ public class PulsarService implements AutoCloseable, ShutdownService {
                 transactionExecutorProvider.shutdownNow();
             }
 
+            brokerClientSharedExternalExecutorProvider.shutdownNow();
+            brokerClientSharedInternalExecutorProvider.shutdownNow();
+            brokerClientSharedTimer.stop();
             ioEventLoopGroup.shutdownGracefully();
 
             // add timeout handling for closing executors
@@ -1344,6 +1363,17 @@ public class PulsarService implements AutoCloseable, ShutdownService {
         return this.offloaderScheduler;
     }
 
+    public PulsarClientImpl createClientImpl(ClientConfigurationData clientConf)
+            throws PulsarClientException {
+        return PulsarClientImpl.builder()
+                .conf(clientConf)
+                .eventLoopGroup(ioEventLoopGroup)
+                .timer(brokerClientSharedTimer)
+                .internalExecutorProvider(brokerClientSharedInternalExecutorProvider)
+                .externalExecutorProvider(brokerClientSharedExternalExecutorProvider)
+                .build();
+    }
+
     public synchronized PulsarClient getClient() throws PulsarServerException {
         if (this.client == null) {
             try {
@@ -1386,7 +1416,9 @@ public class PulsarService implements AutoCloseable, ShutdownService {
                             this.getConfiguration().getBrokerClientAuthenticationPlugin(),
                             this.getConfiguration().getBrokerClientAuthenticationParameters()));
                 }
-                this.client = new PulsarClientImpl(conf, ioEventLoopGroup);
+
+                conf.setStatsIntervalSeconds(0);
+                this.client = createClientImpl(conf);
             } catch (Exception e) {
                 throw new PulsarServerException(e);
             }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index fbef655d489..933e750242e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -25,7 +25,6 @@ import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import com.google.common.collect.Lists;
 import com.google.common.hash.Hashing;
-import io.netty.channel.EventLoopGroup;
 import io.prometheus.client.Counter;
 import java.net.URI;
 import java.net.URL;
@@ -1306,7 +1305,7 @@ public class NamespaceService implements AutoCloseable {
 
                 // Share all the IO threads across broker and client connections
                 ClientConfigurationData conf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData();
-                return new PulsarClientImpl(conf, (EventLoopGroup) pulsar.getBrokerService().executor());
+                return pulsar.createClientImpl(conf);
             } catch (Exception e) {
                 throw new RuntimeException(e);
             }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index dfd8db1a732..b32f627bd01 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -124,7 +124,6 @@ import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.internal.PropertiesUtils;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
@@ -1182,7 +1181,7 @@ public class BrokerService implements Closeable {
                 }
                 // Share all the IO threads across broker and client connections
                 ClientConfigurationData conf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData();
-                return new PulsarClientImpl(conf, workerGroup);
+                return pulsar.createClientImpl(conf);
             } catch (Exception e) {
                 throw new RuntimeException(e);
             }