You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/11/19 18:41:45 UTC
[2/8] flink git commit: [FLINK-3011] [runtime] Fix cancel during
restart
[FLINK-3011] [runtime] Fix cancel during restart
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bf065a38
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bf065a38
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bf065a38
Branch: refs/heads/release-0.10
Commit: bf065a380ceee8d450d8188377041e5eab4b671c
Parents: 839ae19
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 17 11:56:42 2015 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Nov 19 18:27:03 2015 +0100
----------------------------------------------------------------------
.../runtime/executiongraph/ExecutionGraph.java | 29 +++-
.../ExecutionGraphRestartTest.java | 141 +++++++++++++++++++
2 files changed, 169 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bf065a38/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index aae0b7c..1e5d02c 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -711,6 +711,26 @@ public class ExecutionGraph implements Serializable {
return;
}
}
+ // Executions are being canceled. Go into cancelling and wait for
+ // all vertices to be in their final state.
+ else if (current == JobStatus.FAILING) {
+ if (transitionState(current, JobStatus.CANCELLING)) {
+ return;
+ }
+ }
+ // All vertices have been cancelled and it's safe to directly go
+ // into the canceled state.
+ else if (current == JobStatus.RESTARTING) {
+ synchronized (progressLock) {
+ if (transitionState(current, JobStatus.CANCELED)) {
+ postRunCleanup();
+ progressLock.notifyAll();
+
+ LOG.info("Canceled during restart.");
+ return;
+ }
+ }
+ }
else {
// no need to treat other states
return;
@@ -747,9 +767,16 @@ public class ExecutionGraph implements Serializable {
public void restart() {
try {
synchronized (progressLock) {
- if (state != JobStatus.RESTARTING) {
+ JobStatus current = state;
+
+ if (current == JobStatus.CANCELED) {
+ LOG.info("Canceled job during restart. Aborting restart.");
+ return;
+ }
+ else if (current != JobStatus.RESTARTING) {
throw new IllegalStateException("Can only restart job from state restarting.");
}
+
if (scheduler == null) {
throw new IllegalStateException("The execution graph has not been scheduled before - scheduler is null.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bf065a38/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 57b1829..a50aa2e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -37,6 +38,9 @@ import java.util.concurrent.TimeUnit;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.spy;
public class ExecutionGraphRestartTest {
@@ -158,4 +162,141 @@ public class ExecutionGraphRestartTest {
fail("Failed to wait until all execution attempts left the state DEPLOYING.");
}
}
+
+ @Test
+ public void testCancelWhileRestarting() throws Exception {
+ Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
+
+ Instance instance = ExecutionGraphTestUtils.getInstance(
+ new SimpleActorGateway(TestingUtils.directExecutionContext()),
+ NUM_TASKS);
+
+ scheduler.newInstanceAvailable(instance);
+
+ // Blocking program
+ ExecutionGraph executionGraph = new ExecutionGraph(
+ TestingUtils.defaultExecutionContext(),
+ new JobID(),
+ "TestJob",
+ new Configuration(),
+ AkkaUtils.getDefaultTimeout());
+
+ JobVertex jobVertex = new JobVertex("NoOpInvokable");
+ jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+ jobVertex.setParallelism(NUM_TASKS);
+
+ JobGraph jobGraph = new JobGraph("TestJob", jobVertex);
+
+ // We want to manually control the restart and delay
+ executionGraph.setNumberOfRetriesLeft(Integer.MAX_VALUE);
+ executionGraph.setDelayBeforeRetrying(Integer.MAX_VALUE);
+ executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+
+ assertEquals(JobStatus.CREATED, executionGraph.getState());
+
+ executionGraph.scheduleForExecution(scheduler);
+
+ assertEquals(JobStatus.RUNNING, executionGraph.getState());
+
+ // Kill the instance and wait for the job to restart
+ instance.markDead();
+
+ Deadline deadline = TestingUtils.TESTING_DURATION().fromNow();
+
+ while (deadline.hasTimeLeft() &&
+ executionGraph.getState() != JobStatus.RESTARTING) {
+
+ Thread.sleep(100);
+ }
+
+ assertEquals(JobStatus.RESTARTING, executionGraph.getState());
+
+ // Canceling needs to abort the restart
+ executionGraph.cancel();
+
+ assertEquals(JobStatus.CANCELED, executionGraph.getState());
+
+ // The restart has been aborted
+ executionGraph.restart();
+
+ assertEquals(JobStatus.CANCELED, executionGraph.getState());
+ }
+
+ @Test
+ public void testCancelWhileFailing() throws Exception {
+ Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
+
+ Instance instance = ExecutionGraphTestUtils.getInstance(
+ new SimpleActorGateway(TestingUtils.directExecutionContext()),
+ NUM_TASKS);
+
+ scheduler.newInstanceAvailable(instance);
+
+ // Blocking program
+ ExecutionGraph executionGraph = new ExecutionGraph(
+ TestingUtils.defaultExecutionContext(),
+ new JobID(),
+ "TestJob",
+ new Configuration(),
+ AkkaUtils.getDefaultTimeout());
+
+ // Spy on the graph
+ executionGraph = spy(executionGraph);
+
+ // Do nothing here, because we don't want to transition out of
+ // the FAILING state.
+ doNothing().when(executionGraph).jobVertexInFinalState();
+
+ JobVertex jobVertex = new JobVertex("NoOpInvokable");
+ jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+ jobVertex.setParallelism(NUM_TASKS);
+
+ JobGraph jobGraph = new JobGraph("TestJob", jobVertex);
+
+ // We want to manually control the restart and delay
+ executionGraph.setNumberOfRetriesLeft(Integer.MAX_VALUE);
+ executionGraph.setDelayBeforeRetrying(Integer.MAX_VALUE);
+ executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+
+ assertEquals(JobStatus.CREATED, executionGraph.getState());
+
+ executionGraph.scheduleForExecution(scheduler);
+
+ assertEquals(JobStatus.RUNNING, executionGraph.getState());
+
+ // Kill the instance...
+ instance.markDead();
+
+ Deadline deadline = TestingUtils.TESTING_DURATION().fromNow();
+
+ // ...and wait for all vertices to be in state FAILED. The
+ // jobVertexInFinalState does nothing, that's why we don't wait on the
+ // job status.
+ boolean success = false;
+ while (deadline.hasTimeLeft() && !success) {
+ success = true;
+ for (ExecutionVertex vertex : executionGraph.getAllExecutionVertices()) {
+ if (vertex.getExecutionState() != ExecutionState.FAILED) {
+ success = false;
+ Thread.sleep(100);
+ break;
+ }
+ }
+ }
+
+ // Still in failing
+ assertEquals(JobStatus.FAILING, executionGraph.getState());
+
+ // The cancel call needs to change the state to CANCELLING
+ executionGraph.cancel();
+
+ assertEquals(JobStatus.CANCELLING, executionGraph.getState());
+
+ // Unspy and finalize the job state
+ doCallRealMethod().when(executionGraph).jobVertexInFinalState();
+
+ executionGraph.jobVertexInFinalState();
+
+ assertEquals(JobStatus.CANCELED, executionGraph.getState());
+ }
}