You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2018/06/28 09:02:56 UTC

[GitHub] flink pull request #6222: [FLINK-8785][rest] Handle JobSubmissionExceptions

GitHub user zentol opened a pull request:

    https://github.com/apache/flink/pull/6222

    [FLINK-8785][rest] Handle JobSubmissionExceptions

    ## What is the purpose of the change
    
    This PR modifies the `JobSubmitHandler` to handle exceptions contained in the future returned by `DispatcherGateway#submitJob`.
    
    An exception handler was added via `CompletableFuture#exceptionally` to return a proper `ErrorResponseBody` signaling that the job submission has failed.
    
    This PR is pretty much the bare-bones solution; in the JIRA I advocated for including error messages from exceptions since there are various reasons why the submission could fail, but I can't find a satisfying solution.
    
    ## Verifying this change
    
    * see new test in `JobSubmitHandlerTest`
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zentol/flink 8785_basic

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6222.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6222
    
----
commit 32fe49270596cdcf2f91f822c3a6504a14ba40eb
Author: zentol <ch...@...>
Date:   2018-06-28T08:57:01Z

    [FLINK-8785][rest] Handle JobSubmissionExceptions

----


---

[GitHub] flink pull request #6222: [FLINK-8785][rest] Handle JobSubmissionExceptions

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6222#discussion_r199171079
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java ---
    @@ -87,4 +89,33 @@ public void testSuccessfulJobSubmission() throws Exception {
     		handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()), mockGateway)
     			.get();
     	}
    +
    +	@Test
    +	public void testFailedJobSubmission() throws Exception {
    +		final String errorMessage = "test";
    +		DispatcherGateway mockGateway = mock(DispatcherGateway.class);
    +		when(mockGateway.submitJob(any(JobGraph.class), any(Time.class))).thenReturn(FutureUtils.completedExceptionally(new Exception(errorMessage)));
    +		GatewayRetriever<DispatcherGateway> mockGatewayRetriever = mock(GatewayRetriever.class);
    +
    +		JobSubmitHandler handler = new JobSubmitHandler(
    +			CompletableFuture.completedFuture("http://localhost:1234"),
    +			mockGatewayRetriever,
    +			RpcUtils.INF_TIMEOUT,
    +			Collections.emptyMap());
    +
    +		JobGraph job = new JobGraph("testjob");
    +		JobSubmitRequestBody request = new JobSubmitRequestBody(job);
    +
    +		try {
    +			handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()), mockGateway)
    +				.get();
    +		} catch (Exception e) {
    +			Throwable t = ExceptionUtils.stripExecutionException(e);
    +			if (t instanceof RestHandlerException){
    +				Assert.assertTrue(t.getMessage().equals("Job submission failed."));
    +			} else {
    +				throw e;
    +			}
    +		}
    --- End diff --
    
    I think we should make sure that `errorMessage` is part of the `RestHandlerException#message`. Otherwise this information won't be sent to the client in the form of the `ErrorResponseBody`.


---

[GitHub] flink pull request #6222: [FLINK-8785][rest] Handle JobSubmissionExceptions

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6222#discussion_r199170668
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java ---
    @@ -87,4 +89,33 @@ public void testSuccessfulJobSubmission() throws Exception {
     		handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()), mockGateway)
     			.get();
     	}
    +
    +	@Test
    +	public void testFailedJobSubmission() throws Exception {
    +		final String errorMessage = "test";
    +		DispatcherGateway mockGateway = mock(DispatcherGateway.class);
    +		when(mockGateway.submitJob(any(JobGraph.class), any(Time.class))).thenReturn(FutureUtils.completedExceptionally(new Exception(errorMessage)));
    +		GatewayRetriever<DispatcherGateway> mockGatewayRetriever = mock(GatewayRetriever.class);
    --- End diff --
    
    No need to create a mock. `() -> CompletableFuture.completed(mockGateway)` should be good enough.


---

[GitHub] flink pull request #6222: [FLINK-8785][rest] Handle JobSubmissionExceptions

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6222#discussion_r199170250
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java ---
    @@ -66,6 +67,9 @@ public JobSubmitHandler(
     		}
     
     		return gateway.submitJob(jobGraph, timeout)
    -			.thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()));
    +			.thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()))
    +			.exceptionally(exception -> {
    +				throw new CompletionException(new RestHandlerException("Job submission failed.", HttpResponseStatus.INTERNAL_SERVER_ERROR, exception));
    --- End diff --
    
    Maybe we could add the `exception.getMessage` to the `message` of the `RestHandlerException`. Otherwise the user will only see `"Job submission failed."` in the `ErrorResponseBody`. With the change it could be `"Job submission failed: Failure cause"`


---

[GitHub] flink pull request #6222: [FLINK-8785][rest] Handle JobSubmissionExceptions

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6222#discussion_r199477966
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java ---
    @@ -66,6 +67,9 @@ public JobSubmitHandler(
     		}
     
     		return gateway.submitJob(jobGraph, timeout)
    -			.thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()));
    +			.thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()))
    +			.exceptionally(exception -> {
    +				throw new CompletionException(new RestHandlerException("Job submission failed.", HttpResponseStatus.INTERNAL_SERVER_ERROR, exception));
    --- End diff --
    
    well, there's no doubt that it _could_ be helpful; my point is that it can be _harmful_ if not done properly.
    
    The `submitJob` should either provide the `JobSubmitHandler` with means to detect these exceptions and create adequate responses, or explicitly throw exceptions with messages that we can safely pass on to users.
    
    That said, I do not know how to do either of these things in a good way. 😞 
    
    For completeness sake, here are ideas that came to mind:
    
    ## 1
    Introduce a special `FlinkUserFacingException` that we "trust" to contain a good error message.
    
    Con: This provides little additional safety and will never provide proper HTTP response code.
    
    ## 2
    Introduce dedicated exceptions for the scenarios that you listed and explicitly look for them in the `exceptionally` block, i.e
    ```
    .exceptionally(exception -> {
    	if (exception instanceof JobAlreadyExistsException) {
    		throw new CompletionException(new RestHandlerException("Job already exists.", HttpResponseStatus.BAD_REQUEST, exception));
    	} else {
    		throw new CompletionException(new RestHandlerException("Job submission failed.", HttpResponseStatus.INTERNAL_SERVER_ERROR, exception));
    	}
    }
    ```
    
    Con: Obviously, this approach is inherently flawed as there is no guarantee that a given exception can be thrown; we would have to manually keep it in sync with the actual implementation because `CompletableFuture` throw a wrench into sane exception handling. 😡 
    
    ## 3
    Encode possible user-facing exceptions in the return value of `submitJob`, i.e. return a `AckOrException`
    ```
    public class AckOrException {
    	// holds exception, could also be a series of nullable fields
    	private final SuperEither<ExceptionA, ExceptionB, ExceptionC> exception; 
    	...
    	public void throwIfError() throws ExceptionA, ExceptionB, ExceptionC;
    }
    ```
    Con: Relies on users to call `throwIfError` and introduces an entirely separate channel for passing errors, but it would allow exception matching.


---

[GitHub] flink pull request #6222: [FLINK-8785][rest] Handle JobSubmissionExceptions

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6222#discussion_r199719645
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java ---
    @@ -66,6 +67,9 @@ public JobSubmitHandler(
     		}
     
     		return gateway.submitJob(jobGraph, timeout)
    -			.thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()));
    +			.thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()))
    +			.exceptionally(exception -> {
    +				throw new CompletionException(new RestHandlerException("Job submission failed.", HttpResponseStatus.INTERNAL_SERVER_ERROR, exception));
    --- End diff --
    
    Do note that this discussion isn't really blocking the PR from being merged as it would effectively be an extension of it.


---

[GitHub] flink pull request #6222: [FLINK-8785][rest] Handle JobSubmissionExceptions

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6222#discussion_r199333683
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java ---
    @@ -66,6 +67,9 @@ public JobSubmitHandler(
     		}
     
     		return gateway.submitJob(jobGraph, timeout)
    -			.thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()));
    +			.thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()))
    +			.exceptionally(exception -> {
    +				throw new CompletionException(new RestHandlerException("Job submission failed.", HttpResponseStatus.INTERNAL_SERVER_ERROR, exception));
    --- End diff --
    
    I see your point. I'm just wondering whether some bits of context wouldn't be helpful on the client side when using the CLI. So for example if the job was misconfigured or if it was already submitted to the cluster in HA mode, then it would be helpful for the user to know.


---

[GitHub] flink pull request #6222: [FLINK-8785][rest] Handle JobSubmissionExceptions

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6222#discussion_r199248513
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java ---
    @@ -66,6 +67,9 @@ public JobSubmitHandler(
     		}
     
     		return gateway.submitJob(jobGraph, timeout)
    -			.thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()));
    +			.thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()))
    +			.exceptionally(exception -> {
    +				throw new CompletionException(new RestHandlerException("Job submission failed.", HttpResponseStatus.INTERNAL_SERVER_ERROR, exception));
    --- End diff --
    
    I'm not quite fond of the idea. As alluded in the PR description and JIRA I agree that the existing error message isn't _helpful_, yet better than the current state.
    
    I rather like that so far the REST API has control over the error messages. This ensures that the user only sees messages that were actually meant for him.
    
    In contrast, exception messages are pretty much arbitrary. They may change at will, the audience isn't defined (user vs dev), may only helpful if the fully stack trace is present, often don't have any message at all (see usages of `Preconditions`, or NPEs) and typically only describe what went wrong, not why, how to fix it or if it even was a user-error. Given that this would break down the barrier between internal/user-facing messages you obviously also run into cases where users have _no idea_ what the message even means. Finally you end up with mismatches between the error message and error code.
    
    To me the underlying issue is that `submitJob` funnels all manner of exceptions into a `FlinkException/JobSubmissionException` that we can't do much with. Neither can we categorize them in any way, nor distinguish between who's responsible (user vs Flink) nor when in the process the failure occurred.
    Without diving into the implementation you don't _even know which exceptions are thrown_, but i suppose this is a general issue of `CompletableFutures`.


---

[GitHub] flink pull request #6222: [FLINK-8785][rest] Handle JobSubmissionExceptions

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6222#discussion_r199783421
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java ---
    @@ -66,6 +67,9 @@ public JobSubmitHandler(
     		}
     
     		return gateway.submitJob(jobGraph, timeout)
    -			.thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()));
    +			.thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()))
    +			.exceptionally(exception -> {
    +				throw new CompletionException(new RestHandlerException("Job submission failed.", HttpResponseStatus.INTERNAL_SERVER_ERROR, exception));
    --- End diff --
    
    I would be in favor of approach 3 because we are doing something similar for the `JobExecutionResult`/`JobResult`. We could then throw the exception on the `RestClusterClient`. And I also agree that this is something we can add as a follow up. Can you please create a JIRA issue for this @zentol.


---

[GitHub] flink issue #6222: [FLINK-8785][rest] Handle JobSubmissionExceptions

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/6222
  
    Merging, will create the issue once I'm done.


---

[GitHub] flink pull request #6222: [FLINK-8785][rest] Handle JobSubmissionExceptions

Posted by zentol <gi...@git.apache.org>.
Github user zentol closed the pull request at:

    https://github.com/apache/flink/pull/6222


---