You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/11/19 18:23:38 UTC

[3/5] flink git commit: [FLINK-3011] [runtime] Disallow ExecutionGraph state transition from FAILED to RESTARTING

[FLINK-3011] [runtime] Disallow ExecutionGraph state transition from FAILED to RESTARTING

Removes the possibility to go from FAILED state back to RESTARTING. This was only used in a test
case. It was a breaking the terminal state semantics of the FAILED state.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a402002d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a402002d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a402002d

Branch: refs/heads/master
Commit: a402002df7b6a3f21d84f9338b8b0afec1cc65ec
Parents: d8ab8bc
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon Nov 16 16:18:20 2015 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Nov 19 18:22:03 2015 +0100

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/ExecutionGraph.java   |  6 ------
 .../executiongraph/ExecutionGraphRestartTest.scala     | 13 ++++---------
 2 files changed, 4 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a402002d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 9430d80..aae0b7c 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -746,12 +746,6 @@ public class ExecutionGraph implements Serializable {
 
 	public void restart() {
 		try {
-			if (state == JobStatus.FAILED) {
-				if (!transitionState(JobStatus.FAILED, JobStatus.RESTARTING)) {
-					throw new IllegalStateException("Execution Graph left the state FAILED while trying to restart.");
-				}
-			}
-
 			synchronized (progressLock) {
 				if (state != JobStatus.RESTARTING) {
 					throw new IllegalStateException("Can only restart job from state restarting.");

http://git-wip-us.apache.org/repos/asf/flink/blob/a402002d/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
index e41d7ff..8fb3c4e 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
@@ -43,7 +43,7 @@ class ExecutionGraphRestartTest extends WordSpecLike with Matchers {
   val NUM_TASKS = 31
 
   "The execution graph" must {
-    "be manually restartable" in {
+    "not be manually restartable" in {
       try {
         val instance = ExecutionGraphTestUtils.getInstance(
           new SimpleActorGateway(TestingUtils.directExecutionContext),
@@ -73,21 +73,16 @@ class ExecutionGraphRestartTest extends WordSpecLike with Matchers {
         eg.getState should equal(JobStatus.RUNNING)
 
         eg.getAllExecutionVertices.iterator().next().fail(new Exception("Test Exception"))
-        
+
         for (vertex <- eg.getAllExecutionVertices().asScala) {
           vertex.getCurrentExecutionAttempt().cancelingComplete()
         }
-        
+
         eg.getState should equal(JobStatus.FAILED)
 
         eg.restart()
-        eg.getState should equal(JobStatus.RUNNING)
-        
-        for (vertex <- eg.getAllExecutionVertices.asScala) {
-          vertex.getCurrentExecutionAttempt().markFinished()
-        }
 
-        eg.getState should equal(JobStatus.FINISHED)
+        eg.getState should equal(JobStatus.FAILED)
       } catch {
         case t: Throwable =>
           t.printStackTrace()