You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/10/27 03:30:04 UTC

[pulsar] branch master updated: [improve][broker] Add UncaughtExceptionHandler for every thread pool (#18211)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b7c5c62965 [improve][broker] Add UncaughtExceptionHandler for every thread pool (#18211)
5b7c5c62965 is described below

commit 5b7c5c62965151c35d9e5b9f0b50bb93b0beb2c3
Author: feynmanlin <31...@qq.com>
AuthorDate: Thu Oct 27 11:29:53 2022 +0800

    [improve][broker] Add UncaughtExceptionHandler for every thread pool (#18211)
---
 .../org/apache/pulsar/broker/PulsarService.java    | 11 ++++----
 .../broker/TransactionMetadataStoreService.java    |  4 +--
 .../loadbalance/impl/ModularLoadManagerImpl.java   |  5 ++--
 .../loadbalance/impl/SimpleLoadManagerImpl.java    |  4 +--
 .../pulsar/broker/service/BrokerService.java       | 33 ++++++++++++----------
 .../metrics/PrometheusMetricsProvider.java         |  5 ++--
 .../pulsar/broker/tools/LoadReportCommand.java     |  4 ++-
 .../pulsar/client/impl/AutoClusterFailover.java    |  4 +--
 .../client/impl/ControlledClusterFailover.java     |  4 +--
 .../pulsar/client/impl/PulsarClientImpl.java       |  5 ++--
 .../pulsar/client/util/ExecutorProvider.java       |  8 ++++--
 .../pulsar/functions/instance/InstanceCache.java   |  4 +--
 .../worker/ClusterServiceCoordinator.java          |  4 +--
 13 files changed, 53 insertions(+), 42 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 17fb80d9ee4..e31256177b0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -315,13 +315,13 @@ public class PulsarService implements AutoCloseable, ShutdownService {
         this.config = config;
         this.processTerminator = processTerminator;
         this.loadManagerExecutor = Executors
-                .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-load-manager"));
+                .newSingleThreadScheduledExecutor(new ExecutorProvider.ExtendedThreadFactory("pulsar-load-manager"));
         this.workerConfig = workerConfig;
         this.functionWorkerService = functionWorkerService;
         this.executor = Executors.newScheduledThreadPool(config.getNumExecutorThreadPoolSize(),
-                new DefaultThreadFactory("pulsar"));
+                new ExecutorProvider.ExtendedThreadFactory("pulsar"));
         this.cacheExecutor = Executors.newScheduledThreadPool(config.getNumCacheExecutorThreadPoolSize(),
-                new DefaultThreadFactory("zk-cache-callback"));
+                new ExecutorProvider.ExtendedThreadFactory("zk-cache-callback"));
 
         if (config.isTransactionCoordinatorEnabled()) {
             this.transactionExecutorProvider = new ExecutorProvider(this.getConfiguration()
@@ -615,7 +615,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
 
     private CompletableFuture<Void> addTimeoutHandling(CompletableFuture<Void> future) {
         ScheduledExecutorService shutdownExecutor = Executors.newSingleThreadScheduledExecutor(
-                new DefaultThreadFactory(getClass().getSimpleName() + "-shutdown"));
+                new ExecutorProvider.ExtendedThreadFactory(getClass().getSimpleName() + "-shutdown"));
         FutureUtil.addTimeoutHandling(future,
                 Duration.ofMillis(Math.max(1L, getConfiguration().getBrokerShutdownTimeoutMs())),
                 shutdownExecutor, () -> FutureUtil.createTimeoutException("Timeout in close", getClass(), "close"));
@@ -1425,7 +1425,8 @@ public class PulsarService implements AutoCloseable, ShutdownService {
 
     protected synchronized ScheduledExecutorService getCompactorExecutor() {
         if (this.compactorExecutor == null) {
-            compactorExecutor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("compaction"));
+            compactorExecutor = Executors.newSingleThreadScheduledExecutor(
+                    new ExecutorProvider.ExtendedThreadFactory("compaction"));
         }
         return this.compactorExecutor;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 9ee1657f137..3d9e6924d11 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -24,7 +24,6 @@ import static org.apache.pulsar.transaction.coordinator.proto.TxnStatus.COMMITTI
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
-import io.netty.util.concurrent.DefaultThreadFactory;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.List;
@@ -52,6 +51,7 @@ import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
 import org.apache.pulsar.client.api.transaction.TransactionBufferClientException.ReachMaxPendingOpsException;
 import org.apache.pulsar.client.api.transaction.TransactionBufferClientException.RequestTimeoutException;
 import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.api.proto.TxnAction;
 import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -93,7 +93,7 @@ public class TransactionMetadataStoreService {
     private static final long HANDLE_PENDING_CONNECT_TIME_OUT = 30000L;
 
     private final ThreadFactory threadFactory =
-            new DefaultThreadFactory("transaction-coordinator-thread-factory");
+            new ExecutorProvider.ExtendedThreadFactory("transaction-coordinator-thread-factory");
 
 
     public TransactionMetadataStoreService(TransactionMetadataStoreProvider transactionMetadataStoreProvider,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index d04c64c163a..c14768eed5e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.broker.loadbalance.impl;
 
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
-import io.netty.util.concurrent.DefaultThreadFactory;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.ConcurrentModificationException;
@@ -60,6 +59,7 @@ import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLo
 import org.apache.pulsar.broker.resources.ClusterResources;
 import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.ServiceUnitId;
@@ -212,7 +212,8 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
         loadData = new LoadData();
         loadSheddingPipeline = new ArrayList<>();
         preallocatedBundleToBroker = new ConcurrentHashMap<>();
-        scheduler = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-modular-load-manager"));
+        scheduler = Executors.newSingleThreadScheduledExecutor(
+                new ExecutorProvider.ExtendedThreadFactory("pulsar-modular-load-manager"));
         this.brokerToFailureDomainMap = new HashMap<>();
 
         this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index 5b2098b8e4d..c2c0d1947c9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -26,7 +26,6 @@ import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 import com.google.common.collect.TreeMultimap;
-import io.netty.util.concurrent.DefaultThreadFactory;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -55,6 +54,7 @@ import org.apache.pulsar.broker.loadbalance.LoadManager;
 import org.apache.pulsar.broker.loadbalance.PlacementStrategy;
 import org.apache.pulsar.broker.loadbalance.ResourceUnit;
 import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
+import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.ServiceUnitId;
 import org.apache.pulsar.common.policies.data.ResourceQuota;
@@ -189,7 +189,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
     // Perform initializations which may be done without a PulsarService.
     public SimpleLoadManagerImpl() {
         scheduler = Executors.newSingleThreadScheduledExecutor(
-                new DefaultThreadFactory("pulsar-simple-load-manager"));
+                new ExecutorProvider.ExtendedThreadFactory("pulsar-simple-load-manager"));
         this.sortedRankings.set(new TreeMap<>());
         this.currentLoadReports = new HashMap<>();
         this.resourceUnitRankings = new HashMap<>();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 14fb9a9a4b4..b410fb48b22 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -127,6 +127,7 @@ import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.internal.PropertiesUtils;
+import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.configuration.BindAddress;
 import org.apache.pulsar.common.configuration.FieldContext;
@@ -311,13 +312,14 @@ public class BrokerService implements Closeable {
         this.topicOrderedExecutor = OrderedExecutor.newBuilder()
                 .numThreads(pulsar.getConfiguration().getNumWorkerThreadsForNonPersistentTopic())
                 .name("broker-topic-workers").build();
-        final DefaultThreadFactory acceptorThreadFactory = new DefaultThreadFactory("pulsar-acceptor");
+        final DefaultThreadFactory acceptorThreadFactory =
+                new ExecutorProvider.ExtendedThreadFactory("pulsar-acceptor");
 
         this.acceptorGroup = EventLoopUtil.newEventLoopGroup(
                 pulsar.getConfiguration().getNumAcceptorThreads(), false, acceptorThreadFactory);
         this.workerGroup = eventLoopGroup;
-        this.statsUpdater = Executors
-                .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-stats-updater"));
+        this.statsUpdater = Executors.newSingleThreadScheduledExecutor(
+                new ExecutorProvider.ExtendedThreadFactory("pulsar-stats-updater"));
         this.authorizationService = new AuthorizationService(
                 pulsar.getConfiguration(), pulsar().getPulsarResources());
         if (!pulsar.getConfiguration().getEntryFilterNames().isEmpty()) {
@@ -327,22 +329,22 @@ public class BrokerService implements Closeable {
         pulsar.getLocalMetadataStore().registerListener(this::handleMetadataChanges);
         pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges);
 
-        this.inactivityMonitor = Executors
-                .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-inactivity-monitor"));
-        this.messageExpiryMonitor = Executors
-                .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-msg-expiry-monitor"));
+        this.inactivityMonitor = Executors.newSingleThreadScheduledExecutor(
+                new ExecutorProvider.ExtendedThreadFactory("pulsar-inactivity-monitor"));
+        this.messageExpiryMonitor = Executors.newSingleThreadScheduledExecutor(
+                new ExecutorProvider.ExtendedThreadFactory("pulsar-msg-expiry-monitor"));
         this.compactionMonitor =
                 Executors.newSingleThreadScheduledExecutor(
-                        new DefaultThreadFactory("pulsar-compaction-monitor"));
-        this.consumedLedgersMonitor = Executors
-                .newSingleThreadScheduledExecutor(new DefaultThreadFactory("consumed-Ledgers-monitor"));
+                        new ExecutorProvider.ExtendedThreadFactory("pulsar-compaction-monitor"));
+        this.consumedLedgersMonitor = Executors.newSingleThreadScheduledExecutor(
+                new ExecutorProvider.ExtendedThreadFactory("consumed-Ledgers-monitor"));
         this.topicPublishRateLimiterMonitor =
                 new PublishRateLimiterMonitor("pulsar-topic-publish-rate-limiter-monitor");
         this.brokerPublishRateLimiterMonitor =
                 new PublishRateLimiterMonitor("pulsar-broker-publish-rate-limiter-monitor");
         this.backlogQuotaManager = new BacklogQuotaManager(pulsar);
-        this.backlogQuotaChecker = Executors
-                .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-backlog-quota-checker"));
+        this.backlogQuotaChecker = Executors.newSingleThreadScheduledExecutor(
+                new ExecutorProvider.ExtendedThreadFactory("pulsar-backlog-quota-checker"));
         this.authenticationService = new AuthenticationService(pulsar.getConfiguration());
         this.blockedDispatchers =
                 ConcurrentOpenHashSet.<PersistentDispatcherMultipleConsumers>newBuilder().build();
@@ -429,7 +431,8 @@ public class BrokerService implements Closeable {
             bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
                     new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 1024));
             EventLoopUtil.enableTriggeredMode(bootstrap);
-            DefaultThreadFactory defaultThreadFactory = new DefaultThreadFactory("pulsar-ph-" + protocol);
+            DefaultThreadFactory defaultThreadFactory =
+                    new ExecutorProvider.ExtendedThreadFactory("pulsar-ph-" + protocol);
             EventLoopGroup dedicatedWorkerGroup =
                     EventLoopUtil.newEventLoopGroup(configuration.getNumIOThreads(), false, defaultThreadFactory);
             bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(dedicatedWorkerGroup));
@@ -551,7 +554,7 @@ public class BrokerService implements Closeable {
         int interval = pulsar().getConfiguration().getBrokerDeduplicationSnapshotFrequencyInSeconds();
         if (interval > 0 && pulsar().getConfiguration().isBrokerDeduplicationEnabled()) {
             this.deduplicationSnapshotMonitor =
-                    Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory(
+                    Executors.newSingleThreadScheduledExecutor(new ExecutorProvider.ExtendedThreadFactory(
                             "deduplication-snapshot-monitor"));
             deduplicationSnapshotMonitor.scheduleAtFixedRate(safeRun(() -> forEachTopic(
                     Topic::checkDeduplicationSnapshot))
@@ -685,7 +688,7 @@ public class BrokerService implements Closeable {
                 stop();
             }
             //start monitor.
-            scheduler = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory(name));
+            scheduler = Executors.newSingleThreadScheduledExecutor(new ExecutorProvider.ExtendedThreadFactory(name));
             // schedule task that sums up publish-rate across all cnx on a topic ,
             // and check the rate limit exceeded or not.
             scheduler.scheduleAtFixedRate(safeRun(checkTask), tickTimeMs, tickTimeMs, TimeUnit.MILLISECONDS);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java
index 3097d261313..73c0609c556 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.broker.stats.prometheus.metrics;
 
 import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
 import com.google.common.annotations.VisibleForTesting;
-import io.netty.util.concurrent.DefaultThreadFactory;
 import io.prometheus.client.Collector;
 import java.io.IOException;
 import java.io.Writer;
@@ -34,6 +33,7 @@ import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.client.util.ExecutorProvider;
 
 /**
  * A <i>Prometheus</i> based {@link StatsProvider} implementation.
@@ -90,7 +90,8 @@ public class PrometheusMetricsProvider implements StatsProvider {
 
     @Override
     public void start(Configuration conf) {
-        executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("metrics"));
+        executor = Executors.newSingleThreadScheduledExecutor(
+                new ExecutorProvider.ExtendedThreadFactory("metrics"));
 
         int latencyRolloverSeconds = conf.getInt(PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
                 DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/tools/LoadReportCommand.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/tools/LoadReportCommand.java
index 9eaf8c1196d..935e3a9f2fa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/tools/LoadReportCommand.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/tools/LoadReportCommand.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.broker.loadbalance.BrokerHostUsage;
 import org.apache.pulsar.broker.loadbalance.impl.GenericBrokerHostUsageImpl;
 import org.apache.pulsar.broker.loadbalance.impl.LinuxBrokerHostUsageImpl;
 import org.apache.pulsar.broker.tools.LoadReportCommand.Flags;
+import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
 import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
 
@@ -88,7 +89,8 @@ public class LoadReportCommand extends CliCommand<CliFlags, Flags> {
         spec.console().println("--------------------------------------");
         spec.console().println();
 
-        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(
+                new ExecutorProvider.ExtendedThreadFactory("load-report"));
         BrokerHostUsage hostUsage;
         try {
             if (isLinux) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
index f2b4449aa0e..94e8026b701 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.client.impl;
 
 import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
 import com.google.common.base.Strings;
-import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
@@ -38,6 +37,7 @@ import org.apache.pulsar.client.api.AutoClusterFailoverBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.ServiceUrlProvider;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.util.ExecutorProvider;
 
 @Slf4j
 @Data
@@ -80,7 +80,7 @@ public class AutoClusterFailover implements ServiceUrlProvider {
         this.intervalMs = builder.checkIntervalMs;
         this.resolver = new PulsarServiceNameResolver();
         this.executor = Executors.newSingleThreadScheduledExecutor(
-                new DefaultThreadFactory("pulsar-service-provider"));
+                new ExecutorProvider.ExtendedThreadFactory("pulsar-service-provider"));
     }
 
     @Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
index 3fb503cd0f7..4ab1977d0fb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
@@ -25,7 +25,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 import io.netty.handler.codec.http.HttpRequest;
 import io.netty.handler.codec.http.HttpResponse;
-import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.charset.StandardCharsets;
@@ -44,6 +43,7 @@ import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.ControlledClusterFailoverBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.ServiceUrlProvider;
+import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.asynchttpclient.AsyncHttpClient;
 import org.asynchttpclient.AsyncHttpClientConfig;
@@ -74,7 +74,7 @@ public class ControlledClusterFailover implements ServiceUrlProvider {
         this.currentPulsarServiceUrl = builder.defaultServiceUrl;
         this.interval = builder.interval;
         this.executor = Executors.newSingleThreadScheduledExecutor(
-                new DefaultThreadFactory("pulsar-service-provider"));
+                new ExecutorProvider.ExtendedThreadFactory("pulsar-service-provider"));
 
         this.httpClient = buildHttpClient();
         this.requestBuilder = httpClient.prepareGet(builder.urlProvider)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 08adad46136..d964328d59c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -26,7 +26,6 @@ import com.google.common.cache.LoadingCache;
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
-import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.time.Clock;
@@ -756,7 +755,7 @@ public class PulsarClientImpl implements PulsarClient {
         // would happen
         CompletableFuture<Void> combinedFuture = FutureUtil.waitForAll(futures);
         ScheduledExecutorService shutdownExecutor = Executors.newSingleThreadScheduledExecutor(
-                new DefaultThreadFactory("pulsar-client-shutdown-timeout-scheduler"));
+                new ExecutorProvider.ExtendedThreadFactory("pulsar-client-shutdown-timeout-scheduler"));
         FutureUtil.addTimeoutHandling(combinedFuture, Duration.ofSeconds(CLOSE_TIMEOUT_SECONDS),
                 shutdownExecutor, () -> FutureUtil.createTimeoutException("Closing producers and consumers timed out.",
                         PulsarClientImpl.class, "closeAsync"));
@@ -1087,7 +1086,7 @@ public class PulsarClientImpl implements PulsarClient {
     }
 
     private static ThreadFactory getThreadFactory(String poolName) {
-        return new DefaultThreadFactory(poolName, Thread.currentThread().isDaemon());
+        return new ExecutorProvider.ExtendedThreadFactory(poolName, Thread.currentThread().isDaemon());
     }
 
     void cleanupProducer(ProducerBase<?> producer) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
index a7a97347002..037aef411a0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
@@ -40,10 +40,12 @@ public class ExecutorProvider {
     private final String poolName;
     private volatile boolean isShutdown;
 
-    protected static class ExtendedThreadFactory extends DefaultThreadFactory {
-
+    public static class ExtendedThreadFactory extends DefaultThreadFactory {
         @Getter
         private Thread thread;
+        public ExtendedThreadFactory(String poolName) {
+            super(poolName, false);
+        }
         public ExtendedThreadFactory(String poolName, boolean daemon) {
             super(poolName, daemon);
         }
@@ -51,6 +53,8 @@ public class ExecutorProvider {
         @Override
         public Thread newThread(Runnable r) {
             thread = super.newThread(r);
+            thread.setUncaughtExceptionHandler((t, e) ->
+                    log.error("Thread {} got uncaught Exception", t.getName(), e));
             return thread;
         }
     }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
index 988f7ce2e28..c9aea1148b1 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
@@ -18,11 +18,11 @@
  */
 package org.apache.pulsar.functions.instance;
 
-import io.netty.util.concurrent.DefaultThreadFactory;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import lombok.Getter;
+import org.apache.pulsar.client.util.ExecutorProvider;
 
 public class InstanceCache {
 
@@ -33,7 +33,7 @@ public class InstanceCache {
 
     private InstanceCache() {
         ThreadFactory namedThreadFactory =
-                new DefaultThreadFactory("function-timer-thread");
+                new ExecutorProvider.ExtendedThreadFactory("function-timer-thread");
         scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(namedThreadFactory);
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java
index f52259c6296..e01b5bc6943 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.functions.worker;
 
 import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Executors;
@@ -30,6 +29,7 @@ import java.util.function.Supplier;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.util.ExecutorProvider;
 
 @Slf4j
 public class ClusterServiceCoordinator implements AutoCloseable {
@@ -54,7 +54,7 @@ public class ClusterServiceCoordinator implements AutoCloseable {
 
     public ClusterServiceCoordinator(String workerId, LeaderService leaderService, Supplier<Boolean> isLeader) {
         this(workerId, leaderService, isLeader, Executors.newSingleThreadScheduledExecutor(
-                new ThreadFactoryBuilder().setNameFormat("cluster-service-coordinator-timer").build()));
+                new ExecutorProvider.ExtendedThreadFactory("cluster-service-coordinator-timer")));
     }
 
     @VisibleForTesting