You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/08/17 02:55:08 UTC
[pulsar] branch branch-3.0 updated: [fix][broker] Gracefully shutdown does not work with admin cli in standalone (#20709)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new d5efaa03f4e [fix][broker] Gracefully shutdown does not work with admin cli in standalone (#20709)
d5efaa03f4e is described below
commit d5efaa03f4eb45ac95f8b99a6f5a971e9c15b796
Author: Kim, Joo Hyuk <be...@gmail.com>
AuthorDate: Wed Jul 26 15:08:16 2023 +0900
[fix][broker] Gracefully shutdown does not work with admin cli in standalone (#20709)
Fixes : #20617
### Motivation
Currently, clients' shutdown API does not behave consistently in sense that asynchronicity is not handled explicitly. So issue #20617 happens.
This PR will allow clients know that shutdown is actually triggered.
### Modifications
- Synchronize call to `POST /shutdown` on client side
- Asynchronize explicitly `pulsar().closeAsync()` invocation
---
.../pulsar/broker/admin/impl/BrokersBase.java | 20 +++++++++----
.../org/apache/pulsar/client/admin/Brokers.java | 9 +++---
.../java/org/apache/pulsar/admin/cli/CmdBase.java | 34 ++++++++++++++++++++++
.../org/apache/pulsar/admin/cli/CmdBrokers.java | 4 +--
4 files changed, 56 insertions(+), 11 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 3328fa9715b..a61a2c940b8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -546,16 +546,26 @@ public class BrokersBase extends AdminResource {
@ApiParam(name = "maxConcurrentUnloadPerSec",
value = "if the value absent(value=0) means no concurrent limitation.")
@QueryParam("maxConcurrentUnloadPerSec") int maxConcurrentUnloadPerSec,
- @QueryParam("forcedTerminateTopic") @DefaultValue("true") boolean forcedTerminateTopic
+ @QueryParam("forcedTerminateTopic") @DefaultValue("true") boolean forcedTerminateTopic,
+ @Suspended final AsyncResponse asyncResponse
) {
validateSuperUserAccess();
- doShutDownBrokerGracefully(maxConcurrentUnloadPerSec, forcedTerminateTopic);
+ doShutDownBrokerGracefullyAsync(maxConcurrentUnloadPerSec, forcedTerminateTopic)
+ .thenAccept(__ -> {
+ LOG.info("[{}] Successfully shutdown broker gracefully", clientAppId());
+ asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(ex -> {
+ LOG.error("[{}] Failed to shutdown broker gracefully", clientAppId(), ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
- private void doShutDownBrokerGracefully(int maxConcurrentUnloadPerSec,
- boolean forcedTerminateTopic) {
+ private CompletableFuture<Void> doShutDownBrokerGracefullyAsync(int maxConcurrentUnloadPerSec,
+ boolean forcedTerminateTopic) {
pulsar().getBrokerService().unloadNamespaceBundlesGracefully(maxConcurrentUnloadPerSec, forcedTerminateTopic);
- pulsar().closeAsync();
+ return pulsar().closeAsync();
}
}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java
index 464d02121cf..29c280f8ba5 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Brokers.java
@@ -326,10 +326,11 @@ public interface Brokers {
CompletableFuture<Void> healthcheckAsync(TopicVersion topicVersion);
/**
- * Shutdown current broker gracefully.
- * @param maxConcurrentUnloadPerSec
- * @param forcedTerminateTopic
- * @return
+ * Trigger the current broker to graceful-shutdown asynchronously.
+ *
+ * @param maxConcurrentUnloadPerSec the maximum number of topics to unload per second.
+ * This helps control the speed of the unload operation during shutdown.
+ * @param forcedTerminateTopic if true, topics will be forcefully terminated during the shutdown process.
*/
CompletableFuture<Void> shutDownBrokerGracefully(int maxConcurrentUnloadPerSec,
boolean forcedTerminateTopic);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
index ce2c44ec1e8..381bc8abcaa 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.admin.cli;
+import static org.apache.pulsar.client.admin.internal.BaseResource.getApiException;
import com.beust.jcommander.DefaultUsageFormatter;
import com.beust.jcommander.IUsageFormatter;
import com.beust.jcommander.JCommander;
@@ -26,10 +27,15 @@ import com.beust.jcommander.ParameterException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.PulsarAdminException.ConnectException;
+import org.apache.pulsar.client.admin.internal.PulsarAdminImpl;
public abstract class CmdBase {
protected final JCommander jcommander;
@@ -37,6 +43,12 @@ public abstract class CmdBase {
private PulsarAdmin admin;
private IUsageFormatter usageFormatter;
+ /**
+ * Default read timeout in milliseconds.
+ * Used if not found from configuration data in {@link #getReadTimeoutMs()}
+ */
+ private static final long DEFAULT_READ_TIMEOUT_MILLIS = 60000;
+
@Parameter(names = { "--help", "-h" }, help = true, hidden = true)
private boolean help = false;
@@ -124,6 +136,28 @@ public abstract class CmdBase {
return admin;
}
+ protected long getReadTimeoutMs() {
+ PulsarAdmin pulsarAdmin = getAdmin();
+ if (pulsarAdmin instanceof PulsarAdminImpl) {
+ return ((PulsarAdminImpl) pulsarAdmin).getClientConfigData().getReadTimeoutMs();
+ }
+ return DEFAULT_READ_TIMEOUT_MILLIS;
+ }
+
+ protected <T> T sync(Supplier<CompletableFuture<T>> executor) throws PulsarAdminException {
+ try {
+ return executor.get().get(getReadTimeoutMs(), TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ } catch (ExecutionException e) {
+ throw PulsarAdminException.wrap(getApiException(e.getCause()));
+ } catch (Exception e) {
+ throw PulsarAdminException.wrap(getApiException(e));
+ }
+ }
static Map<String, String> parseListKeyValueMap(List<String> metadata) {
Map<String, String> map = null;
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java
index 1e86edcf59c..f1571e96c65 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBrokers.java
@@ -150,8 +150,8 @@ public class CmdBrokers extends CmdBase {
@Override
void run() throws Exception {
- getAdmin().brokers().shutDownBrokerGracefully(maxConcurrentUnloadPerSec, forcedTerminateTopic);
- System.out.println("Successfully trigger broker shutdown gracefully");
+ sync(() -> getAdmin().brokers().shutDownBrokerGracefully(maxConcurrentUnloadPerSec, forcedTerminateTopic));
+ System.out.println("Successfully shutdown broker gracefully");
}
}