You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/08/17 02:55:08 UTC

[pulsar] branch branch-3.0 updated: [fix][broker] Gracefully shutdown does not work with admin cli in standalone (#20709)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new d5efaa03f4e [fix][broker] Gracefully shutdown does not work with admin cli in standalone (#20709)
d5efaa03f4e is described below

commit d5efaa03f4eb45ac95f8b99a6f5a971e9c15b796
Author: Kim, Joo Hyuk <be...@gmail.com>
AuthorDate: Wed Jul 26 15:08:16 2023 +0900

    [fix][broker] Gracefully shutdown does not work with admin cli in standalone (#20709)
    
    Fixes : #20617
    
    ### Motivation
    
    Currently, clients' shutdown API does not behave consistently in sense that asynchronicity is not handled explicitly. So issue #20617 happens.
    
    This PR will allow clients know that shutdown is actually triggered.
    
    ### Modifications
    
    - Synchronize call to `POST /shutdown` on client side
    - Asynchronize explicitly `pulsar().closeAsync()` invocation
---
 .../pulsar/broker/admin/impl/BrokersBase.java      | 20 +++++++++----
 .../org/apache/pulsar/client/admin/Brokers.java    |  9 +++---
 .../java/org/apache/pulsar/admin/cli/CmdBase.java  | 34 ++++++++++++++++++++++
 .../org/apache/pulsar/admin/cli/CmdBrokers.java    |  4 +--
 4 files changed, 56 insertions(+), 11 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 3328fa9715b..a61a2c940b8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -546,16 +546,26 @@ public class BrokersBase extends AdminResource {
             @ApiParam(name = "maxConcurrentUnloadPerSec",
                     value = "if the value absent(value=0) means no concurrent limitation.")
             @QueryParam("maxConcurrentUnloadPerSec") int maxConcurrentUnloadPerSec,
-            @QueryParam("forcedTerminateTopic") @DefaultValue("true") boolean forcedTerminateTopic
+            @QueryParam("forcedTerminateTopic") @DefaultValue("true") boolean forcedTerminateTopic,
+            @Suspended final AsyncResponse asyncResponse
     ) {
         validateSuperUserAccess();
-        doShutDownBrokerGracefully(maxConcurrentUnloadPerSec, forcedTerminateTopic);
+        doShutDownBrokerGracefullyAsync(maxConcurrentUnloadPerSec, forcedTerminateTopic)
+                .thenAccept(__ -> {
+                    LOG.info("[{}] Successfully shutdown broker gracefully", clientAppId());
+                    asyncResponse.resume(Response.noContent().build());
+                })
+                .exceptionally(ex -> {
+            LOG.error("[{}] Failed to shutdown broker gracefully", clientAppId(), ex);
+            resumeAsyncResponseExceptionally(asyncResponse, ex);
+            return null;
+        });
     }
 
-    private void doShutDownBrokerGracefully(int maxConcurrentUnloadPerSec,
-                                            boolean forcedTerminateTopic) {
+    private CompletableFuture<Void> doShutDownBrokerGracefullyAsync(int maxConcurrentUnloadPerSec,
+                                                                    boolean forcedTerminateTopic) {
         pulsar().getBrokerService().unloadNamespaceBundlesGracefully(maxConcurrentUnloadPerSec, forcedTerminateTopic);
-        pulsar().closeAsync();
+        return pulsar().closeAsync();
     }
 }
 
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java
index 464d02121cf..29c280f8ba5 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java
@@ -326,10 +326,11 @@ public interface Brokers {
     CompletableFuture<Void> healthcheckAsync(TopicVersion topicVersion);
 
     /**
-     * Shutdown current broker gracefully.
-     * @param maxConcurrentUnloadPerSec
-     * @param forcedTerminateTopic
-     * @return
+     * Trigger the current broker to graceful-shutdown asynchronously.
+     *
+     * @param maxConcurrentUnloadPerSec the maximum number of topics to unload per second.
+     *                                  This helps control the speed of the unload operation during shutdown.
+     * @param forcedTerminateTopic if true, topics will be forcefully terminated during the shutdown process.
      */
     CompletableFuture<Void> shutDownBrokerGracefully(int maxConcurrentUnloadPerSec,
                                                      boolean forcedTerminateTopic);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
index ce2c44ec1e8..381bc8abcaa 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.admin.cli;
 
+import static org.apache.pulsar.client.admin.internal.BaseResource.getApiException;
 import com.beust.jcommander.DefaultUsageFormatter;
 import com.beust.jcommander.IUsageFormatter;
 import com.beust.jcommander.JCommander;
@@ -26,10 +27,15 @@ import com.beust.jcommander.ParameterException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Supplier;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.PulsarAdminException.ConnectException;
+import org.apache.pulsar.client.admin.internal.PulsarAdminImpl;
 
 public abstract class CmdBase {
     protected final JCommander jcommander;
@@ -37,6 +43,12 @@ public abstract class CmdBase {
     private PulsarAdmin admin;
     private IUsageFormatter usageFormatter;
 
+    /**
+     * Default read timeout in milliseconds.
+     * Used if not found from configuration data in {@link #getReadTimeoutMs()}
+     */
+    private static final long DEFAULT_READ_TIMEOUT_MILLIS = 60000;
+
     @Parameter(names = { "--help", "-h" }, help = true, hidden = true)
     private boolean help = false;
 
@@ -124,6 +136,28 @@ public abstract class CmdBase {
         return admin;
     }
 
+    protected long getReadTimeoutMs() {
+        PulsarAdmin pulsarAdmin = getAdmin();
+        if (pulsarAdmin instanceof PulsarAdminImpl) {
+            return ((PulsarAdminImpl) pulsarAdmin).getClientConfigData().getReadTimeoutMs();
+        }
+        return DEFAULT_READ_TIMEOUT_MILLIS;
+    }
+
+    protected <T> T sync(Supplier<CompletableFuture<T>> executor) throws PulsarAdminException {
+        try {
+            return executor.get().get(getReadTimeoutMs(), TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        } catch (ExecutionException e) {
+            throw PulsarAdminException.wrap(getApiException(e.getCause()));
+        } catch (Exception e) {
+            throw PulsarAdminException.wrap(getApiException(e));
+        }
+    }
 
     static Map<String, String> parseListKeyValueMap(List<String> metadata) {
         Map<String, String> map = null;
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java
index 1e86edcf59c..f1571e96c65 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java
@@ -150,8 +150,8 @@ public class CmdBrokers extends CmdBase {
 
         @Override
         void run() throws Exception {
-            getAdmin().brokers().shutDownBrokerGracefully(maxConcurrentUnloadPerSec, forcedTerminateTopic);
-            System.out.println("Successfully trigger broker shutdown gracefully");
+            sync(() -> getAdmin().brokers().shutDownBrokerGracefully(maxConcurrentUnloadPerSec, forcedTerminateTopic));
+            System.out.println("Successfully shutdown broker gracefully");
         }
 
     }