You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/06/23 11:59:41 UTC

flink git commit: [FLINK-4046] [runtime] Add direct state transition from RESTARTING to FAILED

Repository: flink
Updated Branches:
  refs/heads/master 37defbb42 -> cfe629340


[FLINK-4046] [runtime] Add direct state transition from RESTARTING to FAILED

A job can get stuck in FAILING if fail is called on a restarting job which has
not yet reset its ExecutionJobVertices, because these vertices would not call
jobVertexInFinalState. This method, however, must be called in order to transition
from FAILING to FAILED.

Accept state FAILED when calling ExecutionGraph.restart

This closes #2095.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cfe62934
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cfe62934
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cfe62934

Branch: refs/heads/master
Commit: cfe629340205586d54115143ee30428c1f772abc
Parents: 37defbb
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Jun 9 15:54:01 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jun 23 13:58:24 2016 +0200

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  | 17 ++++--
 .../ExecutionGraphRestartTest.java              | 59 ++++++++++++++++++++
 2 files changed, 72 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cfe62934/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 3a2dbef..b11f51d 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -810,8 +810,15 @@ public class ExecutionGraph implements Serializable {
 			JobStatus current = state;
 			if (current == JobStatus.FAILING || current.isTerminalState()) {
 				return;
-			}
-			else if (transitionState(current, JobStatus.FAILING, t)) {
+			} else if (current == JobStatus.RESTARTING && transitionState(current, JobStatus.FAILED, t)) {
+				synchronized (progressLock) {
+					postRunCleanup();
+					progressLock.notifyAll();
+
+					LOG.info("Job {} failed during restart.", getJobID());
+					return;
+				}
+			} else if (transitionState(current, JobStatus.FAILING, t)) {
 				this.failureCause = t;
 
 				if (!verticesInCreationOrder.isEmpty()) {
@@ -839,8 +846,10 @@ public class ExecutionGraph implements Serializable {
 				if (current == JobStatus.CANCELED) {
 					LOG.info("Canceled job during restart. Aborting restart.");
 					return;
-				}
-				else if (current != JobStatus.RESTARTING) {
+				} else if (current == JobStatus.FAILED) {
+					LOG.info("Failed job during restart. Aborting restart.");
+					return;
+				} else if (current != JobStatus.RESTARTING) {
 					throw new IllegalStateException("Can only restart job from state restarting.");
 				}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cfe62934/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 01cca5c..687a46a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -265,6 +265,65 @@ public class ExecutionGraphRestartTest extends TestLogger {
 	}
 
 	@Test
+	public void testFailWhileRestarting() throws Exception {
+		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
+
+		Instance instance = ExecutionGraphTestUtils.getInstance(
+			new SimpleActorGateway(TestingUtils.directExecutionContext()),
+			NUM_TASKS);
+
+		scheduler.newInstanceAvailable(instance);
+
+		// Blocking program
+		ExecutionGraph executionGraph = new ExecutionGraph(
+			TestingUtils.defaultExecutionContext(),
+			new JobID(),
+			"TestJob",
+			new Configuration(),
+			ExecutionConfigTest.getSerializedConfig(),
+			AkkaUtils.getDefaultTimeout(),
+			// We want to manually control the restart and delay
+			new FixedDelayRestartStrategy(Integer.MAX_VALUE, Long.MAX_VALUE));
+
+		JobVertex jobVertex = new JobVertex("NoOpInvokable");
+		jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+		jobVertex.setParallelism(NUM_TASKS);
+
+		JobGraph jobGraph = new JobGraph("TestJob", jobVertex);
+
+		executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+
+		assertEquals(JobStatus.CREATED, executionGraph.getState());
+
+		executionGraph.scheduleForExecution(scheduler);
+
+		assertEquals(JobStatus.RUNNING, executionGraph.getState());
+
+		// Kill the instance and wait for the job to restart
+		instance.markDead();
+
+		Deadline deadline = TestingUtils.TESTING_DURATION().fromNow();
+
+		while (deadline.hasTimeLeft() &&
+			executionGraph.getState() != JobStatus.RESTARTING) {
+
+			Thread.sleep(100);
+		}
+
+		assertEquals(JobStatus.RESTARTING, executionGraph.getState());
+
+		// Canceling needs to abort the restart
+		executionGraph.fail(new Exception("Test exception"));
+
+		assertEquals(JobStatus.FAILED, executionGraph.getState());
+
+		// The restart has been aborted
+		executionGraph.restart();
+
+		assertEquals(JobStatus.FAILED, executionGraph.getState());
+	}
+
+	@Test
 	public void testCancelWhileFailing() throws Exception {
 		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());