You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/12/09 12:49:36 UTC
[flink] 03/03: [FLINK-25155] Support claim mode in rest api
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7641c239b777273126c10efab334e3247771a28b
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Fri Dec 3 15:47:03 2021 +0100
[FLINK-25155] Support claim mode in rest api
---
.../shortcodes/generated/rest_v1_dispatcher.html | 10 ++++++++++
.../apache/flink/client/cli/CliFrontendRunTest.java | 2 +-
.../runtime/webmonitor/handlers/JarRunHandler.java | 9 ++++++++-
.../runtime/webmonitor/handlers/JarRunRequestBody.java | 18 ++++++++++++++++--
.../handlers/JarRunHandlerParameterTest.java | 6 ++++--
.../webmonitor/handlers/JarRunRequestBodyTest.java | 11 ++++++++++-
.../src/test/resources/rest_api_v1.snapshot | 6 +++++-
7 files changed, 54 insertions(+), 8 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
index 4e4b8ac..d4f7354 100644
--- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
+++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
@@ -869,6 +869,10 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa
"type" : "string"
}
},
+ "restoreMode" : {
+ "type" : "string",
+ "enum" : [ "CLAIM", "LEGACY" ]
+ },
"savepointPath" : {
"type" : "string"
}
@@ -1028,6 +1032,9 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:LogInfo",
"properties" : {
+ "mtime" : {
+ "type" : "integer"
+ },
"name" : {
"type" : "string"
},
@@ -5729,6 +5736,9 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:LogInfo",
"properties" : {
+ "mtime" : {
+ "type" : "integer"
+ },
"name" : {
"type" : "string"
},
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
index bf47053..861bb1f 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
@@ -166,7 +166,7 @@ public class CliFrontendRunTest extends CliFrontendTestBase {
SavepointRestoreSettings savepointSettings = executionOptions.getSavepointRestoreSettings();
assertTrue(savepointSettings.restoreSavepoint());
- assertEquals(RestoreMode.NO_CLAIM, savepointSettings.getRestoreMode());
+ assertEquals(RestoreMode.LEGACY, savepointSettings.getRestoreMode());
assertEquals("expectedSavepointPath", savepointSettings.getRestorePath());
assertTrue(savepointSettings.allowNonRestoredState());
}
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index ad258d6..be57752 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -26,6 +26,8 @@ import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
@@ -40,6 +42,7 @@ import javax.annotation.Nonnull;
import java.nio.file.Path;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
@@ -142,10 +145,14 @@ public class JarRunHandler
request, SavepointPathQueryParameter.class)),
null,
log);
+ final RestoreMode restoreMode =
+ Optional.ofNullable(requestBody.getRestoreMode())
+ .orElseGet(SavepointConfigOptions.RESTORE_MODE::defaultValue);
final SavepointRestoreSettings savepointRestoreSettings;
if (savepointPath != null) {
savepointRestoreSettings =
- SavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState);
+ SavepointRestoreSettings.forPath(
+ savepointPath, allowNonRestoredState, restoreMode);
} else {
savepointRestoreSettings = SavepointRestoreSettings.none();
}
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java
index 2ca235d..8e8e3fd 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -35,6 +36,7 @@ import java.util.List;
public class JarRunRequestBody extends JarRequestBody {
private static final String FIELD_NAME_ALLOW_NON_RESTORED_STATE = "allowNonRestoredState";
private static final String FIELD_NAME_SAVEPOINT_PATH = "savepointPath";
+ private static final String FIELD_NAME_SAVEPOINT_RESTORE_MODE = "restoreMode";
@JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE)
@Nullable
@@ -44,8 +46,12 @@ public class JarRunRequestBody extends JarRequestBody {
@Nullable
private String savepointPath;
+ @JsonProperty(FIELD_NAME_SAVEPOINT_RESTORE_MODE)
+ @Nullable
+ private RestoreMode restoreMode;
+
public JarRunRequestBody() {
- this(null, null, null, null, null, null, null);
+ this(null, null, null, null, null, null, null, null);
}
@JsonCreator
@@ -58,10 +64,12 @@ public class JarRunRequestBody extends JarRequestBody {
@Nullable @JsonProperty(FIELD_NAME_JOB_ID) JobID jobId,
@Nullable @JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE)
Boolean allowNonRestoredState,
- @Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_PATH) String savepointPath) {
+ @Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_PATH) String savepointPath,
+ @Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_RESTORE_MODE) RestoreMode restoreMode) {
super(entryClassName, programArguments, programArgumentsList, parallelism, jobId);
this.allowNonRestoredState = allowNonRestoredState;
this.savepointPath = savepointPath;
+ this.restoreMode = restoreMode;
}
@Nullable
@@ -75,4 +83,10 @@ public class JarRunRequestBody extends JarRequestBody {
public String getSavepointPath() {
return savepointPath;
}
+
+ @Nullable
+ @JsonIgnore
+ public RestoreMode getRestoreMode() {
+ return restoreMode;
+ }
}
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
index 5ad4d60..01835b0 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
@@ -182,12 +183,13 @@ public class JarRunHandlerParameterTest
PARALLELISM,
null,
ALLOW_NON_RESTORED_STATE_QUERY,
- RESTORE_PATH);
+ RESTORE_PATH,
+ RestoreMode.CLAIM);
}
@Override
JarRunRequestBody getJarRequestBodyWithJobId(JobID jobId) {
- return new JarRunRequestBody(null, null, null, null, jobId, null, null);
+ return new JarRunRequestBody(null, null, null, null, jobId, null, null, null);
}
@Test
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java
index 38c964a..bec96cc 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.rest.messages.RestRequestMarshallingTestBase;
import java.util.Arrays;
@@ -36,7 +37,14 @@ public class JarRunRequestBodyTest extends RestRequestMarshallingTestBase<JarRun
@Override
protected JarRunRequestBody getTestRequestInstance() {
return new JarRunRequestBody(
- "hello", "world", Arrays.asList("boo", "far"), 4, new JobID(), true, "foo/bar");
+ "hello",
+ "world",
+ Arrays.asList("boo", "far"),
+ 4,
+ new JobID(),
+ true,
+ "foo/bar",
+ RestoreMode.CLAIM);
}
@Override
@@ -49,5 +57,6 @@ public class JarRunRequestBodyTest extends RestRequestMarshallingTestBase<JarRun
assertEquals(expected.getJobId(), actual.getJobId());
assertEquals(expected.getAllowNonRestoredState(), actual.getAllowNonRestoredState());
assertEquals(expected.getSavepointPath(), actual.getSavepointPath());
+ assertEquals(expected.getRestoreMode(), actual.getRestoreMode());
}
}
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 279adbb..8ca5d08 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -435,6 +435,10 @@
},
"savepointPath" : {
"type" : "string"
+ },
+ "restoreMode" : {
+ "type" : "string",
+ "enum" : [ "CLAIM", "LEGACY" ]
}
}
},
@@ -3374,4 +3378,4 @@
}
}
} ]
-}
\ No newline at end of file
+}