You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/11/19 18:41:45 UTC

[2/8] flink git commit: [FLINK-3011] [runtime] Fix cancel during restart

[FLINK-3011] [runtime] Fix cancel during restart


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bf065a38
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bf065a38
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bf065a38

Branch: refs/heads/release-0.10
Commit: bf065a380ceee8d450d8188377041e5eab4b671c
Parents: 839ae19
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 17 11:56:42 2015 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Nov 19 18:27:03 2015 +0100

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  |  29 +++-
 .../ExecutionGraphRestartTest.java              | 141 +++++++++++++++++++
 2 files changed, 169 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bf065a38/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index aae0b7c..1e5d02c 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -711,6 +711,26 @@ public class ExecutionGraph implements Serializable {
 					return;
 				}
 			}
+			// Executions are being canceled. Go into cancelling and wait for
+			// all vertices to be in their final state.
+			else if (current == JobStatus.FAILING) {
+				if (transitionState(current, JobStatus.CANCELLING)) {
+					return;
+				}
+			}
+			// All vertices have been cancelled and it's safe to directly go
+			// into the canceled state.
+			else if (current == JobStatus.RESTARTING) {
+				synchronized (progressLock) {
+					if (transitionState(current, JobStatus.CANCELED)) {
+						postRunCleanup();
+						progressLock.notifyAll();
+
+						LOG.info("Canceled during restart.");
+						return;
+					}
+				}
+			}
 			else {
 				// no need to treat other states
 				return;
@@ -747,9 +767,16 @@ public class ExecutionGraph implements Serializable {
 	public void restart() {
 		try {
 			synchronized (progressLock) {
-				if (state != JobStatus.RESTARTING) {
+				JobStatus current = state;
+
+				if (current == JobStatus.CANCELED) {
+					LOG.info("Canceled job during restart. Aborting restart.");
+					return;
+				}
+				else if (current != JobStatus.RESTARTING) {
 					throw new IllegalStateException("Can only restart job from state restarting.");
 				}
+
 				if (scheduler == null) {
 					throw new IllegalStateException("The execution graph has not been scheduled before - scheduler is null.");
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/bf065a38/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 57b1829..a50aa2e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -37,6 +38,9 @@ import java.util.concurrent.TimeUnit;
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.spy;
 
 public class ExecutionGraphRestartTest {
 
@@ -158,4 +162,141 @@ public class ExecutionGraphRestartTest {
 			fail("Failed to wait until all execution attempts left the state DEPLOYING.");
 		}
 	}
+
+	@Test
+	public void testCancelWhileRestarting() throws Exception {
+		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
+
+		Instance instance = ExecutionGraphTestUtils.getInstance(
+				new SimpleActorGateway(TestingUtils.directExecutionContext()),
+				NUM_TASKS);
+
+		scheduler.newInstanceAvailable(instance);
+
+		// Blocking program
+		ExecutionGraph executionGraph = new ExecutionGraph(
+				TestingUtils.defaultExecutionContext(),
+				new JobID(),
+				"TestJob",
+				new Configuration(),
+				AkkaUtils.getDefaultTimeout());
+
+		JobVertex jobVertex = new JobVertex("NoOpInvokable");
+		jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+		jobVertex.setParallelism(NUM_TASKS);
+
+		JobGraph jobGraph = new JobGraph("TestJob", jobVertex);
+
+		// We want to manually control the restart and delay
+		executionGraph.setNumberOfRetriesLeft(Integer.MAX_VALUE);
+		executionGraph.setDelayBeforeRetrying(Integer.MAX_VALUE);
+		executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+
+		assertEquals(JobStatus.CREATED, executionGraph.getState());
+
+		executionGraph.scheduleForExecution(scheduler);
+
+		assertEquals(JobStatus.RUNNING, executionGraph.getState());
+
+		// Kill the instance and wait for the job to restart
+		instance.markDead();
+
+		Deadline deadline = TestingUtils.TESTING_DURATION().fromNow();
+
+		while (deadline.hasTimeLeft() &&
+				executionGraph.getState() != JobStatus.RESTARTING) {
+
+			Thread.sleep(100);
+		}
+
+		assertEquals(JobStatus.RESTARTING, executionGraph.getState());
+
+		// Canceling needs to abort the restart
+		executionGraph.cancel();
+
+		assertEquals(JobStatus.CANCELED, executionGraph.getState());
+
+		// The restart has been aborted
+		executionGraph.restart();
+
+		assertEquals(JobStatus.CANCELED, executionGraph.getState());
+	}
+
+	@Test
+	public void testCancelWhileFailing() throws Exception {
+		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
+
+		Instance instance = ExecutionGraphTestUtils.getInstance(
+				new SimpleActorGateway(TestingUtils.directExecutionContext()),
+				NUM_TASKS);
+
+		scheduler.newInstanceAvailable(instance);
+
+		// Blocking program
+		ExecutionGraph executionGraph = new ExecutionGraph(
+				TestingUtils.defaultExecutionContext(),
+				new JobID(),
+				"TestJob",
+				new Configuration(),
+				AkkaUtils.getDefaultTimeout());
+
+		// Spy on the graph
+		executionGraph = spy(executionGraph);
+
+		// Do nothing here, because we don't want to transition out of
+		// the FAILING state.
+		doNothing().when(executionGraph).jobVertexInFinalState();
+
+		JobVertex jobVertex = new JobVertex("NoOpInvokable");
+		jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+		jobVertex.setParallelism(NUM_TASKS);
+
+		JobGraph jobGraph = new JobGraph("TestJob", jobVertex);
+
+		// We want to manually control the restart and delay
+		executionGraph.setNumberOfRetriesLeft(Integer.MAX_VALUE);
+		executionGraph.setDelayBeforeRetrying(Integer.MAX_VALUE);
+		executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+
+		assertEquals(JobStatus.CREATED, executionGraph.getState());
+
+		executionGraph.scheduleForExecution(scheduler);
+
+		assertEquals(JobStatus.RUNNING, executionGraph.getState());
+
+		// Kill the instance...
+		instance.markDead();
+
+		Deadline deadline = TestingUtils.TESTING_DURATION().fromNow();
+
+		// ...and wait for all vertices to be in state FAILED. The
+		// jobVertexInFinalState does nothing, that's why we don't wait on the
+		// job status.
+		boolean success = false;
+		while (deadline.hasTimeLeft() && !success) {
+			success = true;
+			for (ExecutionVertex vertex : executionGraph.getAllExecutionVertices()) {
+				if (vertex.getExecutionState() != ExecutionState.FAILED) {
+					success = false;
+					Thread.sleep(100);
+					break;
+				}
+			}
+		}
+
+		// Still in failing
+		assertEquals(JobStatus.FAILING, executionGraph.getState());
+
+		// The cancel call needs to change the state to CANCELLING
+		executionGraph.cancel();
+
+		assertEquals(JobStatus.CANCELLING, executionGraph.getState());
+
+		// Unspy and finalize the job state
+		doCallRealMethod().when(executionGraph).jobVertexInFinalState();
+
+		executionGraph.jobVertexInFinalState();
+
+		assertEquals(JobStatus.CANCELED, executionGraph.getState());
+	}
 }