You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/29 22:51:09 UTC

[2/7] flink git commit: [hotfix] [dist. coordination] Add safety check for execution graph state transitions

[hotfix] [dist. coordination] Add safety check for execution graph state transitions


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/60895a3c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/60895a3c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/60895a3c

Branch: refs/heads/master
Commit: 60895a3ccd83609088be6ecef3445f7c78c9955a
Parents: 874d956
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Mar 22 19:53:50 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Mar 29 17:11:49 2017 +0200

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  |  8 +++++++
 .../flink/runtime/jobgraph/JobStatus.java       | 22 +++++++++++++++++++-
 2 files changed, 29 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/60895a3c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index e911f49..1c7b1c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -1105,6 +1105,14 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	}
 
 	private boolean transitionState(JobStatus current, JobStatus newState, Throwable error) {
+		// consistency check
+		if (current.isTerminalState()) {
+			String message = "Job is trying to leave terminal state " + current;
+			LOG.error(message);
+			throw new IllegalStateException(message);
+		}
+
+		// now do the actual state transition
 		if (STATE_UPDATER.compareAndSet(this, current, newState)) {
 			LOG.info("Job {} ({}) switched from state {} to {}.", getJobName(), getJobID(), current, newState, error);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/60895a3c/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
index 6a0ac97..4ef86bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
@@ -69,11 +69,31 @@ public enum JobStatus {
 	JobStatus(TerminalState terminalState) {
 		this.terminalState = terminalState;
 	}
-	
+
+	/**
+	 * Checks whether this state is <i>globally terminal</i>. A globally terminal job
+	 * is complete and cannot fail any more and will not be restarted or recovered by another
+	 * standby master node.
+	 * 
+	 * <p>When a globally terminal state has been reached, all recovery data for the job is
+	 * dropped from the high-availability services.
+	 * 
+	 * @return True, if this job status is globally terminal, false otherwise.
+	 */
 	public boolean isGloballyTerminalState() {
 		return terminalState == TerminalState.GLOBALLY;
 	}
 
+	/**
+	 * Checks whether this state is <i>locally terminal</i>. Locally terminal refers to the
+	 * state of a job's execution graph within an executing JobManager. If the execution graph
+	 * is locally terminal, the JobManager will not continue executing or recovering the job. 
+	 *
+	 * <p>The only state that is locally terminal, but not globally terminal is {@link #SUSPENDED},
+	 * which is typically entered when the executing JobManager looses its leader status.
+	 * 
+	 * @return True, if this job status is terminal, false otherwise.
+	 */
 	public boolean isTerminalState() {
 		return terminalState != TerminalState.NON_TERMINAL;
 	}