You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2016/06/13 13:34:24 UTC

[GitHub] flink pull request #2096: [FLINK-3800] [runtime] Introduce SUSPENDED job sta...

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/2096

    [FLINK-3800] [runtime] Introduce SUSPENDED job status

    The SUSPENDED job status is a new ExecutionGraph state which can be reached from all
    non-terminal states when calling suspend on the ExecutionGraph. Unlike the FAILED,
    FINISHED and CANCELED state, the SUSPENDED state does not trigger the deletion of the
    job from the HA storage. Therefore, this state can be used to handle the loss of
    leadership or the shutdown of a JobManager so that the ExecutionGraph is stopped but
    can still be recovered. SUSPENDED is also a terminal state but it can be differentiated as
    a locally terminal state from FAILED, CANCELED and FINISHED which are globally
    terminal states.
    
    Add test case for suspend signal
    
    Add test case for suspending restarting job
    
    Add test case for HA job recovery when losing leadership
    
    Add online documentation for the job status

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink fixHALifecycle

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2096.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2096
    
----
commit 0d3c738e85fed2e161bc724887a4d8ce06a2798c
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-06-09T09:37:14Z

    [FLINK-3800] [runtime] Introduce SUSPENDED job status
    
    The SUSPENDED job status is a new ExecutionGraph state which can be reached from all
    non-terminal states when calling suspend on the ExecutionGraph. Unlike the FAILED,
    FINISHED and CANCELED state, the SUSPENDED state does not trigger the deletion of the
    job from the HA storage. Therefore, this state can be used to handle the loss of
    leadership or the shutdown of a JobManager so that the ExecutionGraph is stopped but
    can still be recovered. SUSPENDED is also a terminal state but it can be differentiated as
    a locally terminal state from FAILED, CANCELED and FINISHED which are globally
    terminal states.
    
    Add test case for suspend signal
    
    Add test case for suspending restarting job
    
    Add test case for HA job recovery when losing leadership
    
    Add online documentation for the job status

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2096: [FLINK-3800] [runtime] Introduce SUSPENDED job sta...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2096#discussion_r68039809
  
    --- Diff: docs/internals/job_scheduling.md ---
    @@ -74,7 +74,28 @@ Besides the vertices, the ExecutionGraph also contains the {% gh_link /flink-run
     <img src="fig/job_and_execution_graph.svg" alt="JobGraph and ExecutionGraph" height="400px" style="text-align: center;"/>
     </div>
     
    -During its execution, each parallel task goes through multiple stages, from *created* to *finished* or *failed*. The diagram below illustrates the 
    +Each ExecutionGraph has a job status associated with it.
    +This job status indicates the current state of the job execution.
    +
    +A Flink job is first in the *created* state, then switches to *running* and upon completion of all work it switches to *finished*.
    +In case of failures, a job switches first to *failing* where it cancels all running tasks.
    +If all job vertices have reached a final state and the job is not restartable, then the job transitions to *failed*.
    +If the job can be restarted, then it will enter the *restarting* state.
    +Once the job has been completely restarted, it will reach the *created* state.
    +
    +In case that the user cancels the job, it will go into the *cancelling* state.
    +This is also entails the cancellation of all currently running tasks.
    --- End diff --
    
    Typo: This **~~is~~** also entails...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2096: [FLINK-3800] [runtime] Introduce SUSPENDED job status

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2096
  
    Thanks for your thorough review @uce.
    
    - We have talked offline concerning the status in the web interface and it turned out to be not a problem since the jobs are directly removed from `currentJobs` in the `JobManager` when `cancelAndClearEverything` is called. This implies that the jobs will no longer be shown in the web interface.
    
    - I agree that it is a good idea to replace the `SuppressRestartException` by a mechanism to disable the `RestartStrategies`. I've opened a [JIRA issue](https://issues.apache.org/jira/browse/FLINK-4112) to keep track of the effort. I think, however, that the mechanism should be implemented as part of the work on this issue. That way we won't mix pull requests with each other.
    
    - You're right. I've addressed the comment and added `SUSPENDED` to the set of states which don't throw an exception when encountered in `ExecutionGraph.restart`.
    
    - You're right concerning the `JobManagerProcess` tests. Maybe we can refactor some of them in the future by applying a similar pattern as it was used in `JobManagerHARecoveryTest`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2096: [FLINK-3800] [runtime] Introduce SUSPENDED job sta...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2096#discussion_r68039992
  
    --- Diff: docs/internals/job_scheduling.md ---
    @@ -74,7 +74,28 @@ Besides the vertices, the ExecutionGraph also contains the {% gh_link /flink-run
     <img src="fig/job_and_execution_graph.svg" alt="JobGraph and ExecutionGraph" height="400px" style="text-align: center;"/>
     </div>
     
    -During its execution, each parallel task goes through multiple stages, from *created* to *finished* or *failed*. The diagram below illustrates the 
    +Each ExecutionGraph has a job status associated with it.
    +This job status indicates the current state of the job execution.
    +
    +A Flink job is first in the *created* state, then switches to *running* and upon completion of all work it switches to *finished*.
    +In case of failures, a job switches first to *failing* where it cancels all running tasks.
    +If all job vertices have reached a final state and the job is not restartable, then the job transitions to *failed*.
    +If the job can be restarted, then it will enter the *restarting* state.
    +Once the job has been completely restarted, it will reach the *created* state.
    +
    +In case that the user cancels the job, it will go into the *cancelling* state.
    +This is also entails the cancellation of all currently running tasks.
    +Once all running tasks have reached a final state, the job transitions to the state *cancelled*.
    +
    +Unlike the states *finished*, *canceled* and *failed* which denote a globally terminal state and, thus, trigger the clean up of the job, the *suspended* state is only locally terminal.
    +Locally terminal means that the execution of the job has been terminated on the respective JobManager but another JobManager of the Flink cluster can retrieve the job from the persistent HA store and restart it.
    +Consequently, a job which reaches the *suspended* state won't be completely cleaned up.
    +
    +<div style="text-align: center;">
    +<img src="fig/job_status.svg" alt="States and Transitions of Flink job" height="500px" style="text-align: center;"/>
    --- End diff --
    
    Very nice figure! This will future contributors a lot.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2096: [FLINK-3800] [runtime] Introduce SUSPENDED job status

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2096
  
    Will be merging this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2096: [FLINK-3800] [runtime] Introduce SUSPENDED job sta...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2096#discussion_r68221848
  
    --- Diff: flink-runtime/src/test/resources/log4j-test.properties ---
    @@ -16,7 +16,7 @@
     # limitations under the License.
     ################################################################################
     
    -log4j.rootLogger=OFF, console
    +log4j.rootLogger=INFO, console
    --- End diff --
    
    Yes, of course. Will fix it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2096: [FLINK-3800] [runtime] Introduce SUSPENDED job sta...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2096#discussion_r68221836
  
    --- Diff: docs/internals/job_scheduling.md ---
    @@ -74,7 +74,28 @@ Besides the vertices, the ExecutionGraph also contains the {% gh_link /flink-run
     <img src="fig/job_and_execution_graph.svg" alt="JobGraph and ExecutionGraph" height="400px" style="text-align: center;"/>
     </div>
     
    -During its execution, each parallel task goes through multiple stages, from *created* to *finished* or *failed*. The diagram below illustrates the 
    +Each ExecutionGraph has a job status associated with it.
    +This job status indicates the current state of the job execution.
    +
    +A Flink job is first in the *created* state, then switches to *running* and upon completion of all work it switches to *finished*.
    +In case of failures, a job switches first to *failing* where it cancels all running tasks.
    +If all job vertices have reached a final state and the job is not restartable, then the job transitions to *failed*.
    +If the job can be restarted, then it will enter the *restarting* state.
    +Once the job has been completely restarted, it will reach the *created* state.
    +
    +In case that the user cancels the job, it will go into the *cancelling* state.
    +This is also entails the cancellation of all currently running tasks.
    --- End diff --
    
    Thanks for spotting :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2096: [FLINK-3800] [runtime] Introduce SUSPENDED job sta...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2096


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2096: [FLINK-3800] [runtime] Introduce SUSPENDED job sta...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2096#discussion_r68222088
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---
    @@ -1029,21 +1061,25 @@ else if (current == JobStatus.CANCELLING) {
     						}
     					}
     					else if (current == JobStatus.FAILING) {
    -						if (restartStrategy.canRestart() && transitionState(current, JobStatus.RESTARTING)) {
    -							// double check in case that in the meantime a SuppressRestartsException was thrown
    -							if (restartStrategy.canRestart()) {
    -								restartStrategy.restart(this);
    -								break;
    -							} else {
    -								fail(new Exception("ExecutionGraph went into RESTARTING state but " +
    -									"then the restart strategy was disabled."));
    -							}
    -
    -						} else if (!restartStrategy.canRestart() && transitionState(current, JobStatus.FAILED, failureCause)) {
    +						boolean allowRestart = !(failureCause instanceof SuppressRestartsException);
    +
    +						if (allowRestart && restartStrategy.canRestart() && transitionState(current, JobStatus.RESTARTING)) {
    +							restartStrategy.restart(this);
    +							break;
    +						} else if ((!allowRestart || !restartStrategy.canRestart()) && transitionState(current, JobStatus.FAILED, failureCause)) {
     							postRunCleanup();
     							break;
     						}
     					}
    +					else if (current == JobStatus.SUSPENDED) {
    +						// we've already cleaned up when entering the SUSPENDED state
    +						break;
    +					}
    +					else if (current.isGloballyTerminalState()) {
    +						LOG.warn("Job has entered globally terminal state without waiting for all " +
    +							"job vertices to reach final state.");
    +						break;
    +					}
     					else {
     						fail(new Exception("ExecutionGraph went into final state from state " + current));
    --- End diff --
    
    Yes you're right. The `break` is missing here. Will add it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2096: [FLINK-3800] [runtime] Introduce SUSPENDED job sta...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2096#discussion_r68077449
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---
    @@ -1029,21 +1061,25 @@ else if (current == JobStatus.CANCELLING) {
     						}
     					}
     					else if (current == JobStatus.FAILING) {
    -						if (restartStrategy.canRestart() && transitionState(current, JobStatus.RESTARTING)) {
    -							// double check in case that in the meantime a SuppressRestartsException was thrown
    -							if (restartStrategy.canRestart()) {
    -								restartStrategy.restart(this);
    -								break;
    -							} else {
    -								fail(new Exception("ExecutionGraph went into RESTARTING state but " +
    -									"then the restart strategy was disabled."));
    -							}
    -
    -						} else if (!restartStrategy.canRestart() && transitionState(current, JobStatus.FAILED, failureCause)) {
    +						boolean allowRestart = !(failureCause instanceof SuppressRestartsException);
    +
    +						if (allowRestart && restartStrategy.canRestart() && transitionState(current, JobStatus.RESTARTING)) {
    +							restartStrategy.restart(this);
    +							break;
    +						} else if ((!allowRestart || !restartStrategy.canRestart()) && transitionState(current, JobStatus.FAILED, failureCause)) {
     							postRunCleanup();
     							break;
     						}
     					}
    +					else if (current == JobStatus.SUSPENDED) {
    +						// we've already cleaned up when entering the SUSPENDED state
    +						break;
    +					}
    +					else if (current.isGloballyTerminalState()) {
    +						LOG.warn("Job has entered globally terminal state without waiting for all " +
    +							"job vertices to reach final state.");
    +						break;
    +					}
     					else {
     						fail(new Exception("ExecutionGraph went into final state from state " + current));
    --- End diff --
    
    Missing `break` I guess (this should not be called anyways and it was that way before your change)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2096: [FLINK-3800] [runtime] Introduce SUSPENDED job sta...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2096#discussion_r68046896
  
    --- Diff: flink-runtime/src/test/resources/log4j-test.properties ---
    @@ -16,7 +16,7 @@
     # limitations under the License.
     ################################################################################
     
    -log4j.rootLogger=OFF, console
    +log4j.rootLogger=INFO, console
    --- End diff --
    
    Should stay `OFF`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2096: [FLINK-3800] [runtime] Introduce SUSPENDED job status

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the issue:

    https://github.com/apache/flink/pull/2096
  
    Really good changes and great that you added the `JobStatus` graph transitions figure to the docs.
    
    I looked at all changes and tests. Furthermore, I've tested this with lost ZooKeeper connections and it works as expected. The job is suspended and recovered smoothly. \U0001f44d 
    
    Some general notes:
    - The only thing that might be slightly confusing for users is that until the web frontend is notified about the leader changes, it shows the job status as FAILED until it disappears. This is probably some web interface quirk, because it does not recognize the new job status. I understand that you remove suspended job graphs and don't archive them (which makes sense as they are recovered later), but giving that it takes some time until all information propagates to the web frontend, we might want to temporarily show the state as suspended. We can do this as a follow up.
    
    - Regarding the removal of restart strategy disabling, I thought about whether we should leave it in and get rid of the `SuppressRestartsException` instead. That seems to be cleaner than the custom exception, which is checked in the `ExecutionGraph`. If we want to get rid of it (as a follow up), we should leave the disable call in.
    
    - If you think that https://github.com/apache/flink/pull/2095#issuecomment-227731879 is a valid point, then we should apply it for the suspended state as well.
    
    These comments are not blockers and I think this is good to merge. Great job!



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2096: [FLINK-3800] [runtime] Introduce SUSPENDED job status

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the issue:

    https://github.com/apache/flink/pull/2096
  
    PS As another follow up we can probably get rid of some of the `JobManagerProcess` tests, which tried -- in a way -- to work around the missing suspended state. Your approach in `JobManagerHARecoveryTest` is way better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---