You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/04/13 12:22:38 UTC

[GitHub] [pulsar] codelipenghui commented on a change in pull request #10199: Shutdown Broker gracefully, but forcefully after brokerShutdownTimeoutMs

codelipenghui commented on a change in pull request #10199:
URL: https://github.com/apache/pulsar/pull/10199#discussion_r612370039



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -694,60 +705,192 @@ public void close() throws IOException {
                 }
             });
 
-            List<CompletableFuture<Void>> asyncCloseFutures = new ArrayList<>();
+            CompletableFuture<CompletableFuture<Void>> cancellableDownstreamFutureReference = new CompletableFuture<>();
+            CompletableFuture<Void> shutdownFuture =
+                    CompletableFuture.allOf(shutdownEventLoopGracefully(acceptorGroup),
+                            shutdownEventLoopGracefully(workerGroup))
+                            .handle((v, t) -> {
+                                if (t != null) {
+                                    log.warn("Error shutting down event loops gracefully", t);
+                                } else {
+                                    log.info("Event loops shutdown completed.");
+                                }
+                                return null;
+                            })
+                            .thenCompose(__ -> {
+                                log.info("Continuing to second phase in shutdown.");
 
-            if (listenChannel != null && listenChannel.isOpen()) {
-                asyncCloseFutures.add(closeChannel(listenChannel));
-            }
+                                List<CompletableFuture<Void>> asyncCloseFutures = new ArrayList<>();
 
-            if (listenChannelTls != null && listenChannelTls.isOpen()) {
-                asyncCloseFutures.add(closeChannel(listenChannelTls));
-            }
+                                if (listenChannel != null && listenChannel.isOpen()) {
+                                    asyncCloseFutures.add(closeChannel(listenChannel));
+                                }
+
+                                if (listenChannelTls != null && listenChannelTls.isOpen()) {
+                                    asyncCloseFutures.add(closeChannel(listenChannelTls));
+                                }
+
+                                if (interceptor != null) {
+                                    interceptor.close();
+                                    interceptor = null;
+                                }
 
-            acceptorGroup.shutdownGracefully();
-            workerGroup.shutdownGracefully();
+                                try {
+                                    authenticationService.close();
+                                } catch (IOException e) {
+                                    log.warn("Error in closing authenticationService", e);
+                                }
+                                pulsarStats.close();
+                                ClientCnxnAspect.removeListener(zkStatsListener);
+                                ClientCnxnAspect.registerExecutor(null);
+                                try {
+                                    delayedDeliveryTrackerFactory.close();
+                                } catch (IOException e) {
+                                    log.warn("Error in closing delayedDeliveryTrackerFactory", e);
+                                }
 
-            if (interceptor != null) {
-                interceptor.close();
-                interceptor = null;
+                                asyncCloseFutures.add(GracefulExecutorServiceShutdownHandler
+                                        .shutdownGracefully(
+                                                (long) (GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT
+                                                        * pulsar.getConfiguration().getBrokerShutdownTimeoutMs()),
+                                                statsUpdater,
+                                                inactivityMonitor,
+                                                messageExpiryMonitor,
+                                                compactionMonitor,
+                                                messagePublishBufferMonitor,
+                                                consumedLedgersMonitor,
+                                                backlogQuotaChecker,
+                                                topicOrderedExecutor,
+                                                topicPublishRateLimiterMonitor,
+                                                brokerPublishRateLimiterMonitor,
+                                                deduplicationSnapshotMonitor));
+
+                                CompletableFuture<Void> combined =
+                                        FutureUtil.waitForAllAndSupportCancel(asyncCloseFutures);
+                                cancellableDownstreamFutureReference.complete(combined);
+                                combined.handle((v, t) -> {
+                                    if (t == null) {
+                                        log.info("Broker service completely shut down");
+                                    } else {
+                                        if (t instanceof CancellationException) {
+                                            log.warn("Broker service didn't complete gracefully. "
+                                                    + "Terminating Broker service.");
+                                        } else {
+                                            log.warn("Broker service shut down completed with exception", t);
+                                        }
+                                    }
+                                    return null;
+                                });
+                                return combined;
+                            });
+            FutureUtil.whenCancelledOrTimedOut(shutdownFuture, () -> cancellableDownstreamFutureReference
+                    .thenAccept(future -> future.cancel(false)));
+            return shutdownFuture;
+        } catch (Exception e) {
+            return FutureUtil.failedFuture(e);
+        }
+    }
+
+    CompletableFuture<Void> shutdownEventLoopGracefully(EventLoopGroup eventLoopGroup) {
+        long brokerShutdownTimeoutMs = pulsar.getConfiguration().getBrokerShutdownTimeoutMs();
+        long quietPeriod = Math.min((long) (
+                GRACEFUL_SHUTDOWN_QUIET_PERIOD_RATIO_OF_TOTAL_TIMEOUT * brokerShutdownTimeoutMs),
+                GRACEFUL_SHUTDOWN_QUIET_PERIOD_MAX_MS);
+        long timeout = (long) (GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT * brokerShutdownTimeoutMs);
+        return NettyFutureUtil.toCompletableFutureVoid(
+                eventLoopGroup.shutdownGracefully(quietPeriod,
+                        timeout, TimeUnit.MILLISECONDS));
+    }
+
+    @Slf4j
+    private static class GracefulExecutorServiceShutdownHandler {
+        private final ScheduledExecutorService shutdownScheduler = Executors.newSingleThreadScheduledExecutor(
+                new DefaultThreadFactory(getClass().getSimpleName()));
+        private final List<ExecutorService> executors;
+        private final CompletableFuture<Void> future;
+        private final long timeoutMs;
+
+        private GracefulExecutorServiceShutdownHandler(long timeoutMs, ExecutorService... executorServices) {
+            this.timeoutMs = timeoutMs;
+            executors = Arrays.stream(executorServices)
+                    .filter(Objects::nonNull)
+                    .collect(Collectors.toList());
+            future = new CompletableFuture<>();
+        }
+
+        static CompletableFuture<Void> shutdownGracefully(long timeoutMs, ExecutorService... executorServices) {
+            return new GracefulExecutorServiceShutdownHandler(timeoutMs, executorServices).doShutdownGracefully();
+        }
+
+        private CompletableFuture<Void> doShutdownGracefully() {
+            log.info("Shutting down {} executors.", executors.size());
+            executors.forEach(ExecutorService::shutdown);
+            FutureUtil.whenCancelledOrTimedOut(future, () -> {
+                terminate();
+            });
+            checkCompletion();
+            if (!shutdownScheduler.isShutdown()) {
+                try {
+                    shutdownScheduler.schedule(this::terminate, timeoutMs, TimeUnit.MILLISECONDS);
+                } catch (RejectedExecutionException e) {
+                    // ignore
+                }
             }
+            return future;
+        }
 
-            statsUpdater.shutdown();
-            inactivityMonitor.shutdown();
-            messageExpiryMonitor.shutdown();
-            compactionMonitor.shutdown();
-            messagePublishBufferMonitor.shutdown();
-            consumedLedgersMonitor.shutdown();
-            backlogQuotaChecker.shutdown();
-            authenticationService.close();
-            pulsarStats.close();
-            ClientCnxnAspect.removeListener(zkStatsListener);
-            ClientCnxnAspect.registerExecutor(null);
-            topicOrderedExecutor.shutdown();
-            delayedDeliveryTrackerFactory.close();
-            if (topicPublishRateLimiterMonitor != null) {
-                topicPublishRateLimiterMonitor.shutdown();
+        private void terminate() {
+            for (ExecutorService executor : executors) {
+                if (!executor.isTerminated()) {
+                    log.info("Shutting down forcefully executor {}", executor);
+                    for (Runnable runnable : executor.shutdownNow()) {
+                        log.info("Execution in progress for runnable instance of {}: {}", runnable.getClass(),
+                                runnable);
+                    }
+                }
             }
-            if (brokerPublishRateLimiterMonitor != null) {
-                brokerPublishRateLimiterMonitor.shutdown();
+            shutdown();
+        }
+
+        private void shutdown() {
+            if (!shutdownScheduler.isShutdown()) {
+                log.info("Shutting down scheduler.");
+                shutdownScheduler.shutdown();
             }
-            if (deduplicationSnapshotMonitor != null) {
-                deduplicationSnapshotMonitor.shutdown();
+        }
+
+        private void scheduleCheck() {
+            if (!shutdownScheduler.isShutdown()) {
+                try {
+                    shutdownScheduler
+                            .schedule(this::checkCompletion, Math.max(timeoutMs / 100, 10), TimeUnit.MILLISECONDS);

Review comment:
       Why need `timeoutMs / 100` here? Is it should be `timeoutMs`

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
##########
@@ -308,10 +310,17 @@ public void close() throws PulsarServerException {
         try {
             closeAsync().get();
         } catch (ExecutionException e) {
-            if (e.getCause() instanceof PulsarServerException) {
-                throw (PulsarServerException) e.getCause();
+            Throwable cause = e.getCause();
+            if (cause instanceof PulsarServerException) {
+                throw (PulsarServerException) cause;
+            } else if (cause instanceof TimeoutException) {
+                if (getConfiguration().getBrokerShutdownTimeoutMs() < 1000L) {
+                    // ignore shutdown timeout when it's less than 1000ms (in tests)

Review comment:
       The behavior should be consistent between the Pulsar server and the test? If users configure the shutdown timeout which < 1000L, what is the expected behavior.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java
##########
@@ -52,30 +49,15 @@ public void run() {
                     + service.getSafeWebServiceAddress() + ", broker url=" + service.getSafeBrokerServiceUrl());
         }
 
-        ExecutorService executor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("shutdown-thread"));
-
         try {
-            CompletableFuture<Void> future = new CompletableFuture<>();
-
-            executor.execute(() -> {
-                try {
-                    service.closeAsync().whenComplete((result, throwable) -> {
-                        if (throwable != null) {
-                            future.completeExceptionally(throwable);
-                        } else {
-                            future.complete(result);
-                        }
-                    });
-                } catch (Exception e) {
-                    future.completeExceptionally(e);
-                }
-            });
-
-            future.get(service.getConfiguration().getBrokerShutdownTimeoutMs(), TimeUnit.MILLISECONDS);
-
+            service.closeAsync().get();

Review comment:
       Why not use the "shutdown-thread" here?  And if we don't apply the future timeout, does the `processTerminator.accept(0)` can works? The thread might always block by the get method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org