You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/10/27 03:30:04 UTC
[pulsar] branch master updated: [improve][broker] Add UncaughtExceptionHandler for every thread pool (#18211)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5b7c5c62965 [improve][broker] Add UncaughtExceptionHandler for every thread pool (#18211)
5b7c5c62965 is described below
commit 5b7c5c62965151c35d9e5b9f0b50bb93b0beb2c3
Author: feynmanlin <31...@qq.com>
AuthorDate: Thu Oct 27 11:29:53 2022 +0800
[improve][broker] Add UncaughtExceptionHandler for every thread pool (#18211)
---
.../org/apache/pulsar/broker/PulsarService.java | 11 ++++----
.../broker/TransactionMetadataStoreService.java | 4 +--
.../loadbalance/impl/ModularLoadManagerImpl.java | 5 ++--
.../loadbalance/impl/SimpleLoadManagerImpl.java | 4 +--
.../pulsar/broker/service/BrokerService.java | 33 ++++++++++++----------
.../metrics/PrometheusMetricsProvider.java | 5 ++--
.../pulsar/broker/tools/LoadReportCommand.java | 4 ++-
.../pulsar/client/impl/AutoClusterFailover.java | 4 +--
.../client/impl/ControlledClusterFailover.java | 4 +--
.../pulsar/client/impl/PulsarClientImpl.java | 5 ++--
.../pulsar/client/util/ExecutorProvider.java | 8 ++++--
.../pulsar/functions/instance/InstanceCache.java | 4 +--
.../worker/ClusterServiceCoordinator.java | 4 +--
13 files changed, 53 insertions(+), 42 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 17fb80d9ee4..e31256177b0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -315,13 +315,13 @@ public class PulsarService implements AutoCloseable, ShutdownService {
this.config = config;
this.processTerminator = processTerminator;
this.loadManagerExecutor = Executors
- .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-load-manager"));
+ .newSingleThreadScheduledExecutor(new ExecutorProvider.ExtendedThreadFactory("pulsar-load-manager"));
this.workerConfig = workerConfig;
this.functionWorkerService = functionWorkerService;
this.executor = Executors.newScheduledThreadPool(config.getNumExecutorThreadPoolSize(),
- new DefaultThreadFactory("pulsar"));
+ new ExecutorProvider.ExtendedThreadFactory("pulsar"));
this.cacheExecutor = Executors.newScheduledThreadPool(config.getNumCacheExecutorThreadPoolSize(),
- new DefaultThreadFactory("zk-cache-callback"));
+ new ExecutorProvider.ExtendedThreadFactory("zk-cache-callback"));
if (config.isTransactionCoordinatorEnabled()) {
this.transactionExecutorProvider = new ExecutorProvider(this.getConfiguration()
@@ -615,7 +615,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private CompletableFuture<Void> addTimeoutHandling(CompletableFuture<Void> future) {
ScheduledExecutorService shutdownExecutor = Executors.newSingleThreadScheduledExecutor(
- new DefaultThreadFactory(getClass().getSimpleName() + "-shutdown"));
+ new ExecutorProvider.ExtendedThreadFactory(getClass().getSimpleName() + "-shutdown"));
FutureUtil.addTimeoutHandling(future,
Duration.ofMillis(Math.max(1L, getConfiguration().getBrokerShutdownTimeoutMs())),
shutdownExecutor, () -> FutureUtil.createTimeoutException("Timeout in close", getClass(), "close"));
@@ -1425,7 +1425,8 @@ public class PulsarService implements AutoCloseable, ShutdownService {
protected synchronized ScheduledExecutorService getCompactorExecutor() {
if (this.compactorExecutor == null) {
- compactorExecutor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("compaction"));
+ compactorExecutor = Executors.newSingleThreadScheduledExecutor(
+ new ExecutorProvider.ExtendedThreadFactory("compaction"));
}
return this.compactorExecutor;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 9ee1657f137..3d9e6924d11 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -24,7 +24,6 @@ import static org.apache.pulsar.transaction.coordinator.proto.TxnStatus.COMMITTI
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
-import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
@@ -52,6 +51,7 @@ import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import org.apache.pulsar.client.api.transaction.TransactionBufferClientException.ReachMaxPendingOpsException;
import org.apache.pulsar.client.api.transaction.TransactionBufferClientException.RequestTimeoutException;
import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.util.FutureUtil;
@@ -93,7 +93,7 @@ public class TransactionMetadataStoreService {
private static final long HANDLE_PENDING_CONNECT_TIME_OUT = 30000L;
private final ThreadFactory threadFactory =
- new DefaultThreadFactory("transaction-coordinator-thread-factory");
+ new ExecutorProvider.ExtendedThreadFactory("transaction-coordinator-thread-factory");
public TransactionMetadataStoreService(TransactionMetadataStoreProvider transactionMetadataStoreProvider,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index d04c64c163a..c14768eed5e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.broker.loadbalance.impl;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
-import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.ConcurrentModificationException;
@@ -60,6 +59,7 @@ import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLo
import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
@@ -212,7 +212,8 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
loadData = new LoadData();
loadSheddingPipeline = new ArrayList<>();
preallocatedBundleToBroker = new ConcurrentHashMap<>();
- scheduler = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-modular-load-manager"));
+ scheduler = Executors.newSingleThreadScheduledExecutor(
+ new ExecutorProvider.ExtendedThreadFactory("pulsar-modular-load-manager"));
this.brokerToFailureDomainMap = new HashMap<>();
this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index 5b2098b8e4d..c2c0d1947c9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -26,7 +26,6 @@ import com.google.common.cache.LoadingCache;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.collect.TreeMultimap;
-import io.netty.util.concurrent.DefaultThreadFactory;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
@@ -55,6 +54,7 @@ import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.PlacementStrategy;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
+import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.policies.data.ResourceQuota;
@@ -189,7 +189,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
// Perform initializations which may be done without a PulsarService.
public SimpleLoadManagerImpl() {
scheduler = Executors.newSingleThreadScheduledExecutor(
- new DefaultThreadFactory("pulsar-simple-load-manager"));
+ new ExecutorProvider.ExtendedThreadFactory("pulsar-simple-load-manager"));
this.sortedRankings.set(new TreeMap<>());
this.currentLoadReports = new HashMap<>();
this.resourceUnitRankings = new HashMap<>();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 14fb9a9a4b4..b410fb48b22 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -127,6 +127,7 @@ import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.internal.PropertiesUtils;
+import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.configuration.BindAddress;
import org.apache.pulsar.common.configuration.FieldContext;
@@ -311,13 +312,14 @@ public class BrokerService implements Closeable {
this.topicOrderedExecutor = OrderedExecutor.newBuilder()
.numThreads(pulsar.getConfiguration().getNumWorkerThreadsForNonPersistentTopic())
.name("broker-topic-workers").build();
- final DefaultThreadFactory acceptorThreadFactory = new DefaultThreadFactory("pulsar-acceptor");
+ final DefaultThreadFactory acceptorThreadFactory =
+ new ExecutorProvider.ExtendedThreadFactory("pulsar-acceptor");
this.acceptorGroup = EventLoopUtil.newEventLoopGroup(
pulsar.getConfiguration().getNumAcceptorThreads(), false, acceptorThreadFactory);
this.workerGroup = eventLoopGroup;
- this.statsUpdater = Executors
- .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-stats-updater"));
+ this.statsUpdater = Executors.newSingleThreadScheduledExecutor(
+ new ExecutorProvider.ExtendedThreadFactory("pulsar-stats-updater"));
this.authorizationService = new AuthorizationService(
pulsar.getConfiguration(), pulsar().getPulsarResources());
if (!pulsar.getConfiguration().getEntryFilterNames().isEmpty()) {
@@ -327,22 +329,22 @@ public class BrokerService implements Closeable {
pulsar.getLocalMetadataStore().registerListener(this::handleMetadataChanges);
pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges);
- this.inactivityMonitor = Executors
- .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-inactivity-monitor"));
- this.messageExpiryMonitor = Executors
- .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-msg-expiry-monitor"));
+ this.inactivityMonitor = Executors.newSingleThreadScheduledExecutor(
+ new ExecutorProvider.ExtendedThreadFactory("pulsar-inactivity-monitor"));
+ this.messageExpiryMonitor = Executors.newSingleThreadScheduledExecutor(
+ new ExecutorProvider.ExtendedThreadFactory("pulsar-msg-expiry-monitor"));
this.compactionMonitor =
Executors.newSingleThreadScheduledExecutor(
- new DefaultThreadFactory("pulsar-compaction-monitor"));
- this.consumedLedgersMonitor = Executors
- .newSingleThreadScheduledExecutor(new DefaultThreadFactory("consumed-Ledgers-monitor"));
+ new ExecutorProvider.ExtendedThreadFactory("pulsar-compaction-monitor"));
+ this.consumedLedgersMonitor = Executors.newSingleThreadScheduledExecutor(
+ new ExecutorProvider.ExtendedThreadFactory("consumed-Ledgers-monitor"));
this.topicPublishRateLimiterMonitor =
new PublishRateLimiterMonitor("pulsar-topic-publish-rate-limiter-monitor");
this.brokerPublishRateLimiterMonitor =
new PublishRateLimiterMonitor("pulsar-broker-publish-rate-limiter-monitor");
this.backlogQuotaManager = new BacklogQuotaManager(pulsar);
- this.backlogQuotaChecker = Executors
- .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-backlog-quota-checker"));
+ this.backlogQuotaChecker = Executors.newSingleThreadScheduledExecutor(
+ new ExecutorProvider.ExtendedThreadFactory("pulsar-backlog-quota-checker"));
this.authenticationService = new AuthenticationService(pulsar.getConfiguration());
this.blockedDispatchers =
ConcurrentOpenHashSet.<PersistentDispatcherMultipleConsumers>newBuilder().build();
@@ -429,7 +431,8 @@ public class BrokerService implements Closeable {
bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 1024));
EventLoopUtil.enableTriggeredMode(bootstrap);
- DefaultThreadFactory defaultThreadFactory = new DefaultThreadFactory("pulsar-ph-" + protocol);
+ DefaultThreadFactory defaultThreadFactory =
+ new ExecutorProvider.ExtendedThreadFactory("pulsar-ph-" + protocol);
EventLoopGroup dedicatedWorkerGroup =
EventLoopUtil.newEventLoopGroup(configuration.getNumIOThreads(), false, defaultThreadFactory);
bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(dedicatedWorkerGroup));
@@ -551,7 +554,7 @@ public class BrokerService implements Closeable {
int interval = pulsar().getConfiguration().getBrokerDeduplicationSnapshotFrequencyInSeconds();
if (interval > 0 && pulsar().getConfiguration().isBrokerDeduplicationEnabled()) {
this.deduplicationSnapshotMonitor =
- Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory(
+ Executors.newSingleThreadScheduledExecutor(new ExecutorProvider.ExtendedThreadFactory(
"deduplication-snapshot-monitor"));
deduplicationSnapshotMonitor.scheduleAtFixedRate(safeRun(() -> forEachTopic(
Topic::checkDeduplicationSnapshot))
@@ -685,7 +688,7 @@ public class BrokerService implements Closeable {
stop();
}
//start monitor.
- scheduler = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory(name));
+ scheduler = Executors.newSingleThreadScheduledExecutor(new ExecutorProvider.ExtendedThreadFactory(name));
// schedule task that sums up publish-rate across all cnx on a topic ,
// and check the rate limit exceeded or not.
scheduler.scheduleAtFixedRate(safeRun(checkTask), tickTimeMs, tickTimeMs, TimeUnit.MILLISECONDS);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java
index 3097d261313..73c0609c556 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.broker.stats.prometheus.metrics;
import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.google.common.annotations.VisibleForTesting;
-import io.netty.util.concurrent.DefaultThreadFactory;
import io.prometheus.client.Collector;
import java.io.IOException;
import java.io.Writer;
@@ -34,6 +33,7 @@ import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.client.util.ExecutorProvider;
/**
* A <i>Prometheus</i> based {@link StatsProvider} implementation.
@@ -90,7 +90,8 @@ public class PrometheusMetricsProvider implements StatsProvider {
@Override
public void start(Configuration conf) {
- executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("metrics"));
+ executor = Executors.newSingleThreadScheduledExecutor(
+ new ExecutorProvider.ExtendedThreadFactory("metrics"));
int latencyRolloverSeconds = conf.getInt(PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/tools/LoadReportCommand.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/tools/LoadReportCommand.java
index 9eaf8c1196d..935e3a9f2fa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/tools/LoadReportCommand.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/tools/LoadReportCommand.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.broker.loadbalance.BrokerHostUsage;
import org.apache.pulsar.broker.loadbalance.impl.GenericBrokerHostUsageImpl;
import org.apache.pulsar.broker.loadbalance.impl.LinuxBrokerHostUsageImpl;
import org.apache.pulsar.broker.tools.LoadReportCommand.Flags;
+import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
@@ -88,7 +89,8 @@ public class LoadReportCommand extends CliCommand<CliFlags, Flags> {
spec.console().println("--------------------------------------");
spec.console().println();
- ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+ ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(
+ new ExecutorProvider.ExtendedThreadFactory("load-report"));
BrokerHostUsage hostUsage;
try {
if (isLinux) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
index f2b4449aa0e..94e8026b701 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.client.impl;
import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.google.common.base.Strings;
-import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
@@ -38,6 +37,7 @@ import org.apache.pulsar.client.api.AutoClusterFailoverBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.ServiceUrlProvider;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.util.ExecutorProvider;
@Slf4j
@Data
@@ -80,7 +80,7 @@ public class AutoClusterFailover implements ServiceUrlProvider {
this.intervalMs = builder.checkIntervalMs;
this.resolver = new PulsarServiceNameResolver();
this.executor = Executors.newSingleThreadScheduledExecutor(
- new DefaultThreadFactory("pulsar-service-provider"));
+ new ExecutorProvider.ExtendedThreadFactory("pulsar-service-provider"));
}
@Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
index 3fb503cd0f7..4ab1977d0fb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
@@ -25,7 +25,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
-import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
@@ -44,6 +43,7 @@ import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ControlledClusterFailoverBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.ServiceUrlProvider;
+import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.AsyncHttpClientConfig;
@@ -74,7 +74,7 @@ public class ControlledClusterFailover implements ServiceUrlProvider {
this.currentPulsarServiceUrl = builder.defaultServiceUrl;
this.interval = builder.interval;
this.executor = Executors.newSingleThreadScheduledExecutor(
- new DefaultThreadFactory("pulsar-service-provider"));
+ new ExecutorProvider.ExtendedThreadFactory("pulsar-service-provider"));
this.httpClient = buildHttpClient();
this.requestBuilder = httpClient.prepareGet(builder.urlProvider)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 08adad46136..d964328d59c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -26,7 +26,6 @@ import com.google.common.cache.LoadingCache;
import io.netty.channel.EventLoopGroup;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
-import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.time.Clock;
@@ -756,7 +755,7 @@ public class PulsarClientImpl implements PulsarClient {
// would happen
CompletableFuture<Void> combinedFuture = FutureUtil.waitForAll(futures);
ScheduledExecutorService shutdownExecutor = Executors.newSingleThreadScheduledExecutor(
- new DefaultThreadFactory("pulsar-client-shutdown-timeout-scheduler"));
+ new ExecutorProvider.ExtendedThreadFactory("pulsar-client-shutdown-timeout-scheduler"));
FutureUtil.addTimeoutHandling(combinedFuture, Duration.ofSeconds(CLOSE_TIMEOUT_SECONDS),
shutdownExecutor, () -> FutureUtil.createTimeoutException("Closing producers and consumers timed out.",
PulsarClientImpl.class, "closeAsync"));
@@ -1087,7 +1086,7 @@ public class PulsarClientImpl implements PulsarClient {
}
private static ThreadFactory getThreadFactory(String poolName) {
- return new DefaultThreadFactory(poolName, Thread.currentThread().isDaemon());
+ return new ExecutorProvider.ExtendedThreadFactory(poolName, Thread.currentThread().isDaemon());
}
void cleanupProducer(ProducerBase<?> producer) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
index a7a97347002..037aef411a0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
@@ -40,10 +40,12 @@ public class ExecutorProvider {
private final String poolName;
private volatile boolean isShutdown;
- protected static class ExtendedThreadFactory extends DefaultThreadFactory {
-
+ public static class ExtendedThreadFactory extends DefaultThreadFactory {
@Getter
private Thread thread;
+ public ExtendedThreadFactory(String poolName) {
+ super(poolName, false);
+ }
public ExtendedThreadFactory(String poolName, boolean daemon) {
super(poolName, daemon);
}
@@ -51,6 +53,8 @@ public class ExecutorProvider {
@Override
public Thread newThread(Runnable r) {
thread = super.newThread(r);
+ thread.setUncaughtExceptionHandler((t, e) ->
+ log.error("Thread {} got uncaught Exception", t.getName(), e));
return thread;
}
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
index 988f7ce2e28..c9aea1148b1 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
@@ -18,11 +18,11 @@
*/
package org.apache.pulsar.functions.instance;
-import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import lombok.Getter;
+import org.apache.pulsar.client.util.ExecutorProvider;
public class InstanceCache {
@@ -33,7 +33,7 @@ public class InstanceCache {
private InstanceCache() {
ThreadFactory namedThreadFactory =
- new DefaultThreadFactory("function-timer-thread");
+ new ExecutorProvider.ExtendedThreadFactory("function-timer-thread");
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(namedThreadFactory);
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java
index f52259c6296..e01b5bc6943 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.functions.worker;
import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
@@ -30,6 +29,7 @@ import java.util.function.Supplier;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.util.ExecutorProvider;
@Slf4j
public class ClusterServiceCoordinator implements AutoCloseable {
@@ -54,7 +54,7 @@ public class ClusterServiceCoordinator implements AutoCloseable {
public ClusterServiceCoordinator(String workerId, LeaderService leaderService, Supplier<Boolean> isLeader) {
this(workerId, leaderService, isLeader, Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder().setNameFormat("cluster-service-coordinator-timer").build()));
+ new ExecutorProvider.ExtendedThreadFactory("cluster-service-coordinator-timer")));
}
@VisibleForTesting