You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2017/10/10 11:57:51 UTC
[GitHub] flink pull request #4789: [FLINK-7780] [REST][Client] Define protocol for tr...
GitHub user zentol opened a pull request:
https://github.com/apache/flink/pull/4789
[FLINK-7780] [REST][Client] Define protocol for triggering savepoints
Based on #4788.
## What is the purpose of the change
This PR includes all the client-side changes necessary to trigger savepoints with FLIP-6, including the message headers etc. . It does NOT include a port of the savepoint handlers.
Do note that the REST protocol is _incompatible_ with the existing savepoint handlers. For one it is now a POST instead of GET (as it should be), and the savepoint target directory is no longer a path but a query parameter (for the reasoning, see SavepointTriggerHeaders#getTargetRestEndpointURL). There has also been a recent discussion on the mailing list to make this a query parameter.
## Brief change log
* refactor/add utility classes for easier testing
* move savepoint logic from CliFrontend into ClusterClient (as we did with stop/cancel in ad380463d3d44cdd98302bf072bc5deba8696b5b)
* define REST protocol for triggering savepoints and integrate it into the `RestClusterClient`
## Verifying this change
This change added tests and can be verified as follows:
* the changes to the CliFrontend are covered by modified tests in CliFrontendSavepointTest
* the changes to the ClusterClient are covered by new tests in ClusterClientTest
* the changes to the RestClusterClient are covered by RestClusterClientTest#testTriggerSavepoint
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/zentol/flink 7780
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/4789.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #4789
----
commit 237140e0799bf20869fcb2bc168e05e879d23895
Author: zentol <ch...@apache.org>
Date: 2017-10-10T10:57:05Z
[FLINK-7790] [REST] Unresolved query params not added to request URL
commit 03418468b7eba607d3df31870da6651da479455b
Author: zentol <ch...@apache.org>
Date: 2017-10-09T11:06:06Z
[refactor] [tests] Refactor CliFrontend mocking into utility class
commit 88010f49dcf452186d56cf9575835a3d738ddff5
Author: zentol <ch...@apache.org>
Date: 2017-10-10T11:22:59Z
[refactor] [tests] Generalize gateway mocking in ClusterClientTest
commit 9949452a5c656266008dec8718e32db11c05fbe3
Author: zentol <ch...@apache.org>
Date: 2017-10-10T11:23:21Z
[refactor] [tests] Generalize test handler generation
commit ffcd614f4c5b55f84cbe8f2c591c138ea21d588b
Author: zentol <ch...@apache.org>
Date: 2017-10-09T11:34:52Z
[FLINK-7780] [Client] Move savepoint logic into ClusterClient
commit ed5e241e38bc0af8bdde49483dc6a673f09fd0a3
Author: zentol <ch...@apache.org>
Date: 2017-10-09T16:09:36Z
[FLINK-7780] [REST] Define savepoint trigger protocol
----
---
[GitHub] flink issue #4789: [FLINK-7780] [REST][Client] Define protocol for triggerin...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:
https://github.com/apache/flink/pull/4789
merging.
---
[GitHub] flink pull request #4789: [FLINK-7780] [REST][Client] Define protocol for tr...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4789#discussion_r143731232
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerHeaders.java ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.savepoints;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * These headers define the protocol for triggering a savepoint.
+ */
+public class SavepointTriggerHeaders implements MessageHeaders<EmptyRequestBody, SavepointTriggerResponseBody, SavepointMessageParameters> {
+
+ private static final SavepointTriggerHeaders INSTANCE = new SavepointTriggerHeaders();
+
+ private SavepointTriggerHeaders() {
+ }
+
+ @Override
+ public Class<EmptyRequestBody> getRequestClass() {
+ return EmptyRequestBody.class;
+ }
+
+ @Override
+ public Class<SavepointTriggerResponseBody> getResponseClass() {
+ return SavepointTriggerResponseBody.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.ACCEPTED;
+ }
+
+ @Override
+ public SavepointMessageParameters getUnresolvedMessageParameters() {
+ return new SavepointMessageParameters();
+ }
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.POST;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ /*
+ Note: this is different to the existing implementation for which the targetDirectory is a path parameter
+ Having it as a path parameter has several downsides as it
+ - is optional (which we only allow for query parameters)
+ - causes parsing issues, since the path is not reliably treated as a single parameter
+ - does not denote a hierarchy which path parameters are supposed to do
+ - interacts badly with the POST spec, as it would require the progress url to also contain the targetDirectory
+ */
--- End diff --
Good thing to change the old behaviour!
---
[GitHub] flink pull request #4789: [FLINK-7780] [REST][Client] Define protocol for tr...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/4789
---
[GitHub] flink pull request #4789: [FLINK-7780] [REST][Client] Define protocol for tr...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4789#discussion_r143728931
--- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -726,35 +724,29 @@ protected int savepoint(String[] args) {
*/
private int triggerSavepoint(SavepointOptions options, JobID jobId, String savepointDirectory) {
try {
- ActorGateway jobManager = getJobManagerGateway(options);
-
- logAndSysout("Triggering savepoint for job " + jobId + ".");
- Future<Object> response = jobManager.ask(new TriggerSavepoint(jobId, Option.apply(savepointDirectory)),
- new FiniteDuration(1, TimeUnit.HOURS));
-
- Object result;
+ CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(options.getCommandLine());
+ ClusterClient client = activeCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory);
try {
- logAndSysout("Waiting for response...");
- result = Await.result(response, FiniteDuration.Inf());
- }
- catch (Exception e) {
- throw new Exception("Triggering a savepoint for the job " + jobId + " failed.", e);
- }
+ logAndSysout("Triggering savepoint for job " + jobId + ".");
+ CompletableFuture<String> savepointPathFuture = client.triggerSavepoint(jobId, savepointDirectory);
- if (result instanceof TriggerSavepointSuccess) {
- TriggerSavepointSuccess success = (TriggerSavepointSuccess) result;
- logAndSysout("Savepoint completed. Path: " + success.savepointPath());
+ String savepointPath;
+ try {
+ logAndSysout("Waiting for response...");
+ savepointPath = savepointPathFuture.get();
+ }
+ catch (ExecutionException ee) {
+ Throwable cause = ExceptionUtils.stripExecutionException(ee);
+ throw new Exception("Triggering a savepoint for the job " + jobId + " failed.", cause);
--- End diff --
`FlinkException`
---