You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/07/12 09:46:14 UTC

[GitHub] [pulsar] lhotari commented on a diff in pull request #15638: [fix] Ensure resources are disposed in a sync way while closing ManagedLedgerFactory and CoordinationService

lhotari commented on code in PR #15638:
URL: https://github.com/apache/pulsar/pull/15638#discussion_r918754279


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java:
##########
@@ -597,6 +601,8 @@ private CompletableFuture<Void> addTimeoutHandling(CompletableFuture<Void> futur
                 Duration.ofMillis(Math.max(1L, getConfiguration().getBrokerShutdownTimeoutMs())),
                 shutdownExecutor, () -> FutureUtil.createTimeoutException("Timeout in close", getClass(), "close"));
         future.handle((v, t) -> {
+            LOG.info("Shutdown timed out after {} ms", getConfiguration().getBrokerShutdownTimeoutMs());
+            LOG.info(ThreadDumpUtil.buildThreadDiagnosticString());

Review Comment:
   does this need `if (t != null) {` ?



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java:
##########
@@ -249,20 +252,20 @@ public void setup() throws Exception {
 
     @AfterMethod(alwaysRun = true)
     public void teardown() throws Exception {
-        metadataStore.close();
-        brokerService.getTopics().clear();
-        brokerService.close(); //to clear pulsarStats
-        try {
-            pulsar.close();
-        } catch (Exception e) {
-            log.warn("Failed to close pulsar service", e);
-            throw e;
-        }
-
-        executor.shutdownNow();
-        if (eventLoopGroup != null) {
-            eventLoopGroup.shutdownGracefully().get();
-        }
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        futures.add(brokerService.closeAsync());
+        futures.add(pulsar.closeAsync());
+        futures.add(GracefulExecutorServicesShutdown.initiate()
+                .timeout(Duration.ZERO)
+                .shutdown(executor)
+                .handle());
+        final CompletableFuture<Void> eventLoopGroupCloseFuture = new CompletableFuture<>();
+        eventLoopGroup.shutdownGracefully().sync().addListener(f -> {
+            eventLoopGroupCloseFuture.complete(null);
+        });

Review Comment:
   this doesn't look right. 
   
   What is the intention of calling `.sync()`? that would wait for the shutdown to happen before proceeding. The future isn't needed in that case at all so that's why the `.addListener` doesn't make sense. 
   
   There's [org.apache.pulsar.common.util.netty.NettyFutureUtil#toCompletableFuture](https://github.com/apache/pulsar/blob/12d43a86d9fb228b498914155bc3a6194874a2a2/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/NettyFutureUtil.java#L36) to adapt a Netty Future to a CompletableFuture.
   
   



##########
pulsar-common/src/main/java/org/apache/pulsar/common/util/GracefulExecutorServicesShutdown.java:
##########
@@ -114,4 +121,16 @@ public CompletableFuture<Void> handle() {
         return new GracefulExecutorServicesTerminationHandler(timeout, terminationTimeout,
                 executorServices).getFuture();
     }
+
+    public static CompletableFuture<Void> shutdownEventLoopGracefully(EventLoopGroup eventLoopGroup,
+                                                                      long brokerShutdownTimeoutMs) {
+        long quietPeriod = Math.min((long) (
+                        GRACEFUL_SHUTDOWN_QUIET_PERIOD_RATIO_OF_TOTAL_TIMEOUT * brokerShutdownTimeoutMs),
+                GRACEFUL_SHUTDOWN_QUIET_PERIOD_MAX_MS);
+        long timeout = (long) (GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT * brokerShutdownTimeoutMs);
+        return NettyFutureUtil.toCompletableFutureVoid(
+                eventLoopGroup.shutdownGracefully(quietPeriod,
+                        timeout, TimeUnit.MILLISECONDS));
+    }
+

Review Comment:
   this method doesn't really belong to this class. org.apache.pulsar.common.util.netty.EventLoopUtil is a better location for this method.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java:
##########
@@ -238,12 +240,19 @@ public void teardown() throws Exception {
         if (channel != null) {
             channel.close();
         }
-        pulsar.close();
-        brokerService.close();
-        executor.shutdownNow();
-        if (eventLoopGroup != null) {
-            eventLoopGroup.shutdownGracefully().get();
-        }
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        futures.add(brokerService.closeAsync());
+        futures.add(pulsar.closeAsync());
+        futures.add(GracefulExecutorServicesShutdown.initiate()
+                .timeout(Duration.ZERO)
+                .shutdown(executor)
+                .handle());
+        final CompletableFuture<Void> eventLoopGroupCloseFuture = new CompletableFuture<>();
+        eventLoopGroup.shutdownGracefully().sync().addListener(f -> {
+            eventLoopGroupCloseFuture.complete(null);
+        });

Review Comment:
   this doesn't look right. (please check first comment about this code pattern)



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java:
##########
@@ -190,12 +195,18 @@ public void testHttpLookupRedirect() throws Exception {
     @AfterMethod(alwaysRun = true)
     @Override
     protected void cleanup() throws Exception {
-        if (this.executorService != null) {
-            this.executorService.shutdownNow();
-        }
-        if (eventExecutors != null) {
-            eventExecutors.shutdownGracefully();
-        }
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        futures.add(pulsar.closeAsync());
+        futures.add(GracefulExecutorServicesShutdown.initiate()
+                .timeout(Duration.ZERO)
+                .shutdown(executorService)
+                .handle());
+        final CompletableFuture<Void> eventLoopGroupCloseFuture = new CompletableFuture<>();
+        eventExecutors.shutdownGracefully().sync().addListener(f -> {
+            eventLoopGroupCloseFuture.complete(null);
+        });

Review Comment:
   this doesn't look right. (please check first comment about this code pattern)



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java:
##########
@@ -208,19 +211,20 @@ public CompletableFuture<Boolean> checkInitializedBefore(PersistentSubscription
 
     @AfterMethod(alwaysRun = true)
     public void teardown() throws Exception {
-        brokerMock.close(); //to clear pulsarStats
-        try {
-            pulsarMock.close();
-        } catch (Exception e) {
-            log.warn("Failed to close pulsar service", e);
-            throw e;
-        }
-
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        futures.add(brokerMock.closeAsync());
+        futures.add(pulsarMock.closeAsync());
+        futures.add(GracefulExecutorServicesShutdown.initiate()
+                .timeout(Duration.ZERO)
+                .shutdown(executor)
+                .handle());
+        final CompletableFuture<Void> eventLoopGroupCloseFuture = new CompletableFuture<>();
+        eventLoopGroup.shutdownGracefully().sync().addListener(f -> {
+            eventLoopGroupCloseFuture.complete(null);
+        });

Review Comment:
   this doesn't look right. (please check first comment about this code pattern)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org