You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/12/09 12:49:36 UTC

[flink] 03/03: [FLINK-25155] Support claim mode in rest api

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7641c239b777273126c10efab334e3247771a28b
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Fri Dec 3 15:47:03 2021 +0100

    [FLINK-25155] Support claim mode in rest api
---
 .../shortcodes/generated/rest_v1_dispatcher.html       | 10 ++++++++++
 .../apache/flink/client/cli/CliFrontendRunTest.java    |  2 +-
 .../runtime/webmonitor/handlers/JarRunHandler.java     |  9 ++++++++-
 .../runtime/webmonitor/handlers/JarRunRequestBody.java | 18 ++++++++++++++++--
 .../handlers/JarRunHandlerParameterTest.java           |  6 ++++--
 .../webmonitor/handlers/JarRunRequestBodyTest.java     | 11 ++++++++++-
 .../src/test/resources/rest_api_v1.snapshot            |  6 +++++-
 7 files changed, 54 insertions(+), 8 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
index 4e4b8ac..d4f7354 100644
--- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
+++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
@@ -869,6 +869,10 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa
         "type" : "string"
       }
     },
+    "restoreMode" : {
+      "type" : "string",
+      "enum" : [ "CLAIM", "LEGACY" ]
+    },
     "savepointPath" : {
       "type" : "string"
     }
@@ -1028,6 +1032,9 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa
         "type" : "object",
         "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:LogInfo",
         "properties" : {
+          "mtime" : {
+            "type" : "integer"
+          },
           "name" : {
             "type" : "string"
           },
@@ -5729,6 +5736,9 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa
         "type" : "object",
         "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:LogInfo",
         "properties" : {
+          "mtime" : {
+            "type" : "integer"
+          },
           "name" : {
             "type" : "string"
           },
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
index bf47053..861bb1f 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
@@ -166,7 +166,7 @@ public class CliFrontendRunTest extends CliFrontendTestBase {
 
         SavepointRestoreSettings savepointSettings = executionOptions.getSavepointRestoreSettings();
         assertTrue(savepointSettings.restoreSavepoint());
-        assertEquals(RestoreMode.NO_CLAIM, savepointSettings.getRestoreMode());
+        assertEquals(RestoreMode.LEGACY, savepointSettings.getRestoreMode());
         assertEquals("expectedSavepointPath", savepointSettings.getRestorePath());
         assertTrue(savepointSettings.allowNonRestoredState());
     }
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index ad258d6..be57752 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -26,6 +26,8 @@ import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
@@ -40,6 +42,7 @@ import javax.annotation.Nonnull;
 
 import java.nio.file.Path;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
@@ -142,10 +145,14 @@ public class JarRunHandler
                                                 request, SavepointPathQueryParameter.class)),
                         null,
                         log);
+        final RestoreMode restoreMode =
+                Optional.ofNullable(requestBody.getRestoreMode())
+                        .orElseGet(SavepointConfigOptions.RESTORE_MODE::defaultValue);
         final SavepointRestoreSettings savepointRestoreSettings;
         if (savepointPath != null) {
             savepointRestoreSettings =
-                    SavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState);
+                    SavepointRestoreSettings.forPath(
+                            savepointPath, allowNonRestoredState, restoreMode);
         } else {
             savepointRestoreSettings = SavepointRestoreSettings.none();
         }
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java
index 2ca235d..8e8e3fd 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
 import org.apache.flink.runtime.rest.messages.RequestBody;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -35,6 +36,7 @@ import java.util.List;
 public class JarRunRequestBody extends JarRequestBody {
     private static final String FIELD_NAME_ALLOW_NON_RESTORED_STATE = "allowNonRestoredState";
     private static final String FIELD_NAME_SAVEPOINT_PATH = "savepointPath";
+    private static final String FIELD_NAME_SAVEPOINT_RESTORE_MODE = "restoreMode";
 
     @JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE)
     @Nullable
@@ -44,8 +46,12 @@ public class JarRunRequestBody extends JarRequestBody {
     @Nullable
     private String savepointPath;
 
+    @JsonProperty(FIELD_NAME_SAVEPOINT_RESTORE_MODE)
+    @Nullable
+    private RestoreMode restoreMode;
+
     public JarRunRequestBody() {
-        this(null, null, null, null, null, null, null);
+        this(null, null, null, null, null, null, null, null);
     }
 
     @JsonCreator
@@ -58,10 +64,12 @@ public class JarRunRequestBody extends JarRequestBody {
             @Nullable @JsonProperty(FIELD_NAME_JOB_ID) JobID jobId,
             @Nullable @JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE)
                     Boolean allowNonRestoredState,
-            @Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_PATH) String savepointPath) {
+            @Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_PATH) String savepointPath,
+            @Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_RESTORE_MODE) RestoreMode restoreMode) {
         super(entryClassName, programArguments, programArgumentsList, parallelism, jobId);
         this.allowNonRestoredState = allowNonRestoredState;
         this.savepointPath = savepointPath;
+        this.restoreMode = restoreMode;
     }
 
     @Nullable
@@ -75,4 +83,10 @@ public class JarRunRequestBody extends JarRequestBody {
     public String getSavepointPath() {
         return savepointPath;
     }
+
+    @Nullable
+    @JsonIgnore
+    public RestoreMode getRestoreMode() {
+        return restoreMode;
+    }
 }
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
index 5ad4d60..01835b0 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
@@ -182,12 +183,13 @@ public class JarRunHandlerParameterTest
                 PARALLELISM,
                 null,
                 ALLOW_NON_RESTORED_STATE_QUERY,
-                RESTORE_PATH);
+                RESTORE_PATH,
+                RestoreMode.CLAIM);
     }
 
     @Override
     JarRunRequestBody getJarRequestBodyWithJobId(JobID jobId) {
-        return new JarRunRequestBody(null, null, null, null, jobId, null, null);
+        return new JarRunRequestBody(null, null, null, null, jobId, null, null, null);
     }
 
     @Test
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java
index 38c964a..bec96cc 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
 import org.apache.flink.runtime.rest.messages.RestRequestMarshallingTestBase;
 
 import java.util.Arrays;
@@ -36,7 +37,14 @@ public class JarRunRequestBodyTest extends RestRequestMarshallingTestBase<JarRun
     @Override
     protected JarRunRequestBody getTestRequestInstance() {
         return new JarRunRequestBody(
-                "hello", "world", Arrays.asList("boo", "far"), 4, new JobID(), true, "foo/bar");
+                "hello",
+                "world",
+                Arrays.asList("boo", "far"),
+                4,
+                new JobID(),
+                true,
+                "foo/bar",
+                RestoreMode.CLAIM);
     }
 
     @Override
@@ -49,5 +57,6 @@ public class JarRunRequestBodyTest extends RestRequestMarshallingTestBase<JarRun
         assertEquals(expected.getJobId(), actual.getJobId());
         assertEquals(expected.getAllowNonRestoredState(), actual.getAllowNonRestoredState());
         assertEquals(expected.getSavepointPath(), actual.getSavepointPath());
+        assertEquals(expected.getRestoreMode(), actual.getRestoreMode());
     }
 }
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 279adbb..8ca5d08 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -435,6 +435,10 @@
         },
         "savepointPath" : {
           "type" : "string"
+        },
+        "restoreMode" : {
+          "type" : "string",
+          "enum" : [ "CLAIM", "LEGACY" ]
         }
       }
     },
@@ -3374,4 +3378,4 @@
       }
     }
   } ]
-}
\ No newline at end of file
+}