You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/12/01 10:35:26 UTC
[pulsar] 02/02: [Broker] Use shared executors for broker and geo-replication clients (#13839)
This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 1bd15b7bfd4fefb94c692153609b4bb4f318c98e
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Thu Jan 20 20:16:19 2022 +0200
[Broker] Use shared executors for broker and geo-replication clients (#13839)
* [Broker] Use shared executors for broker clients and geo-replication clients
* Remove brokerClientNumIOThreads configuration key and default to 1
* Revisit the shared timer creation
- don't ever make it a daemon thread
(cherry picked from commit 4924e6d54a8fa4fb1a01f48644c23c625d0407f2)
---
.../org/apache/pulsar/broker/PulsarService.java | 34 +++++++++++++++++++++-
.../pulsar/broker/namespace/NamespaceService.java | 3 +-
.../pulsar/broker/service/BrokerService.java | 3 +-
3 files changed, 35 insertions(+), 5 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index a95ba68344a..9c799603df8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -31,6 +31,7 @@ import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.lang.reflect.Constructor;
@@ -234,6 +235,9 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private final Consumer<Integer> processTerminator;
protected final EventLoopGroup ioEventLoopGroup;
+ private final ExecutorProvider brokerClientSharedInternalExecutorProvider;
+ private final ExecutorProvider brokerClientSharedExternalExecutorProvider;
+ private final Timer brokerClientSharedTimer;
private MetricsGenerator metricsGenerator;
@@ -326,6 +330,18 @@ public class PulsarService implements AutoCloseable, ShutdownService {
this.ioEventLoopGroup = EventLoopUtil.newEventLoopGroup(config.getNumIOThreads(), config.isEnableBusyWait(),
new DefaultThreadFactory("pulsar-io"));
+ // the internal executor is not used in the broker client or replication clients since this executor is
+ // used for consumers and the transaction support in the client.
+ // since an instance is required, a single threaded shared instance is used for all broker client instances
+ this.brokerClientSharedInternalExecutorProvider =
+ new ExecutorProvider(1, "broker-client-shared-internal-executor");
+ // the external executor is not used in the broker client or replication clients since this executor is
+ // used for consumer listeners.
+ // since an instance is required, a single threaded shared instance is used for all broker client instances
+ this.brokerClientSharedExternalExecutorProvider =
+ new ExecutorProvider(1, "broker-client-shared-external-executor");
+ this.brokerClientSharedTimer =
+ new HashedWheelTimer(new DefaultThreadFactory("broker-client-shared-timer"), 1, TimeUnit.MILLISECONDS);
}
public MetadataStore createConfigurationMetadataStore() throws MetadataStoreException {
@@ -518,6 +534,9 @@ public class PulsarService implements AutoCloseable, ShutdownService {
transactionExecutorProvider.shutdownNow();
}
+ brokerClientSharedExternalExecutorProvider.shutdownNow();
+ brokerClientSharedInternalExecutorProvider.shutdownNow();
+ brokerClientSharedTimer.stop();
ioEventLoopGroup.shutdownGracefully();
// add timeout handling for closing executors
@@ -1344,6 +1363,17 @@ public class PulsarService implements AutoCloseable, ShutdownService {
return this.offloaderScheduler;
}
+ public PulsarClientImpl createClientImpl(ClientConfigurationData clientConf)
+ throws PulsarClientException {
+ return PulsarClientImpl.builder()
+ .conf(clientConf)
+ .eventLoopGroup(ioEventLoopGroup)
+ .timer(brokerClientSharedTimer)
+ .internalExecutorProvider(brokerClientSharedInternalExecutorProvider)
+ .externalExecutorProvider(brokerClientSharedExternalExecutorProvider)
+ .build();
+ }
+
public synchronized PulsarClient getClient() throws PulsarServerException {
if (this.client == null) {
try {
@@ -1386,7 +1416,9 @@ public class PulsarService implements AutoCloseable, ShutdownService {
this.getConfiguration().getBrokerClientAuthenticationPlugin(),
this.getConfiguration().getBrokerClientAuthenticationParameters()));
}
- this.client = new PulsarClientImpl(conf, ioEventLoopGroup);
+
+ conf.setStatsIntervalSeconds(0);
+ this.client = createClientImpl(conf);
} catch (Exception e) {
throw new PulsarServerException(e);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index fbef655d489..933e750242e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -25,7 +25,6 @@ import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
-import io.netty.channel.EventLoopGroup;
import io.prometheus.client.Counter;
import java.net.URI;
import java.net.URL;
@@ -1306,7 +1305,7 @@ public class NamespaceService implements AutoCloseable {
// Share all the IO threads across broker and client connections
ClientConfigurationData conf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData();
- return new PulsarClientImpl(conf, (EventLoopGroup) pulsar.getBrokerService().executor());
+ return pulsar.createClientImpl(conf);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index dfd8db1a732..b32f627bd01 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -124,7 +124,6 @@ import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.internal.PropertiesUtils;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
@@ -1182,7 +1181,7 @@ public class BrokerService implements Closeable {
}
// Share all the IO threads across broker and client connections
ClientConfigurationData conf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData();
- return new PulsarClientImpl(conf, workerGroup);
+ return pulsar.createClientImpl(conf);
} catch (Exception e) {
throw new RuntimeException(e);
}