You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2017/10/10 11:57:51 UTC

[GitHub] flink pull request #4789: [FLINK-7780] [REST][Client] Define protocol for tr...

GitHub user zentol opened a pull request:

    https://github.com/apache/flink/pull/4789

    [FLINK-7780] [REST][Client] Define protocol for triggering savepoints

    Based on #4788.
    
    ## What is the purpose of the change
    
    This PR includes all the client-side changes necessary to trigger savepoints with FLIP-6, including the message headers etc. . It does NOT include a port of the savepoint handlers.
    
    Do note that the REST protocol is _incompatible_ with the existing savepoint handlers. For one it is now a POST instead of GET (as it should be), and the savepoint target directory is no longer a path but a query parameter (for the reasoning, see SavepointTriggerHeaders#getTargetRestEndpointURL). There has also been a recent discussion on the mailing list to make this a query parameter.
    
    ## Brief change log
    
    * refactor/add utility classes for easier testing
    * move savepoint logic from CliFrontend into ClusterClient (as we did with stop/cancel in ad380463d3d44cdd98302bf072bc5deba8696b5b)
    * define REST protocol for triggering savepoints and integrate it into the `RestClusterClient`
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
    
    * the changes to the CliFrontend are covered by modified tests in CliFrontendSavepointTest
    * the changes to the ClusterClient are covered by new tests in ClusterClientTest
    * the changes to the RestClusterClient are covered by RestClusterClientTest#testTriggerSavepoint
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zentol/flink 7780

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4789.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4789
    
----
commit 237140e0799bf20869fcb2bc168e05e879d23895
Author: zentol <ch...@apache.org>
Date:   2017-10-10T10:57:05Z

    [FLINK-7790] [REST] Unresolved query params not added to request URL

commit 03418468b7eba607d3df31870da6651da479455b
Author: zentol <ch...@apache.org>
Date:   2017-10-09T11:06:06Z

    [refactor] [tests] Refactor CliFrontend mocking into utility class

commit 88010f49dcf452186d56cf9575835a3d738ddff5
Author: zentol <ch...@apache.org>
Date:   2017-10-10T11:22:59Z

    [refactor] [tests] Generalize gateway mocking in ClusterClientTest

commit 9949452a5c656266008dec8718e32db11c05fbe3
Author: zentol <ch...@apache.org>
Date:   2017-10-10T11:23:21Z

    [refactor] [tests] Generalize test handler generation

commit ffcd614f4c5b55f84cbe8f2c591c138ea21d588b
Author: zentol <ch...@apache.org>
Date:   2017-10-09T11:34:52Z

    [FLINK-7780] [Client] Move savepoint logic into ClusterClient

commit ed5e241e38bc0af8bdde49483dc6a673f09fd0a3
Author: zentol <ch...@apache.org>
Date:   2017-10-09T16:09:36Z

    [FLINK-7780] [REST] Define savepoint trigger protocol

----


---

[GitHub] flink issue #4789: [FLINK-7780] [REST][Client] Define protocol for triggerin...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/4789
  
    merging.


---

[GitHub] flink pull request #4789: [FLINK-7780] [REST][Client] Define protocol for tr...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4789#discussion_r143731232
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerHeaders.java ---
    @@ -0,0 +1,79 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.messages.job.savepoints;
    +
    +import org.apache.flink.runtime.rest.HttpMethodWrapper;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.MessageHeaders;
    +
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
    +
    +/**
    + * These headers define the protocol for triggering a savepoint.
    + */
    +public class SavepointTriggerHeaders implements MessageHeaders<EmptyRequestBody, SavepointTriggerResponseBody, SavepointMessageParameters> {
    +
    +	private static final SavepointTriggerHeaders INSTANCE = new SavepointTriggerHeaders();
    +
    +	private SavepointTriggerHeaders() {
    +	}
    +
    +	@Override
    +	public Class<EmptyRequestBody> getRequestClass() {
    +		return EmptyRequestBody.class;
    +	}
    +
    +	@Override
    +	public Class<SavepointTriggerResponseBody> getResponseClass() {
    +		return SavepointTriggerResponseBody.class;
    +	}
    +
    +	@Override
    +	public HttpResponseStatus getResponseStatusCode() {
    +		return HttpResponseStatus.ACCEPTED;
    +	}
    +
    +	@Override
    +	public SavepointMessageParameters getUnresolvedMessageParameters() {
    +		return new SavepointMessageParameters();
    +	}
    +
    +	@Override
    +	public HttpMethodWrapper getHttpMethod() {
    +		return HttpMethodWrapper.POST;
    +	}
    +
    +	@Override
    +	public String getTargetRestEndpointURL() {
    +		/*
    +		Note: this is different to the existing implementation for which the targetDirectory is a path parameter
    +		Having it as a path parameter has several downsides as it
    +			- is optional (which we only allow for query parameters)
    +			- causes parsing issues, since the path is not reliably treated as a single parameter
    +			- does not denote a hierarchy which path parameters are supposed to do
    +			- interacts badly with the POST spec, as it would require the progress url to also contain the targetDirectory
    +		 */
    --- End diff --
    
    Good thing to change the old behaviour!


---

[GitHub] flink pull request #4789: [FLINK-7780] [REST][Client] Define protocol for tr...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4789


---

[GitHub] flink pull request #4789: [FLINK-7780] [REST][Client] Define protocol for tr...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4789#discussion_r143728931
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
    @@ -726,35 +724,29 @@ protected int savepoint(String[] args) {
     	 */
     	private int triggerSavepoint(SavepointOptions options, JobID jobId, String savepointDirectory) {
     		try {
    -			ActorGateway jobManager = getJobManagerGateway(options);
    -
    -			logAndSysout("Triggering savepoint for job " + jobId + ".");
    -			Future<Object> response = jobManager.ask(new TriggerSavepoint(jobId, Option.apply(savepointDirectory)),
    -					new FiniteDuration(1, TimeUnit.HOURS));
    -
    -			Object result;
    +			CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(options.getCommandLine());
    +			ClusterClient client = activeCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory);
     			try {
    -				logAndSysout("Waiting for response...");
    -				result = Await.result(response, FiniteDuration.Inf());
    -			}
    -			catch (Exception e) {
    -				throw new Exception("Triggering a savepoint for the job " + jobId + " failed.", e);
    -			}
    +				logAndSysout("Triggering savepoint for job " + jobId + ".");
    +				CompletableFuture<String> savepointPathFuture = client.triggerSavepoint(jobId, savepointDirectory);
     
    -			if (result instanceof TriggerSavepointSuccess) {
    -				TriggerSavepointSuccess success = (TriggerSavepointSuccess) result;
    -				logAndSysout("Savepoint completed. Path: " + success.savepointPath());
    +				String savepointPath;
    +				try {
    +					logAndSysout("Waiting for response...");
    +					savepointPath = savepointPathFuture.get();
    +				}
    +				catch (ExecutionException ee) {
    +					Throwable cause = ExceptionUtils.stripExecutionException(ee);
    +					throw new Exception("Triggering a savepoint for the job " + jobId + " failed.", cause);
    --- End diff --
    
    `FlinkException`


---