You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/05/28 17:21:39 UTC

[pulsar] branch master updated: Don't run Runtime#halt in tests (#7066)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bb7f47f  Don't run Runtime#halt in tests (#7066)
bb7f47f is described below

commit bb7f47fca2cb5e284346d510c0ccec891fa2ff43
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu May 28 10:21:21 2020 -0700

    Don't run Runtime#halt in tests (#7066)
    
    When you trigger Runtime#halt from a test, as was happening on ZK
    session loss, it kills the test without reporting a result. We
    shouldn't do that. Instead, reserve halt for production code, and just
    log for tests.
    
    Co-authored-by: Ivan Kelly <ik...@splunk.com>
---
 .../org/apache/pulsar/PulsarBrokerStarter.java     |  8 ++++-
 .../java/org/apache/pulsar/PulsarStandalone.java   |  7 +++-
 .../broker/MessagingServiceShutdownHook.java       | 11 ++++---
 .../apache/pulsar/broker/NoOpShutdownService.java  | 38 ----------------------
 .../org/apache/pulsar/broker/PulsarService.java    | 17 +++++-----
 .../apache/pulsar/broker/SLAMonitoringTest.java    |  2 --
 .../broker/auth/MockedPulsarServiceBaseTest.java   |  2 --
 .../AntiAffinityNamespaceGroupTest.java            |  3 --
 .../broker/loadbalance/LoadBalancerTest.java       |  2 --
 .../loadbalance/ModularLoadManagerImplTest.java    |  4 ---
 .../loadbalance/SimpleLoadManagerImplTest.java     |  3 --
 .../broker/service/AdvertisedAddressTest.java      |  2 --
 .../broker/service/BacklogQuotaManagerTest.java    |  2 --
 .../pulsar/broker/service/BkEnsemblesTestBase.java |  2 --
 .../broker/service/BrokerBookieIsolationTest.java  |  4 ---
 .../pulsar/broker/service/MaxMessageSizeTest.java  |  2 --
 .../PersistentDispatcherFailoverConsumerTest.java  |  2 --
 .../service/PersistentTopicConcurrentTest.java     |  2 --
 .../pulsar/broker/service/PersistentTopicTest.java |  2 --
 .../pulsar/broker/service/ReplicatorTestBase.java  |  4 ---
 .../pulsar/broker/service/ServerCnxTest.java       |  2 --
 .../pulsar/broker/service/TopicOwnerTest.java      |  2 --
 .../persistent/PersistentSubscriptionTest.java     |  2 --
 .../buffer/PersistentTransactionBufferTest.java    |  2 --
 .../coordinator/TransactionMetaStoreTestBase.java  |  2 --
 .../apache/pulsar/broker/web/WebServiceTest.java   |  2 --
 .../client/api/ClientDeduplicationFailureTest.java |  4 +--
 .../pulsar/client/api/NonPersistentTopicTest.java  |  4 ---
 .../worker/PulsarFunctionE2ESecurityTest.java      |  4 +--
 .../worker/PulsarFunctionLocalRunTest.java         |  6 ++--
 .../worker/PulsarFunctionPublishTest.java          |  6 ++--
 .../worker/PulsarWorkerAssignmentTest.java         |  6 ++--
 .../apache/pulsar/io/PulsarFunctionAdminTest.java  |  6 ++--
 .../apache/pulsar/io/PulsarFunctionE2ETest.java    |  4 +--
 34 files changed, 40 insertions(+), 131 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
index 42a2cf1..c5b1aa7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
@@ -207,7 +207,13 @@ public class PulsarBrokerStarter {
             }
 
             // init pulsar service
-            pulsarService = new PulsarService(brokerConfig, Optional.ofNullable(functionsWorkerService));
+            pulsarService = new PulsarService(brokerConfig,
+                                              Optional.ofNullable(functionsWorkerService),
+                                              (exitCode) -> {
+                                                  log.info("Halting broker process with code {}",
+                                                           exitCode);
+                                                  Runtime.getRuntime().halt(exitCode);
+                                              });
 
             // if no argument to run bookie in cmd line, read from pulsar config
             if (!argsContains(args, "-rb") && !argsContains(args, "--run-bookie")) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
index be1b276..3fab76f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
@@ -315,7 +315,12 @@ public class PulsarStandalone implements AutoCloseable {
         }
 
         // Start Broker
-        broker = new PulsarService(config, Optional.ofNullable(fnWorkerService));
+        broker = new PulsarService(config,
+                                   Optional.ofNullable(fnWorkerService),
+                                   (exitCode) -> {
+                                       log.info("Halting standalone process with code {}", exitCode);
+                                       Runtime.getRuntime().halt(exitCode);
+                                   });
         broker.start();
 
         broker.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java
index e09a589..6d1768c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
 
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher.ShutdownService;
@@ -40,9 +41,11 @@ public class MessagingServiceShutdownHook extends Thread implements ShutdownServ
     private static final String LogbackLoggerContextClassName = "ch.qos.logback.classic.LoggerContext";
 
     private PulsarService service = null;
+    private final Consumer<Integer> processTerminator;
 
-    public MessagingServiceShutdownHook(PulsarService service) {
+    public MessagingServiceShutdownHook(PulsarService service, Consumer<Integer> processTerminator) {
         this.service = service;
+        this.processTerminator = processTerminator;
     }
 
     @Override
@@ -76,8 +79,9 @@ public class MessagingServiceShutdownHook extends Thread implements ShutdownServ
         } finally {
 
             immediateFlushBufferedLogs();
+
             // always put system to halt immediately
-            Runtime.getRuntime().halt(0);
+            processTerminator.accept(0);
         }
     }
 
@@ -96,8 +100,7 @@ public class MessagingServiceShutdownHook extends Thread implements ShutdownServ
 
         LOG.info("Invoking Runtime.halt({})", exitCode);
         immediateFlushBufferedLogs();
-        Runtime.getRuntime().halt(exitCode);
-
+        processTerminator.accept(exitCode);
     }
 
     public static void immediateFlushBufferedLogs() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/NoOpShutdownService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/NoOpShutdownService.java
deleted file mode 100644
index 6c70e9a..0000000
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/NoOpShutdownService.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher.ShutdownService;
-
-@Slf4j
-public class NoOpShutdownService implements ShutdownService {
-
-    @Override
-    public void run() {
-        shutdown(0);
-    }
-
-    @Override
-    public void shutdown(int exitCode) {
-        log.warn("Invoked shutdown with exitCode={}", exitCode);
-    }
-
-}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 21a73a0..2584955 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -50,6 +50,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 
 import lombok.AccessLevel;
@@ -188,7 +189,7 @@ public class PulsarService implements AutoCloseable {
     private final Optional<WorkerService> functionWorkerService;
     private ProtocolHandlers protocolHandlers = null;
 
-    private ShutdownService shutdownService;
+    private final ShutdownService shutdownService;
 
     private MetricsGenerator metricsGenerator;
     private TransactionMetadataStoreService transactionMetadataStoreService;
@@ -203,10 +204,14 @@ public class PulsarService implements AutoCloseable {
     private final Condition isClosedCondition = mutex.newCondition();
 
     public PulsarService(ServiceConfiguration config) {
-        this(config, Optional.empty());
+        this(config, Optional.empty(), (exitCode) -> {
+                LOG.info("Process termination requested with code {}. "
+                         + "Ignoring, as this constructor is intended for tests. ", exitCode);
+            });
     }
 
-    public PulsarService(ServiceConfiguration config, Optional<WorkerService> functionWorkerService) {
+    public PulsarService(ServiceConfiguration config, Optional<WorkerService> functionWorkerService,
+                         Consumer<Integer> processTerminator) {
         // Validate correctness of configuration
         PulsarConfigurationLoader.isComplete(config);
 
@@ -215,7 +220,7 @@ public class PulsarService implements AutoCloseable {
         this.advertisedAddress = advertisedAddress(config);
         this.brokerVersion = PulsarVersion.getVersion();
         this.config = config;
-        this.shutdownService = new MessagingServiceShutdownHook(this);
+        this.shutdownService = new MessagingServiceShutdownHook(this, processTerminator);
         this.loadManagerExecutor = Executors
                 .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-load-manager"));
         this.functionWorkerService = functionWorkerService;
@@ -1029,10 +1034,6 @@ public class PulsarService implements AutoCloseable {
         return transactionMetadataStoreService;
     }
 
-    public void setShutdownService(ShutdownService shutdownService) {
-        this.shutdownService = shutdownService;
-    }
-
     public ShutdownService getShutdownService() {
         return shutdownService;
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
index 41ae985..eb7a555 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
@@ -85,7 +85,6 @@ public class SLAMonitoringTest {
             configurations[i] = config;
 
             pulsarServices[i] = new PulsarService(config);
-            pulsarServices[i].setShutdownService(new NoOpShutdownService());
             pulsarServices[i].start();
 
             brokerWebServicePorts[i] = pulsarServices[i].getListenPortHTTP().get();
@@ -215,7 +214,6 @@ public class SLAMonitoringTest {
         // Check if the namespace is properly unloaded and reowned by the broker
         try {
             pulsarServices[crashIndex] = new PulsarService(configurations[crashIndex]);
-            pulsarServices[crashIndex].setShutdownService(new NoOpShutdownService());
             pulsarServices[crashIndex].start();
 
             // Port for the broker will have changed since it's dynamically allocated
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 53e629b..26ea536 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -45,7 +45,6 @@ import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.PulsarMockBookKeeper;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.broker.BookKeeperClientFactory;
-import org.apache.pulsar.broker.NoOpShutdownService;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.namespace.NamespaceService;
@@ -210,7 +209,6 @@ public abstract class MockedPulsarServiceBaseTest {
 
     protected PulsarService startBroker(ServiceConfiguration conf) throws Exception {
         PulsarService pulsar = spy(new PulsarService(conf));
-        pulsar.setShutdownService(new NoOpShutdownService());
 
         setupBrokerMocks(pulsar);
         boolean isAuthorizationEnabled = conf.isAuthorizationEnabled();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
index 0951da4..229ee84 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
@@ -41,7 +41,6 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.pulsar.broker.NoOpShutdownService;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
@@ -116,7 +115,6 @@ public class AntiAffinityNamespaceGroupTest {
         config1.setAdvertisedAddress("localhost");
         createCluster(bkEnsemble.getZkClient(), config1);
         pulsar1 = new PulsarService(config1);
-        pulsar1.setShutdownService(new NoOpShutdownService());
         pulsar1.start();
 
         primaryHost = String.format("%s:%d", "localhost", pulsar1.getListenPortHTTP().get());
@@ -132,7 +130,6 @@ public class AntiAffinityNamespaceGroupTest {
         config2.setBrokerServicePort(Optional.of(0));
         config2.setFailureDomainsEnabled(true);
         pulsar2 = new PulsarService(config2);
-        pulsar2.setShutdownService(new NoOpShutdownService());
         pulsar2.start();
 
         secondaryHost = String.format("%s:%d", "localhost", pulsar2.getListenPortHTTP().get());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
index d52a3ec..0ef5e6f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
@@ -54,7 +54,6 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.pulsar.broker.NoOpShutdownService;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
@@ -147,7 +146,6 @@ public class LoadBalancerTest {
             config.setLoadBalancerEnabled(false);
 
             pulsarServices[i] = new PulsarService(config);
-            pulsarServices[i].setShutdownService(new NoOpShutdownService());
             pulsarServices[i].start();
             brokerWebServicePorts[i] = pulsarServices[i].getListenPortHTTP().get();
             brokerNativeBrokerPorts[i] = pulsarServices[i].getBrokerListenPort().get();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
index ee05ce5..0a43889 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
@@ -52,7 +52,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.broker.BrokerData;
 import org.apache.pulsar.broker.BundleData;
-import org.apache.pulsar.broker.NoOpShutdownService;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -151,7 +150,6 @@ public class ModularLoadManagerImplTest {
         config1.setBrokerServicePortTls(Optional.of(0));
         config1.setWebServicePortTls(Optional.of(0));
         pulsar1 = new PulsarService(config1);
-        pulsar1.setShutdownService(new NoOpShutdownService());
         pulsar1.start();
 
         primaryHost = String.format("%s:%d", "localhost", pulsar1.getListenPortHTTP().get());
@@ -169,7 +167,6 @@ public class ModularLoadManagerImplTest {
         config2.setBrokerServicePortTls(Optional.of(0));
         config2.setWebServicePortTls(Optional.of(0));
         pulsar2 = new PulsarService(config2);
-        pulsar2.setShutdownService(new NoOpShutdownService());
         pulsar2.start();
 
         secondaryHost = String.format("%s:%d", "localhost", pulsar2.getListenPortHTTP().get());
@@ -597,7 +594,6 @@ public class ModularLoadManagerImplTest {
         config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
         config.setBrokerServicePort(Optional.of(0));
         PulsarService pulsar = new PulsarService(config);
-        pulsar.setShutdownService(new NoOpShutdownService());
         // create znode using different zk-session
         final String brokerZnode = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + pulsar.getAdvertisedAddress() + ":"
                 + config.getWebServicePort();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
index 0a814e7..999987b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
@@ -51,7 +51,6 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.commons.lang3.SystemUtils;
-import org.apache.pulsar.broker.NoOpShutdownService;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
@@ -130,7 +129,6 @@ public class SimpleLoadManagerImplTest {
         config1.setWebServicePortTls(Optional.of(0));
         config1.setAdvertisedAddress("localhost");
         pulsar1 = new PulsarService(config1);
-        pulsar1.setShutdownService(new NoOpShutdownService());
         pulsar1.start();
 
         url1 = new URL(pulsar1.getWebServiceAddress());
@@ -148,7 +146,6 @@ public class SimpleLoadManagerImplTest {
         config2.setBrokerServicePortTls(Optional.of(0));
         config2.setWebServicePortTls(Optional.of(0));
         pulsar2 = new PulsarService(config2);
-        pulsar2.setShutdownService(new NoOpShutdownService());
         pulsar2.start();
 
         url2 = new URL(pulsar2.getWebServiceAddress());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
index fa012f4..593c7af 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
@@ -24,7 +24,6 @@ import com.google.gson.JsonObject;
 import java.nio.charset.StandardCharsets;
 import java.util.Optional;
 
-import org.apache.pulsar.broker.NoOpShutdownService;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
@@ -56,7 +55,6 @@ public class AdvertisedAddressTest {
         config.setManagedLedgerMaxEntriesPerLedger(5);
         config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
         pulsar = new PulsarService(config);
-        pulsar.setShutdownService(new NoOpShutdownService());
         pulsar.start();
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index ccd021b..b786c28 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -33,7 +33,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.pulsar.broker.ConfigHelper;
-import org.apache.pulsar.broker.NoOpShutdownService;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -90,7 +89,6 @@ public class BacklogQuotaManagerTest {
             config.setAllowAutoTopicCreationType("non-partitioned");
 
             pulsar = new PulsarService(config);
-            pulsar.setShutdownService(new NoOpShutdownService());
             pulsar.start();
 
             adminUrl = new URL("http://127.0.0.1" + ":" + pulsar.getListenPortHTTP().get());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
index 04845fd..a4bfe05 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.broker.service;
 
 import java.util.Optional;
 
-import org.apache.pulsar.broker.NoOpShutdownService;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -80,7 +79,6 @@ public abstract class BkEnsemblesTestBase {
             config.setAllowAutoTopicCreationType("non-partitioned");
 
             pulsar = new PulsarService(config);
-            pulsar.setShutdownService(new NoOpShutdownService());
             pulsar.start();
 
             admin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getWebServiceAddress()).build();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
index feea115..3aac32f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
@@ -44,7 +44,6 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.pulsar.broker.ManagedLedgerClientFactory;
-import org.apache.pulsar.broker.NoOpShutdownService;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
@@ -160,7 +159,6 @@ public class BrokerBookieIsolationTest {
         config.setManagedLedgerMaxEntriesPerLedger(totalEntriesPerLedger);
         config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
         pulsarService = new PulsarService(config);
-        pulsarService.setShutdownService(new NoOpShutdownService());
         pulsarService.start();
 
 
@@ -293,7 +291,6 @@ public class BrokerBookieIsolationTest {
         config.setManagedLedgerMaxEntriesPerLedger(totalEntriesPerLedger);
         config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
         pulsarService = new PulsarService(config);
-        pulsarService.setShutdownService(new NoOpShutdownService());
         pulsarService.start();
 
         PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarService.getWebServiceAddress()).build();
@@ -413,7 +410,6 @@ public class BrokerBookieIsolationTest {
 
         config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
         pulsarService = new PulsarService(config);
-        pulsarService.setShutdownService(new NoOpShutdownService());
         pulsarService.start();
 
         PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarService.getWebServiceAddress()).build();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java
index 139aefb..7dd9969 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java
@@ -24,7 +24,6 @@ import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.pulsar.broker.NoOpShutdownService;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -70,7 +69,6 @@ public class MaxMessageSizeTest {
             configuration.setMaxMessageSize(10 * 1024 * 1024);
 
             pulsar = new PulsarService(configuration);
-            pulsar.setShutdownService(new NoOpShutdownService());
             pulsar.start();
 
             String url = "http://127.0.0.1:" + pulsar.getListenPortHTTP().get();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 2b4976b..8f31a96 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -63,7 +63,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.pulsar.broker.NoOpShutdownService;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
@@ -112,7 +111,6 @@ public class PersistentDispatcherFailoverConsumerTest {
     public void setup() throws Exception {
         ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
         PulsarService pulsar = spy(new PulsarService(svcConfig));
-        pulsar.setShutdownService(new NoOpShutdownService());
         doReturn(svcConfig).when(pulsar).getConfiguration();
 
         mlFactoryMock = mock(ManagedLedgerFactory.class);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
index da8a203..cfc3937 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
@@ -40,7 +40,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
-import org.apache.pulsar.broker.NoOpShutdownService;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.namespace.NamespaceService;
@@ -80,7 +79,6 @@ public class PersistentTopicConcurrentTest extends MockedBookKeeperTestCase {
         super.setUp(m);
         ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
         PulsarService pulsar = spy(new PulsarService(svcConfig));
-        pulsar.setShutdownService(new NoOpShutdownService());
         doReturn(svcConfig).when(pulsar).getConfiguration();
 
         mlFactoryMock = mock(ManagedLedgerFactory.class);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 37cdec6..3c66d92 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -82,7 +82,6 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
-import org.apache.pulsar.broker.NoOpShutdownService;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
@@ -161,7 +160,6 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
         executor = OrderedExecutor.newBuilder().numThreads(1).build();
         ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
         pulsar = spy(new PulsarService(svcConfig));
-        pulsar.setShutdownService(new NoOpShutdownService());
         doReturn(svcConfig).when(pulsar).getConfiguration();
         doReturn(mock(Compactor.class)).when(pulsar).getCompactor();
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index a53e39a..b427273 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -34,7 +34,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.pulsar.broker.NoOpShutdownService;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -129,7 +128,6 @@ public class ReplicatorTestBase {
         config1.setDefaultNumberOfNamespaceBundles(1);
         config1.setAllowAutoTopicCreationType("non-partitioned");
         pulsar1 = new PulsarService(config1);
-        pulsar1.setShutdownService(new NoOpShutdownService());
         pulsar1.start();
         ns1 = pulsar1.getBrokerService();
 
@@ -161,7 +159,6 @@ public class ReplicatorTestBase {
         config2.setDefaultNumberOfNamespaceBundles(1);
         config2.setAllowAutoTopicCreationType("non-partitioned");
         pulsar2 = new PulsarService(config2);
-        pulsar2.setShutdownService(new NoOpShutdownService());
         pulsar2.start();
         ns2 = pulsar2.getBrokerService();
 
@@ -193,7 +190,6 @@ public class ReplicatorTestBase {
         config3.setDefaultNumberOfNamespaceBundles(1);
         config3.setAllowAutoTopicCreationType("non-partitioned");
         pulsar3 = new PulsarService(config3);
-        pulsar3.setShutdownService(new NoOpShutdownService());
         pulsar3.start();
         ns3 = pulsar3.getBrokerService();
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index aa71801..51859d0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -65,7 +65,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.pulsar.broker.NoOpShutdownService;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
@@ -153,7 +152,6 @@ public class ServerCnxTest {
         executor = OrderedExecutor.newBuilder().numThreads(1).build();
         svcConfig = spy(new ServiceConfiguration());
         pulsar = spy(new PulsarService(svcConfig));
-        pulsar.setShutdownService(new NoOpShutdownService());
         doReturn(new DefaultSchemaRegistryService()).when(pulsar).getSchemaRegistryService();
 
         svcConfig.setKeepAliveIntervalSeconds(inSec(1, TimeUnit.SECONDS));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
index 0ad1824..3328f39 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.broker.service;
 
 import com.google.common.collect.Sets;
-import org.apache.pulsar.broker.NoOpShutdownService;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -67,7 +66,6 @@ public class TopicOwnerTest {
             configurations[i] = config;
 
             pulsarServices[i] = new PulsarService(config);
-            pulsarServices[i].setShutdownService(new NoOpShutdownService());
             pulsarServices[i].start();
 
             pulsarAdmins[i] = PulsarAdmin.builder()
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
index 038b22d..d8bf47d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
@@ -49,7 +49,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.pulsar.broker.NoOpShutdownService;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
@@ -105,7 +104,6 @@ public class PersistentSubscriptionTest {
 
         ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
         pulsarMock = spy(new PulsarService(svcConfig));
-        pulsarMock.setShutdownService(new NoOpShutdownService());
         doReturn(svcConfig).when(pulsarMock).getConfiguration();
         doReturn(mock(Compactor.class)).when(pulsarMock).getCompactor();
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/PersistentTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/PersistentTransactionBufferTest.java
index 4bb508c..2146eba 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/PersistentTransactionBufferTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/PersistentTransactionBufferTest.java
@@ -65,7 +65,6 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.pulsar.broker.NoOpShutdownService;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
@@ -123,7 +122,6 @@ public class PersistentTransactionBufferTest extends MockedBookKeeperTestCase {
     public void setup() throws Exception {
         ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
         pulsar = spy(new PulsarService(svcConfig));
-        pulsar.setShutdownService(new NoOpShutdownService());
         doReturn(svcConfig).when(pulsar).getConfiguration();
         doReturn(mock(Compactor.class)).when(pulsar).getCompactor();
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
index 225110f..325bf6c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.broker.transaction.coordinator;
 import java.util.Optional;
 
 import org.apache.pulsar.PulsarTransactionCoordinatorMetadataSetup;
-import org.apache.pulsar.broker.NoOpShutdownService;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -72,7 +71,6 @@ public class TransactionMetaStoreTestBase {
             configurations[i] = config;
 
             pulsarServices[i] = new PulsarService(config);
-            pulsarServices[i].setShutdownService(new NoOpShutdownService());
             pulsarServices[i].start();
 
             pulsarAdmins[i] = PulsarAdmin.builder()
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
index fb42747..b9577e6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
@@ -58,7 +58,6 @@ import org.apache.http.entity.ByteArrayEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.pulsar.broker.MockedBookKeeperClientFactory;
-import org.apache.pulsar.broker.NoOpShutdownService;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -315,7 +314,6 @@ public class WebServiceTest {
         config.setZookeeperServers("localhost:2181");
         config.setHttpMaxRequestSize(10 * 1024);
         pulsar = spy(new PulsarService(config));
-        pulsar.setShutdownService(new NoOpShutdownService());
         doReturn(zkFactory).when(pulsar).getZooKeeperClientFactory();
         doReturn(new MockedBookKeeperClientFactory()).when(pulsar).newBookKeeperClientFactory();
         pulsar.start();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
index 9fd1a8d..c93968a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
@@ -43,7 +43,6 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.pulsar.broker.NoOpShutdownService;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
@@ -98,8 +97,7 @@ public class ClientDeduplicationFailureTest {
         config.setAllowAutoTopicCreationType("non-partitioned");
 
 
-        pulsar = new PulsarService(config, Optional.empty());
-        pulsar.setShutdownService(new NoOpShutdownService());
+        pulsar = new PulsarService(config);
         pulsar.start();
 
         String brokerServiceUrl = pulsar.getWebServiceAddress();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index 9232642..49ee0a7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -41,7 +41,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.pulsar.broker.NoOpShutdownService;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
@@ -893,7 +892,6 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
             config1.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
             config1.setAllowAutoTopicCreationType("non-partitioned");
             pulsar1 = new PulsarService(config1);
-            pulsar1.setShutdownService(new NoOpShutdownService());
             pulsar1.start();
             ns1 = pulsar1.getBrokerService();
 
@@ -919,7 +917,6 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
             config2.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
             config2.setAllowAutoTopicCreationType("non-partitioned");
             pulsar2 = new PulsarService(config2);
-            pulsar2.setShutdownService(new NoOpShutdownService());
             pulsar2.start();
             ns2 = pulsar2.getBrokerService();
 
@@ -944,7 +941,6 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
             config3.setBrokerServicePort(Optional.of(0));
             config3.setAllowAutoTopicCreationType("non-partitioned");
             pulsar3 = new PulsarService(config3);
-            pulsar3.setShutdownService(new NoOpShutdownService());
             pulsar3.start();
             ns3 = pulsar3.getBrokerService();
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
index 4e68681..48c40f3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
@@ -44,7 +44,6 @@ import java.util.concurrent.TimeUnit;
 
 import javax.crypto.SecretKey;
 
-import org.apache.pulsar.broker.NoOpShutdownService;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
@@ -157,8 +156,7 @@ public class PulsarFunctionE2ESecurityTest {
                 "token:" +  adminToken);
         functionsWorkerService = createPulsarFunctionWorker(config);
         Optional<WorkerService> functionWorkerService = Optional.of(functionsWorkerService);
-        pulsar = new PulsarService(config, functionWorkerService);
-        pulsar.setShutdownService(new NoOpShutdownService());
+        pulsar = new PulsarService(config, functionWorkerService, (exitCode) -> {});
         pulsar.start();
 
         brokerServiceUrl = pulsar.getWebServiceAddress();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index 48bb83f..d0afe77 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -43,7 +43,6 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.pulsar.broker.NoOpShutdownService;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
@@ -178,8 +177,7 @@ public class PulsarFunctionLocalRunTest {
         functionsWorkerService = createPulsarFunctionWorker(config);
 
         Optional<WorkerService> functionWorkerService = Optional.of(functionsWorkerService);
-        pulsar = new PulsarService(config, functionWorkerService);
-        pulsar.setShutdownService(new NoOpShutdownService());
+        pulsar = new PulsarService(config, functionWorkerService, (exitCode) -> {});
         pulsar.start();
 
         String brokerServiceUrl = pulsar.getWebServiceAddressTls();
@@ -702,4 +700,4 @@ public class PulsarFunctionLocalRunTest {
         String jarFilePathUrl = String.format("http://127.0.0.1:%d/pulsar-io-data-generator.nar", fileServer.getAddress().getPort());
         testPulsarSinkStats(jarFilePathUrl);
     }
-}
\ No newline at end of file
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
index e8b8aa8..cce8ce8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
@@ -31,7 +31,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.api.namespace.NamespaceBuilder;
-import org.apache.pulsar.broker.NoOpShutdownService;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
@@ -165,8 +164,7 @@ public class PulsarFunctionPublishTest {
         functionsWorkerService = createPulsarFunctionWorker(config);
 
         Optional<WorkerService> functionWorkerService = Optional.of(functionsWorkerService);
-        pulsar = new PulsarService(config, functionWorkerService);
-        pulsar.setShutdownService(new NoOpShutdownService());
+        pulsar = new PulsarService(config, functionWorkerService, (exitCode) -> {});
         pulsar.start();
 
         String brokerServiceUrl = pulsar.getWebServiceAddressTls();
@@ -485,4 +483,4 @@ public class PulsarFunctionPublishTest {
         assertEquals(files.size(), 0, "BK files left over: " + files);
 
     }
-}
\ No newline at end of file
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
index ca9ce3b..70ab543 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -34,7 +34,6 @@ import java.util.Set;
 
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.pulsar.broker.NoOpShutdownService;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
@@ -100,8 +99,7 @@ public class PulsarWorkerAssignmentTest {
 
         functionsWorkerService = createPulsarFunctionWorker(config);
         final Optional<WorkerService> functionWorkerService = Optional.of(functionsWorkerService);
-        pulsar = new PulsarService(config, functionWorkerService);
-        pulsar.setShutdownService(new NoOpShutdownService());
+        pulsar = new PulsarService(config, functionWorkerService, (exitCode) -> {});
         pulsar.start();
 
         admin = spy(PulsarAdmin.builder().serviceHttpUrl(pulsar.getWebServiceAddress()).build());
@@ -333,4 +331,4 @@ public class PulsarWorkerAssignmentTest {
         return functionConfig;
     }
 
-}
\ No newline at end of file
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
index 26de3a6..28fd0b4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
@@ -32,7 +32,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
-import org.apache.pulsar.broker.NoOpShutdownService;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
@@ -117,8 +116,7 @@ public class PulsarFunctionAdminTest {
 
         functionsWorkerService = createPulsarFunctionWorker(config);
         Optional<WorkerService> functionWorkerService = Optional.of(functionsWorkerService);
-        pulsar = new PulsarService(config, functionWorkerService);
-        pulsar.setShutdownService(new NoOpShutdownService());
+        pulsar = new PulsarService(config, functionWorkerService, (exitCode) -> {});
         pulsar.start();
         urlTls = new URL(pulsar.getBrokerServiceUrlTls());
 
@@ -203,4 +201,4 @@ public class PulsarFunctionAdminTest {
 
         return new WorkerService(workerConfig);
     }
-}
\ No newline at end of file
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 58d8ee8..f28c9eb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -60,7 +60,6 @@ import java.util.regex.Pattern;
 
 import lombok.ToString;
 
-import org.apache.pulsar.broker.NoOpShutdownService;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
@@ -192,8 +191,7 @@ public class PulsarFunctionE2ETest {
 
         functionsWorkerService = createPulsarFunctionWorker(config);
         Optional<WorkerService> functionWorkerService = Optional.of(functionsWorkerService);
-        pulsar = new PulsarService(config, functionWorkerService);
-        pulsar.setShutdownService(new NoOpShutdownService());
+        pulsar = new PulsarService(config, functionWorkerService, (exitCode) -> {});
         pulsar.start();
 
         Map<String, String> authParams = new HashMap<>();