You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/04/16 15:59:54 UTC
[pulsar] branch master updated: Shutdown Broker gracefully,
but forcefully after brokerShutdownTimeoutMs (#10199)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 152d1e6 Shutdown Broker gracefully, but forcefully after brokerShutdownTimeoutMs (#10199)
152d1e6 is described below
commit 152d1e69d16b510521003a33ea877ae33f0ce8e5
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Fri Apr 16 18:59:03 2021 +0300
Shutdown Broker gracefully, but forcefully after brokerShutdownTimeoutMs (#10199)
* Wait for shutdown of BrokerService event loops
* Move CompletableFutureCancellationHandler to pulsar-common util
* Prevent misusage of CompletableFutureCancellationHandler
* Clear cancelAction field after the future completes
- make cancelAction eligible for GC after the future completes (or
gets cancelled)
* Support cancel signalling when executing multiple futures
* Shutdown BrokerService gracefully which using closeAsync
- shutdown forcefully when closing times out
* Set 100ms to brokerShutdownTimeoutMs used in tests
* Revert changes in MessagingServiceShutdownHook
* Handle CancellationException since it's used in timeouts in BrokerService
* Set shutdown timeout to 0 ms in tests
* Ignore TimeoutException and CancellationException when broker shutdown timeout is 0
* Extract GracefulExecutorServicesShutdown and use it in PulsarService
- handle graceful / forcefully shutdown also for PulsarService executors
* Fix some unclosed PulsarServices
* Set shutdown timeout to 0 in some more tests
* Do some class and method renamings to clarify the code
* Revisit the logic to use awaitTermination
* Use shutdownNow to shutdown the scheduler used for future timeout handling
---
.../broker/MessagingServiceShutdownHook.java | 3 +-
.../org/apache/pulsar/broker/PulsarService.java | 91 ++++++-----
.../pulsar/broker/service/BrokerService.java | 144 +++++++++++++-----
.../service/GracefulExecutorServicesShutdown.java | 117 ++++++++++++++
...GracefulExecutorServicesTerminationHandler.java | 160 ++++++++++++++++++++
.../apache/pulsar/broker/PulsarServiceTest.java | 7 +-
.../apache/pulsar/broker/SLAMonitoringTest.java | 6 +-
.../broker/admin/BrokerAdminClientTlsAuthTest.java | 1 +
.../broker/auth/MockedPulsarServiceBaseTest.java | 5 +
.../AntiAffinityNamespaceGroupTest.java | 2 +
.../loadbalance/LeaderElectionServiceTest.java | 1 +
.../broker/loadbalance/LoadBalancerTest.java | 1 +
.../loadbalance/ModularLoadManagerImplTest.java | 3 +
.../loadbalance/SimpleLoadManagerImplTest.java | 2 +
.../OwnerShipForCurrentServerTestBase.java | 1 +
.../broker/service/AdvertisedAddressTest.java | 1 +
.../broker/service/BacklogQuotaManagerTest.java | 1 +
.../pulsar/broker/service/BkEnsemblesTestBase.java | 1 +
.../broker/service/BrokerBookieIsolationTest.java | 3 +
.../GracefulExecutorServicesShutdownTest.java | 168 +++++++++++++++++++++
.../pulsar/broker/service/MaxMessageSizeTest.java | 1 +
.../PersistentDispatcherFailoverConsumerTest.java | 1 +
.../service/PersistentTopicConcurrentTest.java | 3 +
.../pulsar/broker/service/PersistentTopicTest.java | 1 +
.../pulsar/broker/service/ReplicatorTestBase.java | 1 +
.../pulsar/broker/service/ServerCnxTest.java | 1 +
.../pulsar/broker/service/TopicOwnerTest.java | 1 +
.../persistent/PersistentSubscriptionTest.java | 1 +
.../broker/transaction/TransactionTestBase.java | 1 +
.../coordinator/TransactionMetaStoreTestBase.java | 1 +
.../apache/pulsar/broker/web/WebServiceTest.java | 1 +
.../pulsar/client/api/BrokerServiceLookupTest.java | 8 +
.../client/api/ClientDeduplicationFailureTest.java | 1 +
.../pulsar/client/api/NonPersistentTopicTest.java | 3 +
.../pulsar/client/api/ServiceUrlProviderTest.java | 1 +
.../worker/PulsarFunctionE2ESecurityTest.java | 1 +
.../worker/PulsarFunctionLocalRunTest.java | 1 +
.../worker/PulsarFunctionPublishTest.java | 1 +
.../functions/worker/PulsarFunctionTlsTest.java | 1 +
.../worker/PulsarWorkerAssignmentTest.java | 1 +
.../apache/pulsar/io/AbstractPulsarE2ETest.java | 1 +
.../apache/pulsar/io/PulsarFunctionAdminTest.java | 1 +
.../apache/pulsar/io/PulsarFunctionTlsTest.java | 1 +
.../apache/pulsar/client/impl/ConsumerImpl.java | 1 +
.../client/impl/MultiTopicsConsumerImpl.java | 1 +
.../CompletableFutureCancellationHandler.java | 40 +++--
.../org/apache/pulsar/common/util/FutureUtil.java | 45 +++++-
.../pulsar/common/util/netty/EventLoopUtil.java | 10 ++
.../pulsar/common/util/netty/NettyFutureUtil.java | 85 +++++++++++
.../CompletableFutureCancellationHandlerTest.java | 8 +-
50 files changed, 841 insertions(+), 101 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java
index 0addf1e..187f4cd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.lang.reflect.Method;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -76,7 +77,7 @@ public class MessagingServiceShutdownHook extends Thread implements ShutdownServ
future.get(service.getConfiguration().getBrokerShutdownTimeoutMs(), TimeUnit.MILLISECONDS);
LOG.info("Completed graceful shutdown. Exiting");
- } catch (TimeoutException e) {
+ } catch (TimeoutException | CancellationException e) {
LOG.warn("Graceful shutdown timeout expired. Closing now");
} catch (Exception e) {
LOG.error("Failed to perform graceful shutdown, Exiting anyway", e);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 7d7787c..17bb041 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -33,6 +33,7 @@ import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
@@ -40,6 +41,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
@@ -48,6 +50,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@@ -89,6 +92,7 @@ import org.apache.pulsar.broker.protocol.ProtocolHandlers;
import org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.GracefulExecutorServicesShutdown;
import org.apache.pulsar.broker.service.SystemTopicBaseTxnBufferSnapshotService;
import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
import org.apache.pulsar.broker.service.Topic;
@@ -164,6 +168,7 @@ import org.slf4j.LoggerFactory;
@Setter(AccessLevel.PROTECTED)
public class PulsarService implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(PulsarService.class);
+ private static final double GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT = 0.5d;
private ServiceConfiguration config = null;
private NamespaceService nsService = null;
private ManagedLedgerStorage managedLedgerClientFactory = null;
@@ -236,7 +241,7 @@ public class PulsarService implements AutoCloseable {
private PulsarResources pulsarResources;
public enum State {
- Init, Started, Closed
+ Init, Started, Closing, Closed
}
private volatile State state;
@@ -308,10 +313,15 @@ public class PulsarService implements AutoCloseable {
try {
closeAsync().get();
} catch (ExecutionException e) {
- if (e.getCause() instanceof PulsarServerException) {
- throw (PulsarServerException) e.getCause();
+ Throwable cause = e.getCause();
+ if (cause instanceof PulsarServerException) {
+ throw (PulsarServerException) cause;
+ } else if (getConfiguration().getBrokerShutdownTimeoutMs() == 0
+ && (cause instanceof TimeoutException || cause instanceof CancellationException)) {
+ // ignore shutdown timeout when timeout is 0, which is primarily used in tests
+ // to forcefully shutdown the broker
} else {
- throw new PulsarServerException(e.getCause());
+ throw new PulsarServerException(cause);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -327,6 +337,7 @@ public class PulsarService implements AutoCloseable {
if (closeFuture != null) {
return closeFuture;
}
+ state = State.Closing;
// close the service in reverse order v.s. in which they are started
if (this.webService != null) {
@@ -345,6 +356,15 @@ public class PulsarService implements AutoCloseable {
this.webSocketService.close();
}
+ GracefulExecutorServicesShutdown executorServicesShutdown =
+ GracefulExecutorServicesShutdown
+ .initiate()
+ .timeout(
+ Duration.ofMillis(
+ (long) (GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT
+ * getConfiguration()
+ .getBrokerShutdownTimeoutMs())));
+
List<CompletableFuture<Void>> asyncCloseFutures = new ArrayList<>();
if (this.brokerService != null) {
asyncCloseFutures.add(this.brokerService.closeAsync());
@@ -366,7 +386,7 @@ public class PulsarService implements AutoCloseable {
this.leaderElectionService = null;
}
- loadManagerExecutor.shutdown();
+ executorServicesShutdown.shutdown(loadManagerExecutor);
if (globalZkCache != null) {
globalZkCache.close();
@@ -402,24 +422,11 @@ public class PulsarService implements AutoCloseable {
nsService = null;
}
- if (compactorExecutor != null) {
- compactorExecutor.shutdown();
- }
-
- if (offloaderScheduler != null) {
- offloaderScheduler.shutdown();
- }
-
- // executor is not initialized in mocks even when real close method is called
- // guard against null executors
- if (executor != null) {
- executor.shutdown();
- }
-
- if (orderedExecutor != null) {
- orderedExecutor.shutdown();
- }
- cacheExecutor.shutdown();
+ executorServicesShutdown.shutdown(compactorExecutor);
+ executorServicesShutdown.shutdown(offloaderScheduler);
+ executorServicesShutdown.shutdown(executor);
+ executorServicesShutdown.shutdown(orderedExecutor);
+ executorServicesShutdown.shutdown(cacheExecutor);
LoadManager loadManager = this.loadManager.get();
if (loadManager != null) {
@@ -441,10 +448,7 @@ public class PulsarService implements AutoCloseable {
transactionBufferClient.close();
}
- if (transactionExecutor != null) {
- transactionExecutor.shutdown();
- transactionExecutor = null;
- }
+ executorServicesShutdown.shutdown(transactionExecutor);
if (coordinationService != null) {
coordinationService.close();
@@ -457,20 +461,23 @@ public class PulsarService implements AutoCloseable {
configurationMetadataStore.close();
}
- state = State.Closed;
- isClosedCondition.signalAll();
+ // add timeout handling for closing executors
+ asyncCloseFutures.add(executorServicesShutdown.handle());
- CompletableFuture<Void> shutdownFuture =
- CompletableFuture.allOf(asyncCloseFutures.toArray(new CompletableFuture[0]));
- closeFuture = shutdownFuture;
- return shutdownFuture;
+ closeFuture = addTimeoutHandling(FutureUtil.waitForAllAndSupportCancel(asyncCloseFutures));
+ closeFuture.handle((v, t) -> {
+ state = State.Closed;
+ isClosedCondition.signalAll();
+ return null;
+ });
+ return closeFuture;
} catch (Exception e) {
PulsarServerException pse;
if (e instanceof CompletionException && e.getCause() instanceof MetadataStoreException) {
- pse = new PulsarServerException(MetadataStoreException.unwrap((CompletionException) e));
+ pse = new PulsarServerException(MetadataStoreException.unwrap(e));
} else if (e.getCause() instanceof CompletionException
&& e.getCause().getCause() instanceof MetadataStoreException) {
- pse = new PulsarServerException(MetadataStoreException.unwrap((CompletionException) e.getCause()));
+ pse = new PulsarServerException(MetadataStoreException.unwrap(e.getCause()));
} else {
pse = new PulsarServerException(e);
}
@@ -480,6 +487,20 @@ public class PulsarService implements AutoCloseable {
}
}
+ private CompletableFuture<Void> addTimeoutHandling(CompletableFuture<Void> future) {
+ ScheduledExecutorService shutdownExecutor = Executors.newSingleThreadScheduledExecutor(
+ new DefaultThreadFactory(getClass().getSimpleName() + "-shutdown"));
+ FutureUtil.addTimeoutHandling(future,
+ Duration.ofMillis(Math.max(1L, getConfiguration().getBrokerShutdownTimeoutMs())),
+ shutdownExecutor, () -> FutureUtil.createTimeoutException("Timeout in close", getClass(), "close"));
+ future.handle((v, t) -> {
+ // shutdown the shutdown executor
+ shutdownExecutor.shutdownNow();
+ return null;
+ });
+ return future;
+ }
+
/**
* Get the current service configuration.
*
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 718cb29..c1961f3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -54,11 +54,13 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
@@ -165,6 +167,7 @@ import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.common.util.netty.ChannelFutures;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.apache.pulsar.common.util.netty.NettyFutureUtil;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.zookeeper.ZkIsolatedBookieEnsemblePlacementPolicy;
import org.apache.pulsar.zookeeper.ZooKeeperCacheListener;
@@ -187,6 +190,9 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
private static final TimeoutException FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION =
FutureUtil.createTimeoutException("Failed to load topic within timeout", BrokerService.class,
"futureWithDeadline(...)");
+ private static final long GRACEFUL_SHUTDOWN_QUIET_PERIOD_MAX_MS = 5000L;
+ private static final double GRACEFUL_SHUTDOWN_QUIET_PERIOD_RATIO_OF_TOTAL_TIMEOUT = 0.25d;
+ private static final double GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT = 0.5d;
private final PulsarService pulsar;
private final ManagedLedgerFactory managedLedgerFactory;
@@ -694,60 +700,118 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
}
});
- List<CompletableFuture<Void>> asyncCloseFutures = new ArrayList<>();
+ CompletableFuture<CompletableFuture<Void>> cancellableDownstreamFutureReference = new CompletableFuture<>();
+ CompletableFuture<Void> shutdownFuture =
+ CompletableFuture.allOf(shutdownEventLoopGracefully(acceptorGroup),
+ shutdownEventLoopGracefully(workerGroup))
+ .handle((v, t) -> {
+ if (t != null) {
+ log.warn("Error shutting down event loops gracefully", t);
+ } else {
+ log.info("Event loops shutdown completed.");
+ }
+ return null;
+ })
+ .thenCompose(__ -> {
+ log.info("Continuing to second phase in shutdown.");
- if (listenChannel != null && listenChannel.isOpen()) {
- asyncCloseFutures.add(closeChannel(listenChannel));
- }
+ List<CompletableFuture<Void>> asyncCloseFutures = new ArrayList<>();
- if (listenChannelTls != null && listenChannelTls.isOpen()) {
- asyncCloseFutures.add(closeChannel(listenChannelTls));
- }
+ if (listenChannel != null && listenChannel.isOpen()) {
+ asyncCloseFutures.add(closeChannel(listenChannel));
+ }
- acceptorGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
+ if (listenChannelTls != null && listenChannelTls.isOpen()) {
+ asyncCloseFutures.add(closeChannel(listenChannelTls));
+ }
- if (interceptor != null) {
- interceptor.close();
- interceptor = null;
- }
+ if (interceptor != null) {
+ interceptor.close();
+ interceptor = null;
+ }
- statsUpdater.shutdown();
- inactivityMonitor.shutdown();
- messageExpiryMonitor.shutdown();
- compactionMonitor.shutdown();
- messagePublishBufferMonitor.shutdown();
- consumedLedgersMonitor.shutdown();
- backlogQuotaChecker.shutdown();
- authenticationService.close();
- pulsarStats.close();
- ClientCnxnAspect.removeListener(zkStatsListener);
- ClientCnxnAspect.registerExecutor(null);
- topicOrderedExecutor.shutdown();
- delayedDeliveryTrackerFactory.close();
- if (topicPublishRateLimiterMonitor != null) {
- topicPublishRateLimiterMonitor.shutdown();
- }
- if (brokerPublishRateLimiterMonitor != null) {
- brokerPublishRateLimiterMonitor.shutdown();
- }
- if (deduplicationSnapshotMonitor != null) {
- deduplicationSnapshotMonitor.shutdown();
- }
+ try {
+ authenticationService.close();
+ } catch (IOException e) {
+ log.warn("Error in closing authenticationService", e);
+ }
+ pulsarStats.close();
+ ClientCnxnAspect.removeListener(zkStatsListener);
+ ClientCnxnAspect.registerExecutor(null);
+ try {
+ delayedDeliveryTrackerFactory.close();
+ } catch (IOException e) {
+ log.warn("Error in closing delayedDeliveryTrackerFactory", e);
+ }
- CompletableFuture<Void> shutdownFuture =
- CompletableFuture.allOf(asyncCloseFutures.toArray(new CompletableFuture[0]))
- .thenAccept(__ -> log.info("Broker service completely shut down"));
+ asyncCloseFutures.add(GracefulExecutorServicesShutdown
+ .initiate()
+ .timeout(
+ Duration.ofMillis(
+ (long) (GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT
+ * pulsar.getConfiguration()
+ .getBrokerShutdownTimeoutMs())))
+ .shutdown(
+ statsUpdater,
+ inactivityMonitor,
+ messageExpiryMonitor,
+ compactionMonitor,
+ messagePublishBufferMonitor,
+ consumedLedgersMonitor,
+ backlogQuotaChecker,
+ topicOrderedExecutor,
+ topicPublishRateLimiterMonitor,
+ brokerPublishRateLimiterMonitor,
+ deduplicationSnapshotMonitor)
+ .handle());
+
+ CompletableFuture<Void> combined =
+ FutureUtil.waitForAllAndSupportCancel(asyncCloseFutures);
+ cancellableDownstreamFutureReference.complete(combined);
+ combined.handle((v, t) -> {
+ if (t == null) {
+ log.info("Broker service completely shut down");
+ } else {
+ if (t instanceof CancellationException) {
+ log.warn("Broker service didn't complete gracefully. "
+ + "Terminating Broker service.");
+ } else {
+ log.warn("Broker service shut down completed with exception", t);
+ }
+ }
+ return null;
+ });
+ return combined;
+ });
+ FutureUtil.whenCancelledOrTimedOut(shutdownFuture, () -> cancellableDownstreamFutureReference
+ .thenAccept(future -> future.cancel(false)));
return shutdownFuture;
} catch (Exception e) {
return FutureUtil.failedFuture(e);
}
}
+ CompletableFuture<Void> shutdownEventLoopGracefully(EventLoopGroup eventLoopGroup) {
+ long brokerShutdownTimeoutMs = pulsar.getConfiguration().getBrokerShutdownTimeoutMs();
+ long quietPeriod = Math.min((long) (
+ GRACEFUL_SHUTDOWN_QUIET_PERIOD_RATIO_OF_TOTAL_TIMEOUT * brokerShutdownTimeoutMs),
+ GRACEFUL_SHUTDOWN_QUIET_PERIOD_MAX_MS);
+ long timeout = (long) (GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT * brokerShutdownTimeoutMs);
+ return NettyFutureUtil.toCompletableFutureVoid(
+ eventLoopGroup.shutdownGracefully(quietPeriod,
+ timeout, TimeUnit.MILLISECONDS));
+ }
+
private CompletableFuture<Void> closeChannel(Channel channel) {
return ChannelFutures.toCompletableFuture(channel.close())
- // convert to CompletableFuture<Void>
- .thenAccept(__ -> {});
+ .handle((c, t) -> {
+ // log problem if closing of channel fails
+ // ignore RejectedExecutionException
+ if (t != null && !(t instanceof RejectedExecutionException)) {
+ log.warn("Cannot close channel {}", channel, t);
+ }
+ return null;
+ });
}
/**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/GracefulExecutorServicesShutdown.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/GracefulExecutorServicesShutdown.java
new file mode 100644
index 0000000..1c7134f
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/GracefulExecutorServicesShutdown.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This a builder like class for providing a fluent API for graceful shutdown
+ *
+ * Executors are added with the {@link #shutdown(ExecutorService...)}
+ * method. The {@link ExecutorService#shutdown()} method is called immediately.
+ *
+ * Calling the {@link #handle()} method returns a future which completes when all executors
+ * have been terminated.
+ *
+ * The executors will waited for completion with the {@link ExecutorService#awaitTermination(long, TimeUnit)} method.
+ * If the shutdown times out or the future is cancelled, all executors will be terminated and the termination
+ * timeout value will be used for waiting for termination.
+ * The default value for termination timeout is 10% of the shutdown timeout.
+ */
+public class GracefulExecutorServicesShutdown {
+ private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(15);
+ private static final Double DEFAULT_TERMINATION_TIMEOUT_RATIO = 0.1d;
+ private final List<ExecutorService> executorServices = new ArrayList<>();
+ private Duration timeout = DEFAULT_TIMEOUT;
+ private Duration terminationTimeout;
+
+ private GracefulExecutorServicesShutdown() {
+
+ }
+
+ /**
+ * Initiates a new shutdown for one or many {@link ExecutorService}s.
+ *
+ * @return a new instance for controlling graceful shutdown
+ */
+ public static GracefulExecutorServicesShutdown initiate() {
+ return new GracefulExecutorServicesShutdown();
+ }
+
+ /**
+ * Calls {@link ExecutorService#shutdown()} and enlists the executor as part of the
+ * shutdown handling.
+ *
+ * @param executorServices one or many executors to shutdown
+ * @return the current instance for controlling graceful shutdown
+ */
+ public GracefulExecutorServicesShutdown shutdown(ExecutorService... executorServices) {
+ for (ExecutorService executorService : executorServices) {
+ if (executorService != null) {
+ executorService.shutdown();
+ this.executorServices.add(executorService);
+ }
+ }
+ return this;
+ }
+
+ /**
+ * Sets the timeout for graceful shutdown.
+ *
+ * @param timeout duration for the timeout
+ * @return the current instance for controlling graceful shutdown
+ */
+ public GracefulExecutorServicesShutdown timeout(Duration timeout) {
+ this.timeout = timeout;
+ return this;
+ }
+
+ /**
+ * Sets the timeout for waiting for executors to complete in forceful termination.
+ *
+ * @param terminationTimeout duration for the timeout
+ * @return the current instance for controlling graceful shutdown
+ */
+ public GracefulExecutorServicesShutdown terminationTimeout(Duration terminationTimeout) {
+ this.terminationTimeout = terminationTimeout;
+ return this;
+ }
+
+ /**
+ * Starts the handler for polling frequently for the completed termination of enlisted executors.
+ *
+ * If the termination times out or the future is cancelled, all active executors will be forcefully
+ * terminated by calling {@link ExecutorService#shutdownNow()}.
+ *
+ * @return a future which completes when all executors have terminated
+ */
+ public CompletableFuture<Void> handle() {
+ // if termination timeout isn't provided, calculate a termination timeout based on the shutdown timeout
+ if (terminationTimeout == null) {
+ terminationTimeout = Duration.ofNanos((long) (timeout.toNanos() * DEFAULT_TERMINATION_TIMEOUT_RATIO));
+ }
+ return new GracefulExecutorServicesTerminationHandler(timeout, terminationTimeout,
+ executorServices).getFuture();
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/GracefulExecutorServicesTerminationHandler.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/GracefulExecutorServicesTerminationHandler.java
new file mode 100644
index 0000000..9abe078
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/GracefulExecutorServicesTerminationHandler.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.util.FutureUtil;
+
+/**
+ * Waits for termination of {@link ExecutorService}s that have been shutdown.
+ *
+ * The executors will be terminated forcefully after the timeout or when the future is cancelled.
+ *
+ * Designed to be used via the API in {@link GracefulExecutorServicesShutdown}
+ */
+@Slf4j
+class GracefulExecutorServicesTerminationHandler {
+ private static final long SHUTDOWN_THREAD_COMPLETION_TIMEOUT_NANOS = Duration.ofMillis(100L).toNanos();
+ private final List<ExecutorService> executors;
+ private final CompletableFuture<Void> future;
+ private final Duration shutdownTimeout;
+ private final Duration terminationTimeout;
+ private final CountDownLatch shutdownThreadCompletedLatch = new CountDownLatch(1);
+
+ GracefulExecutorServicesTerminationHandler(Duration shutdownTimeout, Duration terminationTimeout,
+ List<ExecutorService> executorServices) {
+ this.shutdownTimeout = shutdownTimeout;
+ this.terminationTimeout = terminationTimeout;
+ this.executors = Collections.unmodifiableList(new ArrayList<>(executorServices));
+ this.future = new CompletableFuture<>();
+ log.info("Starting termination handler for {} executors.", executors.size());
+ for (ExecutorService executor : executors) {
+ if (!executor.isShutdown()) {
+ throw new IllegalStateException(
+ String.format("Executor %s should have been shutdown before entering the termination handler.",
+ executor));
+ }
+ }
+ if (haveExecutorsBeenTerminated()) {
+ markShutdownCompleted();
+ } else {
+ if (shutdownTimeout.isZero() || shutdownTimeout.isNegative()) {
+ terminateExecutors();
+ markShutdownCompleted();
+ } else {
+ Thread shutdownWaitingThread = new Thread(this::awaitShutdown, getClass().getSimpleName());
+ shutdownWaitingThread.start();
+ FutureUtil.whenCancelledOrTimedOut(future, () -> {
+ shutdownWaitingThread.interrupt();
+ waitUntilShutdownWaitingThreadIsCompleted();
+ });
+ }
+ }
+ }
+
+ public CompletableFuture<Void> getFuture() {
+ return future;
+ }
+
+ private boolean haveExecutorsBeenTerminated() {
+ return executors.stream().allMatch(ExecutorService::isTerminated);
+ }
+
+ private void markShutdownCompleted() {
+ log.info("Shutdown completed.");
+ future.complete(null);
+ }
+
+ private void awaitShutdown() {
+ try {
+ awaitTermination(shutdownTimeout);
+ terminateExecutors();
+ markShutdownCompleted();
+ } catch (Exception e) {
+ log.error("Error in termination handler", e);
+ future.completeExceptionally(e);
+ } finally {
+ shutdownThreadCompletedLatch.countDown();
+ }
+ }
+
+ private boolean awaitTermination(Duration timeout) {
+ if (!timeout.isZero() && !timeout.isNegative()) {
+ long awaitUntilNanos = System.nanoTime() + timeout.toNanos();
+ while (!Thread.currentThread().isInterrupted() && System.nanoTime() < awaitUntilNanos) {
+ int activeExecutorsCount = executors.size();
+ for (ExecutorService executor : executors) {
+ long remainingTimeNanos = awaitUntilNanos - System.nanoTime();
+ if (remainingTimeNanos > 0) {
+ try {
+ if (executor.isTerminated()
+ || executor.awaitTermination(remainingTimeNanos, TimeUnit.NANOSECONDS)) {
+ activeExecutorsCount--;
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ }
+ if (activeExecutorsCount == 0) {
+ return true;
+ }
+ }
+ }
+ return haveExecutorsBeenTerminated();
+ }
+
+ private void terminateExecutors() {
+ for (ExecutorService executor : executors) {
+ if (!executor.isTerminated()) {
+ log.info("Shutting down forcefully executor {}", executor);
+ executor.shutdownNow();
+ }
+ }
+ if (!Thread.currentThread().isInterrupted() && !awaitTermination(terminationTimeout)) {
+ for (ExecutorService executor : executors) {
+ if (!executor.isTerminated()) {
+ log.warn("Executor {} didn't shutdown after waiting for termination.", executor);
+ for (Runnable runnable : executor.shutdownNow()) {
+ log.info("Execution in progress for runnable instance of {}: {}", runnable.getClass(),
+ runnable);
+ }
+ }
+ }
+ }
+ }
+
+ private void waitUntilShutdownWaitingThreadIsCompleted() {
+ try {
+ shutdownThreadCompletedLatch.await(terminationTimeout.toNanos()
+ + SHUTDOWN_THREAD_COMPLETION_TIMEOUT_NANOS, TimeUnit.NANOSECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
index 0ff1ca1..a326d57 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
@@ -23,6 +23,7 @@ import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.AssertJUnit.assertSame;
import java.util.Optional;
+import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
@@ -33,12 +34,14 @@ import org.testng.annotations.Test;
public class PulsarServiceTest {
@Test
- public void testGetWorkerService() {
+ public void testGetWorkerService() throws Exception {
ServiceConfiguration configuration = new ServiceConfiguration();
configuration.setZookeeperServers("localhost");
configuration.setClusterName("clusterName");
configuration.setFunctionsWorkerEnabled(true);
+ configuration.setBrokerShutdownTimeoutMs(0L);
WorkerService expectedWorkerService = mock(WorkerService.class);
+ @Cleanup
PulsarService pulsarService = spy(new PulsarService(configuration, new WorkerConfig(),
Optional.of(expectedWorkerService), (exitCode) -> {}));
@@ -56,6 +59,8 @@ public class PulsarServiceTest {
configuration.setZookeeperServers("localhost");
configuration.setClusterName("clusterName");
configuration.setFunctionsWorkerEnabled(false);
+ configuration.setBrokerShutdownTimeoutMs(0L);
+ @Cleanup
PulsarService pulsarService = new PulsarService(configuration, new WorkerConfig(),
Optional.empty(), (exitCode) -> {});
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
index 7e09ab9..4437b43 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
@@ -22,7 +22,6 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-
import java.net.URL;
import java.util.HashSet;
import java.util.List;
@@ -33,9 +32,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-
import lombok.extern.slf4j.Slf4j;
-
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -79,12 +76,13 @@ public class SLAMonitoringTest {
// start brokers
for (int i = 0; i < BROKER_COUNT; i++) {
ServiceConfiguration config = new ServiceConfiguration();
+ config.setBrokerShutdownTimeoutMs(0L);
config.setBrokerServicePort(Optional.of(0));
+ config.setBrokerShutdownTimeoutMs(0L);
config.setClusterName("my-cluster");
config.setAdvertisedAddress("localhost");
config.setWebServicePort(Optional.of(0));
config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
- config.setBrokerServicePort(Optional.of(0));
config.setDefaultNumberOfNamespaceBundles(1);
config.setLoadBalancerEnabled(false);
configurations[i] = config;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java
index 24a32a0..bbc70d0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java
@@ -113,6 +113,7 @@ public class BrokerAdminClientTlsAuthTest extends MockedPulsarServiceBaseTest {
/***** Start Broker 2 ******/
ServiceConfiguration conf = new ServiceConfiguration();
+ conf.setBrokerShutdownTimeoutMs(0L);
conf.setBrokerServicePort(Optional.of(0));
conf.setBrokerServicePortTls(Optional.of(0));
conf.setWebServicePort(Optional.of(0));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index ed6db6a..3ccf925 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -157,6 +157,7 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
}
protected void doInitConf() throws Exception {
+ this.conf.setBrokerShutdownTimeoutMs(0L);
this.conf.setBrokerServicePort(Optional.of(0));
this.conf.setBrokerServicePortTls(Optional.of(0));
this.conf.setAdvertisedAddress("localhost");
@@ -243,6 +244,8 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
protected void stopBroker() throws Exception {
log.info("Stopping Pulsar broker. brokerServiceUrl: {} webServiceAddress: {}", pulsar.getBrokerServiceUrl(),
pulsar.getWebServiceAddress());
+ // set shutdown timeout to 0 for forceful shutdown
+ pulsar.getConfiguration().setBrokerShutdownTimeoutMs(0L);
pulsar.close();
pulsar = null;
// Simulate cleanup of ephemeral nodes
@@ -276,6 +279,7 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
}
protected PulsarService startBrokerWithoutAuthorization(ServiceConfiguration conf) throws Exception {
+ conf.setBrokerShutdownTimeoutMs(0L);
PulsarService pulsar = spy(new PulsarService(conf));
setupBrokerMocks(pulsar);
pulsar.start();
@@ -431,6 +435,7 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
configuration.setZookeeperServers("localhost:2181");
configuration.setConfigurationStoreServers("localhost:3181");
configuration.setAllowAutoTopicCreationType("non-partitioned");
+ configuration.setBrokerShutdownTimeoutMs(0L);
configuration.setBrokerServicePort(Optional.of(0));
configuration.setBrokerServicePortTls(Optional.of(0));
configuration.setWebServicePort(Optional.of(0));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
index cbb8e40..a827912 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
@@ -111,6 +111,7 @@ public class AntiAffinityNamespaceGroupTest {
config1.setClusterName("use");
config1.setWebServicePort(Optional.of(0));
config1.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config1.setBrokerShutdownTimeoutMs(0L);
config1.setBrokerServicePort(Optional.of(0));
config1.setFailureDomainsEnabled(true);
config1.setLoadBalancerEnabled(true);
@@ -131,6 +132,7 @@ public class AntiAffinityNamespaceGroupTest {
config2.setClusterName("use");
config2.setWebServicePort(Optional.of(0));
config2.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config2.setBrokerShutdownTimeoutMs(0L);
config2.setBrokerServicePort(Optional.of(0));
config2.setFailureDomainsEnabled(true);
config2.setAdvertisedAddress("localhost");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
index 581531d..5f0a1fb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
@@ -67,6 +67,7 @@ public class LeaderElectionServiceTest {
PulsarAdminException {
final String clusterName = "elect-test";
ServiceConfiguration config = new ServiceConfiguration();
+ config.setBrokerShutdownTimeoutMs(0L);
config.setBrokerServicePort(Optional.of(0));
config.setWebServicePort(Optional.of(0));
config.setClusterName(clusterName);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
index 84750da..83c8e7e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
@@ -141,6 +141,7 @@ public class LoadBalancerTest {
config.setBrokerServicePortTls(Optional.of(0));
config.setWebServicePortTls(Optional.of(0));
config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config.setBrokerShutdownTimeoutMs(0L);
config.setBrokerServicePort(Optional.of(0));
config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
config.setAdvertisedAddress(localhost+i);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
index 3c6d0f4..d9ebda7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
@@ -147,6 +147,7 @@ public class ModularLoadManagerImplTest {
config1.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
config1.setAdvertisedAddress("localhost");
+ config1.setBrokerShutdownTimeoutMs(0L);
config1.setBrokerServicePort(Optional.of(0));
config1.setBrokerServicePortTls(Optional.of(0));
config1.setWebServicePortTls(Optional.of(0));
@@ -164,6 +165,7 @@ public class ModularLoadManagerImplTest {
config2.setWebServicePort(Optional.of(0));
config2.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
config2.setAdvertisedAddress("localhost");
+ config2.setBrokerShutdownTimeoutMs(0L);
config2.setBrokerServicePort(Optional.of(0));
config2.setBrokerServicePortTls(Optional.of(0));
config2.setWebServicePortTls(Optional.of(0));
@@ -593,6 +595,7 @@ public class ModularLoadManagerImplTest {
config.setClusterName("use");
config.setWebServicePort(Optional.of(0));
config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config.setBrokerShutdownTimeoutMs(0L);
config.setBrokerServicePort(Optional.of(0));
PulsarService pulsar = new PulsarService(config);
// create znode using different zk-session
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
index cced810..4307955 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
@@ -122,6 +122,7 @@ public class SimpleLoadManagerImplTest {
config1.setClusterName("use");
config1.setWebServicePort(Optional.of(0));
config1.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config1.setBrokerShutdownTimeoutMs(0L);
config1.setBrokerServicePort(Optional.of(0));
config1.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
config1.setBrokerServicePortTls(Optional.of(0));
@@ -140,6 +141,7 @@ public class SimpleLoadManagerImplTest {
config2.setClusterName("use");
config2.setWebServicePort(Optional.of(0));
config2.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config2.setBrokerShutdownTimeoutMs(0L);
config2.setBrokerServicePort(Optional.of(0));
config2.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
config2.setBrokerServicePortTls(Optional.of(0));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
index ec9fde5..c1d15eb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
@@ -109,6 +109,7 @@ public class OwnerShipForCurrentServerTestBase {
conf.setBookkeeperClientExposeStatsToPrometheus(true);
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
+ conf.setBrokerShutdownTimeoutMs(0L);
conf.setBrokerServicePort(Optional.of(0));
conf.setBrokerServicePortTls(Optional.of(0));
conf.setAdvertisedAddress("localhost");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
index c5e11c8..592bfff 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
@@ -47,6 +47,7 @@ public class AdvertisedAddressTest {
bkEnsemble.start();
ServiceConfiguration config = new ServiceConfiguration();
+ config.setBrokerShutdownTimeoutMs(0L);
config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
config.setWebServicePort(Optional.ofNullable(0));
config.setClusterName("usc");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index 1be4aea..c310196 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -82,6 +82,7 @@ public class BacklogQuotaManagerTest {
config.setAdvertisedAddress("localhost");
config.setWebServicePort(Optional.of(0));
config.setClusterName("usc");
+ config.setBrokerShutdownTimeoutMs(0L);
config.setBrokerServicePort(Optional.of(0));
config.setAuthorizationEnabled(false);
config.setAuthenticationEnabled(false);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
index c3e2fe2..2140190 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
@@ -77,6 +77,7 @@ public abstract class BkEnsemblesTestBase extends TestRetrySupport {
config.setAdvertisedAddress("localhost");
config.setWebServicePort(Optional.of(0));
config.setClusterName("usc");
+ config.setBrokerShutdownTimeoutMs(0L);
config.setBrokerServicePort(Optional.of(0));
config.setAuthorizationEnabled(false);
config.setAuthenticationEnabled(false);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
index a984bac..b30150c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
@@ -146,6 +146,7 @@ public class BrokerBookieIsolationTest {
config.setClusterName(cluster);
config.setWebServicePort(Optional.of(0));
config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config.setBrokerShutdownTimeoutMs(0L);
config.setBrokerServicePort(Optional.of(0));
config.setAdvertisedAddress("localhost");
config.setBookkeeperClientIsolationGroups(brokerBookkeeperClientIsolationGroups);
@@ -290,6 +291,7 @@ public class BrokerBookieIsolationTest {
config.setClusterName(cluster);
config.setWebServicePort(Optional.of(0));
config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config.setBrokerShutdownTimeoutMs(0L);
config.setBrokerServicePort(Optional.of(0));
config.setAdvertisedAddress("localhost");
config.setBookkeeperClientIsolationGroups(brokerBookkeeperClientIsolationGroups);
@@ -414,6 +416,7 @@ public class BrokerBookieIsolationTest {
config.setClusterName(cluster);
config.setWebServicePort(Optional.of(0));
config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config.setBrokerShutdownTimeoutMs(0L);
config.setBrokerServicePort(Optional.of(0));
config.setAdvertisedAddress("localhost");
config.setBookkeeperClientIsolationGroups(brokerBookkeeperClientIsolationGroups);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GracefulExecutorServicesShutdownTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GracefulExecutorServicesShutdownTest.java
new file mode 100644
index 0000000..b978507
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GracefulExecutorServicesShutdownTest.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.testng.annotations.Test;
+
+public class GracefulExecutorServicesShutdownTest {
+
+ @Test
+ public void shouldShutdownExecutorsImmediately() throws ExecutionException, InterruptedException, TimeoutException {
+ // given
+ GracefulExecutorServicesShutdown shutdown = GracefulExecutorServicesShutdown.initiate();
+ ExecutorService executorService = mock(ExecutorService.class);
+ when(executorService.isTerminated()).thenReturn(true);
+ when(executorService.isShutdown()).thenReturn(true);
+
+ // when
+ shutdown.shutdown(executorService);
+ CompletableFuture<Void> future = shutdown.handle();
+
+ // then
+ verify(executorService, atLeastOnce()).shutdown();
+ future.get(1, TimeUnit.SECONDS);
+ verify(executorService, never()).shutdownNow();
+ assertTrue(future.isDone());
+ }
+
+ @Test
+ public void shouldTerminateExecutorOnTimeout() throws ExecutionException, InterruptedException, TimeoutException {
+ // given
+ GracefulExecutorServicesShutdown shutdown = GracefulExecutorServicesShutdown.initiate();
+ shutdown.timeout(Duration.ofMillis(500));
+ ExecutorService executorService = mock(ExecutorService.class);
+ when(executorService.isShutdown()).thenReturn(true);
+ AtomicBoolean terminated = new AtomicBoolean();
+ when(executorService.isTerminated()).thenAnswer(invocation -> terminated.get());
+ when(executorService.shutdownNow()).thenAnswer(invocation -> {
+ terminated.set(true);
+ return null;
+ });
+ when(executorService.awaitTermination(anyLong(), any())).thenAnswer(invocation -> {
+ long timeout = invocation.getArgument(0);
+ TimeUnit unit = invocation.getArgument(1);
+ Thread.sleep(unit.toMillis(timeout));
+ return terminated.get();
+ });
+
+ // when
+ shutdown.shutdown(executorService);
+ CompletableFuture<Void> future = shutdown.handle();
+
+ // then
+ future.get(1, TimeUnit.SECONDS);
+ verify(executorService, atLeastOnce()).shutdownNow();
+ assertTrue(future.isDone());
+ }
+
+ @Test
+ public void shouldWaitForExecutorToTerminate() throws ExecutionException, InterruptedException, TimeoutException {
+ // given
+ GracefulExecutorServicesShutdown shutdown = GracefulExecutorServicesShutdown.initiate();
+ shutdown.timeout(Duration.ofMillis(500));
+ ExecutorService executorService = mock(ExecutorService.class);
+ when(executorService.isShutdown()).thenReturn(true);
+ AtomicBoolean terminated = new AtomicBoolean();
+ when(executorService.isTerminated()).thenAnswer(invocation -> terminated.get());
+ when(executorService.awaitTermination(anyLong(), any())).thenAnswer(invocation -> {
+ long timeout = invocation.getArgument(0);
+ // wait half the time to simulate the termination completing
+ timeout = timeout / 2;
+ TimeUnit unit = invocation.getArgument(1);
+ Thread.sleep(unit.toMillis(timeout));
+ terminated.set(true);
+ return terminated.get();
+ });
+
+ // when
+ shutdown.shutdown(executorService);
+ CompletableFuture<Void> future = shutdown.handle();
+
+ // then
+ future.get(1, TimeUnit.SECONDS);
+ verify(executorService, times(1)).awaitTermination(anyLong(), any());
+ verify(executorService, never()).shutdownNow();
+ assertTrue(future.isDone());
+ }
+
+
+ @Test
+ public void shouldTerminateWhenFutureIsCancelled() throws InterruptedException {
+ // given
+ GracefulExecutorServicesShutdown shutdown = GracefulExecutorServicesShutdown.initiate();
+ shutdown.timeout(Duration.ofMillis(15000));
+ ExecutorService executorService = mock(ExecutorService.class);
+ when(executorService.isShutdown()).thenReturn(true);
+ AtomicBoolean terminated = new AtomicBoolean();
+ AtomicBoolean awaitTerminationInterrupted = new AtomicBoolean();
+ when(executorService.isTerminated()).thenAnswer(invocation -> terminated.get());
+ when(executorService.awaitTermination(anyLong(), any())).thenAnswer(invocation -> {
+ long timeout = invocation.getArgument(0);
+ TimeUnit unit = invocation.getArgument(1);
+ try {
+ Thread.sleep(unit.toMillis(timeout));
+ } catch (InterruptedException e) {
+ awaitTerminationInterrupted.set(true);
+ Thread.currentThread().interrupt();
+ throw e;
+ }
+ throw new IllegalStateException("Thread.sleep should have been interrupted");
+ });
+ when(executorService.shutdownNow()).thenAnswer(invocation -> {
+ terminated.set(true);
+ return null;
+ });
+
+ // when
+ shutdown.shutdown(executorService);
+ CompletableFuture<Void> future = shutdown.handle();
+ future.cancel(false);
+
+ // then
+ assertTrue(awaitTerminationInterrupted.get(),
+ "awaitTermination should have been interrupted");
+ verify(executorService, times(1)).awaitTermination(anyLong(), any());
+ verify(executorService, times(1)).shutdownNow();
+ }
+
+ @Test
+ public void shouldAcceptNullReferenceAndIgnoreIt() {
+ ExecutorService executorService = null;
+ CompletableFuture<Void> future = GracefulExecutorServicesShutdown.initiate()
+ .shutdown(executorService)
+ .handle();
+ assertTrue(future.isDone());
+ }
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java
index c91f5da..b0dcb83 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java
@@ -63,6 +63,7 @@ public class MaxMessageSizeTest {
configuration.setAdvertisedAddress("localhost");
configuration.setWebServicePort(Optional.of(0));
configuration.setClusterName("max_message_test");
+ configuration.setBrokerShutdownTimeoutMs(0L);
configuration.setBrokerServicePort(Optional.of(0));
configuration.setAuthorizationEnabled(false);
configuration.setAuthenticationEnabled(false);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index f524b98..e715738 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -112,6 +112,7 @@ public class PersistentDispatcherFailoverConsumerTest {
@BeforeMethod
public void setup() throws Exception {
ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
+ svcConfig.setBrokerShutdownTimeoutMs(0L);
pulsar = spy(new PulsarService(svcConfig));
doReturn(svcConfig).when(pulsar).getConfiguration();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
index 4179a63..875566f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Cleanup;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
@@ -79,6 +80,8 @@ public class PersistentTopicConcurrentTest extends MockedBookKeeperTestCase {
public void setup(Method m) throws Exception {
super.setUp(m);
ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
+ svcConfig.setBrokerShutdownTimeoutMs(0L);
+ @Cleanup
PulsarService pulsar = spy(new PulsarService(svcConfig));
doReturn(svcConfig).when(pulsar).getConfiguration();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 4865613..f09e31f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -164,6 +164,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
executor = OrderedExecutor.newBuilder().numThreads(1).build();
ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
svcConfig.setAdvertisedAddress("localhost");
+ svcConfig.setBrokerShutdownTimeoutMs(0L);
pulsar = spy(new PulsarService(svcConfig));
doReturn(svcConfig).when(pulsar).getConfiguration();
doReturn(mock(Compactor.class)).when(pulsar).getCompactor();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index 20b98e0..aeaacf9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -209,6 +209,7 @@ public abstract class ReplicatorTestBase extends TestRetrySupport {
config.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
config.setBrokerDeleteInactiveTopicsFrequencySeconds(
inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
+ config.setBrokerShutdownTimeoutMs(0L);
config.setBrokerServicePort(Optional.of(0));
config.setBrokerServicePortTls(Optional.of(0));
config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 88ec8da..c5a9335 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -152,6 +152,7 @@ public class ServerCnxTest {
public void setup() throws Exception {
executor = OrderedExecutor.newBuilder().numThreads(1).build();
svcConfig = spy(new ServiceConfiguration());
+ svcConfig.setBrokerShutdownTimeoutMs(0L);
pulsar = spy(new PulsarService(svcConfig));
doReturn(new DefaultSchemaRegistryService()).when(pulsar).getSchemaRegistryService();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
index 7e47559..62ec5f4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
@@ -95,6 +95,7 @@ public class TopicOwnerTest {
// start brokers
for (int i = 0; i < BROKER_COUNT; i++) {
ServiceConfiguration config = new ServiceConfiguration();
+ config.setBrokerShutdownTimeoutMs(0L);
config.setBrokerServicePort(Optional.of(0));
config.setClusterName("my-cluster");
config.setAdvertisedAddress("localhost");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
index 455a3f6..bfa678a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
@@ -110,6 +110,7 @@ public class PersistentSubscriptionTest {
executor = Executors.newSingleThreadExecutor();
ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
+ svcConfig.setBrokerShutdownTimeoutMs(0L);
svcConfig.setTransactionCoordinatorEnabled(true);
pulsarMock = spy(new PulsarService(svcConfig));
doReturn(new InMemTransactionBufferProvider()).when(pulsarMock).getTransactionBufferProvider();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index 8fc1fd1..62b0aed 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -119,6 +119,7 @@ public abstract class TransactionTestBase extends TestRetrySupport {
conf.setBookkeeperClientExposeStatsToPrometheus(true);
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
+ conf.setBrokerShutdownTimeoutMs(0L);
conf.setBrokerServicePort(Optional.of(0));
conf.setBrokerServicePortTls(Optional.of(0));
conf.setAdvertisedAddress("localhost");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
index 16cc475..0457907 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
@@ -64,6 +64,7 @@ public abstract class TransactionMetaStoreTestBase extends TestRetrySupport {
// start brokers
for (int i = 0; i < BROKER_COUNT; i++) {
ServiceConfiguration config = new ServiceConfiguration();
+ config.setBrokerShutdownTimeoutMs(0L);
config.setBrokerServicePort(Optional.of(0));
config.setClusterName("my-cluster");
config.setAdvertisedAddress("localhost");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
index c37d24a..24dd62d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
@@ -337,6 +337,7 @@ public class WebServiceTest {
ServiceConfiguration config = new ServiceConfiguration();
config.setAdvertisedAddress("localhost");
+ config.setBrokerShutdownTimeoutMs(0L);
config.setBrokerServicePort(Optional.of(0));
config.setWebServicePort(Optional.of(0));
if (enableTls) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index ae89a55..383cd08 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -135,6 +135,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
/**** start broker-2 ****/
ServiceConfiguration conf2 = new ServiceConfiguration();
+ conf2.setBrokerShutdownTimeoutMs(0L);
conf2.setBrokerServicePort(Optional.of(0));
conf2.setWebServicePort(Optional.of(0));
conf2.setAdvertisedAddress("localhost");
@@ -212,6 +213,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
final String property = "my-property2";
ServiceConfiguration conf2 = new ServiceConfiguration();
conf2.setAdvertisedAddress("localhost");
+ conf2.setBrokerShutdownTimeoutMs(0L);
conf2.setBrokerServicePort(Optional.of(0));
conf2.setWebServicePort(Optional.of(0));
conf2.setAdvertisedAddress("localhost");
@@ -301,6 +303,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
/**** start broker-2 ****/
ServiceConfiguration conf2 = new ServiceConfiguration();
conf2.setAdvertisedAddress("localhost");
+ conf2.setBrokerShutdownTimeoutMs(0L);
conf2.setBrokerServicePort(Optional.of(0));
conf2.setWebServicePort(Optional.of(0));
conf2.setAdvertisedAddress("localhost");
@@ -376,7 +379,9 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
/**** start broker-2 ****/
ServiceConfiguration conf2 = new ServiceConfiguration();
+ conf2.setBrokerShutdownTimeoutMs(0L);
conf2.setAdvertisedAddress("localhost");
+ conf2.setBrokerShutdownTimeoutMs(0L);
conf2.setBrokerServicePort(Optional.of(0));
conf2.setBrokerServicePortTls(Optional.of(0));
conf2.setWebServicePort(Optional.of(0));
@@ -810,6 +815,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
// (1) Start broker-1
ServiceConfiguration conf2 = new ServiceConfiguration();
conf2.setAdvertisedAddress("localhost");
+ conf2.setBrokerShutdownTimeoutMs(0L);
conf2.setBrokerServicePort(Optional.of(0));
conf2.setWebServicePort(Optional.of(0));
conf2.setAdvertisedAddress("localhost");
@@ -913,7 +919,9 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
final String namespace = "my-property/my-ns";
// (1) Start broker-1
ServiceConfiguration conf2 = new ServiceConfiguration();
+ conf2.setBrokerShutdownTimeoutMs(0L);
conf2.setAdvertisedAddress("localhost");
+ conf2.setBrokerShutdownTimeoutMs(0L);
conf2.setBrokerServicePort(Optional.of(0));
conf2.setWebServicePort(Optional.of(0));
conf2.setAdvertisedAddress("localhost");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
index a3ec437..18de76e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
@@ -81,6 +81,7 @@ public class ClientDeduplicationFailureTest {
config.setClusterName("use");
config.setWebServicePort(Optional.of(0));
config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config.setBrokerShutdownTimeoutMs(0L);
config.setBrokerServicePort(Optional.of(0));
config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
config.setTlsAllowInsecureConnection(true);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index 74ca556..deaee35 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -941,6 +941,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
config1.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
config1.setBrokerDeleteInactiveTopicsFrequencySeconds(
inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
+ config1.setBrokerShutdownTimeoutMs(0L);
config1.setBrokerServicePort(Optional.of(0));
config1.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
config1.setAllowAutoTopicCreationType("non-partitioned");
@@ -966,6 +967,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
config2.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
config2.setBrokerDeleteInactiveTopicsFrequencySeconds(
inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
+ config2.setBrokerShutdownTimeoutMs(0L);
config2.setBrokerServicePort(Optional.of(0));
config2.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
config2.setAllowAutoTopicCreationType("non-partitioned");
@@ -991,6 +993,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
config3.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
config3.setBrokerDeleteInactiveTopicsFrequencySeconds(
inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
+ config3.setBrokerShutdownTimeoutMs(0L);
config3.setBrokerServicePort(Optional.of(0));
config3.setAllowAutoTopicCreationType("non-partitioned");
pulsar3 = new PulsarService(config3);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
index 4493698..2b358dd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
@@ -106,6 +106,7 @@ public class ServiceUrlProviderTest extends ProducerConsumerBase {
.subscribe();
PulsarService pulsarService1 = pulsar;
+ conf.setBrokerShutdownTimeoutMs(0L);
conf.setBrokerServicePort(Optional.of(0));
conf.setWebServicePort(Optional.of(0));
restartBroker();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
index 7085832..87a664f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
@@ -129,6 +129,7 @@ public class PulsarFunctionE2ESecurityTest {
config.setSuperUserRoles(superUsers);
config.setWebServicePort(Optional.of(0));
config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config.setBrokerShutdownTimeoutMs(0L);
config.setBrokerServicePort(Optional.of(0));
config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
config.setAdvertisedAddress("localhost");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index 365b81f..5dc5197 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -191,6 +191,7 @@ public class PulsarFunctionLocalRunTest {
config.setWebServicePort(Optional.of(0));
config.setWebServicePortTls(Optional.of(0));
config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config.setBrokerShutdownTimeoutMs(0L);
config.setBrokerServicePort(Optional.of(0));
config.setBrokerServicePortTls(Optional.of(0));
config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
index b6474b6..f21aabeb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
@@ -126,6 +126,7 @@ public class PulsarFunctionPublishTest {
config.setWebServicePort(Optional.of(0));
config.setWebServicePortTls(Optional.of(0));
config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config.setBrokerShutdownTimeoutMs(0L);
config.setBrokerServicePort(Optional.of(0));
config.setBrokerServicePortTls(Optional.of(0));
config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
index 61e7e61..e23c25e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
@@ -89,6 +89,7 @@ public class PulsarFunctionTlsTest {
int webPort = PortManager.nextFreePort();
ServiceConfiguration config = new ServiceConfiguration();
+ config.setBrokerShutdownTimeoutMs(0L);
config.setWebServicePort(Optional.empty());
config.setWebServicePortTls(Optional.of(webPort));
config.setBrokerServicePort(Optional.empty());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
index 2c2e333..b288911 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -92,6 +92,7 @@ public class PulsarWorkerAssignmentTest {
config.setSuperUserRoles(superUsers);
config.setWebServicePort(Optional.of(0));
config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config.setBrokerShutdownTimeoutMs(0L);
config.setBrokerServicePort(Optional.of(0));
config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
config.setAdvertisedAddress("localhost");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
index 2bee28b..d5d83bd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
@@ -127,6 +127,7 @@ public abstract class AbstractPulsarE2ETest {
config.setWebServicePort(Optional.of(0));
config.setWebServicePortTls(Optional.of(0));
config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config.setBrokerShutdownTimeoutMs(0L);
config.setBrokerServicePort(Optional.of(0));
config.setBrokerServicePortTls(Optional.of(0));
config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
index 6d41c33..63f57f0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
@@ -100,6 +100,7 @@ public class PulsarFunctionAdminTest {
config.setWebServicePort(Optional.of(0));
config.setWebServicePortTls(Optional.of(0));
config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config.setBrokerShutdownTimeoutMs(0L);
config.setBrokerServicePort(Optional.of(0));
config.setBrokerServicePortTls(Optional.of(0));
config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index 6dad9a4..ac96b1d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -107,6 +107,7 @@ public class PulsarFunctionTlsTest {
bkEnsemble.start();
config = spy(new ServiceConfiguration());
+ config.setBrokerShutdownTimeoutMs(0L);
config.setClusterName("use");
Set<String> superUsers = Sets.newHashSet("superUser", "admin");
config.setSuperUserRoles(superUsers);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index d53ecfe..ab1d44e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -97,6 +97,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.SafeCollectionUtils;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 7902178..8033c18 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.client.util.ConsumerName;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/CompletableFutureCancellationHandler.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/CompletableFutureCancellationHandler.java
similarity index 74%
rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/CompletableFutureCancellationHandler.java
rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/CompletableFutureCancellationHandler.java
index 1deb387..f2de511 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/CompletableFutureCancellationHandler.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/CompletableFutureCancellationHandler.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.client.impl;
+package org.apache.pulsar.common.util;
import java.util.Objects;
import java.util.concurrent.CancellationException;
@@ -41,10 +41,16 @@ import java.util.function.BiConsumer;
* any "downstream" dependent futures. A cancellation or timeout that happens in any "upstream"
* future will get handled.
*/
-class CompletableFutureCancellationHandler {
- private volatile boolean cancelled;
+public class CompletableFutureCancellationHandler {
+ private enum CompletionStatus {
+ PENDING,
+ CANCELLED,
+ DONE
+ }
+ private volatile CompletionStatus completionStatus = CompletionStatus.PENDING;
private volatile Runnable cancelAction;
private final AtomicBoolean cancelHandled = new AtomicBoolean();
+ private boolean attached;
/**
* Creates a new {@link CompletableFuture} and attaches the cancellation handler
@@ -61,12 +67,15 @@ class CompletableFutureCancellationHandler {
/**
* Attaches the cancellation handler to handle cancels
- * and timeouts
+ * and timeouts. A cancellation handler instance can be used only once.
*
* @param future the future to attach the handler to
- * @param <T> the result type of the future
*/
- public <T> void attachToFuture(CompletableFuture<T> future) {
+ public synchronized void attachToFuture(CompletableFuture<?> future) {
+ if (attached) {
+ throw new IllegalStateException("A future has already been attached to this instance.");
+ }
+ attached = true;
future.whenComplete(whenCompleteFunction());
}
@@ -81,7 +90,7 @@ class CompletableFutureCancellationHandler {
* @param cancelAction the action to run when the the future gets cancelled or timeouts
*/
public void setCancelAction(Runnable cancelAction) {
- if (this.cancelAction != null) {
+ if (this.cancelAction != null || cancelHandled.get()) {
throw new IllegalStateException("cancelAction can only be set once.");
}
this.cancelAction = Objects.requireNonNull(cancelAction);
@@ -89,18 +98,25 @@ class CompletableFutureCancellationHandler {
runCancelActionOnceIfCancelled();
}
- private <T> BiConsumer<? super T, ? super Throwable> whenCompleteFunction() {
- return (T t, Throwable throwable) -> {
+ private BiConsumer<Object, ? super Throwable> whenCompleteFunction() {
+ return (v, throwable) -> {
if (throwable instanceof CancellationException || throwable instanceof TimeoutException) {
- cancelled = true;
+ completionStatus = CompletionStatus.CANCELLED;
+ } else {
+ completionStatus = CompletionStatus.DONE;
}
runCancelActionOnceIfCancelled();
};
}
private void runCancelActionOnceIfCancelled() {
- if (cancelled && cancelAction != null && cancelHandled.compareAndSet(false, true)) {
- cancelAction.run();
+ if (completionStatus != CompletionStatus.PENDING && cancelAction != null &&
+ cancelHandled.compareAndSet(false, true)) {
+ if (completionStatus == CompletionStatus.CANCELLED) {
+ cancelAction.run();
+ }
+ // clear cancel action reference when future completes
+ cancelAction = null;
}
}
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index 53b6deb..7356950 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -36,13 +36,52 @@ public class FutureUtil {
/**
* Return a future that represents the completion of the futures in the provided list.
*
- * @param futures
- * @return
+ * @param futures futures to wait for
+ * @return a new CompletableFuture that is completed when all of the given CompletableFutures complete
*/
- public static <T> CompletableFuture<Void> waitForAll(List<CompletableFuture<T>> futures) {
+ public static CompletableFuture<Void> waitForAll(List<? extends CompletableFuture<?>> futures) {
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}
+
+ /**
+ * Return a future that represents the completion of the futures in the provided list.
+ * The future will support {@link CompletableFuture#cancel(boolean)}. It will cancel
+ * all unfinished futures when the future gets cancelled.
+ *
+ * @param futures futures to wait for
+ * @return a new CompletableFuture that is completed when all of the given CompletableFutures complete
+ */
+ public static CompletableFuture<Void> waitForAllAndSupportCancel(List<? extends CompletableFuture<?>> futures) {
+ CompletableFuture[] futuresArray = futures.toArray(new CompletableFuture[0]);
+ CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(futuresArray);
+ whenCancelledOrTimedOut(combinedFuture, () -> {
+ for (CompletableFuture completableFuture : futuresArray) {
+ if (!completableFuture.isDone()) {
+ completableFuture.cancel(false);
+ }
+ }
+ });
+ return combinedFuture;
+ }
+
+ /**
+ * If the future is cancelled or times out, the cancel action will be
+ * invoked
+ *
+ * The action is executed once if the future completes with
+ * {@link java.util.concurrent.CancellationException} or {@link TimeoutException}
+ *
+ * @param future future to attach the action to
+ * @param cancelAction action to invoke if the future is cancelled or times out
+ */
+ public static void whenCancelledOrTimedOut(CompletableFuture<?> future, Runnable cancelAction) {
+ CompletableFutureCancellationHandler cancellationHandler =
+ new CompletableFutureCancellationHandler();
+ cancellationHandler.setCancelAction(cancelAction);
+ cancellationHandler.attachToFuture(future);
+ }
+
public static <T> CompletableFuture<T> failedFuture(Throwable t) {
CompletableFuture<T> future = new CompletableFuture<>();
future.completeExceptionally(t);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/EventLoopUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/EventLoopUtil.java
index 8e10bcc..a451dff 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/EventLoopUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/EventLoopUtil.java
@@ -34,6 +34,7 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadFactory;
@SuppressWarnings("checkstyle:JavadocType")
@@ -86,4 +87,13 @@ public class EventLoopUtil {
bootstrap.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
}
}
+
+ /**
+ * Shutdowns the EventLoopGroup gracefully. Returns a {@link CompletableFuture}
+ * @param eventLoopGroup the event loop to shutdown
+ * @return CompletableFuture that completes when the shutdown has completed
+ */
+ public static CompletableFuture<Void> shutdownGracefully(EventLoopGroup eventLoopGroup) {
+ return NettyFutureUtil.toCompletableFutureVoid(eventLoopGroup.shutdownGracefully());
+ }
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/NettyFutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/NettyFutureUtil.java
new file mode 100644
index 0000000..df9d24f
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/NettyFutureUtil.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.netty;
+
+import io.netty.util.concurrent.Future;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Contains utility methods for working with Netty Futures
+ */
+public class NettyFutureUtil {
+ /**
+ * Converts a Netty {@link Future} to {@link CompletableFuture}
+ *
+ * @param future Netty future
+ * @param <V> value type
+ * @return converted future instance
+ */
+ public static <V> CompletableFuture<V> toCompletableFuture(Future<V> future) {
+ Objects.requireNonNull(future, "future cannot be null");
+
+ CompletableFuture<V> adapter = new CompletableFuture<>();
+ if (future.isDone()) {
+ if (future.isSuccess()) {
+ adapter.complete(future.getNow());
+ } else {
+ adapter.completeExceptionally(future.cause());
+ }
+ } else {
+ future.addListener((Future<V> f) -> {
+ if (f.isSuccess()) {
+ adapter.complete(f.getNow());
+ } else {
+ adapter.completeExceptionally(f.cause());
+ }
+ });
+ }
+ return adapter;
+ }
+
+ /**
+ * Converts a Netty {@link Future} to {@link CompletableFuture} with Void type
+ *
+ * @param future Netty future
+ * @return converted future instance
+ */
+ public static CompletableFuture<Void> toCompletableFutureVoid(Future<?> future) {
+ Objects.requireNonNull(future, "future cannot be null");
+
+ CompletableFuture<Void> adapter = new CompletableFuture<>();
+ if (future.isDone()) {
+ if (future.isSuccess()) {
+ adapter.complete(null);
+ } else {
+ adapter.completeExceptionally(future.cause());
+ }
+ } else {
+ future.addListener(f -> {
+ if (f.isSuccess()) {
+ adapter.complete(null);
+ } else {
+ adapter.completeExceptionally(f.cause());
+ }
+ });
+ }
+ return adapter;
+ }
+}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/CompletableFutureCancellationHandlerTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/CompletableFutureCancellationHandlerTest.java
similarity index 98%
rename from pulsar-client/src/test/java/org/apache/pulsar/client/impl/CompletableFutureCancellationHandlerTest.java
rename to pulsar-common/src/test/java/org/apache/pulsar/common/util/CompletableFutureCancellationHandlerTest.java
index 9582b73..337e607 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/CompletableFutureCancellationHandlerTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/CompletableFutureCancellationHandlerTest.java
@@ -16,15 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.client.impl;
-
-import org.testng.annotations.Test;
+package org.apache.pulsar.common.util;
+import static org.testng.Assert.assertTrue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.testng.Assert.assertTrue;
+import org.testng.annotations.Test;
public class CompletableFutureCancellationHandlerTest {