You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/30 22:04:53 UTC
[42/50] [abbrv] flink git commit: [hotfix] [dist. coordination] Add
safety check for execution graph state transitions
[hotfix] [dist. coordination] Add safety check for execution graph state transitions
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/60895a3c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/60895a3c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/60895a3c
Branch: refs/heads/table-retraction
Commit: 60895a3ccd83609088be6ecef3445f7c78c9955a
Parents: 874d956
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Mar 22 19:53:50 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Mar 29 17:11:49 2017 +0200
----------------------------------------------------------------------
.../runtime/executiongraph/ExecutionGraph.java | 8 +++++++
.../flink/runtime/jobgraph/JobStatus.java | 22 +++++++++++++++++++-
2 files changed, 29 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/60895a3c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index e911f49..1c7b1c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -1105,6 +1105,14 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}
private boolean transitionState(JobStatus current, JobStatus newState, Throwable error) {
+ // consistency check
+ if (current.isTerminalState()) {
+ String message = "Job is trying to leave terminal state " + current;
+ LOG.error(message);
+ throw new IllegalStateException(message);
+ }
+
+ // now do the actual state transition
if (STATE_UPDATER.compareAndSet(this, current, newState)) {
LOG.info("Job {} ({}) switched from state {} to {}.", getJobName(), getJobID(), current, newState, error);
http://git-wip-us.apache.org/repos/asf/flink/blob/60895a3c/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
index 6a0ac97..4ef86bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
@@ -69,11 +69,31 @@ public enum JobStatus {
JobStatus(TerminalState terminalState) {
this.terminalState = terminalState;
}
-
+
+ /**
+ * Checks whether this state is <i>globally terminal</i>. A globally terminal job
+ * is complete and cannot fail any more and will not be restarted or recovered by another
+ * standby master node.
+ *
+ * <p>When a globally terminal state has been reached, all recovery data for the job is
+ * dropped from the high-availability services.
+ *
+ * @return True, if this job status is globally terminal, false otherwise.
+ */
public boolean isGloballyTerminalState() {
return terminalState == TerminalState.GLOBALLY;
}
+ /**
+ * Checks whether this state is <i>locally terminal</i>. Locally terminal refers to the
+ * state of a job's execution graph within an executing JobManager. If the execution graph
+ * is locally terminal, the JobManager will not continue executing or recovering the job.
+ *
+ * <p>The only state that is locally terminal, but not globally terminal is {@link #SUSPENDED},
+ * which is typically entered when the executing JobManager looses its leader status.
+ *
+ * @return True, if this job status is terminal, false otherwise.
+ */
public boolean isTerminalState() {
return terminalState != TerminalState.NON_TERMINAL;
}