You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StephanEwen <gi...@git.apache.org> on 2018/04/30 16:45:03 UTC

[GitHub] flink pull request #5944: [FLINK-8900] [yarn] Set correct application status...

GitHub user StephanEwen opened a pull request:

    https://github.com/apache/flink/pull/5944

    [FLINK-8900] [yarn] Set correct application status when job is finished

    ## What is the purpose of the change
    
    When finite Flink applications (batch jobs) are sent to YARN in the detached mode, the final status is currently always the same, because the job's result is not passed to the logic that initiates the application shutdown.
    
    This PR forwards the final job status via a future that is used to register the shutdown handlers.
    
    ## Brief change log
    
      - Introduce the `JobTerminationFuture` in the `MiniDispatcher`
      - 
    
    ## Verifying this change
    
    ```
    bin/flink run -m yarn-cluster -yjm 2048 -ytm 2048  ./examples/streaming/WordCount.jar
    ```
    
      - Run the batch job as described above on YARN to succeed, check that the final application status is successful.
    
      - Run the batch job with a parameter to a non existing input file on YARN, check that the final application status is failed.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / **no)**
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
      - The serializers: (yes / **no** / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
      - The S3 file system connector: (yes / **no** / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes / **no**)
      - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StephanEwen/incubator-flink yarn_fix

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5944.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5944
    
----
commit f4130c64420e2ad2acb680869c9b84aa5dbcc7c7
Author: Stephan Ewen <se...@...>
Date:   2018-04-30T07:55:50Z

    [hotfix] [tests] Update log4j-test.properties
    
    Brings the logging definition in sync with other projects.
    Updates the classname for the suppressed logger in Netty to account for the new
    shading model introduced in Flink 1.4.

commit 5fcc9aca392cbcd5dfa474b0a286868b44836f23
Author: Stephan Ewen <se...@...>
Date:   2018-04-27T16:57:27Z

    [FLINK-8900] [yarn] Set correct application status when job is finished

----


---

[GitHub] flink pull request #5944: [FLINK-8900] [yarn] Set correct application status...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5944#discussion_r185732563
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java ---
    @@ -131,6 +133,17 @@ protected JobGraph retrieveJobGraph(Configuration configuration) throws FlinkExc
     		}
     	}
     
    +	@Override
    +	protected void registerShutdownActions(CompletableFuture<ApplicationStatus> terminationFuture) {
    +		terminationFuture.whenComplete((status, throwable) ->
    --- End diff --
    
    Will update


---

[GitHub] flink pull request #5944: [FLINK-8900] [yarn] Set correct application status...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5944#discussion_r185732507
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java ---
    @@ -109,7 +119,11 @@ public MiniDispatcher(
     
     		if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) {
     			// terminate the MiniDispatcher once we served the first JobResult successfully
    -			jobResultFuture.whenComplete((JobResult ignored, Throwable throwable) -> shutDown());
    +			jobResultFuture.whenComplete((JobResult result, Throwable throwable) -> {
    +				ApplicationStatus status = result.getSerializedThrowable().isPresent() ?
    +						ApplicationStatus.FAILED : ApplicationStatus.SUCCEEDED;
    +				jobTerminationFuture.complete(status);
    --- End diff --
    
    True, I had it like that initially, but found the above version more readable in the end, because we don't really use the serializedThrowable (making the map() a bit strange).


---

[GitHub] flink pull request #5944: [FLINK-8900] [yarn] Set correct application status...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5944#discussion_r185163799
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java ---
    @@ -109,7 +119,11 @@ public MiniDispatcher(
     
     		if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) {
     			// terminate the MiniDispatcher once we served the first JobResult successfully
    -			jobResultFuture.whenComplete((JobResult ignored, Throwable throwable) -> shutDown());
    +			jobResultFuture.whenComplete((JobResult result, Throwable throwable) -> {
    --- End diff --
    
    `throwable` isn't used. If `jobResultFuture` cannot be completed exceptionally, `thenAccept` should be used. 


---

[GitHub] flink issue #5944: [FLINK-8900] [yarn] Set correct application status when j...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5944
  
    The test failure is unrelated - unrelated test flakeyness 


---

[GitHub] flink pull request #5944: [FLINK-8900] [yarn] Set correct application status...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5944


---

[GitHub] flink issue #5944: [FLINK-8900] [yarn] Set correct application status when j...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on the issue:

    https://github.com/apache/flink/pull/5944
  
    I will try it out.


---

[GitHub] flink pull request #5944: [FLINK-8900] [yarn] Set correct application status...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5944#discussion_r185164034
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java ---
    @@ -109,7 +119,11 @@ public MiniDispatcher(
     
     		if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) {
     			// terminate the MiniDispatcher once we served the first JobResult successfully
    -			jobResultFuture.whenComplete((JobResult ignored, Throwable throwable) -> shutDown());
    +			jobResultFuture.whenComplete((JobResult result, Throwable throwable) -> {
    +				ApplicationStatus status = result.getSerializedThrowable().isPresent() ?
    +						ApplicationStatus.FAILED : ApplicationStatus.SUCCEEDED;
    +				jobTerminationFuture.complete(status);
    --- End diff --
    
    I think the functional way would be:
    
    ```
    				jobTerminationFuture.complete(result.getSerializedThrowable()
    					.map(serializedThrowable -> ApplicationStatus.FAILED)
    					.orElse(ApplicationStatus.SUCCEEDED));
    ```


---

[GitHub] flink pull request #5944: [FLINK-8900] [yarn] Set correct application status...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5944#discussion_r185739056
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java ---
    @@ -109,7 +119,11 @@ public MiniDispatcher(
     
     		if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) {
     			// terminate the MiniDispatcher once we served the first JobResult successfully
    -			jobResultFuture.whenComplete((JobResult ignored, Throwable throwable) -> shutDown());
    +			jobResultFuture.whenComplete((JobResult result, Throwable throwable) -> {
    +				ApplicationStatus status = result.getSerializedThrowable().isPresent() ?
    +						ApplicationStatus.FAILED : ApplicationStatus.SUCCEEDED;
    +				jobTerminationFuture.complete(status);
    --- End diff --
    
    ok


---

[GitHub] flink pull request #5944: [FLINK-8900] [yarn] Set correct application status...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5944#discussion_r185732539
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java ---
    @@ -109,7 +119,11 @@ public MiniDispatcher(
     
     		if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) {
     			// terminate the MiniDispatcher once we served the first JobResult successfully
    -			jobResultFuture.whenComplete((JobResult ignored, Throwable throwable) -> shutDown());
    +			jobResultFuture.whenComplete((JobResult result, Throwable throwable) -> {
    --- End diff --
    
    Will update


---

[GitHub] flink pull request #5944: [FLINK-8900] [yarn] Set correct application status...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5944#discussion_r185163871
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java ---
    @@ -131,6 +133,17 @@ protected JobGraph retrieveJobGraph(Configuration configuration) throws FlinkExc
     		}
     	}
     
    +	@Override
    +	protected void registerShutdownActions(CompletableFuture<ApplicationStatus> terminationFuture) {
    +		terminationFuture.whenComplete((status, throwable) ->
    --- End diff --
    
    `throwable` isn't used. If `terminationFuture` cannot be completed exceptionally, `thenAccept` should be used. 


---