You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/04/16 15:59:54 UTC

[pulsar] branch master updated: Shutdown Broker gracefully, but forcefully after brokerShutdownTimeoutMs (#10199)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 152d1e6  Shutdown Broker gracefully, but forcefully after brokerShutdownTimeoutMs (#10199)
152d1e6 is described below

commit 152d1e69d16b510521003a33ea877ae33f0ce8e5
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Fri Apr 16 18:59:03 2021 +0300

    Shutdown Broker gracefully, but forcefully after brokerShutdownTimeoutMs (#10199)
    
    * Wait for shutdown of BrokerService event loops
    
    * Move CompletableFutureCancellationHandler to pulsar-common util
    
    * Prevent misusage of CompletableFutureCancellationHandler
    
    * Clear cancelAction field after the future completes
    
    - make cancelAction eligible for GC after the future completes (or
      gets cancelled)
    
    * Support cancel signalling when executing multiple futures
    
    * Shutdown BrokerService gracefully which using closeAsync
    
    - shutdown forcefully when closing times out
    
    * Set 100ms to brokerShutdownTimeoutMs used in tests
    
    * Revert changes in MessagingServiceShutdownHook
    
    * Handle CancellationException since it's used in timeouts in BrokerService
    
    * Set shutdown timeout to 0 ms in tests
    
    * Ignore TimeoutException and CancellationException when broker shutdown timeout is 0
    
    * Extract GracefulExecutorServicesShutdown and use it in PulsarService
    
    - handle graceful / forcefully shutdown also for PulsarService executors
    
    * Fix some unclosed PulsarServices
    
    * Set shutdown timeout to 0 in some more tests
    
    * Do some class and method renamings to clarify the code
    
    * Revisit the logic to use awaitTermination
    
    * Use shutdownNow to shutdown the scheduler used for future timeout handling
---
 .../broker/MessagingServiceShutdownHook.java       |   3 +-
 .../org/apache/pulsar/broker/PulsarService.java    |  91 ++++++-----
 .../pulsar/broker/service/BrokerService.java       | 144 +++++++++++++-----
 .../service/GracefulExecutorServicesShutdown.java  | 117 ++++++++++++++
 ...GracefulExecutorServicesTerminationHandler.java | 160 ++++++++++++++++++++
 .../apache/pulsar/broker/PulsarServiceTest.java    |   7 +-
 .../apache/pulsar/broker/SLAMonitoringTest.java    |   6 +-
 .../broker/admin/BrokerAdminClientTlsAuthTest.java |   1 +
 .../broker/auth/MockedPulsarServiceBaseTest.java   |   5 +
 .../AntiAffinityNamespaceGroupTest.java            |   2 +
 .../loadbalance/LeaderElectionServiceTest.java     |   1 +
 .../broker/loadbalance/LoadBalancerTest.java       |   1 +
 .../loadbalance/ModularLoadManagerImplTest.java    |   3 +
 .../loadbalance/SimpleLoadManagerImplTest.java     |   2 +
 .../OwnerShipForCurrentServerTestBase.java         |   1 +
 .../broker/service/AdvertisedAddressTest.java      |   1 +
 .../broker/service/BacklogQuotaManagerTest.java    |   1 +
 .../pulsar/broker/service/BkEnsemblesTestBase.java |   1 +
 .../broker/service/BrokerBookieIsolationTest.java  |   3 +
 .../GracefulExecutorServicesShutdownTest.java      | 168 +++++++++++++++++++++
 .../pulsar/broker/service/MaxMessageSizeTest.java  |   1 +
 .../PersistentDispatcherFailoverConsumerTest.java  |   1 +
 .../service/PersistentTopicConcurrentTest.java     |   3 +
 .../pulsar/broker/service/PersistentTopicTest.java |   1 +
 .../pulsar/broker/service/ReplicatorTestBase.java  |   1 +
 .../pulsar/broker/service/ServerCnxTest.java       |   1 +
 .../pulsar/broker/service/TopicOwnerTest.java      |   1 +
 .../persistent/PersistentSubscriptionTest.java     |   1 +
 .../broker/transaction/TransactionTestBase.java    |   1 +
 .../coordinator/TransactionMetaStoreTestBase.java  |   1 +
 .../apache/pulsar/broker/web/WebServiceTest.java   |   1 +
 .../pulsar/client/api/BrokerServiceLookupTest.java |   8 +
 .../client/api/ClientDeduplicationFailureTest.java |   1 +
 .../pulsar/client/api/NonPersistentTopicTest.java  |   3 +
 .../pulsar/client/api/ServiceUrlProviderTest.java  |   1 +
 .../worker/PulsarFunctionE2ESecurityTest.java      |   1 +
 .../worker/PulsarFunctionLocalRunTest.java         |   1 +
 .../worker/PulsarFunctionPublishTest.java          |   1 +
 .../functions/worker/PulsarFunctionTlsTest.java    |   1 +
 .../worker/PulsarWorkerAssignmentTest.java         |   1 +
 .../apache/pulsar/io/AbstractPulsarE2ETest.java    |   1 +
 .../apache/pulsar/io/PulsarFunctionAdminTest.java  |   1 +
 .../apache/pulsar/io/PulsarFunctionTlsTest.java    |   1 +
 .../apache/pulsar/client/impl/ConsumerImpl.java    |   1 +
 .../client/impl/MultiTopicsConsumerImpl.java       |   1 +
 .../CompletableFutureCancellationHandler.java      |  40 +++--
 .../org/apache/pulsar/common/util/FutureUtil.java  |  45 +++++-
 .../pulsar/common/util/netty/EventLoopUtil.java    |  10 ++
 .../pulsar/common/util/netty/NettyFutureUtil.java  |  85 +++++++++++
 .../CompletableFutureCancellationHandlerTest.java  |   8 +-
 50 files changed, 841 insertions(+), 101 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java
index 0addf1e..187f4cd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker;
 
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.lang.reflect.Method;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -76,7 +77,7 @@ public class MessagingServiceShutdownHook extends Thread implements ShutdownServ
             future.get(service.getConfiguration().getBrokerShutdownTimeoutMs(), TimeUnit.MILLISECONDS);
 
             LOG.info("Completed graceful shutdown. Exiting");
-        } catch (TimeoutException e) {
+        } catch (TimeoutException | CancellationException e) {
             LOG.warn("Graceful shutdown timeout expired. Closing now");
         } catch (Exception e) {
             LOG.error("Failed to perform graceful shutdown, Exiting anyway", e);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 7d7787c..17bb041 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -33,6 +33,7 @@ import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedList;
@@ -40,6 +41,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
@@ -48,6 +50,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
@@ -89,6 +92,7 @@ import org.apache.pulsar.broker.protocol.ProtocolHandlers;
 import org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.GracefulExecutorServicesShutdown;
 import org.apache.pulsar.broker.service.SystemTopicBaseTxnBufferSnapshotService;
 import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
 import org.apache.pulsar.broker.service.Topic;
@@ -164,6 +168,7 @@ import org.slf4j.LoggerFactory;
 @Setter(AccessLevel.PROTECTED)
 public class PulsarService implements AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(PulsarService.class);
+    private static final double GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT = 0.5d;
     private ServiceConfiguration config = null;
     private NamespaceService nsService = null;
     private ManagedLedgerStorage managedLedgerClientFactory = null;
@@ -236,7 +241,7 @@ public class PulsarService implements AutoCloseable {
     private PulsarResources pulsarResources;
 
     public enum State {
-        Init, Started, Closed
+        Init, Started, Closing, Closed
     }
 
     private volatile State state;
@@ -308,10 +313,15 @@ public class PulsarService implements AutoCloseable {
         try {
             closeAsync().get();
         } catch (ExecutionException e) {
-            if (e.getCause() instanceof PulsarServerException) {
-                throw (PulsarServerException) e.getCause();
+            Throwable cause = e.getCause();
+            if (cause instanceof PulsarServerException) {
+                throw (PulsarServerException) cause;
+            } else if (getConfiguration().getBrokerShutdownTimeoutMs() == 0
+                    && (cause instanceof TimeoutException || cause instanceof CancellationException)) {
+                // ignore shutdown timeout when timeout is 0, which is primarily used in tests
+                // to forcefully shutdown the broker
             } else {
-                throw new PulsarServerException(e.getCause());
+                throw new PulsarServerException(cause);
             }
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
@@ -327,6 +337,7 @@ public class PulsarService implements AutoCloseable {
             if (closeFuture != null) {
                 return closeFuture;
             }
+            state = State.Closing;
 
             // close the service in reverse order v.s. in which they are started
             if (this.webService != null) {
@@ -345,6 +356,15 @@ public class PulsarService implements AutoCloseable {
                 this.webSocketService.close();
             }
 
+            GracefulExecutorServicesShutdown executorServicesShutdown =
+                    GracefulExecutorServicesShutdown
+                            .initiate()
+                            .timeout(
+                                    Duration.ofMillis(
+                                            (long) (GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT
+                                                    * getConfiguration()
+                                                    .getBrokerShutdownTimeoutMs())));
+
             List<CompletableFuture<Void>> asyncCloseFutures = new ArrayList<>();
             if (this.brokerService != null) {
                 asyncCloseFutures.add(this.brokerService.closeAsync());
@@ -366,7 +386,7 @@ public class PulsarService implements AutoCloseable {
                 this.leaderElectionService = null;
             }
 
-            loadManagerExecutor.shutdown();
+            executorServicesShutdown.shutdown(loadManagerExecutor);
 
             if (globalZkCache != null) {
                 globalZkCache.close();
@@ -402,24 +422,11 @@ public class PulsarService implements AutoCloseable {
                 nsService = null;
             }
 
-            if (compactorExecutor != null) {
-                compactorExecutor.shutdown();
-            }
-
-            if (offloaderScheduler != null) {
-                offloaderScheduler.shutdown();
-            }
-
-            // executor is not initialized in mocks even when real close method is called
-            // guard against null executors
-            if (executor != null) {
-                executor.shutdown();
-            }
-
-            if (orderedExecutor != null) {
-                orderedExecutor.shutdown();
-            }
-            cacheExecutor.shutdown();
+            executorServicesShutdown.shutdown(compactorExecutor);
+            executorServicesShutdown.shutdown(offloaderScheduler);
+            executorServicesShutdown.shutdown(executor);
+            executorServicesShutdown.shutdown(orderedExecutor);
+            executorServicesShutdown.shutdown(cacheExecutor);
 
             LoadManager loadManager = this.loadManager.get();
             if (loadManager != null) {
@@ -441,10 +448,7 @@ public class PulsarService implements AutoCloseable {
                 transactionBufferClient.close();
             }
 
-            if (transactionExecutor != null) {
-                transactionExecutor.shutdown();
-                transactionExecutor = null;
-            }
+            executorServicesShutdown.shutdown(transactionExecutor);
 
             if (coordinationService != null) {
                 coordinationService.close();
@@ -457,20 +461,23 @@ public class PulsarService implements AutoCloseable {
                 configurationMetadataStore.close();
             }
 
-            state = State.Closed;
-            isClosedCondition.signalAll();
+            // add timeout handling for closing executors
+            asyncCloseFutures.add(executorServicesShutdown.handle());
 
-            CompletableFuture<Void> shutdownFuture =
-                    CompletableFuture.allOf(asyncCloseFutures.toArray(new CompletableFuture[0]));
-            closeFuture = shutdownFuture;
-            return shutdownFuture;
+            closeFuture = addTimeoutHandling(FutureUtil.waitForAllAndSupportCancel(asyncCloseFutures));
+            closeFuture.handle((v, t) -> {
+                state = State.Closed;
+                isClosedCondition.signalAll();
+                return null;
+            });
+            return closeFuture;
         } catch (Exception e) {
             PulsarServerException pse;
             if (e instanceof CompletionException && e.getCause() instanceof MetadataStoreException) {
-                pse = new PulsarServerException(MetadataStoreException.unwrap((CompletionException) e));
+                pse = new PulsarServerException(MetadataStoreException.unwrap(e));
             } else if (e.getCause() instanceof CompletionException
                     && e.getCause().getCause() instanceof MetadataStoreException) {
-                pse = new PulsarServerException(MetadataStoreException.unwrap((CompletionException) e.getCause()));
+                pse = new PulsarServerException(MetadataStoreException.unwrap(e.getCause()));
             } else {
                 pse = new PulsarServerException(e);
             }
@@ -480,6 +487,20 @@ public class PulsarService implements AutoCloseable {
         }
     }
 
+    private CompletableFuture<Void> addTimeoutHandling(CompletableFuture<Void> future) {
+        ScheduledExecutorService shutdownExecutor = Executors.newSingleThreadScheduledExecutor(
+                new DefaultThreadFactory(getClass().getSimpleName() + "-shutdown"));
+        FutureUtil.addTimeoutHandling(future,
+                Duration.ofMillis(Math.max(1L, getConfiguration().getBrokerShutdownTimeoutMs())),
+                shutdownExecutor, () -> FutureUtil.createTimeoutException("Timeout in close", getClass(), "close"));
+        future.handle((v, t) -> {
+            // shutdown the shutdown executor
+            shutdownExecutor.shutdownNow();
+            return null;
+        });
+        return future;
+    }
+
     /**
      * Get the current service configuration.
      *
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 718cb29..c1961f3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -54,11 +54,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.Semaphore;
@@ -165,6 +167,7 @@ import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.apache.pulsar.common.util.netty.ChannelFutures;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.apache.pulsar.common.util.netty.NettyFutureUtil;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.apache.pulsar.zookeeper.ZkIsolatedBookieEnsemblePlacementPolicy;
 import org.apache.pulsar.zookeeper.ZooKeeperCacheListener;
@@ -187,6 +190,9 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
     private static final TimeoutException FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION =
             FutureUtil.createTimeoutException("Failed to load topic within timeout", BrokerService.class,
                     "futureWithDeadline(...)");
+    private static final long GRACEFUL_SHUTDOWN_QUIET_PERIOD_MAX_MS = 5000L;
+    private static final double GRACEFUL_SHUTDOWN_QUIET_PERIOD_RATIO_OF_TOTAL_TIMEOUT = 0.25d;
+    private static final double GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT = 0.5d;
 
     private final PulsarService pulsar;
     private final ManagedLedgerFactory managedLedgerFactory;
@@ -694,60 +700,118 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
                 }
             });
 
-            List<CompletableFuture<Void>> asyncCloseFutures = new ArrayList<>();
+            CompletableFuture<CompletableFuture<Void>> cancellableDownstreamFutureReference = new CompletableFuture<>();
+            CompletableFuture<Void> shutdownFuture =
+                    CompletableFuture.allOf(shutdownEventLoopGracefully(acceptorGroup),
+                            shutdownEventLoopGracefully(workerGroup))
+                            .handle((v, t) -> {
+                                if (t != null) {
+                                    log.warn("Error shutting down event loops gracefully", t);
+                                } else {
+                                    log.info("Event loops shutdown completed.");
+                                }
+                                return null;
+                            })
+                            .thenCompose(__ -> {
+                                log.info("Continuing to second phase in shutdown.");
 
-            if (listenChannel != null && listenChannel.isOpen()) {
-                asyncCloseFutures.add(closeChannel(listenChannel));
-            }
+                                List<CompletableFuture<Void>> asyncCloseFutures = new ArrayList<>();
 
-            if (listenChannelTls != null && listenChannelTls.isOpen()) {
-                asyncCloseFutures.add(closeChannel(listenChannelTls));
-            }
+                                if (listenChannel != null && listenChannel.isOpen()) {
+                                    asyncCloseFutures.add(closeChannel(listenChannel));
+                                }
 
-            acceptorGroup.shutdownGracefully();
-            workerGroup.shutdownGracefully();
+                                if (listenChannelTls != null && listenChannelTls.isOpen()) {
+                                    asyncCloseFutures.add(closeChannel(listenChannelTls));
+                                }
 
-            if (interceptor != null) {
-                interceptor.close();
-                interceptor = null;
-            }
+                                if (interceptor != null) {
+                                    interceptor.close();
+                                    interceptor = null;
+                                }
 
-            statsUpdater.shutdown();
-            inactivityMonitor.shutdown();
-            messageExpiryMonitor.shutdown();
-            compactionMonitor.shutdown();
-            messagePublishBufferMonitor.shutdown();
-            consumedLedgersMonitor.shutdown();
-            backlogQuotaChecker.shutdown();
-            authenticationService.close();
-            pulsarStats.close();
-            ClientCnxnAspect.removeListener(zkStatsListener);
-            ClientCnxnAspect.registerExecutor(null);
-            topicOrderedExecutor.shutdown();
-            delayedDeliveryTrackerFactory.close();
-            if (topicPublishRateLimiterMonitor != null) {
-                topicPublishRateLimiterMonitor.shutdown();
-            }
-            if (brokerPublishRateLimiterMonitor != null) {
-                brokerPublishRateLimiterMonitor.shutdown();
-            }
-            if (deduplicationSnapshotMonitor != null) {
-                deduplicationSnapshotMonitor.shutdown();
-            }
+                                try {
+                                    authenticationService.close();
+                                } catch (IOException e) {
+                                    log.warn("Error in closing authenticationService", e);
+                                }
+                                pulsarStats.close();
+                                ClientCnxnAspect.removeListener(zkStatsListener);
+                                ClientCnxnAspect.registerExecutor(null);
+                                try {
+                                    delayedDeliveryTrackerFactory.close();
+                                } catch (IOException e) {
+                                    log.warn("Error in closing delayedDeliveryTrackerFactory", e);
+                                }
 
-            CompletableFuture<Void> shutdownFuture =
-                    CompletableFuture.allOf(asyncCloseFutures.toArray(new CompletableFuture[0]))
-                            .thenAccept(__ -> log.info("Broker service completely shut down"));
+                                asyncCloseFutures.add(GracefulExecutorServicesShutdown
+                                        .initiate()
+                                        .timeout(
+                                                Duration.ofMillis(
+                                                        (long) (GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT
+                                                                * pulsar.getConfiguration()
+                                                                .getBrokerShutdownTimeoutMs())))
+                                        .shutdown(
+                                                statsUpdater,
+                                                inactivityMonitor,
+                                                messageExpiryMonitor,
+                                                compactionMonitor,
+                                                messagePublishBufferMonitor,
+                                                consumedLedgersMonitor,
+                                                backlogQuotaChecker,
+                                                topicOrderedExecutor,
+                                                topicPublishRateLimiterMonitor,
+                                                brokerPublishRateLimiterMonitor,
+                                                deduplicationSnapshotMonitor)
+                                        .handle());
+
+                                CompletableFuture<Void> combined =
+                                        FutureUtil.waitForAllAndSupportCancel(asyncCloseFutures);
+                                cancellableDownstreamFutureReference.complete(combined);
+                                combined.handle((v, t) -> {
+                                    if (t == null) {
+                                        log.info("Broker service completely shut down");
+                                    } else {
+                                        if (t instanceof CancellationException) {
+                                            log.warn("Broker service didn't complete gracefully. "
+                                                    + "Terminating Broker service.");
+                                        } else {
+                                            log.warn("Broker service shut down completed with exception", t);
+                                        }
+                                    }
+                                    return null;
+                                });
+                                return combined;
+                            });
+            FutureUtil.whenCancelledOrTimedOut(shutdownFuture, () -> cancellableDownstreamFutureReference
+                    .thenAccept(future -> future.cancel(false)));
             return shutdownFuture;
         } catch (Exception e) {
             return FutureUtil.failedFuture(e);
         }
     }
 
+    CompletableFuture<Void> shutdownEventLoopGracefully(EventLoopGroup eventLoopGroup) {
+        long brokerShutdownTimeoutMs = pulsar.getConfiguration().getBrokerShutdownTimeoutMs();
+        long quietPeriod = Math.min((long) (
+                GRACEFUL_SHUTDOWN_QUIET_PERIOD_RATIO_OF_TOTAL_TIMEOUT * brokerShutdownTimeoutMs),
+                GRACEFUL_SHUTDOWN_QUIET_PERIOD_MAX_MS);
+        long timeout = (long) (GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT * brokerShutdownTimeoutMs);
+        return NettyFutureUtil.toCompletableFutureVoid(
+                eventLoopGroup.shutdownGracefully(quietPeriod,
+                        timeout, TimeUnit.MILLISECONDS));
+    }
+
     private CompletableFuture<Void> closeChannel(Channel channel) {
         return ChannelFutures.toCompletableFuture(channel.close())
-                // convert to CompletableFuture<Void>
-                .thenAccept(__ -> {});
+                .handle((c, t) -> {
+                    // log problem if closing of channel fails
+                    // ignore RejectedExecutionException
+                    if (t != null && !(t instanceof RejectedExecutionException)) {
+                        log.warn("Cannot close channel {}", channel, t);
+                    }
+                    return null;
+                });
     }
 
     /**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/GracefulExecutorServicesShutdown.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/GracefulExecutorServicesShutdown.java
new file mode 100644
index 0000000..1c7134f
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/GracefulExecutorServicesShutdown.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This a builder like class for providing a fluent API for graceful shutdown
+ *
+ * Executors are added with the {@link #shutdown(ExecutorService...)}
+ * method. The {@link ExecutorService#shutdown()} method is called immediately.
+ *
+ * Calling the {@link #handle()} method returns a future which completes when all executors
+ * have been terminated.
+ *
+ * The executors will waited for completion with the {@link ExecutorService#awaitTermination(long, TimeUnit)} method.
+ * If the shutdown times out or the future is cancelled, all executors will be terminated and the termination
+ * timeout value will be used for waiting for termination.
+ * The default value for termination timeout is 10% of the shutdown timeout.
+ */
+public class GracefulExecutorServicesShutdown {
+    private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(15);
+    private static final Double DEFAULT_TERMINATION_TIMEOUT_RATIO = 0.1d;
+    private final List<ExecutorService> executorServices = new ArrayList<>();
+    private Duration timeout = DEFAULT_TIMEOUT;
+    private Duration terminationTimeout;
+
+    private GracefulExecutorServicesShutdown() {
+
+    }
+
+    /**
+     * Initiates a new shutdown for one or many {@link ExecutorService}s.
+     *
+     * @return a new instance for controlling graceful shutdown
+     */
+    public static GracefulExecutorServicesShutdown initiate() {
+        return new GracefulExecutorServicesShutdown();
+    }
+
+    /**
+     * Calls {@link ExecutorService#shutdown()} and enlists the executor as part of the
+     * shutdown handling.
+     *
+     * @param executorServices one or many executors to shutdown
+     * @return the current instance for controlling graceful shutdown
+     */
+    public GracefulExecutorServicesShutdown shutdown(ExecutorService... executorServices) {
+        for (ExecutorService executorService : executorServices) {
+            if (executorService != null) {
+                executorService.shutdown();
+                this.executorServices.add(executorService);
+            }
+        }
+        return this;
+    }
+
+    /**
+     * Sets the timeout for graceful shutdown.
+     *
+     * @param timeout duration for the timeout
+     * @return the current instance for controlling graceful shutdown
+     */
+    public GracefulExecutorServicesShutdown timeout(Duration timeout) {
+        this.timeout = timeout;
+        return this;
+    }
+
+    /**
+     * Sets the timeout for waiting for executors to complete in forceful termination.
+     *
+     * @param terminationTimeout duration for the timeout
+     * @return the current instance for controlling graceful shutdown
+     */
+    public GracefulExecutorServicesShutdown terminationTimeout(Duration terminationTimeout) {
+        this.terminationTimeout = terminationTimeout;
+        return this;
+    }
+
+    /**
+     * Starts the handler for polling frequently for the completed termination of enlisted executors.
+     *
+     * If the termination times out or the future is cancelled, all active executors will be forcefully
+     * terminated by calling {@link ExecutorService#shutdownNow()}.
+     *
+     * @return a future which completes when all executors have terminated
+     */
+    public CompletableFuture<Void> handle() {
+        // if termination timeout isn't provided, calculate a termination timeout based on the shutdown timeout
+        if (terminationTimeout == null) {
+            terminationTimeout = Duration.ofNanos((long) (timeout.toNanos() * DEFAULT_TERMINATION_TIMEOUT_RATIO));
+        }
+        return new GracefulExecutorServicesTerminationHandler(timeout, terminationTimeout,
+                executorServices).getFuture();
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/GracefulExecutorServicesTerminationHandler.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/GracefulExecutorServicesTerminationHandler.java
new file mode 100644
index 0000000..9abe078
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/GracefulExecutorServicesTerminationHandler.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.util.FutureUtil;
+
+/**
+ * Waits for termination of {@link ExecutorService}s that have been shutdown.
+ *
+ * The executors will be terminated forcefully after the timeout or when the future is cancelled.
+ *
+ * Designed to be used via the API in {@link GracefulExecutorServicesShutdown}
+ */
+@Slf4j
+class GracefulExecutorServicesTerminationHandler {
+    private static final long SHUTDOWN_THREAD_COMPLETION_TIMEOUT_NANOS = Duration.ofMillis(100L).toNanos();
+    private final List<ExecutorService> executors;
+    private final CompletableFuture<Void> future;
+    private final Duration shutdownTimeout;
+    private final Duration terminationTimeout;
+    private final CountDownLatch shutdownThreadCompletedLatch = new CountDownLatch(1);
+
+    GracefulExecutorServicesTerminationHandler(Duration shutdownTimeout, Duration terminationTimeout,
+                                               List<ExecutorService> executorServices) {
+        this.shutdownTimeout = shutdownTimeout;
+        this.terminationTimeout = terminationTimeout;
+        this.executors = Collections.unmodifiableList(new ArrayList<>(executorServices));
+        this.future = new CompletableFuture<>();
+        log.info("Starting termination handler for {} executors.", executors.size());
+        for (ExecutorService executor : executors) {
+            if (!executor.isShutdown()) {
+                throw new IllegalStateException(
+                        String.format("Executor %s should have been shutdown before entering the termination handler.",
+                                executor));
+            }
+        }
+        if (haveExecutorsBeenTerminated()) {
+            markShutdownCompleted();
+        } else {
+            if (shutdownTimeout.isZero() || shutdownTimeout.isNegative()) {
+                terminateExecutors();
+                markShutdownCompleted();
+            } else {
+                Thread shutdownWaitingThread = new Thread(this::awaitShutdown, getClass().getSimpleName());
+                shutdownWaitingThread.start();
+                FutureUtil.whenCancelledOrTimedOut(future, () -> {
+                    shutdownWaitingThread.interrupt();
+                    waitUntilShutdownWaitingThreadIsCompleted();
+                });
+            }
+        }
+    }
+
+    public CompletableFuture<Void> getFuture() {
+        return future;
+    }
+
+    private boolean haveExecutorsBeenTerminated() {
+        return executors.stream().allMatch(ExecutorService::isTerminated);
+    }
+
+    private void markShutdownCompleted() {
+        log.info("Shutdown completed.");
+        future.complete(null);
+    }
+
+    private void awaitShutdown() {
+        try {
+            awaitTermination(shutdownTimeout);
+            terminateExecutors();
+            markShutdownCompleted();
+        } catch (Exception e) {
+            log.error("Error in termination handler", e);
+            future.completeExceptionally(e);
+        } finally {
+            shutdownThreadCompletedLatch.countDown();
+        }
+    }
+
+    private boolean awaitTermination(Duration timeout) {
+        if (!timeout.isZero() && !timeout.isNegative()) {
+            long awaitUntilNanos = System.nanoTime() + timeout.toNanos();
+            while (!Thread.currentThread().isInterrupted() && System.nanoTime() < awaitUntilNanos) {
+                int activeExecutorsCount = executors.size();
+                for (ExecutorService executor : executors) {
+                    long remainingTimeNanos = awaitUntilNanos - System.nanoTime();
+                    if (remainingTimeNanos > 0) {
+                        try {
+                            if (executor.isTerminated()
+                                    || executor.awaitTermination(remainingTimeNanos, TimeUnit.NANOSECONDS)) {
+                                activeExecutorsCount--;
+                            }
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                            break;
+                        }
+                    }
+                }
+                if (activeExecutorsCount == 0) {
+                    return true;
+                }
+            }
+        }
+        return haveExecutorsBeenTerminated();
+    }
+
+    private void terminateExecutors() {
+        for (ExecutorService executor : executors) {
+            if (!executor.isTerminated()) {
+                log.info("Shutting down forcefully executor {}", executor);
+                executor.shutdownNow();
+            }
+        }
+        if (!Thread.currentThread().isInterrupted() && !awaitTermination(terminationTimeout)) {
+            for (ExecutorService executor : executors) {
+                if (!executor.isTerminated()) {
+                    log.warn("Executor {} didn't shutdown after waiting for termination.", executor);
+                    for (Runnable runnable : executor.shutdownNow()) {
+                        log.info("Execution in progress for runnable instance of {}: {}", runnable.getClass(),
+                                runnable);
+                    }
+                }
+            }
+        }
+    }
+
+    private void waitUntilShutdownWaitingThreadIsCompleted() {
+        try {
+            shutdownThreadCompletedLatch.await(terminationTimeout.toNanos()
+                    + SHUTDOWN_THREAD_COMPLETION_TIMEOUT_NANOS, TimeUnit.NANOSECONDS);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
index 0ff1ca1..a326d57 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
@@ -23,6 +23,7 @@ import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.AssertJUnit.assertSame;
 import java.util.Optional;
+import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
@@ -33,12 +34,14 @@ import org.testng.annotations.Test;
 public class PulsarServiceTest {
 
     @Test
-    public void testGetWorkerService() {
+    public void testGetWorkerService() throws Exception {
         ServiceConfiguration configuration = new ServiceConfiguration();
         configuration.setZookeeperServers("localhost");
         configuration.setClusterName("clusterName");
         configuration.setFunctionsWorkerEnabled(true);
+        configuration.setBrokerShutdownTimeoutMs(0L);
         WorkerService expectedWorkerService = mock(WorkerService.class);
+        @Cleanup
         PulsarService pulsarService = spy(new PulsarService(configuration, new WorkerConfig(),
                 Optional.of(expectedWorkerService), (exitCode) -> {}));
 
@@ -56,6 +59,8 @@ public class PulsarServiceTest {
         configuration.setZookeeperServers("localhost");
         configuration.setClusterName("clusterName");
         configuration.setFunctionsWorkerEnabled(false);
+        configuration.setBrokerShutdownTimeoutMs(0L);
+        @Cleanup
         PulsarService pulsarService = new PulsarService(configuration, new WorkerConfig(),
                 Optional.empty(), (exitCode) -> {});
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
index 7e09ab9..4437b43 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
@@ -22,7 +22,6 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
 import java.net.URL;
 import java.util.HashSet;
 import java.util.List;
@@ -33,9 +32,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-
 import lombok.extern.slf4j.Slf4j;
-
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -79,12 +76,13 @@ public class SLAMonitoringTest {
         // start brokers
         for (int i = 0; i < BROKER_COUNT; i++) {
             ServiceConfiguration config = new ServiceConfiguration();
+            config.setBrokerShutdownTimeoutMs(0L);
             config.setBrokerServicePort(Optional.of(0));
+            config.setBrokerShutdownTimeoutMs(0L);
             config.setClusterName("my-cluster");
             config.setAdvertisedAddress("localhost");
             config.setWebServicePort(Optional.of(0));
             config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
-            config.setBrokerServicePort(Optional.of(0));
             config.setDefaultNumberOfNamespaceBundles(1);
             config.setLoadBalancerEnabled(false);
             configurations[i] = config;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java
index 24a32a0..bbc70d0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java
@@ -113,6 +113,7 @@ public class BrokerAdminClientTlsAuthTest extends MockedPulsarServiceBaseTest {
 
         /***** Start Broker 2 ******/
         ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setBrokerShutdownTimeoutMs(0L);
         conf.setBrokerServicePort(Optional.of(0));
         conf.setBrokerServicePortTls(Optional.of(0));
         conf.setWebServicePort(Optional.of(0));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index ed6db6a..3ccf925 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -157,6 +157,7 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
     }
 
     protected void doInitConf() throws Exception {
+        this.conf.setBrokerShutdownTimeoutMs(0L);
         this.conf.setBrokerServicePort(Optional.of(0));
         this.conf.setBrokerServicePortTls(Optional.of(0));
         this.conf.setAdvertisedAddress("localhost");
@@ -243,6 +244,8 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
     protected void stopBroker() throws Exception {
         log.info("Stopping Pulsar broker. brokerServiceUrl: {} webServiceAddress: {}", pulsar.getBrokerServiceUrl(),
                 pulsar.getWebServiceAddress());
+        // set shutdown timeout to 0 for forceful shutdown
+        pulsar.getConfiguration().setBrokerShutdownTimeoutMs(0L);
         pulsar.close();
         pulsar = null;
         // Simulate cleanup of ephemeral nodes
@@ -276,6 +279,7 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
     }
 
     protected PulsarService startBrokerWithoutAuthorization(ServiceConfiguration conf) throws Exception {
+        conf.setBrokerShutdownTimeoutMs(0L);
         PulsarService pulsar = spy(new PulsarService(conf));
         setupBrokerMocks(pulsar);
         pulsar.start();
@@ -431,6 +435,7 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
         configuration.setZookeeperServers("localhost:2181");
         configuration.setConfigurationStoreServers("localhost:3181");
         configuration.setAllowAutoTopicCreationType("non-partitioned");
+        configuration.setBrokerShutdownTimeoutMs(0L);
         configuration.setBrokerServicePort(Optional.of(0));
         configuration.setBrokerServicePortTls(Optional.of(0));
         configuration.setWebServicePort(Optional.of(0));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
index cbb8e40..a827912 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
@@ -111,6 +111,7 @@ public class AntiAffinityNamespaceGroupTest {
         config1.setClusterName("use");
         config1.setWebServicePort(Optional.of(0));
         config1.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config1.setBrokerShutdownTimeoutMs(0L);
         config1.setBrokerServicePort(Optional.of(0));
         config1.setFailureDomainsEnabled(true);
         config1.setLoadBalancerEnabled(true);
@@ -131,6 +132,7 @@ public class AntiAffinityNamespaceGroupTest {
         config2.setClusterName("use");
         config2.setWebServicePort(Optional.of(0));
         config2.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config2.setBrokerShutdownTimeoutMs(0L);
         config2.setBrokerServicePort(Optional.of(0));
         config2.setFailureDomainsEnabled(true);
         config2.setAdvertisedAddress("localhost");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
index 581531d..5f0a1fb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
@@ -67,6 +67,7 @@ public class LeaderElectionServiceTest {
             PulsarAdminException {
         final String clusterName = "elect-test";
         ServiceConfiguration config = new ServiceConfiguration();
+        config.setBrokerShutdownTimeoutMs(0L);
         config.setBrokerServicePort(Optional.of(0));
         config.setWebServicePort(Optional.of(0));
         config.setClusterName(clusterName);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
index 84750da..83c8e7e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
@@ -141,6 +141,7 @@ public class LoadBalancerTest {
             config.setBrokerServicePortTls(Optional.of(0));
             config.setWebServicePortTls(Optional.of(0));
             config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+            config.setBrokerShutdownTimeoutMs(0L);
             config.setBrokerServicePort(Optional.of(0));
             config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
             config.setAdvertisedAddress(localhost+i);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
index 3c6d0f4..d9ebda7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
@@ -147,6 +147,7 @@ public class ModularLoadManagerImplTest {
         config1.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
 
         config1.setAdvertisedAddress("localhost");
+        config1.setBrokerShutdownTimeoutMs(0L);
         config1.setBrokerServicePort(Optional.of(0));
         config1.setBrokerServicePortTls(Optional.of(0));
         config1.setWebServicePortTls(Optional.of(0));
@@ -164,6 +165,7 @@ public class ModularLoadManagerImplTest {
         config2.setWebServicePort(Optional.of(0));
         config2.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
         config2.setAdvertisedAddress("localhost");
+        config2.setBrokerShutdownTimeoutMs(0L);
         config2.setBrokerServicePort(Optional.of(0));
         config2.setBrokerServicePortTls(Optional.of(0));
         config2.setWebServicePortTls(Optional.of(0));
@@ -593,6 +595,7 @@ public class ModularLoadManagerImplTest {
         config.setClusterName("use");
         config.setWebServicePort(Optional.of(0));
         config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config.setBrokerShutdownTimeoutMs(0L);
         config.setBrokerServicePort(Optional.of(0));
         PulsarService pulsar = new PulsarService(config);
         // create znode using different zk-session
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
index cced810..4307955 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
@@ -122,6 +122,7 @@ public class SimpleLoadManagerImplTest {
         config1.setClusterName("use");
         config1.setWebServicePort(Optional.of(0));
         config1.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config1.setBrokerShutdownTimeoutMs(0L);
         config1.setBrokerServicePort(Optional.of(0));
         config1.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
         config1.setBrokerServicePortTls(Optional.of(0));
@@ -140,6 +141,7 @@ public class SimpleLoadManagerImplTest {
         config2.setClusterName("use");
         config2.setWebServicePort(Optional.of(0));
         config2.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config2.setBrokerShutdownTimeoutMs(0L);
         config2.setBrokerServicePort(Optional.of(0));
         config2.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
         config2.setBrokerServicePortTls(Optional.of(0));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
index ec9fde5..c1d15eb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
@@ -109,6 +109,7 @@ public class OwnerShipForCurrentServerTestBase {
             conf.setBookkeeperClientExposeStatsToPrometheus(true);
             conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
 
+            conf.setBrokerShutdownTimeoutMs(0L);
             conf.setBrokerServicePort(Optional.of(0));
             conf.setBrokerServicePortTls(Optional.of(0));
             conf.setAdvertisedAddress("localhost");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
index c5e11c8..592bfff 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
@@ -47,6 +47,7 @@ public class AdvertisedAddressTest {
         bkEnsemble.start();
 
         ServiceConfiguration config = new ServiceConfiguration();
+        config.setBrokerShutdownTimeoutMs(0L);
         config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
         config.setWebServicePort(Optional.ofNullable(0));
         config.setClusterName("usc");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index 1be4aea..c310196 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -82,6 +82,7 @@ public class BacklogQuotaManagerTest {
             config.setAdvertisedAddress("localhost");
             config.setWebServicePort(Optional.of(0));
             config.setClusterName("usc");
+            config.setBrokerShutdownTimeoutMs(0L);
             config.setBrokerServicePort(Optional.of(0));
             config.setAuthorizationEnabled(false);
             config.setAuthenticationEnabled(false);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
index c3e2fe2..2140190 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
@@ -77,6 +77,7 @@ public abstract class BkEnsemblesTestBase extends TestRetrySupport {
             config.setAdvertisedAddress("localhost");
             config.setWebServicePort(Optional.of(0));
             config.setClusterName("usc");
+            config.setBrokerShutdownTimeoutMs(0L);
             config.setBrokerServicePort(Optional.of(0));
             config.setAuthorizationEnabled(false);
             config.setAuthenticationEnabled(false);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
index a984bac..b30150c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
@@ -146,6 +146,7 @@ public class BrokerBookieIsolationTest {
         config.setClusterName(cluster);
         config.setWebServicePort(Optional.of(0));
         config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config.setBrokerShutdownTimeoutMs(0L);
         config.setBrokerServicePort(Optional.of(0));
         config.setAdvertisedAddress("localhost");
         config.setBookkeeperClientIsolationGroups(brokerBookkeeperClientIsolationGroups);
@@ -290,6 +291,7 @@ public class BrokerBookieIsolationTest {
         config.setClusterName(cluster);
         config.setWebServicePort(Optional.of(0));
         config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config.setBrokerShutdownTimeoutMs(0L);
         config.setBrokerServicePort(Optional.of(0));
         config.setAdvertisedAddress("localhost");
         config.setBookkeeperClientIsolationGroups(brokerBookkeeperClientIsolationGroups);
@@ -414,6 +416,7 @@ public class BrokerBookieIsolationTest {
         config.setClusterName(cluster);
         config.setWebServicePort(Optional.of(0));
         config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config.setBrokerShutdownTimeoutMs(0L);
         config.setBrokerServicePort(Optional.of(0));
         config.setAdvertisedAddress("localhost");
         config.setBookkeeperClientIsolationGroups(brokerBookkeeperClientIsolationGroups);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GracefulExecutorServicesShutdownTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GracefulExecutorServicesShutdownTest.java
new file mode 100644
index 0000000..b978507
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GracefulExecutorServicesShutdownTest.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.testng.annotations.Test;
+
+public class GracefulExecutorServicesShutdownTest {
+
+    @Test
+    public void shouldShutdownExecutorsImmediately() throws ExecutionException, InterruptedException, TimeoutException {
+        // given
+        GracefulExecutorServicesShutdown shutdown = GracefulExecutorServicesShutdown.initiate();
+        ExecutorService executorService = mock(ExecutorService.class);
+        when(executorService.isTerminated()).thenReturn(true);
+        when(executorService.isShutdown()).thenReturn(true);
+
+        // when
+        shutdown.shutdown(executorService);
+        CompletableFuture<Void> future = shutdown.handle();
+
+        // then
+        verify(executorService, atLeastOnce()).shutdown();
+        future.get(1, TimeUnit.SECONDS);
+        verify(executorService, never()).shutdownNow();
+        assertTrue(future.isDone());
+    }
+
+    @Test
+    public void shouldTerminateExecutorOnTimeout() throws ExecutionException, InterruptedException, TimeoutException {
+        // given
+        GracefulExecutorServicesShutdown shutdown = GracefulExecutorServicesShutdown.initiate();
+        shutdown.timeout(Duration.ofMillis(500));
+        ExecutorService executorService = mock(ExecutorService.class);
+        when(executorService.isShutdown()).thenReturn(true);
+        AtomicBoolean terminated = new AtomicBoolean();
+        when(executorService.isTerminated()).thenAnswer(invocation -> terminated.get());
+        when(executorService.shutdownNow()).thenAnswer(invocation -> {
+           terminated.set(true);
+           return null;
+        });
+        when(executorService.awaitTermination(anyLong(), any())).thenAnswer(invocation  -> {
+            long timeout = invocation.getArgument(0);
+            TimeUnit unit = invocation.getArgument(1);
+            Thread.sleep(unit.toMillis(timeout));
+            return terminated.get();
+        });
+
+        // when
+        shutdown.shutdown(executorService);
+        CompletableFuture<Void> future = shutdown.handle();
+
+        // then
+        future.get(1, TimeUnit.SECONDS);
+        verify(executorService, atLeastOnce()).shutdownNow();
+        assertTrue(future.isDone());
+    }
+
+    @Test
+    public void shouldWaitForExecutorToTerminate() throws ExecutionException, InterruptedException, TimeoutException {
+        // given
+        GracefulExecutorServicesShutdown shutdown = GracefulExecutorServicesShutdown.initiate();
+        shutdown.timeout(Duration.ofMillis(500));
+        ExecutorService executorService = mock(ExecutorService.class);
+        when(executorService.isShutdown()).thenReturn(true);
+        AtomicBoolean terminated = new AtomicBoolean();
+        when(executorService.isTerminated()).thenAnswer(invocation -> terminated.get());
+        when(executorService.awaitTermination(anyLong(), any())).thenAnswer(invocation  -> {
+            long timeout = invocation.getArgument(0);
+            // wait half the time to simulate the termination completing
+            timeout = timeout / 2;
+            TimeUnit unit = invocation.getArgument(1);
+            Thread.sleep(unit.toMillis(timeout));
+            terminated.set(true);
+            return terminated.get();
+        });
+
+        // when
+        shutdown.shutdown(executorService);
+        CompletableFuture<Void> future = shutdown.handle();
+
+        // then
+        future.get(1, TimeUnit.SECONDS);
+        verify(executorService, times(1)).awaitTermination(anyLong(), any());
+        verify(executorService, never()).shutdownNow();
+        assertTrue(future.isDone());
+    }
+
+
+    @Test
+    public void shouldTerminateWhenFutureIsCancelled() throws InterruptedException {
+        // given
+        GracefulExecutorServicesShutdown shutdown = GracefulExecutorServicesShutdown.initiate();
+        shutdown.timeout(Duration.ofMillis(15000));
+        ExecutorService executorService = mock(ExecutorService.class);
+        when(executorService.isShutdown()).thenReturn(true);
+        AtomicBoolean terminated = new AtomicBoolean();
+        AtomicBoolean awaitTerminationInterrupted = new AtomicBoolean();
+        when(executorService.isTerminated()).thenAnswer(invocation -> terminated.get());
+        when(executorService.awaitTermination(anyLong(), any())).thenAnswer(invocation  -> {
+            long timeout = invocation.getArgument(0);
+            TimeUnit unit = invocation.getArgument(1);
+            try {
+                Thread.sleep(unit.toMillis(timeout));
+            } catch (InterruptedException e) {
+                awaitTerminationInterrupted.set(true);
+                Thread.currentThread().interrupt();
+                throw e;
+            }
+            throw new IllegalStateException("Thread.sleep should have been interrupted");
+        });
+        when(executorService.shutdownNow()).thenAnswer(invocation -> {
+            terminated.set(true);
+            return null;
+        });
+
+        // when
+        shutdown.shutdown(executorService);
+        CompletableFuture<Void> future = shutdown.handle();
+        future.cancel(false);
+
+        // then
+        assertTrue(awaitTerminationInterrupted.get(),
+                "awaitTermination should have been interrupted");
+        verify(executorService, times(1)).awaitTermination(anyLong(), any());
+        verify(executorService, times(1)).shutdownNow();
+    }
+
+    @Test
+    public void shouldAcceptNullReferenceAndIgnoreIt() {
+        ExecutorService executorService = null;
+        CompletableFuture<Void> future = GracefulExecutorServicesShutdown.initiate()
+                .shutdown(executorService)
+                .handle();
+        assertTrue(future.isDone());
+    }
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java
index c91f5da..b0dcb83 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java
@@ -63,6 +63,7 @@ public class MaxMessageSizeTest {
             configuration.setAdvertisedAddress("localhost");
             configuration.setWebServicePort(Optional.of(0));
             configuration.setClusterName("max_message_test");
+            configuration.setBrokerShutdownTimeoutMs(0L);
             configuration.setBrokerServicePort(Optional.of(0));
             configuration.setAuthorizationEnabled(false);
             configuration.setAuthenticationEnabled(false);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index f524b98..e715738 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -112,6 +112,7 @@ public class PersistentDispatcherFailoverConsumerTest {
     @BeforeMethod
     public void setup() throws Exception {
         ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
+        svcConfig.setBrokerShutdownTimeoutMs(0L);
         pulsar = spy(new PulsarService(svcConfig));
         doReturn(svcConfig).when(pulsar).getConfiguration();
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
index 4179a63..875566f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
@@ -79,6 +80,8 @@ public class PersistentTopicConcurrentTest extends MockedBookKeeperTestCase {
     public void setup(Method m) throws Exception {
         super.setUp(m);
         ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
+        svcConfig.setBrokerShutdownTimeoutMs(0L);
+        @Cleanup
         PulsarService pulsar = spy(new PulsarService(svcConfig));
         doReturn(svcConfig).when(pulsar).getConfiguration();
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 4865613..f09e31f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -164,6 +164,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
         executor = OrderedExecutor.newBuilder().numThreads(1).build();
         ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
         svcConfig.setAdvertisedAddress("localhost");
+        svcConfig.setBrokerShutdownTimeoutMs(0L);
         pulsar = spy(new PulsarService(svcConfig));
         doReturn(svcConfig).when(pulsar).getConfiguration();
         doReturn(mock(Compactor.class)).when(pulsar).getCompactor();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index 20b98e0..aeaacf9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -209,6 +209,7 @@ public abstract class ReplicatorTestBase extends TestRetrySupport {
         config.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
         config.setBrokerDeleteInactiveTopicsFrequencySeconds(
                 inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
+        config.setBrokerShutdownTimeoutMs(0L);
         config.setBrokerServicePort(Optional.of(0));
         config.setBrokerServicePortTls(Optional.of(0));
         config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 88ec8da..c5a9335 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -152,6 +152,7 @@ public class ServerCnxTest {
     public void setup() throws Exception {
         executor = OrderedExecutor.newBuilder().numThreads(1).build();
         svcConfig = spy(new ServiceConfiguration());
+        svcConfig.setBrokerShutdownTimeoutMs(0L);
         pulsar = spy(new PulsarService(svcConfig));
         doReturn(new DefaultSchemaRegistryService()).when(pulsar).getSchemaRegistryService();
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
index 7e47559..62ec5f4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
@@ -95,6 +95,7 @@ public class TopicOwnerTest {
         // start brokers
         for (int i = 0; i < BROKER_COUNT; i++) {
             ServiceConfiguration config = new ServiceConfiguration();
+            config.setBrokerShutdownTimeoutMs(0L);
             config.setBrokerServicePort(Optional.of(0));
             config.setClusterName("my-cluster");
             config.setAdvertisedAddress("localhost");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
index 455a3f6..bfa678a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
@@ -110,6 +110,7 @@ public class PersistentSubscriptionTest {
         executor = Executors.newSingleThreadExecutor();
 
         ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
+        svcConfig.setBrokerShutdownTimeoutMs(0L);
         svcConfig.setTransactionCoordinatorEnabled(true);
         pulsarMock = spy(new PulsarService(svcConfig));
         doReturn(new InMemTransactionBufferProvider()).when(pulsarMock).getTransactionBufferProvider();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index 8fc1fd1..62b0aed 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -119,6 +119,7 @@ public abstract class TransactionTestBase extends TestRetrySupport {
             conf.setBookkeeperClientExposeStatsToPrometheus(true);
             conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
 
+            conf.setBrokerShutdownTimeoutMs(0L);
             conf.setBrokerServicePort(Optional.of(0));
             conf.setBrokerServicePortTls(Optional.of(0));
             conf.setAdvertisedAddress("localhost");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
index 16cc475..0457907 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
@@ -64,6 +64,7 @@ public abstract class TransactionMetaStoreTestBase extends TestRetrySupport {
         // start brokers
         for (int i = 0; i < BROKER_COUNT; i++) {
             ServiceConfiguration config = new ServiceConfiguration();
+            config.setBrokerShutdownTimeoutMs(0L);
             config.setBrokerServicePort(Optional.of(0));
             config.setClusterName("my-cluster");
             config.setAdvertisedAddress("localhost");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
index c37d24a..24dd62d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
@@ -337,6 +337,7 @@ public class WebServiceTest {
 
         ServiceConfiguration config = new ServiceConfiguration();
         config.setAdvertisedAddress("localhost");
+        config.setBrokerShutdownTimeoutMs(0L);
         config.setBrokerServicePort(Optional.of(0));
         config.setWebServicePort(Optional.of(0));
         if (enableTls) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index ae89a55..383cd08 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -135,6 +135,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
 
         /**** start broker-2 ****/
         ServiceConfiguration conf2 = new ServiceConfiguration();
+        conf2.setBrokerShutdownTimeoutMs(0L);
         conf2.setBrokerServicePort(Optional.of(0));
         conf2.setWebServicePort(Optional.of(0));
         conf2.setAdvertisedAddress("localhost");
@@ -212,6 +213,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
         final String property = "my-property2";
         ServiceConfiguration conf2 = new ServiceConfiguration();
         conf2.setAdvertisedAddress("localhost");
+        conf2.setBrokerShutdownTimeoutMs(0L);
         conf2.setBrokerServicePort(Optional.of(0));
         conf2.setWebServicePort(Optional.of(0));
         conf2.setAdvertisedAddress("localhost");
@@ -301,6 +303,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
         /**** start broker-2 ****/
         ServiceConfiguration conf2 = new ServiceConfiguration();
         conf2.setAdvertisedAddress("localhost");
+        conf2.setBrokerShutdownTimeoutMs(0L);
         conf2.setBrokerServicePort(Optional.of(0));
         conf2.setWebServicePort(Optional.of(0));
         conf2.setAdvertisedAddress("localhost");
@@ -376,7 +379,9 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
 
         /**** start broker-2 ****/
         ServiceConfiguration conf2 = new ServiceConfiguration();
+        conf2.setBrokerShutdownTimeoutMs(0L);
         conf2.setAdvertisedAddress("localhost");
+        conf2.setBrokerShutdownTimeoutMs(0L);
         conf2.setBrokerServicePort(Optional.of(0));
         conf2.setBrokerServicePortTls(Optional.of(0));
         conf2.setWebServicePort(Optional.of(0));
@@ -810,6 +815,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
         // (1) Start broker-1
         ServiceConfiguration conf2 = new ServiceConfiguration();
         conf2.setAdvertisedAddress("localhost");
+        conf2.setBrokerShutdownTimeoutMs(0L);
         conf2.setBrokerServicePort(Optional.of(0));
         conf2.setWebServicePort(Optional.of(0));
         conf2.setAdvertisedAddress("localhost");
@@ -913,7 +919,9 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
             final String namespace = "my-property/my-ns";
             // (1) Start broker-1
             ServiceConfiguration conf2 = new ServiceConfiguration();
+            conf2.setBrokerShutdownTimeoutMs(0L);
             conf2.setAdvertisedAddress("localhost");
+            conf2.setBrokerShutdownTimeoutMs(0L);
             conf2.setBrokerServicePort(Optional.of(0));
             conf2.setWebServicePort(Optional.of(0));
             conf2.setAdvertisedAddress("localhost");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
index a3ec437..18de76e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
@@ -81,6 +81,7 @@ public class ClientDeduplicationFailureTest {
         config.setClusterName("use");
         config.setWebServicePort(Optional.of(0));
         config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config.setBrokerShutdownTimeoutMs(0L);
         config.setBrokerServicePort(Optional.of(0));
         config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
         config.setTlsAllowInsecureConnection(true);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index 74ca556..deaee35 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -941,6 +941,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
             config1.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
             config1.setBrokerDeleteInactiveTopicsFrequencySeconds(
                     inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
+            config1.setBrokerShutdownTimeoutMs(0L);
             config1.setBrokerServicePort(Optional.of(0));
             config1.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
             config1.setAllowAutoTopicCreationType("non-partitioned");
@@ -966,6 +967,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
             config2.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
             config2.setBrokerDeleteInactiveTopicsFrequencySeconds(
                     inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
+            config2.setBrokerShutdownTimeoutMs(0L);
             config2.setBrokerServicePort(Optional.of(0));
             config2.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
             config2.setAllowAutoTopicCreationType("non-partitioned");
@@ -991,6 +993,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
             config3.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
             config3.setBrokerDeleteInactiveTopicsFrequencySeconds(
                     inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
+            config3.setBrokerShutdownTimeoutMs(0L);
             config3.setBrokerServicePort(Optional.of(0));
             config3.setAllowAutoTopicCreationType("non-partitioned");
             pulsar3 = new PulsarService(config3);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
index 4493698..2b358dd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
@@ -106,6 +106,7 @@ public class ServiceUrlProviderTest extends ProducerConsumerBase {
                 .subscribe();
 
         PulsarService pulsarService1 = pulsar;
+        conf.setBrokerShutdownTimeoutMs(0L);
         conf.setBrokerServicePort(Optional.of(0));
         conf.setWebServicePort(Optional.of(0));
         restartBroker();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
index 7085832..87a664f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
@@ -129,6 +129,7 @@ public class PulsarFunctionE2ESecurityTest {
         config.setSuperUserRoles(superUsers);
         config.setWebServicePort(Optional.of(0));
         config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config.setBrokerShutdownTimeoutMs(0L);
         config.setBrokerServicePort(Optional.of(0));
         config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
         config.setAdvertisedAddress("localhost");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index 365b81f..5dc5197 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -191,6 +191,7 @@ public class PulsarFunctionLocalRunTest {
         config.setWebServicePort(Optional.of(0));
         config.setWebServicePortTls(Optional.of(0));
         config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config.setBrokerShutdownTimeoutMs(0L);
         config.setBrokerServicePort(Optional.of(0));
         config.setBrokerServicePortTls(Optional.of(0));
         config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
index b6474b6..f21aabeb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
@@ -126,6 +126,7 @@ public class PulsarFunctionPublishTest {
         config.setWebServicePort(Optional.of(0));
         config.setWebServicePortTls(Optional.of(0));
         config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config.setBrokerShutdownTimeoutMs(0L);
         config.setBrokerServicePort(Optional.of(0));
         config.setBrokerServicePortTls(Optional.of(0));
         config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
index 61e7e61..e23c25e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
@@ -89,6 +89,7 @@ public class PulsarFunctionTlsTest {
             int webPort = PortManager.nextFreePort();
 
             ServiceConfiguration config = new ServiceConfiguration();
+            config.setBrokerShutdownTimeoutMs(0L);
             config.setWebServicePort(Optional.empty());
             config.setWebServicePortTls(Optional.of(webPort));
             config.setBrokerServicePort(Optional.empty());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
index 2c2e333..b288911 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -92,6 +92,7 @@ public class PulsarWorkerAssignmentTest {
         config.setSuperUserRoles(superUsers);
         config.setWebServicePort(Optional.of(0));
         config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config.setBrokerShutdownTimeoutMs(0L);
         config.setBrokerServicePort(Optional.of(0));
         config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
         config.setAdvertisedAddress("localhost");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
index 2bee28b..d5d83bd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
@@ -127,6 +127,7 @@ public abstract class AbstractPulsarE2ETest {
         config.setWebServicePort(Optional.of(0));
         config.setWebServicePortTls(Optional.of(0));
         config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config.setBrokerShutdownTimeoutMs(0L);
         config.setBrokerServicePort(Optional.of(0));
         config.setBrokerServicePortTls(Optional.of(0));
         config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
index 6d41c33..63f57f0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
@@ -100,6 +100,7 @@ public class PulsarFunctionAdminTest {
         config.setWebServicePort(Optional.of(0));
         config.setWebServicePortTls(Optional.of(0));
         config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config.setBrokerShutdownTimeoutMs(0L);
         config.setBrokerServicePort(Optional.of(0));
         config.setBrokerServicePortTls(Optional.of(0));
         config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index 6dad9a4..ac96b1d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -107,6 +107,7 @@ public class PulsarFunctionTlsTest {
         bkEnsemble.start();
 
         config = spy(new ServiceConfiguration());
+        config.setBrokerShutdownTimeoutMs(0L);
         config.setClusterName("use");
         Set<String> superUsers = Sets.newHashSet("superUser", "admin");
         config.setSuperUserRoles(superUsers);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index d53ecfe..ab1d44e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -97,6 +97,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.SafeCollectionUtils;
 import org.apache.pulsar.common.util.collections.BitSetRecyclable;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 7902178..8033c18 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.client.util.ConsumerName;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/CompletableFutureCancellationHandler.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/CompletableFutureCancellationHandler.java
similarity index 74%
rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/CompletableFutureCancellationHandler.java
rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/CompletableFutureCancellationHandler.java
index 1deb387..f2de511 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/CompletableFutureCancellationHandler.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/CompletableFutureCancellationHandler.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.impl;
+package org.apache.pulsar.common.util;
 
 import java.util.Objects;
 import java.util.concurrent.CancellationException;
@@ -41,10 +41,16 @@ import java.util.function.BiConsumer;
  * any "downstream" dependent futures. A cancellation or timeout that happens in any "upstream"
  * future will get handled.
  */
-class CompletableFutureCancellationHandler {
-    private volatile boolean cancelled;
+public class CompletableFutureCancellationHandler {
+    private enum CompletionStatus {
+        PENDING,
+        CANCELLED,
+        DONE
+    }
+    private volatile CompletionStatus completionStatus = CompletionStatus.PENDING;
     private volatile Runnable cancelAction;
     private final AtomicBoolean cancelHandled = new AtomicBoolean();
+    private boolean attached;
 
     /**
      * Creates a new {@link CompletableFuture} and attaches the cancellation handler
@@ -61,12 +67,15 @@ class CompletableFutureCancellationHandler {
 
     /**
      * Attaches the cancellation handler to handle cancels
-     * and timeouts
+     * and timeouts. A cancellation handler instance can be used only once.
      *
      * @param future the future to attach the handler to
-     * @param <T>    the result type of the future
      */
-    public <T> void attachToFuture(CompletableFuture<T> future) {
+    public synchronized void attachToFuture(CompletableFuture<?> future) {
+        if (attached) {
+            throw new IllegalStateException("A future has already been attached to this instance.");
+        }
+        attached = true;
         future.whenComplete(whenCompleteFunction());
     }
 
@@ -81,7 +90,7 @@ class CompletableFutureCancellationHandler {
      * @param cancelAction the action to run when the the future gets cancelled or timeouts
      */
     public void setCancelAction(Runnable cancelAction) {
-        if (this.cancelAction != null) {
+        if (this.cancelAction != null || cancelHandled.get()) {
             throw new IllegalStateException("cancelAction can only be set once.");
         }
         this.cancelAction = Objects.requireNonNull(cancelAction);
@@ -89,18 +98,25 @@ class CompletableFutureCancellationHandler {
         runCancelActionOnceIfCancelled();
     }
 
-    private <T> BiConsumer<? super T, ? super Throwable> whenCompleteFunction() {
-        return (T t, Throwable throwable) -> {
+    private BiConsumer<Object, ? super Throwable> whenCompleteFunction() {
+        return (v, throwable) -> {
             if (throwable instanceof CancellationException || throwable instanceof TimeoutException) {
-                cancelled = true;
+                completionStatus = CompletionStatus.CANCELLED;
+            } else {
+                completionStatus = CompletionStatus.DONE;
             }
             runCancelActionOnceIfCancelled();
         };
     }
 
     private void runCancelActionOnceIfCancelled() {
-        if (cancelled && cancelAction != null && cancelHandled.compareAndSet(false, true)) {
-            cancelAction.run();
+        if (completionStatus != CompletionStatus.PENDING && cancelAction != null &&
+                cancelHandled.compareAndSet(false, true)) {
+            if (completionStatus == CompletionStatus.CANCELLED) {
+                cancelAction.run();
+            }
+            // clear cancel action reference when future completes
+            cancelAction = null;
         }
     }
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index 53b6deb..7356950 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -36,13 +36,52 @@ public class FutureUtil {
     /**
      * Return a future that represents the completion of the futures in the provided list.
      *
-     * @param futures
-     * @return
+     * @param futures futures to wait for
+     * @return a new CompletableFuture that is completed when all of the given CompletableFutures complete
      */
-    public static <T> CompletableFuture<Void> waitForAll(List<CompletableFuture<T>> futures) {
+    public static CompletableFuture<Void> waitForAll(List<? extends CompletableFuture<?>> futures) {
         return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
     }
 
+
+    /**
+     * Return a future that represents the completion of the futures in the provided list.
+     * The future will support {@link CompletableFuture#cancel(boolean)}. It will cancel
+     * all unfinished futures when the future gets cancelled.
+     *
+     * @param futures futures to wait for
+     * @return a new CompletableFuture that is completed when all of the given CompletableFutures complete
+     */
+    public static CompletableFuture<Void> waitForAllAndSupportCancel(List<? extends CompletableFuture<?>> futures) {
+        CompletableFuture[] futuresArray = futures.toArray(new CompletableFuture[0]);
+        CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(futuresArray);
+        whenCancelledOrTimedOut(combinedFuture, () -> {
+            for (CompletableFuture completableFuture : futuresArray) {
+                if (!completableFuture.isDone()) {
+                    completableFuture.cancel(false);
+                }
+            }
+        });
+        return combinedFuture;
+    }
+
+    /**
+     * If the future is cancelled or times out, the cancel action will be
+     * invoked
+     *
+     * The action is executed once if the future completes with
+     * {@link java.util.concurrent.CancellationException} or {@link TimeoutException}
+     *
+     * @param future future to attach the action to
+     * @param cancelAction action to invoke if the future is cancelled or times out
+     */
+    public static void whenCancelledOrTimedOut(CompletableFuture<?> future, Runnable cancelAction) {
+        CompletableFutureCancellationHandler cancellationHandler =
+                new CompletableFutureCancellationHandler();
+        cancellationHandler.setCancelAction(cancelAction);
+        cancellationHandler.attachToFuture(future);
+    }
+
     public static <T> CompletableFuture<T> failedFuture(Throwable t) {
         CompletableFuture<T> future = new CompletableFuture<>();
         future.completeExceptionally(t);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/EventLoopUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/EventLoopUtil.java
index 8e10bcc..a451dff 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/EventLoopUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/EventLoopUtil.java
@@ -34,6 +34,7 @@ import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioDatagramChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadFactory;
 
 @SuppressWarnings("checkstyle:JavadocType")
@@ -86,4 +87,13 @@ public class EventLoopUtil {
             bootstrap.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
         }
     }
+
+    /**
+     * Shutdowns the EventLoopGroup gracefully. Returns a {@link CompletableFuture}
+     * @param eventLoopGroup the event loop to shutdown
+     * @return CompletableFuture that completes when the shutdown has completed
+     */
+    public static CompletableFuture<Void> shutdownGracefully(EventLoopGroup eventLoopGroup) {
+        return NettyFutureUtil.toCompletableFutureVoid(eventLoopGroup.shutdownGracefully());
+    }
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/NettyFutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/NettyFutureUtil.java
new file mode 100644
index 0000000..df9d24f
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/NettyFutureUtil.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.netty;
+
+import io.netty.util.concurrent.Future;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Contains utility methods for working with Netty Futures
+ */
+public class NettyFutureUtil {
+    /**
+     * Converts a Netty {@link Future} to {@link CompletableFuture}
+     *
+     * @param future Netty future
+     * @param <V>    value type
+     * @return converted future instance
+     */
+    public static <V> CompletableFuture<V> toCompletableFuture(Future<V> future) {
+        Objects.requireNonNull(future, "future cannot be null");
+
+        CompletableFuture<V> adapter = new CompletableFuture<>();
+        if (future.isDone()) {
+            if (future.isSuccess()) {
+                adapter.complete(future.getNow());
+            } else {
+                adapter.completeExceptionally(future.cause());
+            }
+        } else {
+            future.addListener((Future<V> f) -> {
+                if (f.isSuccess()) {
+                    adapter.complete(f.getNow());
+                } else {
+                    adapter.completeExceptionally(f.cause());
+                }
+            });
+        }
+        return adapter;
+    }
+
+    /**
+     * Converts a Netty {@link Future} to {@link CompletableFuture} with Void type
+     *
+     * @param future Netty future
+     * @return converted future instance
+     */
+    public static CompletableFuture<Void> toCompletableFutureVoid(Future<?> future) {
+        Objects.requireNonNull(future, "future cannot be null");
+
+        CompletableFuture<Void> adapter = new CompletableFuture<>();
+        if (future.isDone()) {
+            if (future.isSuccess()) {
+                adapter.complete(null);
+            } else {
+                adapter.completeExceptionally(future.cause());
+            }
+        } else {
+            future.addListener(f -> {
+                if (f.isSuccess()) {
+                    adapter.complete(null);
+                } else {
+                    adapter.completeExceptionally(f.cause());
+                }
+            });
+        }
+        return adapter;
+    }
+}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/CompletableFutureCancellationHandlerTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/CompletableFutureCancellationHandlerTest.java
similarity index 98%
rename from pulsar-client/src/test/java/org/apache/pulsar/client/impl/CompletableFutureCancellationHandlerTest.java
rename to pulsar-common/src/test/java/org/apache/pulsar/common/util/CompletableFutureCancellationHandlerTest.java
index 9582b73..337e607 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/CompletableFutureCancellationHandlerTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/CompletableFutureCancellationHandlerTest.java
@@ -16,15 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.impl;
-
-import org.testng.annotations.Test;
+package org.apache.pulsar.common.util;
 
+import static org.testng.Assert.assertTrue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.testng.Assert.assertTrue;
+import org.testng.annotations.Test;
 
 public class CompletableFutureCancellationHandlerTest {