You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ha...@apache.org on 2023/12/27 02:48:38 UTC

(flink) branch master updated (11cdf7e7ada -> d0dbd51c89d)

This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from 11cdf7e7ada [FLINK-32849][runtime][JUnit5 Migration] The resource manager package of flink-runtime module
     new 81598f8a5c7 [FLINK-32881][checkpoint] Support triggering savepoint in detach mode (-detach) for CLI
     new 421f50e38a2 [FLINK-32881][checkpoint] add detached mode for stop-with-savepoint
     new d0dbd51c89d [FLINK-32881][checkpoint] update docs for savepoint detached option

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/content.zh/docs/deployment/cli.md             |  39 +++++++
 docs/content.zh/docs/ops/state/savepoints.md       |  10 ++
 docs/content.zh/docs/ops/upgrading.md              |   3 +
 docs/content/docs/deployment/cli.md                |  39 +++++++
 docs/content/docs/ops/state/savepoints.md          |  12 ++
 docs/content/docs/ops/upgrading.md                 |   3 +
 .../org/apache/flink/client/cli/CliFrontend.java   | 128 ++++++++++++++++++---
 .../apache/flink/client/cli/CliFrontendParser.java |  20 +++-
 .../apache/flink/client/cli/SavepointOptions.java  |   7 ++
 .../org/apache/flink/client/cli/StopOptions.java   |   9 ++
 .../apache/flink/client/program/ClusterClient.java |  34 ++++++
 .../flink/client/program/MiniClusterClient.java    |  16 +++
 .../client/program/rest/RestClusterClient.java     | 125 ++++++++++++--------
 .../flink/client/cli/CliFrontendSavepointTest.java |  40 +++++++
 .../cli/CliFrontendStopWithSavepointTest.java      |  30 +++++
 .../flink/client/program/TestingClusterClient.java |  40 +++++++
 .../flink/runtime/minicluster/MiniCluster.java     |  40 +++++++
 .../environment/RemoteStreamEnvironmentTest.java   |  17 +++
 18 files changed, 545 insertions(+), 67 deletions(-)


(flink) 01/03: [FLINK-32881][checkpoint] Support triggering savepoint in detach mode (-detach) for CLI

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 81598f8a5c78effd2a06fda3557cdae23dfb4b79
Author: 周仁祥 <zh...@bytedance.com>
AuthorDate: Tue Nov 21 00:31:57 2023 +0800

    [FLINK-32881][checkpoint] Support triggering savepoint in detach mode (-detach) for CLI
---
 .../org/apache/flink/client/cli/CliFrontend.java   | 41 +++++++++++++++-
 .../apache/flink/client/cli/CliFrontendParser.java | 14 +++++-
 .../apache/flink/client/cli/SavepointOptions.java  |  7 +++
 .../apache/flink/client/program/ClusterClient.java | 15 ++++++
 .../flink/client/program/MiniClusterClient.java    |  6 +++
 .../client/program/rest/RestClusterClient.java     | 57 +++++++++++++++-------
 .../flink/client/cli/CliFrontendSavepointTest.java | 40 +++++++++++++++
 .../flink/client/program/TestingClusterClient.java | 19 ++++++++
 .../flink/runtime/minicluster/MiniCluster.java     | 20 ++++++++
 .../environment/RemoteStreamEnvironmentTest.java   |  8 +++
 10 files changed, 206 insertions(+), 21 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 97c16010bf0..9aa1450b46f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -781,13 +781,26 @@ public class CliFrontend {
             runClusterAction(
                     activeCommandLine,
                     commandLine,
-                    (clusterClient, effectiveConfiguration) ->
+                    (clusterClient, effectiveConfiguration) -> {
+                        // Trigger savepoint in detached mode
+                        if (savepointOptions.isDetached()) {
+                            // trigger savepoint in detached mode and
+                            // return the trigger id immediately
+                            triggerDetachedSavepoint(
+                                    clusterClient,
+                                    jobId,
+                                    savepointDirectory,
+                                    savepointOptions.getFormatType(),
+                                    getClientTimeout(effectiveConfiguration));
+                        } else {
                             triggerSavepoint(
                                     clusterClient,
                                     jobId,
                                     savepointDirectory,
                                     savepointOptions.getFormatType(),
-                                    getClientTimeout(effectiveConfiguration)));
+                                    getClientTimeout(effectiveConfiguration));
+                        }
+                    });
         }
     }
 
@@ -819,6 +832,30 @@ public class CliFrontend {
         }
     }
 
+    /** Sends a SavepointTriggerMessage to the job manager in detached mode. */
+    private void triggerDetachedSavepoint(
+            ClusterClient<?> clusterClient,
+            JobID jobId,
+            String savepointDirectory,
+            SavepointFormatType formatType,
+            Duration clientTimeout)
+            throws FlinkException {
+        logAndSysout("Triggering savepoint in detached mode for job " + jobId + '.');
+
+        try {
+            final String triggerId =
+                    clusterClient
+                            .triggerDetachedSavepoint(jobId, savepointDirectory, formatType)
+                            .get(clientTimeout.toMillis(), TimeUnit.MILLISECONDS);
+
+            logAndSysout("Successfully trigger manual savepoint, triggerId: " + triggerId);
+        } catch (Exception e) {
+            Throwable cause = ExceptionUtils.stripExecutionException(e);
+            throw new FlinkException(
+                    "Triggering a detached savepoint for the job " + jobId + " failed.", cause);
+        }
+    }
+
     /** Sends a SavepointDisposalRequest to the job manager. */
     private void disposeSavepoint(
             ClusterClient<?> clusterClient, String savepointPath, Duration clientTimeout)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 37fdb928c8c..43491a97ee1 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -164,6 +164,14 @@ public class CliFrontendParser {
                     false,
                     "Defines whether to trigger this checkpoint as a full one.");
 
+    static final Option SAVEPOINT_DETACHED_OPTION =
+            new Option(
+                    "detached",
+                    false,
+                    "Triggering savepoint in detached mode, client and JM are decoupled,"
+                            + " return the savepoint trigger id as the unique identification of"
+                            + " the detached savepoint.");
+
     // list specific options
     static final Option RUNNING_OPTION =
             new Option("r", "running", false, "Show only running programs and their JobIDs");
@@ -446,7 +454,8 @@ public class CliFrontendParser {
         return buildGeneralOptions(new Options())
                 .addOption(SAVEPOINT_DISPOSE_OPTION)
                 .addOption(JAR_OPTION)
-                .addOption(SAVEPOINT_FORMAT_OPTION);
+                .addOption(SAVEPOINT_FORMAT_OPTION)
+                .addOption(SAVEPOINT_DETACHED_OPTION);
     }
 
     static Options getCheckpointCommandOptions() {
@@ -490,7 +499,8 @@ public class CliFrontendParser {
     private static Options getSavepointOptionsWithoutDeprecatedOptions(Options options) {
         return options.addOption(SAVEPOINT_DISPOSE_OPTION)
                 .addOption(SAVEPOINT_FORMAT_OPTION)
-                .addOption(JAR_OPTION);
+                .addOption(JAR_OPTION)
+                .addOption(SAVEPOINT_DETACHED_OPTION);
     }
 
     private static Options getCheckpointOptionsWithoutDeprecatedOptions(Options options) {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/SavepointOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/SavepointOptions.java
index e0ef3555edd..6a8057069c2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/SavepointOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/SavepointOptions.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.commons.cli.CommandLine;
 
 import static org.apache.flink.client.cli.CliFrontendParser.JAR_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_DETACHED_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_DISPOSE_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_FORMAT_OPTION;
 
@@ -35,6 +36,7 @@ public class SavepointOptions extends CommandLineOptions {
     private boolean dispose;
     private String disposeSavepointPath;
     private String jarFile;
+    private boolean isDetached;
 
     public SavepointOptions(CommandLine line) {
         super(line);
@@ -42,6 +44,7 @@ public class SavepointOptions extends CommandLineOptions {
         dispose = line.hasOption(SAVEPOINT_DISPOSE_OPTION.getOpt());
         disposeSavepointPath = line.getOptionValue(SAVEPOINT_DISPOSE_OPTION.getOpt());
         jarFile = line.getOptionValue(JAR_OPTION.getOpt());
+        isDetached = line.hasOption(SAVEPOINT_DETACHED_OPTION.getOpt());
         if (line.hasOption(SAVEPOINT_FORMAT_OPTION)) {
             formatType =
                     ConfigurationUtils.convertValue(
@@ -71,4 +74,8 @@ public class SavepointOptions extends CommandLineOptions {
     public SavepointFormatType getFormatType() {
         return formatType;
     }
+
+    public boolean isDetached() {
+        return isDetached;
+    }
 }
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 4555bc32dac..f25946971c3 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -188,6 +188,21 @@ public interface ClusterClient<T> extends AutoCloseable {
      */
     CompletableFuture<Long> triggerCheckpoint(JobID jobId, CheckpointType checkpointType);
 
+    /**
+     * Triggers a detached savepoint for the job identified by the job id. The savepoint will be
+     * written to the given savepoint directory, or {@link
+     * org.apache.flink.configuration.CheckpointingOptions#SAVEPOINT_DIRECTORY} if it is null.
+     * Notice that: the detached savepoint will return with a savepoint trigger id instead of the
+     * path future, that means the client will return very quickly.
+     *
+     * @param jobId job id
+     * @param savepointDirectory directory the savepoint should be written to
+     * @param formatType a binary format of the savepoint
+     * @return the savepoint trigger id
+     */
+    CompletableFuture<String> triggerDetachedSavepoint(
+            JobID jobId, @Nullable String savepointDirectory, SavepointFormatType formatType);
+
     /**
      * Sends out a request to a specified coordinator and return the response.
      *
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
index 292524ec5f3..889fa8b8202 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
@@ -115,6 +115,12 @@ public class MiniClusterClient implements ClusterClient<MiniClusterClient.MiniCl
         return miniCluster.triggerCheckpoint(jobId, checkpointType);
     }
 
+    @Override
+    public CompletableFuture<String> triggerDetachedSavepoint(
+            JobID jobId, @Nullable String savepointDirectory, SavepointFormatType formatType) {
+        return miniCluster.triggerDetachedSavepoint(jobId, savepointDirectory, false, formatType);
+    }
+
     @Override
     public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) {
         return miniCluster.disposeSavepoint(savepointPath);
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index d293fc33cec..06587021bcb 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -539,7 +539,7 @@ public class RestClusterClient<T> implements ClusterClient<T> {
     @Override
     public CompletableFuture<String> cancelWithSavepoint(
             JobID jobId, @Nullable String savepointDirectory, SavepointFormatType formatType) {
-        return triggerSavepoint(jobId, savepointDirectory, true, formatType);
+        return triggerSavepoint(jobId, savepointDirectory, true, formatType, false);
     }
 
     @Override
@@ -547,7 +547,7 @@ public class RestClusterClient<T> implements ClusterClient<T> {
             final JobID jobId,
             final @Nullable String savepointDirectory,
             final SavepointFormatType formatType) {
-        return triggerSavepoint(jobId, savepointDirectory, false, formatType);
+        return triggerSavepoint(jobId, savepointDirectory, false, formatType, false);
     }
 
     @Override
@@ -580,6 +580,14 @@ public class RestClusterClient<T> implements ClusterClient<T> {
                         });
     }
 
+    @Override
+    public CompletableFuture<String> triggerDetachedSavepoint(
+            final JobID jobId,
+            final @Nullable String savepointDirectory,
+            final SavepointFormatType formatType) {
+        return triggerSavepoint(jobId, savepointDirectory, false, formatType, true);
+    }
+
     @Override
     public CompletableFuture<CoordinationResponse> sendCoordinationRequest(
             JobID jobId, OperatorID operatorId, CoordinationRequest request) {
@@ -615,7 +623,8 @@ public class RestClusterClient<T> implements ClusterClient<T> {
             final JobID jobId,
             final @Nullable String savepointDirectory,
             final boolean cancelJob,
-            final SavepointFormatType formatType) {
+            final SavepointFormatType formatType,
+            final boolean isDetachedMode) {
         final SavepointTriggerHeaders savepointTriggerHeaders =
                 SavepointTriggerHeaders.getInstance();
         final SavepointTriggerMessageParameters savepointTriggerMessageParameters =
@@ -629,20 +638,34 @@ public class RestClusterClient<T> implements ClusterClient<T> {
                         new SavepointTriggerRequestBody(
                                 savepointDirectory, cancelJob, formatType, null));
 
-        return responseFuture
-                .thenCompose(
-                        savepointTriggerResponseBody -> {
-                            final TriggerId savepointTriggerId =
-                                    savepointTriggerResponseBody.getTriggerId();
-                            return pollSavepointAsync(jobId, savepointTriggerId);
-                        })
-                .thenApply(
-                        savepointInfo -> {
-                            if (savepointInfo.getFailureCause() != null) {
-                                throw new CompletionException(savepointInfo.getFailureCause());
-                            }
-                            return savepointInfo.getLocation();
-                        });
+        CompletableFuture<String> futureResult;
+        if (isDetachedMode) {
+            // we just return the savepoint trigger id in detached savepoint,
+            // that means the client could exit immediately
+            futureResult =
+                    responseFuture.thenApply((TriggerResponse tr) -> tr.getTriggerId().toString());
+        } else {
+            // otherwise we need to wait the savepoint to be succeeded
+            // and return the savepoint path
+            futureResult =
+                    responseFuture
+                            .thenCompose(
+                                    savepointTriggerResponseBody -> {
+                                        final TriggerId savepointTriggerId =
+                                                savepointTriggerResponseBody.getTriggerId();
+                                        return pollSavepointAsync(jobId, savepointTriggerId);
+                                    })
+                            .thenApply(
+                                    savepointInfo -> {
+                                        if (savepointInfo.getFailureCause() != null) {
+                                            throw new CompletionException(
+                                                    savepointInfo.getFailureCause());
+                                        }
+                                        return savepointInfo.getLocation();
+                                    });
+        }
+
+        return futureResult;
     }
 
     @Override
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java
index f6e059bb258..c3e0c127c66 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendSavepointTest.java
@@ -209,6 +209,34 @@ class CliFrontendSavepointTest extends CliFrontendTestBase {
         }
     }
 
+    // ------------------------------------------------------------------------
+    // detached savepoint
+    // ------------------------------------------------------------------------
+
+    @Test
+    void testTriggerDetachedSavepointSuccess() throws Exception {
+
+        JobID jobId = new JobID();
+
+        String savepointPath = "expectedSavepointPath";
+
+        final ClusterClient<String> clusterClient = createDetachClusterClient(savepointPath);
+
+        try {
+            MockedCliFrontend frontend = new MockedCliFrontend(clusterClient);
+
+            String[] parameters = {"-detached", jobId.toString()};
+            frontend.savepoint(parameters);
+
+            verify(clusterClient, times(1))
+                    .triggerDetachedSavepoint(eq(jobId), isNull(), eq(SavepointFormatType.DEFAULT));
+
+            assertThat(buffer.toString()).contains(savepointPath);
+        } finally {
+            clusterClient.close();
+        }
+    }
+
     /** Tests disposal with a JAR file. */
     @Test
     void testDisposeWithJar(@TempDir java.nio.file.Path tmp) throws Exception {
@@ -315,6 +343,18 @@ class CliFrontendSavepointTest extends CliFrontendTestBase {
         return clusterClient;
     }
 
+    private static ClusterClient<String> createDetachClusterClient(String expectedResponse) {
+        final ClusterClient<String> clusterClient = mock(ClusterClient.class);
+
+        when(clusterClient.triggerDetachedSavepoint(
+                        any(JobID.class),
+                        nullable(String.class),
+                        nullable(SavepointFormatType.class)))
+                .thenReturn(CompletableFuture.completedFuture(expectedResponse));
+
+        return clusterClient;
+    }
+
     private static ClusterClient<String> createFailingClusterClient(Exception expectedException) {
         final ClusterClient<String> clusterClient = mock(ClusterClient.class);
 
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java b/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java
index b175df76198..0649ff3e777 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.rest.messages.TriggerId;
 import org.apache.flink.util.function.QuadFunction;
 import org.apache.flink.util.function.TriFunction;
 
@@ -59,6 +60,10 @@ public class TestingClusterClient<T> implements ClusterClient<T> {
             triggerSavepointFunction =
                     (ignore, savepointPath, formatType) ->
                             CompletableFuture.completedFuture(savepointPath);
+    private TriFunction<JobID, String, SavepointFormatType, CompletableFuture<String>>
+            triggerDetachedSavepointFunction =
+                    (ignore, savepointPath, formatType) ->
+                            CompletableFuture.completedFuture(new TriggerId().toString());
 
     private BiFunction<JobID, CheckpointType, CompletableFuture<Long>> triggerCheckpointFunction =
             (ignore, checkpointType) -> CompletableFuture.completedFuture(1L);
@@ -85,6 +90,12 @@ public class TestingClusterClient<T> implements ClusterClient<T> {
         this.triggerSavepointFunction = triggerSavepointFunction;
     }
 
+    public void setTriggerDetachedSavepointFunction(
+            TriFunction<JobID, String, SavepointFormatType, CompletableFuture<String>>
+                    triggerDetachedSavepointFunction) {
+        this.triggerDetachedSavepointFunction = triggerDetachedSavepointFunction;
+    }
+
     public void setTriggerCheckpointFunction(
             BiFunction<JobID, CheckpointType, CompletableFuture<Long>> triggerCheckpointFunction) {
         this.triggerCheckpointFunction = triggerCheckpointFunction;
@@ -172,6 +183,14 @@ public class TestingClusterClient<T> implements ClusterClient<T> {
         return triggerCheckpointFunction.apply(jobId, checkpointType);
     }
 
+    @Override
+    public CompletableFuture<String> triggerDetachedSavepoint(
+            JobID jobId,
+            @org.jetbrains.annotations.Nullable String savepointDirectory,
+            SavepointFormatType formatType) {
+        return triggerDetachedSavepointFunction.apply(jobId, savepointDirectory, formatType);
+    }
+
     @Override
     public CompletableFuture<CoordinationResponse> sendCoordinationRequest(
             JobID jobId, OperatorID operatorId, CoordinationRequest request) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index a88070d2e0d..cc8b6bb91e8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -891,6 +891,26 @@ public class MiniCluster implements AutoCloseableAsync {
                                 rpcTimeout));
     }
 
+    public CompletableFuture<String> triggerDetachedSavepoint(
+            JobID jobId,
+            String targetDirectory,
+            boolean cancelJob,
+            SavepointFormatType formatType) {
+        return runDispatcherCommand(
+                dispatcherGateway -> {
+                    dispatcherGateway.triggerSavepointAndGetLocation(
+                            jobId,
+                            targetDirectory,
+                            formatType,
+                            cancelJob
+                                    ? TriggerSavepointMode.CANCEL_WITH_SAVEPOINT
+                                    : TriggerSavepointMode.SAVEPOINT,
+                            rpcTimeout);
+                    // return immediately, no need to wait for the future savepoint path
+                    return CompletableFuture.completedFuture("");
+                });
+    }
+
     public CompletableFuture<String> triggerCheckpoint(JobID jobID) {
         return runDispatcherCommand(
                 dispatcherGateway -> dispatcherGateway.triggerCheckpoint(jobID, rpcTimeout));
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
index 9f225bfdd10..2f89d3cafbf 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
@@ -281,6 +281,14 @@ public class RemoteStreamEnvironmentTest extends TestLogger {
             return null;
         }
 
+        @Override
+        public CompletableFuture<String> triggerDetachedSavepoint(
+                JobID jobId,
+                @org.jetbrains.annotations.Nullable String savepointDirectory,
+                SavepointFormatType formatType) {
+            return null;
+        }
+
         @Override
         public CompletableFuture<CoordinationResponse> sendCoordinationRequest(
                 JobID jobId, OperatorID operatorId, CoordinationRequest request) {


(flink) 03/03: [FLINK-32881][checkpoint] update docs for savepoint detached option

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d0dbd51c89df6374d5eb54e12925e032a255385e
Author: 周仁祥 <zh...@bytedance.com>
AuthorDate: Tue Dec 12 16:19:43 2023 +0800

    [FLINK-32881][checkpoint] update docs for savepoint detached option
---
 docs/content.zh/docs/deployment/cli.md       | 39 ++++++++++++++++++++++++++++
 docs/content.zh/docs/ops/state/savepoints.md | 10 +++++++
 docs/content.zh/docs/ops/upgrading.md        |  3 +++
 docs/content/docs/deployment/cli.md          | 39 ++++++++++++++++++++++++++++
 docs/content/docs/ops/state/savepoints.md    | 12 +++++++++
 docs/content/docs/ops/upgrading.md           |  3 +++
 6 files changed, 106 insertions(+)

diff --git a/docs/content.zh/docs/deployment/cli.md b/docs/content.zh/docs/deployment/cli.md
index 544dcb835b8..d71dbdd4e2c 100644
--- a/docs/content.zh/docs/deployment/cli.md
+++ b/docs/content.zh/docs/deployment/cli.md
@@ -125,6 +125,43 @@ Lastly, you can optionally provide what should be the [binary format]({{< ref "d
 
 The path to the savepoint can be used later on to [restart the Flink job](#starting-a-job-from-a-savepoint).
 
+If the state size of the job is quite big, the client will get a timeout exception since it has to wait for the savepoint finished.
+```
+Triggering savepoint for job bec5244e09634ad71a80785937a9732d.
+Waiting for response...
+
+--------------------------------------------------------------
+The program finished with the following exception:
+
+org.apache.flink.util.FlinkException: Triggering a savepoint for the job bec5244e09634ad71a80785937a9732d failed.
+        at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend. java:828)
+        at org.apache.flink.client.cli.CliFrontend.lambda$savepopint$8(CliFrontend.java:794)
+        at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1078)
+        at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:779)
+        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1150)
+        at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1226)
+        at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
+        at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1226)
+        at org.apache.flink.client.cli.CliFrontend.main(CliFronhtend.java:1194)
+Caused by: java.util.concurrent.TimeoutException
+        at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784)
+        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
+        at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:822)
+        ... 8 more
+```
+In this case, we could use "-detached" option to trigger a detached savepoint, the client will return the trigger id immediately.
+```bash
+$ ./bin/flink savepoint \
+      $JOB_ID \ 
+      /tmp/flink-savepoints
+      -detached
+```
+```
+Triggering savepoint in detached mode for job bec5244e09634ad71a80785937a9732d.
+Successfully trigger manual savepoint, triggerId: 2505bbd12c5b58fd997d0f193db44b97
+```
+We can get the status of the detached savepoint by [rest api]({{< ref "docs/ops/rest_api" >}}/#jobs-jobid-checkpoints-triggerid).
+
 #### Disposing a Savepoint
 
 The `savepoint` action can be also used to remove savepoints. `--dispose` with the corresponding 
@@ -214,6 +251,8 @@ Use the `--drain` flag if you want to terminate the job permanently.
 If you want to resume the job at a later point in time, then do not drain the pipeline because it could lead to incorrect results when the job is resumed.
 {{< /hint >}}
 
+If you want to trigger the savepoint in detached mode, add option `-detached` to the command.
+
 Lastly, you can optionally provide what should be the [binary format]({{< ref "docs/ops/state/savepoints" >}}#savepoint-format) of the savepoint.
 
 #### Cancelling a Job Ungracefully
diff --git a/docs/content.zh/docs/ops/state/savepoints.md b/docs/content.zh/docs/ops/state/savepoints.md
index 7b14669db4f..2294fc97923 100644
--- a/docs/content.zh/docs/ops/state/savepoints.md
+++ b/docs/content.zh/docs/ops/state/savepoints.md
@@ -142,6 +142,14 @@ $ bin/flink savepoint :jobId [:targetDirectory]
 $ bin/flink savepoint --type [native/canonical] :jobId [:targetDirectory]
 ```
 
+使用上述命令触发savepoint时,client需要等待savepoint制作完成,因此当任务的状态较大时,可能会导致client出现超时的情况。在这种情况下可以使用detach模式来触发savepoint。
+
+```shell
+$ bin/flink savepoint :jobId [:targetDirectory] -detached
+```
+
+使用该命令时,client拿到本次savepoint的trigger id后立即返回,可以通过[REST API]({{< ref "docs/ops/rest_api" >}}/#jobs-jobid-checkpoints-triggerid)来监控本次savepoint的制作情况。
+
 #### 使用 YARN 触发 Savepoint
 
 ```shell
@@ -160,6 +168,8 @@ $ bin/flink stop --type [native/canonical] --savepointPath [:targetDirectory] :j
 
 这将自动触发 ID 为 `:jobid` 的作业的 Savepoint,并停止该作业。此外,你可以指定一个目标文件系统目录来存储 Savepoint 。该目录需要能被 JobManager(s) 和 TaskManager(s) 访问。你也可以指定创建 Savepoint 的格式。如果没有指定,会采用标准格式创建 Savepoint。
 
+如果你想使用detach模式触发Savepoint,在命令行后添加选项`-detached`即可。
+
 ### 从 Savepoint 恢复
 
 ```shell
diff --git a/docs/content.zh/docs/ops/upgrading.md b/docs/content.zh/docs/ops/upgrading.md
index 322c00a566d..ff433609001 100644
--- a/docs/content.zh/docs/ops/upgrading.md
+++ b/docs/content.zh/docs/ops/upgrading.md
@@ -70,6 +70,8 @@ That same code would have to be recompiled when upgrading to 1.16.0 though.
 ```
 建议定期获取 Savepoint ,以便能够从之前的时间点重新启动应用程序。
 
+如果你想使用detach模式触发 Savepoint,只需添加选项`-detached`。
+
 * 作获取 Savepoint 并停止应用程序。
 ```bash
 > ./bin/flink cancel -s [ Savepoint 的路径] <jobID>
@@ -216,6 +218,7 @@ val mappedEvents: DataStream[(Int, Long)] = events
 ```shell
 $ bin/flink stop [--savepointPath :savepointPath] :jobId
 ```
+如果你想使用detach模式触发Savepoint,在命令行后添加选项`-detached`即可。
 
 更多详情,请阅读 [savepoint documentation]({{< ref "docs/ops/state/savepoints" >}}).
 
diff --git a/docs/content/docs/deployment/cli.md b/docs/content/docs/deployment/cli.md
index 198c1e1c93b..a8818a4fb6b 100644
--- a/docs/content/docs/deployment/cli.md
+++ b/docs/content/docs/deployment/cli.md
@@ -123,6 +123,43 @@ Lastly, you can optionally provide what should be the [binary format]({{< ref "d
 
 The path to the savepoint can be used later on to [restart the Flink job](#starting-a-job-from-a-savepoint).
 
+If the state of the job is quite big, the client will get a timeout exception since it should wait for the savepoint finished.
+```
+Triggering savepoint for job bec5244e09634ad71a80785937a9732d.
+Waiting for response...
+
+--------------------------------------------------------------
+The program finished with the following exception:
+
+org.apache.flink.util.FlinkException: Triggering a savepoint for the job bec5244e09634ad71a80785937a9732d failed.
+        at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend. java:828)
+        at org.apache.flink.client.cli.CliFrontend.lambda$savepopint$8(CliFrontend.java:794)
+        at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1078)
+        at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:779)
+        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1150)
+        at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1226)
+        at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
+        at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1226)
+        at org.apache.flink.client.cli.CliFrontend.main(CliFronhtend.java:1194)
+Caused by: java.util.concurrent.TimeoutException
+        at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784)
+        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
+        at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:822)
+        ... 8 more
+```
+In this case, we could use "-detached" option to trigger a detached savepoint, the client will return immediately as soon as the trigger id returns.
+```bash
+$ ./bin/flink savepoint \
+      $JOB_ID \ 
+      /tmp/flink-savepoints
+      -detached
+```
+```
+Triggering savepoint in detached mode for job bec5244e09634ad71a80785937a9732d.
+Successfully trigger manual savepoint, triggerId: 2505bbd12c5b58fd997d0f193db44b97
+```
+We could get the status of the detached savepoint by [rest api]({{< ref "docs/ops/rest_api" >}}/#jobs-jobid-checkpoints-triggerid).
+
 #### Disposing a Savepoint
 
 The `savepoint` action can be also used to remove savepoints. `--dispose` with the corresponding 
@@ -212,6 +249,8 @@ Use the `--drain` flag if you want to terminate the job permanently.
 If you want to resume the job at a later point in time, then do not drain the pipeline because it could lead to incorrect results when the job is resumed.
 {{< /hint >}}
 
+If you want to trigger the savepoint in detached mode, add option `-detached` to the command.
+
 Lastly, you can optionally provide what should be the [binary format]({{< ref "docs/ops/state/savepoints" >}}#savepoint-format) of the savepoint.
 
 #### Cancelling a Job Ungracefully
diff --git a/docs/content/docs/ops/state/savepoints.md b/docs/content/docs/ops/state/savepoints.md
index c08587ec178..c13cd62e6cc 100644
--- a/docs/content/docs/ops/state/savepoints.md
+++ b/docs/content/docs/ops/state/savepoints.md
@@ -167,6 +167,16 @@ the savepoint should be taken. By default the savepoint will be taken in canonic
 $ bin/flink savepoint --type [native/canonical] :jobId [:targetDirectory]
 ```
 
+When using the above command to trigger a savepoint, the client needs to wait for the savepoint 
+to be completed. Therefore, the client may time out when the state size of the task is large.
+In this case, you can trigger the savepoint in detached mode.
+
+```shell
+$ bin/flink savepoint :jobId [:targetDirectory] -detached
+```
+When using this command, the client returns immediately after getting the trigger id of 
+the savepoint. You can monitor the status of the savepoint through the REST API [rest api]({{< ref "docs/ops/rest_api" >}}/#jobs-jobid-checkpoints-triggerid).
+
 #### Trigger a Savepoint with YARN
 
 ```shell
@@ -186,6 +196,8 @@ you can specify a target file system directory to store the savepoint in. The di
 accessible by the JobManager(s) and TaskManager(s). You can also pass a type in which the savepoint
 should be taken. By default the savepoint will be taken in canonical format.
 
+If you want to trigger the savepoint in detached mode, add option `-detached` to the command.
+
 ### Resuming from Savepoints
 
 ```shell
diff --git a/docs/content/docs/ops/upgrading.md b/docs/content/docs/ops/upgrading.md
index cc7d5e28cd8..b06427c2a96 100644
--- a/docs/content/docs/ops/upgrading.md
+++ b/docs/content/docs/ops/upgrading.md
@@ -103,6 +103,7 @@ There are two ways of taking a savepoint from a running streaming application.
 > ./bin/flink savepoint <jobID> [pathToSavepoint]
 ```
 It is recommended to periodically take savepoints in order to be able to restart an application from a previous point in time.
+If you want to trigger a savepoint in detached mode, just add the option `-detached`.
 
 * Taking a savepoint and stopping the application as a single action. 
 ```bash
@@ -251,6 +252,8 @@ You can do this with the command:
 $ bin/flink stop [--savepointPath :savepointPath] :jobId
 ```
 
+If you want to trigger the savepoint in detached mode, add option `-detached` to the command.
+
 For more details, please read the [savepoint documentation]({{< ref "docs/ops/state/savepoints" >}}).
 
 #### STEP 2: Update your cluster to the new Flink version.


(flink) 02/03: [FLINK-32881][checkpoint] add detached mode for stop-with-savepoint

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 421f50e38a21974d043278d6161fab50fdb8c3e6
Author: 周仁祥 <zh...@bytedance.com>
AuthorDate: Tue Dec 12 16:17:38 2023 +0800

    [FLINK-32881][checkpoint] add detached mode for stop-with-savepoint
---
 .../org/apache/flink/client/cli/CliFrontend.java   | 87 ++++++++++++++++++----
 .../apache/flink/client/cli/CliFrontendParser.java |  6 +-
 .../org/apache/flink/client/cli/StopOptions.java   |  9 +++
 .../apache/flink/client/program/ClusterClient.java | 19 +++++
 .../flink/client/program/MiniClusterClient.java    | 10 +++
 .../client/program/rest/RestClusterClient.java     | 68 ++++++++++-------
 .../cli/CliFrontendStopWithSavepointTest.java      | 30 ++++++++
 .../flink/client/program/TestingClusterClient.java | 21 ++++++
 .../flink/runtime/minicluster/MiniCluster.java     | 20 +++++
 .../environment/RemoteStreamEnvironmentTest.java   |  9 +++
 10 files changed, 233 insertions(+), 46 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 9aa1450b46f..60b34f3db9b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -581,23 +581,26 @@ public class CliFrontend {
                 activeCommandLine,
                 commandLine,
                 (clusterClient, effectiveConfiguration) -> {
-                    final String savepointPath;
-                    try {
-                        savepointPath =
-                                clusterClient
-                                        .stopWithSavepoint(
-                                                jobId,
-                                                advanceToEndOfEventTime,
-                                                targetDirectory,
-                                                formatType)
-                                        .get(
-                                                getClientTimeout(effectiveConfiguration).toMillis(),
-                                                TimeUnit.MILLISECONDS);
-                    } catch (Exception e) {
-                        throw new FlinkException(
-                                "Could not stop with a savepoint job \"" + jobId + "\".", e);
+                    // Trigger savepoint in detached mode
+                    if (stopOptions.isDetached()) {
+                        // trigger stop-with-savepoint in detached mode and
+                        // return the trigger id immediately
+                        stopWithDetachedSavepoint(
+                                clusterClient,
+                                jobId,
+                                advanceToEndOfEventTime,
+                                targetDirectory,
+                                formatType,
+                                getClientTimeout(effectiveConfiguration));
+                    } else {
+                        stopWithSavepoint(
+                                clusterClient,
+                                jobId,
+                                advanceToEndOfEventTime,
+                                targetDirectory,
+                                formatType,
+                                getClientTimeout(effectiveConfiguration));
                     }
-                    logAndSysout("Savepoint completed. Path: " + savepointPath);
                 });
     }
 
@@ -804,6 +807,58 @@ public class CliFrontend {
         }
     }
 
+    /** Sends a SavepointTriggerMessage to the job manager. */
+    private void stopWithSavepoint(
+            ClusterClient<?> clusterClient,
+            JobID jobId,
+            boolean advanceToEndOfEventTime,
+            String targetDirectory,
+            SavepointFormatType formatType,
+            Duration clientTimeout)
+            throws FlinkException {
+        logAndSysout("Triggering stop-with-savepoint for job " + jobId + '.');
+
+        CompletableFuture<String> savepointPathFuture =
+                clusterClient.stopWithSavepoint(
+                        jobId, advanceToEndOfEventTime, targetDirectory, formatType);
+
+        logAndSysout("Waiting for response...");
+
+        try {
+            final String savepointPath =
+                    savepointPathFuture.get(clientTimeout.toMillis(), TimeUnit.MILLISECONDS);
+
+            logAndSysout("Savepoint completed. Path: " + savepointPath);
+        } catch (Exception e) {
+            throw new FlinkException("Could not stop with a savepoint job \"" + jobId + "\".", e);
+        }
+    }
+
+    /** Sends a SavepointTriggerMessage to the job manager in detached mode. */
+    private void stopWithDetachedSavepoint(
+            ClusterClient<?> clusterClient,
+            JobID jobId,
+            boolean advanceToEndOfEventTime,
+            String targetDirectory,
+            SavepointFormatType formatType,
+            Duration clientTimeout)
+            throws FlinkException {
+        logAndSysout("Triggering stop-with-savepoint in detached mode for job " + jobId + '.');
+        try {
+            final String triggerId =
+                    clusterClient
+                            .stopWithDetachedSavepoint(
+                                    jobId, advanceToEndOfEventTime, targetDirectory, formatType)
+                            .get(clientTimeout.toMillis(), TimeUnit.MILLISECONDS);
+            logAndSysout(
+                    "Successfully trigger stop-with-savepoint in detached mode, triggerId: "
+                            + triggerId);
+        } catch (Exception e) {
+            throw new FlinkException(
+                    "Could not stop with a detached savepoint job \"" + jobId + "\".", e);
+        }
+    }
+
     /** Sends a SavepointTriggerMessage to the job manager. */
     private void triggerSavepoint(
             ClusterClient<?> clusterClient,
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 43491a97ee1..ceca4968d21 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -447,7 +447,8 @@ public class CliFrontendParser {
         return buildGeneralOptions(new Options())
                 .addOption(STOP_WITH_SAVEPOINT_PATH)
                 .addOption(STOP_AND_DRAIN)
-                .addOption(SAVEPOINT_FORMAT_OPTION);
+                .addOption(SAVEPOINT_FORMAT_OPTION)
+                .addOption(SAVEPOINT_DETACHED_OPTION);
     }
 
     static Options getSavepointCommandOptions() {
@@ -493,7 +494,8 @@ public class CliFrontendParser {
     private static Options getStopOptionsWithoutDeprecatedOptions(Options options) {
         return options.addOption(STOP_WITH_SAVEPOINT_PATH)
                 .addOption(STOP_AND_DRAIN)
-                .addOption(SAVEPOINT_FORMAT_OPTION);
+                .addOption(SAVEPOINT_FORMAT_OPTION)
+                .addOption(SAVEPOINT_DETACHED_OPTION);
     }
 
     private static Options getSavepointOptionsWithoutDeprecatedOptions(Options options) {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java
index 3363e1a9090..7c981fb53b2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.execution.SavepointFormatType;
 
 import org.apache.commons.cli.CommandLine;
 
+import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_DETACHED_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_FORMAT_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.STOP_AND_DRAIN;
 import static org.apache.flink.client.cli.CliFrontendParser.STOP_WITH_SAVEPOINT_PATH;
@@ -41,6 +42,8 @@ class StopOptions extends CommandLineOptions {
 
     private final SavepointFormatType formatType;
 
+    private boolean isDetached;
+
     StopOptions(CommandLine line) {
         super(line);
         this.args = line.getArgs();
@@ -58,6 +61,8 @@ class StopOptions extends CommandLineOptions {
         } else {
             formatType = SavepointFormatType.DEFAULT;
         }
+
+        this.isDetached = line.hasOption(SAVEPOINT_DETACHED_OPTION.getOpt());
     }
 
     String[] getArgs() {
@@ -79,4 +84,8 @@ class StopOptions extends CommandLineOptions {
     public SavepointFormatType getFormatType() {
         return formatType;
     }
+
+    public boolean isDetached() {
+        return isDetached;
+    }
 }
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index f25946971c3..30c126d1645 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -166,6 +166,25 @@ public interface ClusterClient<T> extends AutoCloseable {
             @Nullable final String savepointDirectory,
             final SavepointFormatType formatType);
 
+    /**
+     * Stops a program on Flink cluster whose job-manager is configured in this client's
+     * configuration. Stopping works only for streaming programs. Be aware, that the program might
+     * continue to run for a while after sending the stop command, because after sources stopped to
+     * emit data all operators need to finish processing.
+     *
+     * @param jobId the job ID of the streaming program to stop
+     * @param advanceToEndOfEventTime flag indicating if the source should inject a {@code
+     *     MAX_WATERMARK} in the pipeline
+     * @param savepointDirectory directory the savepoint should be written to
+     * @param formatType a binary format of the savepoint
+     * @return the savepoint trigger id
+     */
+    CompletableFuture<String> stopWithDetachedSavepoint(
+            final JobID jobId,
+            final boolean advanceToEndOfEventTime,
+            @Nullable final String savepointDirectory,
+            final SavepointFormatType formatType);
+
     /**
      * Triggers a savepoint for the job identified by the job id. The savepoint will be written to
      * the given savepoint directory, or {@link
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
index 889fa8b8202..d8c64dd9eb3 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
@@ -104,6 +104,16 @@ public class MiniClusterClient implements ClusterClient<MiniClusterClient.MiniCl
                 jobId, savepointDirectory, advanceToEndOfEventTime, formatType);
     }
 
+    @Override
+    public CompletableFuture<String> stopWithDetachedSavepoint(
+            JobID jobId,
+            boolean advanceToEndOfEventTime,
+            @Nullable String savepointDirectory,
+            SavepointFormatType formatType) {
+        return miniCluster.stopWithDetachedSavepoint(
+                jobId, savepointDirectory, advanceToEndOfEventTime, formatType);
+    }
+
     @Override
     public CompletableFuture<String> triggerSavepoint(
             JobID jobId, @Nullable String savepointDirectory, SavepointFormatType formatType) {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 06587021bcb..6fa83259671 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -505,35 +505,16 @@ public class RestClusterClient<T> implements ClusterClient<T> {
             final boolean advanceToEndOfTime,
             @Nullable final String savepointDirectory,
             final SavepointFormatType formatType) {
+        return stopWithSavepoint(jobId, advanceToEndOfTime, savepointDirectory, formatType, false);
+    }
 
-        final StopWithSavepointTriggerHeaders stopWithSavepointTriggerHeaders =
-                StopWithSavepointTriggerHeaders.getInstance();
-
-        final SavepointTriggerMessageParameters stopWithSavepointTriggerMessageParameters =
-                stopWithSavepointTriggerHeaders.getUnresolvedMessageParameters();
-        stopWithSavepointTriggerMessageParameters.jobID.resolve(jobId);
-
-        final CompletableFuture<TriggerResponse> responseFuture =
-                sendRequest(
-                        stopWithSavepointTriggerHeaders,
-                        stopWithSavepointTriggerMessageParameters,
-                        new StopWithSavepointRequestBody(
-                                savepointDirectory, advanceToEndOfTime, formatType, null));
-
-        return responseFuture
-                .thenCompose(
-                        savepointTriggerResponseBody -> {
-                            final TriggerId savepointTriggerId =
-                                    savepointTriggerResponseBody.getTriggerId();
-                            return pollSavepointAsync(jobId, savepointTriggerId);
-                        })
-                .thenApply(
-                        savepointInfo -> {
-                            if (savepointInfo.getFailureCause() != null) {
-                                throw new CompletionException(savepointInfo.getFailureCause());
-                            }
-                            return savepointInfo.getLocation();
-                        });
+    @Override
+    public CompletableFuture<String> stopWithDetachedSavepoint(
+            final JobID jobId,
+            final boolean advanceToEndOfTime,
+            @Nullable final String savepointDirectory,
+            final SavepointFormatType formatType) {
+        return stopWithSavepoint(jobId, advanceToEndOfTime, savepointDirectory, formatType, true);
     }
 
     @Override
@@ -619,6 +600,30 @@ public class RestClusterClient<T> implements ClusterClient<T> {
                         });
     }
 
+    public CompletableFuture<String> stopWithSavepoint(
+            final JobID jobId,
+            final boolean advanceToEndOfTime,
+            @Nullable final String savepointDirectory,
+            final SavepointFormatType formatType,
+            final boolean isDetachedMode) {
+
+        final StopWithSavepointTriggerHeaders stopWithSavepointTriggerHeaders =
+                StopWithSavepointTriggerHeaders.getInstance();
+
+        final SavepointTriggerMessageParameters stopWithSavepointTriggerMessageParameters =
+                stopWithSavepointTriggerHeaders.getUnresolvedMessageParameters();
+        stopWithSavepointTriggerMessageParameters.jobID.resolve(jobId);
+
+        final CompletableFuture<TriggerResponse> responseFuture =
+                sendRequest(
+                        stopWithSavepointTriggerHeaders,
+                        stopWithSavepointTriggerMessageParameters,
+                        new StopWithSavepointRequestBody(
+                                savepointDirectory, advanceToEndOfTime, formatType, null));
+
+        return getSavepointTriggerFuture(jobId, isDetachedMode, responseFuture);
+    }
+
     private CompletableFuture<String> triggerSavepoint(
             final JobID jobId,
             final @Nullable String savepointDirectory,
@@ -638,6 +643,13 @@ public class RestClusterClient<T> implements ClusterClient<T> {
                         new SavepointTriggerRequestBody(
                                 savepointDirectory, cancelJob, formatType, null));
 
+        return getSavepointTriggerFuture(jobId, isDetachedMode, responseFuture);
+    }
+
+    private CompletableFuture<String> getSavepointTriggerFuture(
+            JobID jobId,
+            boolean isDetachedMode,
+            CompletableFuture<TriggerResponse> responseFuture) {
         CompletableFuture<String> futureResult;
         if (isDetachedMode) {
             // we just return the savepoint trigger id in detached savepoint,
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopWithSavepointTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopWithSavepointTest.java
index 927b9123519..66fa420e662 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopWithSavepointTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopWithSavepointTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.client.program.TestingClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.rest.messages.TriggerId;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.concurrent.FutureUtils;
 
@@ -33,6 +34,8 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
 
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 
@@ -142,6 +145,33 @@ class CliFrontendStopWithSavepointTest extends CliFrontendTestBase {
         stopWithSavepointLatch.await();
     }
 
+    @Test
+    void testStopWithDetachedSavepoint() throws Exception {
+        ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+        System.setOut(new PrintStream(buffer));
+
+        JobID jid = new JobID();
+
+        String[] parameters = {"-detached", jid.toString()};
+        OneShotLatch stopWithSavepointLatch = new OneShotLatch();
+        TestingClusterClient<String> clusterClient = new TestingClusterClient<>();
+        String savepointTriggerId = new TriggerId().toString();
+        clusterClient.setStopWithDetachedSavepointFunction(
+                (jobID, advanceToEndOfEventTime, savepointDirectory, formatType) -> {
+                    assertThat(jobID).isEqualTo(jid);
+                    assertThat(advanceToEndOfEventTime).isFalse();
+                    assertThat(savepointDirectory).isNull();
+                    stopWithSavepointLatch.trigger();
+                    return CompletableFuture.completedFuture(savepointTriggerId);
+                });
+        MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient);
+        testFrontend.stop(parameters);
+
+        stopWithSavepointLatch.await();
+        // the savepoint trigger id will output to the stdout in detached mode.
+        assertThat(buffer.toString()).contains(savepointTriggerId);
+    }
+
     @Test
     void testStopOnlyWithMaxWM() throws Exception {
         JobID jid = new JobID();
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java b/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java
index 0649ff3e777..261b62b0564 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java
@@ -56,6 +56,11 @@ public class TestingClusterClient<T> implements ClusterClient<T> {
             stopWithSavepointFunction =
                     (ignore1, ignore2, savepointPath, formatType) ->
                             CompletableFuture.completedFuture(savepointPath);
+    private QuadFunction<JobID, Boolean, String, SavepointFormatType, CompletableFuture<String>>
+            stopWithDetachedSavepointFunction =
+                    (ignore1, ignore2, savepointPath, formatType) ->
+                            CompletableFuture.completedFuture(new TriggerId().toString());
+
     private TriFunction<JobID, String, SavepointFormatType, CompletableFuture<String>>
             triggerSavepointFunction =
                     (ignore, savepointPath, formatType) ->
@@ -84,6 +89,12 @@ public class TestingClusterClient<T> implements ClusterClient<T> {
         this.stopWithSavepointFunction = stopWithSavepointFunction;
     }
 
+    public void setStopWithDetachedSavepointFunction(
+            QuadFunction<JobID, Boolean, String, SavepointFormatType, CompletableFuture<String>>
+                    stopWithDetachedSavepointFunction) {
+        this.stopWithDetachedSavepointFunction = stopWithDetachedSavepointFunction;
+    }
+
     public void setTriggerSavepointFunction(
             TriFunction<JobID, String, SavepointFormatType, CompletableFuture<String>>
                     triggerSavepointFunction) {
@@ -172,6 +183,16 @@ public class TestingClusterClient<T> implements ClusterClient<T> {
                 jobId, advanceToEndOfEventTime, savepointDirectory, formatType);
     }
 
+    @Override
+    public CompletableFuture<String> stopWithDetachedSavepoint(
+            JobID jobId,
+            boolean advanceToEndOfEventTime,
+            @org.jetbrains.annotations.Nullable String savepointDirectory,
+            SavepointFormatType formatType) {
+        return stopWithDetachedSavepointFunction.apply(
+                jobId, advanceToEndOfEventTime, savepointDirectory, formatType);
+    }
+
     @Override
     public CompletableFuture<String> triggerSavepoint(
             JobID jobId, @Nullable String savepointDirectory, SavepointFormatType formatType) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index cc8b6bb91e8..740d7e5c701 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -940,6 +940,26 @@ public class MiniCluster implements AutoCloseableAsync {
                                 rpcTimeout));
     }
 
+    public CompletableFuture<String> stopWithDetachedSavepoint(
+            JobID jobId,
+            String targetDirectory,
+            boolean terminate,
+            SavepointFormatType formatType) {
+        return runDispatcherCommand(
+                dispatcherGateway -> {
+                    dispatcherGateway.stopWithSavepointAndGetLocation(
+                            jobId,
+                            targetDirectory,
+                            formatType,
+                            terminate
+                                    ? TriggerSavepointMode.TERMINATE_WITH_SAVEPOINT
+                                    : TriggerSavepointMode.SUSPEND_WITH_SAVEPOINT,
+                            rpcTimeout);
+                    // return immediately, no need to wait for the future savepoint path
+                    return CompletableFuture.completedFuture("");
+                });
+    }
+
     public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) {
         return runDispatcherCommand(
                 dispatcherGateway -> dispatcherGateway.disposeSavepoint(savepointPath, rpcTimeout));
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
index 2f89d3cafbf..22a7b027bc3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
@@ -269,6 +269,15 @@ public class RemoteStreamEnvironmentTest extends TestLogger {
             return null;
         }
 
+        @Override
+        public CompletableFuture<String> stopWithDetachedSavepoint(
+                JobID jobId,
+                boolean advanceToEndOfEventTime,
+                @org.jetbrains.annotations.Nullable String savepointDirectory,
+                SavepointFormatType formatType) {
+            return null;
+        }
+
         @Override
         public CompletableFuture<String> triggerSavepoint(
                 JobID jobId, @Nullable String savepointDirectory, SavepointFormatType formatType) {