You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/05/28 17:21:39 UTC
[pulsar] branch master updated: Don't run Runtime#halt in tests
(#7066)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bb7f47f Don't run Runtime#halt in tests (#7066)
bb7f47f is described below
commit bb7f47fca2cb5e284346d510c0ccec891fa2ff43
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu May 28 10:21:21 2020 -0700
Don't run Runtime#halt in tests (#7066)
When you trigger Runtime#halt from a test, as was happening on ZK
session loss, it kills the test without reporting a result. We
shouldn't do that. Instead, reserve halt for production code, and just
log for tests.
Co-authored-by: Ivan Kelly <ik...@splunk.com>
---
.../org/apache/pulsar/PulsarBrokerStarter.java | 8 ++++-
.../java/org/apache/pulsar/PulsarStandalone.java | 7 +++-
.../broker/MessagingServiceShutdownHook.java | 11 ++++---
.../apache/pulsar/broker/NoOpShutdownService.java | 38 ----------------------
.../org/apache/pulsar/broker/PulsarService.java | 17 +++++-----
.../apache/pulsar/broker/SLAMonitoringTest.java | 2 --
.../broker/auth/MockedPulsarServiceBaseTest.java | 2 --
.../AntiAffinityNamespaceGroupTest.java | 3 --
.../broker/loadbalance/LoadBalancerTest.java | 2 --
.../loadbalance/ModularLoadManagerImplTest.java | 4 ---
.../loadbalance/SimpleLoadManagerImplTest.java | 3 --
.../broker/service/AdvertisedAddressTest.java | 2 --
.../broker/service/BacklogQuotaManagerTest.java | 2 --
.../pulsar/broker/service/BkEnsemblesTestBase.java | 2 --
.../broker/service/BrokerBookieIsolationTest.java | 4 ---
.../pulsar/broker/service/MaxMessageSizeTest.java | 2 --
.../PersistentDispatcherFailoverConsumerTest.java | 2 --
.../service/PersistentTopicConcurrentTest.java | 2 --
.../pulsar/broker/service/PersistentTopicTest.java | 2 --
.../pulsar/broker/service/ReplicatorTestBase.java | 4 ---
.../pulsar/broker/service/ServerCnxTest.java | 2 --
.../pulsar/broker/service/TopicOwnerTest.java | 2 --
.../persistent/PersistentSubscriptionTest.java | 2 --
.../buffer/PersistentTransactionBufferTest.java | 2 --
.../coordinator/TransactionMetaStoreTestBase.java | 2 --
.../apache/pulsar/broker/web/WebServiceTest.java | 2 --
.../client/api/ClientDeduplicationFailureTest.java | 4 +--
.../pulsar/client/api/NonPersistentTopicTest.java | 4 ---
.../worker/PulsarFunctionE2ESecurityTest.java | 4 +--
.../worker/PulsarFunctionLocalRunTest.java | 6 ++--
.../worker/PulsarFunctionPublishTest.java | 6 ++--
.../worker/PulsarWorkerAssignmentTest.java | 6 ++--
.../apache/pulsar/io/PulsarFunctionAdminTest.java | 6 ++--
.../apache/pulsar/io/PulsarFunctionE2ETest.java | 4 +--
34 files changed, 40 insertions(+), 131 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
index 42a2cf1..c5b1aa7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
@@ -207,7 +207,13 @@ public class PulsarBrokerStarter {
}
// init pulsar service
- pulsarService = new PulsarService(brokerConfig, Optional.ofNullable(functionsWorkerService));
+ pulsarService = new PulsarService(brokerConfig,
+ Optional.ofNullable(functionsWorkerService),
+ (exitCode) -> {
+ log.info("Halting broker process with code {}",
+ exitCode);
+ Runtime.getRuntime().halt(exitCode);
+ });
// if no argument to run bookie in cmd line, read from pulsar config
if (!argsContains(args, "-rb") && !argsContains(args, "--run-bookie")) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
index be1b276..3fab76f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
@@ -315,7 +315,12 @@ public class PulsarStandalone implements AutoCloseable {
}
// Start Broker
- broker = new PulsarService(config, Optional.ofNullable(fnWorkerService));
+ broker = new PulsarService(config,
+ Optional.ofNullable(fnWorkerService),
+ (exitCode) -> {
+ log.info("Halting standalone process with code {}", exitCode);
+ Runtime.getRuntime().halt(exitCode);
+ });
broker.start();
broker.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java
index e09a589..6d1768c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher.ShutdownService;
@@ -40,9 +41,11 @@ public class MessagingServiceShutdownHook extends Thread implements ShutdownServ
private static final String LogbackLoggerContextClassName = "ch.qos.logback.classic.LoggerContext";
private PulsarService service = null;
+ private final Consumer<Integer> processTerminator;
- public MessagingServiceShutdownHook(PulsarService service) {
+ public MessagingServiceShutdownHook(PulsarService service, Consumer<Integer> processTerminator) {
this.service = service;
+ this.processTerminator = processTerminator;
}
@Override
@@ -76,8 +79,9 @@ public class MessagingServiceShutdownHook extends Thread implements ShutdownServ
} finally {
immediateFlushBufferedLogs();
+
// always put system to halt immediately
- Runtime.getRuntime().halt(0);
+ processTerminator.accept(0);
}
}
@@ -96,8 +100,7 @@ public class MessagingServiceShutdownHook extends Thread implements ShutdownServ
LOG.info("Invoking Runtime.halt({})", exitCode);
immediateFlushBufferedLogs();
- Runtime.getRuntime().halt(exitCode);
-
+ processTerminator.accept(exitCode);
}
public static void immediateFlushBufferedLogs() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/NoOpShutdownService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/NoOpShutdownService.java
deleted file mode 100644
index 6c70e9a..0000000
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/NoOpShutdownService.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher.ShutdownService;
-
-@Slf4j
-public class NoOpShutdownService implements ShutdownService {
-
- @Override
- public void run() {
- shutdown(0);
- }
-
- @Override
- public void shutdown(int exitCode) {
- log.warn("Invoked shutdown with exitCode={}", exitCode);
- }
-
-}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 21a73a0..2584955 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -50,6 +50,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
import java.util.function.Supplier;
import lombok.AccessLevel;
@@ -188,7 +189,7 @@ public class PulsarService implements AutoCloseable {
private final Optional<WorkerService> functionWorkerService;
private ProtocolHandlers protocolHandlers = null;
- private ShutdownService shutdownService;
+ private final ShutdownService shutdownService;
private MetricsGenerator metricsGenerator;
private TransactionMetadataStoreService transactionMetadataStoreService;
@@ -203,10 +204,14 @@ public class PulsarService implements AutoCloseable {
private final Condition isClosedCondition = mutex.newCondition();
public PulsarService(ServiceConfiguration config) {
- this(config, Optional.empty());
+ this(config, Optional.empty(), (exitCode) -> {
+ LOG.info("Process termination requested with code {}. "
+ + "Ignoring, as this constructor is intended for tests. ", exitCode);
+ });
}
- public PulsarService(ServiceConfiguration config, Optional<WorkerService> functionWorkerService) {
+ public PulsarService(ServiceConfiguration config, Optional<WorkerService> functionWorkerService,
+ Consumer<Integer> processTerminator) {
// Validate correctness of configuration
PulsarConfigurationLoader.isComplete(config);
@@ -215,7 +220,7 @@ public class PulsarService implements AutoCloseable {
this.advertisedAddress = advertisedAddress(config);
this.brokerVersion = PulsarVersion.getVersion();
this.config = config;
- this.shutdownService = new MessagingServiceShutdownHook(this);
+ this.shutdownService = new MessagingServiceShutdownHook(this, processTerminator);
this.loadManagerExecutor = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-load-manager"));
this.functionWorkerService = functionWorkerService;
@@ -1029,10 +1034,6 @@ public class PulsarService implements AutoCloseable {
return transactionMetadataStoreService;
}
- public void setShutdownService(ShutdownService shutdownService) {
- this.shutdownService = shutdownService;
- }
-
public ShutdownService getShutdownService() {
return shutdownService;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
index 41ae985..eb7a555 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
@@ -85,7 +85,6 @@ public class SLAMonitoringTest {
configurations[i] = config;
pulsarServices[i] = new PulsarService(config);
- pulsarServices[i].setShutdownService(new NoOpShutdownService());
pulsarServices[i].start();
brokerWebServicePorts[i] = pulsarServices[i].getListenPortHTTP().get();
@@ -215,7 +214,6 @@ public class SLAMonitoringTest {
// Check if the namespace is properly unloaded and reowned by the broker
try {
pulsarServices[crashIndex] = new PulsarService(configurations[crashIndex]);
- pulsarServices[crashIndex].setShutdownService(new NoOpShutdownService());
pulsarServices[crashIndex].start();
// Port for the broker will have changed since it's dynamically allocated
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 53e629b..26ea536 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -45,7 +45,6 @@ import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.BookKeeperClientFactory;
-import org.apache.pulsar.broker.NoOpShutdownService;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.namespace.NamespaceService;
@@ -210,7 +209,6 @@ public abstract class MockedPulsarServiceBaseTest {
protected PulsarService startBroker(ServiceConfiguration conf) throws Exception {
PulsarService pulsar = spy(new PulsarService(conf));
- pulsar.setShutdownService(new NoOpShutdownService());
setupBrokerMocks(pulsar);
boolean isAuthorizationEnabled = conf.isAuthorizationEnabled();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
index 0951da4..229ee84 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
@@ -41,7 +41,6 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.pulsar.broker.NoOpShutdownService;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
@@ -116,7 +115,6 @@ public class AntiAffinityNamespaceGroupTest {
config1.setAdvertisedAddress("localhost");
createCluster(bkEnsemble.getZkClient(), config1);
pulsar1 = new PulsarService(config1);
- pulsar1.setShutdownService(new NoOpShutdownService());
pulsar1.start();
primaryHost = String.format("%s:%d", "localhost", pulsar1.getListenPortHTTP().get());
@@ -132,7 +130,6 @@ public class AntiAffinityNamespaceGroupTest {
config2.setBrokerServicePort(Optional.of(0));
config2.setFailureDomainsEnabled(true);
pulsar2 = new PulsarService(config2);
- pulsar2.setShutdownService(new NoOpShutdownService());
pulsar2.start();
secondaryHost = String.format("%s:%d", "localhost", pulsar2.getListenPortHTTP().get());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
index d52a3ec..0ef5e6f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
@@ -54,7 +54,6 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.pulsar.broker.NoOpShutdownService;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
@@ -147,7 +146,6 @@ public class LoadBalancerTest {
config.setLoadBalancerEnabled(false);
pulsarServices[i] = new PulsarService(config);
- pulsarServices[i].setShutdownService(new NoOpShutdownService());
pulsarServices[i].start();
brokerWebServicePorts[i] = pulsarServices[i].getListenPortHTTP().get();
brokerNativeBrokerPorts[i] = pulsarServices[i].getBrokerListenPort().get();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
index ee05ce5..0a43889 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
@@ -52,7 +52,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.BrokerData;
import org.apache.pulsar.broker.BundleData;
-import org.apache.pulsar.broker.NoOpShutdownService;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -151,7 +150,6 @@ public class ModularLoadManagerImplTest {
config1.setBrokerServicePortTls(Optional.of(0));
config1.setWebServicePortTls(Optional.of(0));
pulsar1 = new PulsarService(config1);
- pulsar1.setShutdownService(new NoOpShutdownService());
pulsar1.start();
primaryHost = String.format("%s:%d", "localhost", pulsar1.getListenPortHTTP().get());
@@ -169,7 +167,6 @@ public class ModularLoadManagerImplTest {
config2.setBrokerServicePortTls(Optional.of(0));
config2.setWebServicePortTls(Optional.of(0));
pulsar2 = new PulsarService(config2);
- pulsar2.setShutdownService(new NoOpShutdownService());
pulsar2.start();
secondaryHost = String.format("%s:%d", "localhost", pulsar2.getListenPortHTTP().get());
@@ -597,7 +594,6 @@ public class ModularLoadManagerImplTest {
config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
config.setBrokerServicePort(Optional.of(0));
PulsarService pulsar = new PulsarService(config);
- pulsar.setShutdownService(new NoOpShutdownService());
// create znode using different zk-session
final String brokerZnode = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + pulsar.getAdvertisedAddress() + ":"
+ config.getWebServicePort();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
index 0a814e7..999987b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
@@ -51,7 +51,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.lang3.SystemUtils;
-import org.apache.pulsar.broker.NoOpShutdownService;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
@@ -130,7 +129,6 @@ public class SimpleLoadManagerImplTest {
config1.setWebServicePortTls(Optional.of(0));
config1.setAdvertisedAddress("localhost");
pulsar1 = new PulsarService(config1);
- pulsar1.setShutdownService(new NoOpShutdownService());
pulsar1.start();
url1 = new URL(pulsar1.getWebServiceAddress());
@@ -148,7 +146,6 @@ public class SimpleLoadManagerImplTest {
config2.setBrokerServicePortTls(Optional.of(0));
config2.setWebServicePortTls(Optional.of(0));
pulsar2 = new PulsarService(config2);
- pulsar2.setShutdownService(new NoOpShutdownService());
pulsar2.start();
url2 = new URL(pulsar2.getWebServiceAddress());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
index fa012f4..593c7af 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
@@ -24,7 +24,6 @@ import com.google.gson.JsonObject;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
-import org.apache.pulsar.broker.NoOpShutdownService;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
@@ -56,7 +55,6 @@ public class AdvertisedAddressTest {
config.setManagedLedgerMaxEntriesPerLedger(5);
config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
pulsar = new PulsarService(config);
- pulsar.setShutdownService(new NoOpShutdownService());
pulsar.start();
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index ccd021b..b786c28 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -33,7 +33,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.broker.ConfigHelper;
-import org.apache.pulsar.broker.NoOpShutdownService;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -90,7 +89,6 @@ public class BacklogQuotaManagerTest {
config.setAllowAutoTopicCreationType("non-partitioned");
pulsar = new PulsarService(config);
- pulsar.setShutdownService(new NoOpShutdownService());
pulsar.start();
adminUrl = new URL("http://127.0.0.1" + ":" + pulsar.getListenPortHTTP().get());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
index 04845fd..a4bfe05 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.broker.service;
import java.util.Optional;
-import org.apache.pulsar.broker.NoOpShutdownService;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -80,7 +79,6 @@ public abstract class BkEnsemblesTestBase {
config.setAllowAutoTopicCreationType("non-partitioned");
pulsar = new PulsarService(config);
- pulsar.setShutdownService(new NoOpShutdownService());
pulsar.start();
admin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getWebServiceAddress()).build();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
index feea115..3aac32f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
@@ -44,7 +44,6 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.pulsar.broker.ManagedLedgerClientFactory;
-import org.apache.pulsar.broker.NoOpShutdownService;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
@@ -160,7 +159,6 @@ public class BrokerBookieIsolationTest {
config.setManagedLedgerMaxEntriesPerLedger(totalEntriesPerLedger);
config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
pulsarService = new PulsarService(config);
- pulsarService.setShutdownService(new NoOpShutdownService());
pulsarService.start();
@@ -293,7 +291,6 @@ public class BrokerBookieIsolationTest {
config.setManagedLedgerMaxEntriesPerLedger(totalEntriesPerLedger);
config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
pulsarService = new PulsarService(config);
- pulsarService.setShutdownService(new NoOpShutdownService());
pulsarService.start();
PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarService.getWebServiceAddress()).build();
@@ -413,7 +410,6 @@ public class BrokerBookieIsolationTest {
config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
pulsarService = new PulsarService(config);
- pulsarService.setShutdownService(new NoOpShutdownService());
pulsarService.start();
PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarService.getWebServiceAddress()).build();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java
index 139aefb..7dd9969 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java
@@ -24,7 +24,6 @@ import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.pulsar.broker.NoOpShutdownService;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -70,7 +69,6 @@ public class MaxMessageSizeTest {
configuration.setMaxMessageSize(10 * 1024 * 1024);
pulsar = new PulsarService(configuration);
- pulsar.setShutdownService(new NoOpShutdownService());
pulsar.start();
String url = "http://127.0.0.1:" + pulsar.getListenPortHTTP().get();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 2b4976b..8f31a96 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -63,7 +63,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.pulsar.broker.NoOpShutdownService;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
@@ -112,7 +111,6 @@ public class PersistentDispatcherFailoverConsumerTest {
public void setup() throws Exception {
ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
PulsarService pulsar = spy(new PulsarService(svcConfig));
- pulsar.setShutdownService(new NoOpShutdownService());
doReturn(svcConfig).when(pulsar).getConfiguration();
mlFactoryMock = mock(ManagedLedgerFactory.class);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
index da8a203..cfc3937 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
@@ -40,7 +40,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
-import org.apache.pulsar.broker.NoOpShutdownService;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.namespace.NamespaceService;
@@ -80,7 +79,6 @@ public class PersistentTopicConcurrentTest extends MockedBookKeeperTestCase {
super.setUp(m);
ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
PulsarService pulsar = spy(new PulsarService(svcConfig));
- pulsar.setShutdownService(new NoOpShutdownService());
doReturn(svcConfig).when(pulsar).getConfiguration();
mlFactoryMock = mock(ManagedLedgerFactory.class);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 37cdec6..3c66d92 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -82,7 +82,6 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
-import org.apache.pulsar.broker.NoOpShutdownService;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
@@ -161,7 +160,6 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
executor = OrderedExecutor.newBuilder().numThreads(1).build();
ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
pulsar = spy(new PulsarService(svcConfig));
- pulsar.setShutdownService(new NoOpShutdownService());
doReturn(svcConfig).when(pulsar).getConfiguration();
doReturn(mock(Compactor.class)).when(pulsar).getCompactor();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index a53e39a..b427273 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -34,7 +34,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import org.apache.pulsar.broker.NoOpShutdownService;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -129,7 +128,6 @@ public class ReplicatorTestBase {
config1.setDefaultNumberOfNamespaceBundles(1);
config1.setAllowAutoTopicCreationType("non-partitioned");
pulsar1 = new PulsarService(config1);
- pulsar1.setShutdownService(new NoOpShutdownService());
pulsar1.start();
ns1 = pulsar1.getBrokerService();
@@ -161,7 +159,6 @@ public class ReplicatorTestBase {
config2.setDefaultNumberOfNamespaceBundles(1);
config2.setAllowAutoTopicCreationType("non-partitioned");
pulsar2 = new PulsarService(config2);
- pulsar2.setShutdownService(new NoOpShutdownService());
pulsar2.start();
ns2 = pulsar2.getBrokerService();
@@ -193,7 +190,6 @@ public class ReplicatorTestBase {
config3.setDefaultNumberOfNamespaceBundles(1);
config3.setAllowAutoTopicCreationType("non-partitioned");
pulsar3 = new PulsarService(config3);
- pulsar3.setShutdownService(new NoOpShutdownService());
pulsar3.start();
ns3 = pulsar3.getBrokerService();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index aa71801..51859d0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -65,7 +65,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.pulsar.broker.NoOpShutdownService;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
@@ -153,7 +152,6 @@ public class ServerCnxTest {
executor = OrderedExecutor.newBuilder().numThreads(1).build();
svcConfig = spy(new ServiceConfiguration());
pulsar = spy(new PulsarService(svcConfig));
- pulsar.setShutdownService(new NoOpShutdownService());
doReturn(new DefaultSchemaRegistryService()).when(pulsar).getSchemaRegistryService();
svcConfig.setKeepAliveIntervalSeconds(inSec(1, TimeUnit.SECONDS));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
index 0ad1824..3328f39 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.broker.service;
import com.google.common.collect.Sets;
-import org.apache.pulsar.broker.NoOpShutdownService;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -67,7 +66,6 @@ public class TopicOwnerTest {
configurations[i] = config;
pulsarServices[i] = new PulsarService(config);
- pulsarServices[i].setShutdownService(new NoOpShutdownService());
pulsarServices[i].start();
pulsarAdmins[i] = PulsarAdmin.builder()
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
index 038b22d..d8bf47d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
@@ -49,7 +49,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.pulsar.broker.NoOpShutdownService;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
@@ -105,7 +104,6 @@ public class PersistentSubscriptionTest {
ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
pulsarMock = spy(new PulsarService(svcConfig));
- pulsarMock.setShutdownService(new NoOpShutdownService());
doReturn(svcConfig).when(pulsarMock).getConfiguration();
doReturn(mock(Compactor.class)).when(pulsarMock).getCompactor();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/PersistentTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/PersistentTransactionBufferTest.java
index 4bb508c..2146eba 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/PersistentTransactionBufferTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/PersistentTransactionBufferTest.java
@@ -65,7 +65,6 @@ import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.pulsar.broker.NoOpShutdownService;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
@@ -123,7 +122,6 @@ public class PersistentTransactionBufferTest extends MockedBookKeeperTestCase {
public void setup() throws Exception {
ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
pulsar = spy(new PulsarService(svcConfig));
- pulsar.setShutdownService(new NoOpShutdownService());
doReturn(svcConfig).when(pulsar).getConfiguration();
doReturn(mock(Compactor.class)).when(pulsar).getCompactor();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
index 225110f..325bf6c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.broker.transaction.coordinator;
import java.util.Optional;
import org.apache.pulsar.PulsarTransactionCoordinatorMetadataSetup;
-import org.apache.pulsar.broker.NoOpShutdownService;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -72,7 +71,6 @@ public class TransactionMetaStoreTestBase {
configurations[i] = config;
pulsarServices[i] = new PulsarService(config);
- pulsarServices[i].setShutdownService(new NoOpShutdownService());
pulsarServices[i].start();
pulsarAdmins[i] = PulsarAdmin.builder()
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
index fb42747..b9577e6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
@@ -58,7 +58,6 @@ import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.pulsar.broker.MockedBookKeeperClientFactory;
-import org.apache.pulsar.broker.NoOpShutdownService;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -315,7 +314,6 @@ public class WebServiceTest {
config.setZookeeperServers("localhost:2181");
config.setHttpMaxRequestSize(10 * 1024);
pulsar = spy(new PulsarService(config));
- pulsar.setShutdownService(new NoOpShutdownService());
doReturn(zkFactory).when(pulsar).getZooKeeperClientFactory();
doReturn(new MockedBookKeeperClientFactory()).when(pulsar).newBookKeeperClientFactory();
pulsar.start();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
index 9fd1a8d..c93968a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
@@ -43,7 +43,6 @@ import java.util.concurrent.atomic.AtomicLong;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.broker.NoOpShutdownService;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
@@ -98,8 +97,7 @@ public class ClientDeduplicationFailureTest {
config.setAllowAutoTopicCreationType("non-partitioned");
- pulsar = new PulsarService(config, Optional.empty());
- pulsar.setShutdownService(new NoOpShutdownService());
+ pulsar = new PulsarService(config);
pulsar.start();
String brokerServiceUrl = pulsar.getWebServiceAddress();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index 9232642..49ee0a7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -41,7 +41,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.pulsar.broker.NoOpShutdownService;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LoadManager;
@@ -893,7 +892,6 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
config1.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
config1.setAllowAutoTopicCreationType("non-partitioned");
pulsar1 = new PulsarService(config1);
- pulsar1.setShutdownService(new NoOpShutdownService());
pulsar1.start();
ns1 = pulsar1.getBrokerService();
@@ -919,7 +917,6 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
config2.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
config2.setAllowAutoTopicCreationType("non-partitioned");
pulsar2 = new PulsarService(config2);
- pulsar2.setShutdownService(new NoOpShutdownService());
pulsar2.start();
ns2 = pulsar2.getBrokerService();
@@ -944,7 +941,6 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
config3.setBrokerServicePort(Optional.of(0));
config3.setAllowAutoTopicCreationType("non-partitioned");
pulsar3 = new PulsarService(config3);
- pulsar3.setShutdownService(new NoOpShutdownService());
pulsar3.start();
ns3 = pulsar3.getBrokerService();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
index 4e68681..48c40f3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
@@ -44,7 +44,6 @@ import java.util.concurrent.TimeUnit;
import javax.crypto.SecretKey;
-import org.apache.pulsar.broker.NoOpShutdownService;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
@@ -157,8 +156,7 @@ public class PulsarFunctionE2ESecurityTest {
"token:" + adminToken);
functionsWorkerService = createPulsarFunctionWorker(config);
Optional<WorkerService> functionWorkerService = Optional.of(functionsWorkerService);
- pulsar = new PulsarService(config, functionWorkerService);
- pulsar.setShutdownService(new NoOpShutdownService());
+ pulsar = new PulsarService(config, functionWorkerService, (exitCode) -> {});
pulsar.start();
brokerServiceUrl = pulsar.getWebServiceAddress();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index 48bb83f..d0afe77 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -43,7 +43,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import org.apache.pulsar.broker.NoOpShutdownService;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
@@ -178,8 +177,7 @@ public class PulsarFunctionLocalRunTest {
functionsWorkerService = createPulsarFunctionWorker(config);
Optional<WorkerService> functionWorkerService = Optional.of(functionsWorkerService);
- pulsar = new PulsarService(config, functionWorkerService);
- pulsar.setShutdownService(new NoOpShutdownService());
+ pulsar = new PulsarService(config, functionWorkerService, (exitCode) -> {});
pulsar.start();
String brokerServiceUrl = pulsar.getWebServiceAddressTls();
@@ -702,4 +700,4 @@ public class PulsarFunctionLocalRunTest {
String jarFilePathUrl = String.format("http://127.0.0.1:%d/pulsar-io-data-generator.nar", fileServer.getAddress().getPort());
testPulsarSinkStats(jarFilePathUrl);
}
-}
\ No newline at end of file
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
index e8b8aa8..cce8ce8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
@@ -31,7 +31,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
-import org.apache.pulsar.broker.NoOpShutdownService;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
@@ -165,8 +164,7 @@ public class PulsarFunctionPublishTest {
functionsWorkerService = createPulsarFunctionWorker(config);
Optional<WorkerService> functionWorkerService = Optional.of(functionsWorkerService);
- pulsar = new PulsarService(config, functionWorkerService);
- pulsar.setShutdownService(new NoOpShutdownService());
+ pulsar = new PulsarService(config, functionWorkerService, (exitCode) -> {});
pulsar.start();
String brokerServiceUrl = pulsar.getWebServiceAddressTls();
@@ -485,4 +483,4 @@ public class PulsarFunctionPublishTest {
assertEquals(files.size(), 0, "BK files left over: " + files);
}
-}
\ No newline at end of file
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
index ca9ce3b..70ab543 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -34,7 +34,6 @@ import java.util.Set;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.broker.NoOpShutdownService;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
@@ -100,8 +99,7 @@ public class PulsarWorkerAssignmentTest {
functionsWorkerService = createPulsarFunctionWorker(config);
final Optional<WorkerService> functionWorkerService = Optional.of(functionsWorkerService);
- pulsar = new PulsarService(config, functionWorkerService);
- pulsar.setShutdownService(new NoOpShutdownService());
+ pulsar = new PulsarService(config, functionWorkerService, (exitCode) -> {});
pulsar.start();
admin = spy(PulsarAdmin.builder().serviceHttpUrl(pulsar.getWebServiceAddress()).build());
@@ -333,4 +331,4 @@ public class PulsarWorkerAssignmentTest {
return functionConfig;
}
-}
\ No newline at end of file
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
index 26de3a6..28fd0b4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
@@ -32,7 +32,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import org.apache.pulsar.broker.NoOpShutdownService;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
@@ -117,8 +116,7 @@ public class PulsarFunctionAdminTest {
functionsWorkerService = createPulsarFunctionWorker(config);
Optional<WorkerService> functionWorkerService = Optional.of(functionsWorkerService);
- pulsar = new PulsarService(config, functionWorkerService);
- pulsar.setShutdownService(new NoOpShutdownService());
+ pulsar = new PulsarService(config, functionWorkerService, (exitCode) -> {});
pulsar.start();
urlTls = new URL(pulsar.getBrokerServiceUrlTls());
@@ -203,4 +201,4 @@ public class PulsarFunctionAdminTest {
return new WorkerService(workerConfig);
}
-}
\ No newline at end of file
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 58d8ee8..f28c9eb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -60,7 +60,6 @@ import java.util.regex.Pattern;
import lombok.ToString;
-import org.apache.pulsar.broker.NoOpShutdownService;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
@@ -192,8 +191,7 @@ public class PulsarFunctionE2ETest {
functionsWorkerService = createPulsarFunctionWorker(config);
Optional<WorkerService> functionWorkerService = Optional.of(functionsWorkerService);
- pulsar = new PulsarService(config, functionWorkerService);
- pulsar.setShutdownService(new NoOpShutdownService());
+ pulsar = new PulsarService(config, functionWorkerService, (exitCode) -> {});
pulsar.start();
Map<String, String> authParams = new HashMap<>();