You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/05/03 18:07:51 UTC

[1/5] flink git commit: [FLINK-5869] [flip-1] Add basic abstraction for Failover Strategies to ExecutionGraph

Repository: flink
Updated Branches:
  refs/heads/master 821ec80d7 -> 8ed85fe49


http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
new file mode 100644
index 0000000..4dd55ae
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Test;
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.Random;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilExecutionState;
+
+import static org.junit.Assert.assertEquals;
+
+import static org.mockito.Mockito.*;
+
+public class GlobalModVersionTest {
+
+	/**
+	 * Tests that failures during a global cancellation are not handed to the local
+	 * failover strategy.
+	 */
+	@Test
+	public void testNoLocalFailoverWhileCancelling() throws Exception {
+		final FailoverStrategy mockStrategy = mock(FailoverStrategy.class);
+
+		final ExecutionGraph graph = createSampleGraph(mockStrategy);
+
+		final ExecutionVertex testVertex = getRandomVertex(graph);
+
+		graph.scheduleForExecution();
+		assertEquals(JobStatus.RUNNING, graph.getState());
+		assertEquals(1L, graph.getGlobalModVersion());
+
+		// wait until everything is running
+		for (ExecutionVertex v : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
+			final Execution exec = v.getCurrentExecutionAttempt();
+			waitUntilExecutionState(exec, ExecutionState.DEPLOYING, 1000);
+			exec.switchToRunning();
+			assertEquals(ExecutionState.RUNNING, exec.getState());
+		}
+
+		// now cancel the job
+		graph.cancel();
+		assertEquals(2L, graph.getGlobalModVersion());
+
+		// everything should be cancelling
+		for (ExecutionVertex v : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
+			final Execution exec = v.getCurrentExecutionAttempt();
+			assertEquals(ExecutionState.CANCELING, exec.getState());
+		}
+
+		// let a vertex fail
+		testVertex.getCurrentExecutionAttempt().fail(new Exception("test exception"));
+
+		// all cancellations are done now
+		for (ExecutionVertex v : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
+			final Execution exec = v.getCurrentExecutionAttempt();
+			exec.cancelingComplete();
+		}
+
+		assertEquals(JobStatus.CANCELED, graph.getState());
+
+		// no failure notification at all
+		verify(mockStrategy, times(0)).onTaskFailure(any(Execution.class), any(Throwable.class));
+	}
+
+	/**
+	 * Tests that failures during a global faiover are not handed to the local
+	 * failover strategy.
+	 */
+	@Test
+	public void testNoLocalFailoverWhileFailing() throws Exception {
+		final FailoverStrategy mockStrategy = mock(FailoverStrategy.class);
+
+		final ExecutionGraph graph = createSampleGraph(mockStrategy);
+
+		final ExecutionVertex testVertex = getRandomVertex(graph);
+
+		graph.scheduleForExecution();
+		assertEquals(JobStatus.RUNNING, graph.getState());
+
+		// wait until everything is running
+		for (ExecutionVertex v : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
+			final Execution exec = v.getCurrentExecutionAttempt();
+			waitUntilExecutionState(exec, ExecutionState.DEPLOYING, 1000);
+			exec.switchToRunning();
+			assertEquals(ExecutionState.RUNNING, exec.getState());
+		}
+
+		// now send the job into global failover
+		graph.failGlobal(new Exception("global failover"));
+		assertEquals(JobStatus.FAILING, graph.getState());
+		assertEquals(2L, graph.getGlobalModVersion());
+
+		// another attempt to fail global should not do anything
+		graph.failGlobal(new Exception("should be ignored"));
+		assertEquals(JobStatus.FAILING, graph.getState());
+		assertEquals(2L, graph.getGlobalModVersion());
+
+		// everything should be cancelling
+		for (ExecutionVertex v : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
+			final Execution exec = v.getCurrentExecutionAttempt();
+			assertEquals(ExecutionState.CANCELING, exec.getState());
+		}
+
+		// let a vertex fail
+		testVertex.getCurrentExecutionAttempt().fail(new Exception("test exception"));
+
+		// all cancellations are done now
+		for (ExecutionVertex v : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
+			final Execution exec = v.getCurrentExecutionAttempt();
+			exec.cancelingComplete();
+		}
+
+		assertEquals(JobStatus.RESTARTING, graph.getState());
+
+		// no failure notification at all
+		verify(mockStrategy, times(0)).onTaskFailure(any(Execution.class), any(Throwable.class));
+	}
+
+	// ------------------------------------------------------------------------
+	//  utilities
+	// ------------------------------------------------------------------------
+
+	private ExecutionGraph createSampleGraph(FailoverStrategy failoverStrategy) throws Exception {
+
+		final JobID jid = new JobID();
+		final int parallelism = new Random().nextInt(10) + 1;
+
+		final SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, parallelism);
+
+		// build a simple execution graph with on job vertex, parallelism 2
+		final ExecutionGraph graph = new ExecutionGraph(
+				TestingUtils.defaultExecutor(),
+				TestingUtils.defaultExecutor(),
+				jid,
+				"test job",
+				new Configuration(),
+				new SerializedValue<>(new ExecutionConfig()),
+				Time.seconds(10),
+				new InfiniteDelayRestartStrategy(),
+				new CustomStrategy(failoverStrategy),
+				Collections.<BlobKey>emptyList(),
+				Collections.<URL>emptyList(),
+				slotProvider,
+				getClass().getClassLoader());
+
+		JobVertex jv = new JobVertex("test vertex");
+		jv.setInvokableClass(NoOpInvokable.class);
+		jv.setParallelism(parallelism);
+
+		JobGraph jg = new JobGraph(jid, "testjob", jv);
+		graph.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources());
+
+		return graph;
+	}
+
+	private static ExecutionVertex getRandomVertex(ExecutionGraph eg) {
+		final ExecutionVertex[] vertices = eg.getVerticesTopologically().iterator().next().getTaskVertices();
+		return vertices[new Random().nextInt(vertices.length)];
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static class CustomStrategy implements Factory {
+
+		private final FailoverStrategy failoverStrategy;
+
+		CustomStrategy(FailoverStrategy failoverStrategy) {
+			this.failoverStrategy = failoverStrategy;
+		}
+
+		@Override
+		public FailoverStrategy create(ExecutionGraph executionGraph) {
+			return failoverStrategy;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
new file mode 100644
index 0000000..713aece
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import org.apache.flink.runtime.executiongraph.failover.RestartIndividualStrategy;
+import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Test;
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilExecutionState;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilJobStatus;
+
+import static org.junit.Assert.*;
+
+/**
+ * These tests make sure that global failover (restart all) always takes precedence over
+ * local recovery strategies.
+ * 
+ * <p>This test must be in the package it resides in, because it uses package-private methods
+ * from the ExecutionGraph classes.
+ */
+public class IndividualRestartsConcurrencyTest {
+
+	/**
+	 * Tests that a cancellation concurrent to a local failover leads to a properly
+	 * cancelled state.
+	 */
+	@Test
+	public void testCancelWhileInLocalFailover() throws Exception {
+
+		// the logic in this test is as follows:
+		//  - start a job
+		//  - cause a task failure and delay the local recovery action via the manual executor
+		//  - cancel the job to go into cancelling
+		//  - resume in local recovery action
+		//  - validate that this does in fact not start a new task, because the graph as a
+		//    whole should now be cancelled already
+
+		final JobID jid = new JobID();
+		final int parallelism = 2;
+
+		final ManuallyTriggeredDirectExecutor executor = new ManuallyTriggeredDirectExecutor();
+
+		final SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, parallelism);
+
+		final ExecutionGraph graph = createSampleGraph(
+				jid,
+				new IndividualFailoverWithCustomExecutor(executor),
+				slotProvider,
+				2);
+
+		final ExecutionJobVertex ejv = graph.getVerticesTopologically().iterator().next();
+		final ExecutionVertex vertex1 = ejv.getTaskVertices()[0];
+		final ExecutionVertex vertex2 = ejv.getTaskVertices()[1];
+
+		graph.scheduleForExecution();
+		assertEquals(JobStatus.RUNNING, graph.getState());
+
+		// let one of the vertices fail - that triggers a local recovery action
+		vertex1.getCurrentExecutionAttempt().fail(new Exception("test failure"));
+		assertEquals(ExecutionState.FAILED, vertex1.getCurrentExecutionAttempt().getState());
+
+		// graph should still be running and the failover recovery action should be queued
+		assertEquals(JobStatus.RUNNING, graph.getState());
+		assertEquals(1, executor.numQueuedRunnables());
+
+		// now cancel the job
+		graph.cancel();
+
+		assertEquals(JobStatus.CANCELLING, graph.getState());
+		assertEquals(ExecutionState.FAILED, vertex1.getCurrentExecutionAttempt().getState());
+		assertEquals(ExecutionState.CANCELING, vertex2.getCurrentExecutionAttempt().getState());
+
+		// let the recovery action continue
+		executor.trigger();
+
+		// now report that cancelling is complete for the other vertex
+		vertex2.getCurrentExecutionAttempt().cancelingComplete();
+
+		assertEquals(JobStatus.CANCELED, graph.getState());
+		assertTrue(vertex1.getCurrentExecutionAttempt().getState().isTerminal());
+		assertTrue(vertex2.getCurrentExecutionAttempt().getState().isTerminal());
+
+		// make sure all slots are recycled
+		assertEquals(parallelism, slotProvider.getNumberOfAvailableSlots());
+	}
+
+	/**
+	 * Tests that a terminal global failure concurrent to a local failover
+	 * leads to a properly failed state.
+	 */
+	@Test
+	public void testGlobalFailureConcurrentToLocalFailover() throws Exception {
+
+		// the logic in this test is as follows:
+		//  - start a job
+		//  - cause a task failure and delay the local recovery action via the manual executor
+		//  - cause a global failure
+		//  - resume in local recovery action
+		//  - validate that this does in fact not start a new task, because the graph as a
+		//    whole should now be terminally failed already
+
+		final JobID jid = new JobID();
+		final int parallelism = 2;
+
+		final ManuallyTriggeredDirectExecutor executor = new ManuallyTriggeredDirectExecutor();
+
+		final SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, parallelism);
+
+		final ExecutionGraph graph = createSampleGraph(
+				jid,
+				new IndividualFailoverWithCustomExecutor(executor),
+				slotProvider,
+				2);
+
+		final ExecutionJobVertex ejv = graph.getVerticesTopologically().iterator().next();
+		final ExecutionVertex vertex1 = ejv.getTaskVertices()[0];
+		final ExecutionVertex vertex2 = ejv.getTaskVertices()[1];
+
+		graph.scheduleForExecution();
+		assertEquals(JobStatus.RUNNING, graph.getState());
+
+		// let one of the vertices fail - that triggers a local recovery action
+		vertex1.getCurrentExecutionAttempt().fail(new Exception("test failure"));
+		assertEquals(ExecutionState.FAILED, vertex1.getCurrentExecutionAttempt().getState());
+
+		// graph should still be running and the failover recovery action should be queued
+		assertEquals(JobStatus.RUNNING, graph.getState());
+		assertEquals(1, executor.numQueuedRunnables());
+
+		// now cancel the job
+		graph.failGlobal(new Exception("test exception"));
+
+		assertEquals(JobStatus.FAILING, graph.getState());
+		assertEquals(ExecutionState.FAILED, vertex1.getCurrentExecutionAttempt().getState());
+		assertEquals(ExecutionState.CANCELING, vertex2.getCurrentExecutionAttempt().getState());
+
+		// let the recovery action continue
+		executor.trigger();
+
+		// now report that cancelling is complete for the other vertex
+		vertex2.getCurrentExecutionAttempt().cancelingComplete();
+
+		assertEquals(JobStatus.FAILED, graph.getState());
+		assertTrue(vertex1.getCurrentExecutionAttempt().getState().isTerminal());
+		assertTrue(vertex2.getCurrentExecutionAttempt().getState().isTerminal());
+
+		// make sure all slots are recycled
+		assertEquals(parallelism, slotProvider.getNumberOfAvailableSlots());
+	}
+
+	/**
+	 * Tests that a local failover does not try to trump a global failover.
+	 */
+	@Test
+	public void testGlobalRecoveryConcurrentToLocalRecovery() throws Exception {
+
+		// the logic in this test is as follows:
+		//  - start a job
+		//  - cause a task failure and delay the local recovery action via the manual executor
+		//  - cause a global failure that is recovering immediately
+		//  - resume in local recovery action
+		//  - validate that this does in fact not cause another task restart, because the global
+		//    recovery should already have restarted the task graph
+
+		final JobID jid = new JobID();
+		final int parallelism = 2;
+
+		final ManuallyTriggeredDirectExecutor executor = new ManuallyTriggeredDirectExecutor();
+
+		final SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, parallelism);
+
+		final ExecutionGraph graph = createSampleGraph(
+				jid,
+				new IndividualFailoverWithCustomExecutor(executor),
+				new FixedDelayRestartStrategy(1, 0), // one restart, no delay
+				slotProvider,
+				2);
+
+		final ExecutionJobVertex ejv = graph.getVerticesTopologically().iterator().next();
+		final ExecutionVertex vertex1 = ejv.getTaskVertices()[0];
+		final ExecutionVertex vertex2 = ejv.getTaskVertices()[1];
+
+		graph.scheduleForExecution();
+		assertEquals(JobStatus.RUNNING, graph.getState());
+
+		// let one of the vertices fail - that triggers a local recovery action
+		vertex2.getCurrentExecutionAttempt().fail(new Exception("test failure"));
+		assertEquals(ExecutionState.FAILED, vertex2.getCurrentExecutionAttempt().getState());
+
+		// graph should still be running and the failover recovery action should be queued
+		assertEquals(JobStatus.RUNNING, graph.getState());
+		assertEquals(1, executor.numQueuedRunnables());
+
+		// now cancel the job
+		graph.failGlobal(new Exception("test exception"));
+
+		assertEquals(JobStatus.FAILING, graph.getState());
+		assertEquals(ExecutionState.FAILED, vertex2.getCurrentExecutionAttempt().getState());
+		assertEquals(ExecutionState.CANCELING, vertex1.getCurrentExecutionAttempt().getState());
+
+		// now report that cancelling is complete for the other vertex
+		vertex1.getCurrentExecutionAttempt().cancelingComplete();
+
+		waitUntilJobStatus(graph, JobStatus.RUNNING, 1000);
+		assertEquals(JobStatus.RUNNING, graph.getState());
+
+		waitUntilExecutionState(vertex1.getCurrentExecutionAttempt(), ExecutionState.DEPLOYING, 1000);
+		waitUntilExecutionState(vertex2.getCurrentExecutionAttempt(), ExecutionState.DEPLOYING, 1000);
+		vertex1.getCurrentExecutionAttempt().switchToRunning();
+		vertex2.getCurrentExecutionAttempt().switchToRunning();
+		assertEquals(ExecutionState.RUNNING, vertex1.getCurrentExecutionAttempt().getState());
+		assertEquals(ExecutionState.RUNNING, vertex2.getCurrentExecutionAttempt().getState());
+
+		// let the recovery action continue - this should do nothing any more
+		executor.trigger();
+
+		// validate that the graph is still peachy
+		assertEquals(JobStatus.RUNNING, graph.getState());
+		assertEquals(ExecutionState.RUNNING, vertex1.getCurrentExecutionAttempt().getState());
+		assertEquals(ExecutionState.RUNNING, vertex2.getCurrentExecutionAttempt().getState());
+		assertEquals(1, vertex1.getCurrentExecutionAttempt().getAttemptNumber());
+		assertEquals(1, vertex2.getCurrentExecutionAttempt().getAttemptNumber());
+		assertEquals(1, vertex1.getCopyOfPriorExecutionsList().size());
+		assertEquals(1, vertex2.getCopyOfPriorExecutionsList().size());
+
+		// make sure all slots are in use
+		assertEquals(0, slotProvider.getNumberOfAvailableSlots());
+	}
+
+	// ------------------------------------------------------------------------
+	//  utilities
+	// ------------------------------------------------------------------------
+
+	private ExecutionGraph createSampleGraph(
+			JobID jid,
+			Factory failoverStrategy,
+			SlotProvider slotProvider,
+			int parallelism) throws Exception {
+
+		return createSampleGraph(jid, failoverStrategy, new NoRestartStrategy(), slotProvider, parallelism);
+	}
+
+	private ExecutionGraph createSampleGraph(
+			JobID jid,
+			Factory failoverStrategy,
+			RestartStrategy restartStrategy,
+			SlotProvider slotProvider,
+			int parallelism) throws Exception {
+
+		// build a simple execution graph with on job vertex, parallelism 2
+		final ExecutionGraph graph = new ExecutionGraph(
+				TestingUtils.defaultExecutor(),
+				TestingUtils.defaultExecutor(),
+				jid,
+				"test job",
+				new Configuration(),
+				new SerializedValue<>(new ExecutionConfig()),
+				Time.seconds(10),
+				restartStrategy,
+				failoverStrategy,
+				Collections.<BlobKey>emptyList(),
+				Collections.<URL>emptyList(),
+				slotProvider,
+				getClass().getClassLoader());
+
+		JobVertex jv = new JobVertex("test vertex");
+		jv.setInvokableClass(NoOpInvokable.class);
+		jv.setParallelism(parallelism);
+
+		JobGraph jg = new JobGraph(jid, "testjob", jv);
+		graph.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources());
+
+		return graph;
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static class IndividualFailoverWithCustomExecutor implements Factory {
+
+		private final Executor executor;
+
+		IndividualFailoverWithCustomExecutor(Executor executor) {
+			this.executor = executor;
+		}
+
+		@Override
+		public FailoverStrategy create(ExecutionGraph executionGraph) {
+			return new RestartIndividualStrategy(executionGraph, executor);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalJobStatusListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalJobStatusListener.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalJobStatusListener.java
deleted file mode 100644
index c107d54..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalJobStatusListener.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * A job status listener that waits lets one block until the job is in a terminal state.
- */
-public class TerminalJobStatusListener  implements JobStatusListener {
-
-	private final OneShotLatch terminalStateLatch = new OneShotLatch();
-
-	public void waitForTerminalState(long timeoutMillis) throws InterruptedException, TimeoutException {
-		terminalStateLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
-	}
-
-	@Override
-	public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
-		if (newJobStatus.isGloballyTerminalState() || newJobStatus == JobStatus.SUSPENDED) {
-			terminalStateLatch.trigger();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
new file mode 100644
index 0000000..6e67c1a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.utils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.StackTrace;
+import org.apache.flink.runtime.messages.StackTraceSampleResponse;
+
+import java.util.UUID;
+
+/**
+ * A TaskManagerGateway that simply acks the basic operations (deploy, cancel, update) and does not
+ * support any more advanced operations.
+ */
+public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
+
+	private final String address = UUID.randomUUID().toString();
+
+	@Override
+	public String getAddress() {
+		return address;
+	}
+
+	@Override
+	public void disconnectFromJobManager(InstanceID instanceId, Exception cause) {}
+
+	@Override
+	public void stopCluster(ApplicationStatus applicationStatus, String message) {}
+
+	@Override
+	public Future<StackTrace> requestStackTrace(Time timeout) {
+		return FlinkCompletableFuture.completedExceptionally(new UnsupportedOperationException());
+	}
+
+	@Override
+	public Future<StackTraceSampleResponse> requestStackTraceSample(
+			ExecutionAttemptID executionAttemptID,
+			int sampleId,
+			int numSamples,
+			Time delayBetweenSamples,
+			int maxStackTraceDepth,
+			Time timeout) {
+		return FlinkCompletableFuture.completedExceptionally(new UnsupportedOperationException());
+	}
+
+	@Override
+	public Future<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
+		return FlinkCompletableFuture.completed(Acknowledge.get());
+	}
+
+	@Override
+	public Future<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+		return FlinkCompletableFuture.completed(Acknowledge.get());
+	}
+
+	@Override
+	public Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+		return FlinkCompletableFuture.completed(Acknowledge.get());
+	}
+
+	@Override
+	public Future<Acknowledge> updatePartitions(ExecutionAttemptID executionAttemptID, Iterable<PartitionInfo> partitionInfos, Time timeout) {
+		return FlinkCompletableFuture.completed(Acknowledge.get());
+	}
+
+	@Override
+	public void failPartition(ExecutionAttemptID executionAttemptID) {}
+
+	@Override
+	public void notifyCheckpointComplete(
+			ExecutionAttemptID executionAttemptID,
+			JobID jobId,
+			long checkpointId,
+			long timestamp) {}
+
+	@Override
+	public void triggerCheckpoint(
+			ExecutionAttemptID executionAttemptID,
+			JobID jobId,
+			long checkpointId,
+			long timestamp,
+			CheckpointOptions checkpointOptions) {}
+
+	@Override
+	public Future<BlobKey> requestTaskManagerLog(Time timeout) {
+		return FlinkCompletableFuture.completedExceptionally(new UnsupportedOperationException());
+	}
+
+	@Override
+	public Future<BlobKey> requestTaskManagerStdout(Time timeout) {
+		return FlinkCompletableFuture.completedExceptionally(new UnsupportedOperationException());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
new file mode 100644
index 0000000..2cf1eec
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.utils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.net.InetAddress;
+import java.util.ArrayDeque;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A testing utility slot provider that simply has a predefined pool of slots.
+ */
+public class SimpleSlotProvider implements SlotProvider, SlotOwner {
+
+	private final ArrayDeque<AllocatedSlot> slots;
+
+	public SimpleSlotProvider(JobID jobId, int numSlots) {
+		this(jobId, numSlots, new SimpleAckingTaskManagerGateway());
+	}
+
+	public SimpleSlotProvider(JobID jobId, int numSlots, TaskManagerGateway taskManagerGateway) {
+		checkNotNull(jobId, "jobId");
+		checkArgument(numSlots >= 0, "numSlots must be >= 0");
+
+		this.slots = new ArrayDeque<>(numSlots);
+
+		for (int i = 0; i < numSlots; i++) {
+			AllocatedSlot as = new AllocatedSlot(
+					new AllocationID(),
+					jobId,
+					new TaskManagerLocation(ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + i),
+					0,
+					ResourceProfile.UNKNOWN,
+					taskManagerGateway);
+			slots.add(as);
+		}
+	}
+
+	@Override
+	public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
+		final AllocatedSlot slot;
+
+		synchronized (slots) {
+			if (slots.isEmpty()) {
+				slot = null;
+			} else {
+				slot = slots.removeFirst();
+			}
+		}
+
+		if (slot != null) {
+			SimpleSlot result = new SimpleSlot(slot, this, 0);
+			return FlinkCompletableFuture.completed(result);
+		}
+		else {
+			return FlinkCompletableFuture.completedExceptionally(new NoResourceAvailableException());
+		}
+	}
+
+	@Override
+	public boolean returnAllocatedSlot(Slot slot) {
+		synchronized (slots) {
+			slots.add(slot.getAllocatedSlot());
+		}
+		return true;
+	}
+
+	public int getNumberOfAvailableSlots() {
+		synchronized (slots) {
+			return slots.size();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 6316bfd..195baa1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -447,7 +447,7 @@ public class JobManagerTest extends TestLogger {
 							vertex.getCurrentExecutionAttempt().getAttemptId());
 
 						// Reset execution => new execution attempt
-						vertex.resetForNewExecution();
+						vertex.resetForNewExecution(System.currentTimeMillis(), 1L);
 
 						// Producer finished, request state
 						Object request = new JobManagerMessages.RequestPartitionProducerState(jid, rid, partitionId);

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
index 49d0239..b1bb548 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.leaderelection;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.TerminalJobStatusListener;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -53,7 +52,6 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
 	private int numSlotsPerTM = 1;
 	private int parallelism = numTMs * numSlotsPerTM;
 
-	private Configuration configuration;
 	private LeaderElectionRetrievalTestingCluster cluster = null;
 	private JobGraph job = createBlockingJob(parallelism);
 
@@ -61,7 +59,7 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
 	public void before() throws TimeoutException, InterruptedException {
 		Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(true);
 
-		configuration = new Configuration();
+		Configuration configuration = new Configuration();
 
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1);
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
@@ -110,12 +108,9 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
 
 		ExecutionGraph executionGraph = (ExecutionGraph) ((TestingJobManagerMessages.ExecutionGraphFound) responseExecutionGraph).executionGraph();
 
-		TerminalJobStatusListener testListener = new TerminalJobStatusListener();
-		executionGraph.registerJobStatusListener(testListener);
-
 		cluster.revokeLeadership();
 
-		testListener.waitForTerminalState(30000);
+		executionGraph.getTerminationFuture().get(30, TimeUnit.SECONDS);
 	}
 
 	public JobGraph createBlockingJob(int parallelism) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
index 7102f27..4019d63 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionEdge;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -147,6 +148,7 @@ public class RescalePartitionerTest extends TestLogger {
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy(),
+			new RestartAllStrategy.Factory(),
 			new ArrayList<BlobKey>(),
 			new ArrayList<URL>(),
 			new Scheduler(TestingUtils.defaultExecutionContext()),

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredDirectExecutor.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredDirectExecutor.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredDirectExecutor.java
new file mode 100644
index 0000000..07c7bd3
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredDirectExecutor.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.testutils;
+
+import javax.annotation.Nonnull;
+import java.util.ArrayDeque;
+import java.util.concurrent.Executor;
+
+/**
+ * An executor that does not immediately execute a Runnable, but only executes it
+ * upon an explicit trigger.
+ * 
+ * <p>This executor can be used in concurrent tests to control when certain asynchronous
+ * actions should happen.
+ */
+public class ManuallyTriggeredDirectExecutor implements Executor {
+
+	private final ArrayDeque<Runnable> queuedRunnables = new ArrayDeque<>();
+
+	@Override
+	public void execute(@Nonnull Runnable command) {
+		synchronized (queuedRunnables) {
+			queuedRunnables.addLast(command);
+		}
+	}
+
+	/**
+	 * Triggers the next queued runnable and executes it synchronously.
+	 * This method throws an exception if no Runnable is currently queued.
+	 */
+	public void trigger() {
+		final Runnable next;
+
+		synchronized (queuedRunnables) {
+			next = queuedRunnables.removeFirst();
+		}
+
+		if (next != null) {
+			next.run();
+		}
+		else {
+			throw new IllegalStateException("No runnable available");
+		}
+	}
+
+	/**
+	 * Gets the number of Runnables currently queued.
+	 */
+	public int numQueuedRunnables() {
+		synchronized (queuedRunnables) {
+			return queuedRunnables.size();
+		}
+	}
+}


[3/5] flink git commit: [FLINK-5869] [flip-1] Add basic abstraction for Failover Strategies to ExecutionGraph

Posted by se...@apache.org.
[FLINK-5869] [flip-1] Add basic abstraction for Failover Strategies to ExecutionGraph

  - Rename 'ExecutionGraph.fail()' to 'ExecutionGraph.failGlobally()' to differentiate from fine grained failures/recovery
  - Add base class for FailoverStrategy
  - Add default implementation (restart all tasks)
  - Add logic to load the failover strategy from the configuration


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ed85fe4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ed85fe4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ed85fe4

Branch: refs/heads/master
Commit: 8ed85fe49b7595546a8f968e0faa1fa7d4da47ec
Parents: e006127
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Mar 21 19:13:34 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed May 3 19:17:23 2017 +0200

----------------------------------------------------------------------
 .../flink/configuration/JobManagerOptions.java  |   7 +
 .../java/org/apache/flink/util/StringUtils.java |  20 +
 .../flink/runtime/execution/ExecutionState.java |  33 +-
 .../flink/runtime/executiongraph/Execution.java |  56 ++-
 .../runtime/executiongraph/ExecutionGraph.java  | 315 +++++++++++----
 .../executiongraph/ExecutionGraphBuilder.java   |  10 +
 .../executiongraph/ExecutionJobVertex.java      | 172 +++-----
 .../runtime/executiongraph/ExecutionVertex.java |  87 +++-
 .../GlobalModVersionMismatch.java               |  46 +++
 .../failover/FailoverStrategy.java              |  92 +++++
 .../failover/FailoverStrategyLoader.java        |  72 ++++
 .../failover/RestartAllStrategy.java            |  80 ++++
 .../failover/RestartIndividualStrategy.java     | 173 ++++++++
 .../metrics/NumberOfFullRestartsGauge.java      |  47 +++
 .../flink/runtime/jobmaster/JobMaster.java      |   2 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   6 +-
 .../checkpoint/CheckpointCoordinatorTest.java   |   1 +
 .../checkpoint/CoordinatorShutdownTest.java     |   4 +-
 ...ExecutionGraphCheckpointCoordinatorTest.java |   4 +-
 .../ExecutionGraphDeploymentTest.java           | 114 ++----
 .../ExecutionGraphMetricsTest.java              |  12 +-
 .../ExecutionGraphRestartTest.java              |  33 +-
 .../ExecutionGraphSchedulingTest.java           |   9 +-
 .../ExecutionGraphSignalsTest.java              | 399 -------------------
 .../executiongraph/ExecutionGraphStopTest.java  | 176 ++++++++
 .../ExecutionGraphSuspendTest.java              | 307 ++++++++++++++
 .../executiongraph/ExecutionGraphTestUtils.java | 263 ++++++++++--
 .../ExecutionGraphVariousFailuesTest.java       | 118 ++++++
 .../ExecutionVertexCancelTest.java              |  45 +--
 .../ExecutionVertexLocalityTest.java            |   2 +-
 .../executiongraph/ExecutionVertexStopTest.java | 129 ------
 .../executiongraph/FinalizeOnMasterTest.java    |  96 +++++
 .../executiongraph/GlobalModVersionTest.java    | 212 ++++++++++
 .../IndividualRestartsConcurrencyTest.java      | 332 +++++++++++++++
 .../TerminalJobStatusListener.java              |  45 ---
 .../utils/SimpleAckingTaskManagerGateway.java   | 121 ++++++
 .../utils/SimpleSlotProvider.java               | 106 +++++
 .../runtime/jobmanager/JobManagerTest.java      |   2 +-
 .../LeaderChangeJobRecoveryTest.java            |   9 +-
 .../partitioner/RescalePartitionerTest.java     |   2 +
 .../ManuallyTriggeredDirectExecutor.java        |  70 ++++
 41 files changed, 2808 insertions(+), 1021 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 10d9e16..5481d7a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -73,6 +73,13 @@ public class JobManagerOptions {
 			.withDeprecatedKeys("job-manager.max-attempts-history-size");
 
 	/**
+	 * The maximum number of prior execution attempts kept in history.
+	 */
+	public static final ConfigOption<String> EXECUTION_FAILOVER_STRATEGY =
+		key("jobmanager.execution.failover-strategy")
+			.defaultValue("full");
+
+	/**
 	 * This option specifies the interval in order to trigger a resource manager reconnection if the connection
 	 * to the resource manager has been lost.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
index abd6ba6..6638062 100644
--- a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
@@ -330,6 +330,26 @@ public final class StringUtils {
 		return true;
 	}
 
+	/**
+	 * If both string arguments are non-null, this method concatenates them with ' and '.
+	 * If only one of the arguments is non-null, this method returns the non-null argument.
+	 * If both arguments are null, this method returns null.
+	 * 
+	 * @param s1 The first string argument
+	 * @param s2 The second string argument
+	 * 
+	 * @return The concatenated string, or non-null argument, or null 
+	 */
+	@Nullable
+	public static String concatenateWithAnd(@Nullable String s1, @Nullable String s2) {
+		if (s1 != null) {
+			return s2 == null ? s1 : s1 + " and " + s2;
+		}
+		else {
+			return s2 != null ? s2 : null;
+		}
+	}
+
 	// ------------------------------------------------------------------------
 
 	/** Prevent instantiation of this utility class */

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java
index d6ff0cd..53ca8b3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java
@@ -25,27 +25,27 @@ package org.apache.flink.runtime.execution;
  * <pre>{@code
  *
  *     CREATED  -> SCHEDULED -> DEPLOYING -> RUNNING -> FINISHED
- *            |         |            |          |
- *            |         |            |   +------+
- *            |         |            V   V
- *            |         |         CANCELLING -----+----> CANCELED
- *            |         |                         |
- *            |        +-------------------------+
- *            |
- *            |                                   ... -> FAILED
- *           V
+ *        |            |            |          |
+ *        |            |            |   +------+
+ *        |            |            V   V
+ *        |            |         CANCELLING -----+----> CANCELED
+ *        |            |                         |
+ *        |            +-------------------------+
+ *        |
+ *        |                                   ... -> FAILED
+ *        V
  *    RECONCILING  -> RUNNING | FINISHED | CANCELED | FAILED
  *
  * }</pre>
  *
  * <p>It is possible to enter the {@code RECONCILING} state from {@code CREATED}
  * state if job manager fail over, and the {@code RECONCILING} state can switch into
- * any existing task state.</p>
+ * any existing task state.
  *
- * <p>It is possible to enter the {@code FAILED} state from any other state.</p>
+ * <p>It is possible to enter the {@code FAILED} state from any other state.
  *
  * <p>The states {@code FINISHED}, {@code CANCELED}, and {@code FAILED} are
- * considered terminal states.</p>
+ * considered terminal states.
  */
 public enum ExecutionState {
 
@@ -56,7 +56,14 @@ public enum ExecutionState {
 	DEPLOYING,
 	
 	RUNNING,
-	
+
+	/**
+	 * This state marks "successfully completed". It can only be reached when a
+	 * program reaches the "end of its input". The "end of input" can be reached
+	 * when consuming a bounded input (fix set of files, bounded query, etc) or
+	 * when stopping a program (not cancelling!) which make the input look like
+	 * it reached its end at a specific point.
+	 */
 	FINISHED,
 	
 	CANCELING,

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 2680849..c0f1f39 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -74,12 +74,13 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
- * A single execution of a vertex. While an {@link ExecutionVertex} can be executed multiple times (for recovery,
- * or other re-computation), this class tracks the state of a single execution of that vertex and the resources.
+ * A single execution of a vertex. While an {@link ExecutionVertex} can be executed multiple times
+ * (for recovery, re-computation, re-configuration), this class tracks the state of a single execution
+ * of that vertex and the resources.
  * 
- * <p>NOTE ABOUT THE DESIGN RATIONAL:
+ * <h2>Lock free state transitions</h2>
  * 
- * <p>In several points of the code, we need to deal with possible concurrent state changes and actions.
+ * In several points of the code, we need to deal with possible concurrent state changes and actions.
  * For example, while the call to deploy a task (send it to the TaskManager) happens, the task gets cancelled.
  * 
  * <p>We could lock the entire portion of the code (decision to deploy, deploy, set state to running) such that
@@ -113,6 +114,12 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	/** The unique ID marking the specific execution instant of the task */
 	private final ExecutionAttemptID attemptId;
 
+	/** Gets the global modification version of the execution graph when this execution was created.
+	 * This version is bumped in the ExecutionGraph whenever a global failover happens. It is used
+	 * to resolve conflicts between concurrent modification by global and local failover actions. */
+	private final long globalModVersion;
+
+	/** The timestamps when state transitions occurred, indexed by {@link ExecutionState#ordinal()} */ 
 	private final long[] stateTimestamps;
 
 	private final int attemptNumber;
@@ -146,10 +153,27 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 	// --------------------------------------------------------------------------------------------
 
+	/**
+	 * Creates a new Execution attempt.
+	 * 
+	 * @param executor
+	 *             The executor used to dispatch callbacks from futures and asynchronous RPC calls.
+	 * @param vertex
+	 *             The execution vertex to which this Execution belongs
+	 * @param attemptNumber
+	 *             The execution attempt number.
+	 * @param globalModVersion
+	 *             The global modification version of the execution graph when this execution was created
+	 * @param startTimestamp
+	 *             The timestamp that marks the creation of this Execution
+	 * @param timeout
+	 *             The timeout for RPC calls like deploy/cancel/stop.
+	 */
 	public Execution(
 			Executor executor,
 			ExecutionVertex vertex,
 			int attemptNumber,
+			long globalModVersion,
 			long startTimestamp,
 			Time timeout) {
 
@@ -158,6 +182,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		this.attemptId = new ExecutionAttemptID();
 		this.timeout = checkNotNull(timeout);
 
+		this.globalModVersion = globalModVersion;
 		this.attemptNumber = attemptNumber;
 
 		this.stateTimestamps = new long[ExecutionState.values().length];
@@ -190,6 +215,16 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		return state;
 	}
 
+	/**
+	 * Gets the global modification version of the execution graph when this execution was created.
+	 * 
+	 * <p>This version is bumped in the ExecutionGraph whenever a global failover happens. It is used
+	 * to resolve conflicts between concurrent modification by global and local failover actions.
+	 */
+	public long getGlobalModVersion() {
+		return globalModVersion;
+	}
+
 	public SimpleSlot getAssignedResource() {
 		return assignedResource;
 	}
@@ -252,6 +287,12 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	//  Actions
 	// --------------------------------------------------------------------------------------------
 
+	public boolean scheduleForExecution() {
+		SlotProvider resourceProvider = getVertex().getExecutionGraph().getSlotProvider();
+		boolean allowQueued = getVertex().getExecutionGraph().isQueuedSchedulingAllowed();
+		return scheduleForExecution(resourceProvider, allowQueued);
+	}
+
 	/**
 	 * NOTE: This method only throws exceptions if it is in an illegal state to be scheduled, or if the tasks needs
 	 *       to be scheduled immediately and no resource is available. If the task is accepted by the schedule, any
@@ -381,9 +422,6 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 				taskState,
 				attemptNumber);
 
-			// register this execution at the execution graph, to receive call backs
-			vertex.getExecutionGraph().registerExecution(this);
-			
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
 
 			final Future<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, timeout);
@@ -823,7 +861,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 				if (current != FAILED) {
 					String message = String.format("Asynchronous race: Found state %s after successful cancel call.", state);
 					LOG.error(message);
-					vertex.getExecutionGraph().fail(new Exception(message));
+					vertex.getExecutionGraph().failGlobal(new Exception(message));
 				}
 				return;
 			}
@@ -1069,7 +1107,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 			// make sure that the state transition completes normally.
 			// potential errors (in listeners may not affect the main logic)
 			try {
-				vertex.notifyStateTransition(attemptId, targetState, error);
+				vertex.notifyStateTransition(this, targetState, error);
 			}
 			catch (Throwable t) {
 				LOG.error("Error while notifying execution graph of execution state transition.", t);

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index fff1ea2..5eaa637 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.commons.lang3.StringUtils;
-
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.api.common.ArchivedExecutionConfig;
@@ -42,11 +40,15 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
 import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
@@ -65,10 +67,12 @@ import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 
+import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -91,6 +95,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -100,9 +105,9 @@ import static org.apache.flink.util.Preconditions.checkState;
 /**
  * The execution graph is the central data structure that coordinates the distributed
  * execution of a data flow. It keeps representations of each parallel task, each
- * intermediate result, and the communication between them.
+ * intermediate stream, and the communication between them.
  *
- * The execution graph consists of the following constructs:
+ * <p>The execution graph consists of the following constructs:
  * <ul>
  *     <li>The {@link ExecutionJobVertex} represents one vertex from the JobGraph (usually one operation like
  *         "map" or "join") during execution. It holds the aggregated state of all parallel subtasks.
@@ -118,12 +123,41 @@ import static org.apache.flink.util.Preconditions.checkState;
  *         about deployment of tasks and updates in the task status always use the ExecutionAttemptID to
  *         address the message receiver.</li>
  * </ul>
+ * 
+ * <h2>Global and local failover</h2>
+ * 
+ * The Execution Graph has two failover modes: <i>global failover</i> and <i>local failover</i>.
+ * 
+ * <p>A <b>global failover</b> aborts the task executions for all vertices and restarts whole
+ * data flow graph from the last completed checkpoint. Global failover is considered the
+ * "fallback strategy" that is used when a local failover is unsuccessful, or when a issue is
+ * found in the state of the ExecutionGraph that could mark it as inconsistent (caused by a bug).
+ * 
+ * <p>A <b>local failover</b> is triggered when an individual vertex execution (a task) fails.
+ * The local failover is coordinated by the {@link FailoverStrategy}. A local failover typically
+ * attempts to restart as little as possible, but as much as necessary.
+ * 
+ * <p>Between local- and global failover, the global failover always takes precedence, because it
+ * is the core mechanism that the ExecutionGraph relies on to bring back consistency. The
+ * guard that, the ExecutionGraph maintains a <i>global modification version</i>, which is incremented
+ * with every global failover (and other global actions, like job cancellation, or terminal
+ * failure). Local failover is always scoped by the modification version that the execution graph
+ * had when the failover was triggered. If a new global modification version is reached during
+ * local failover (meaning there is a concurrent global failover), the failover strategy has to
+ * yield before the global failover.  
  */
 public class ExecutionGraph implements AccessExecutionGraph, Archiveable<ArchivedExecutionGraph> {
 
+	/** In place updater for the execution graph's current state. Avoids having to use an
+	 * AtomicReference and thus makes the frequent read access a bit faster */
 	private static final AtomicReferenceFieldUpdater<ExecutionGraph, JobStatus> STATE_UPDATER =
 			AtomicReferenceFieldUpdater.newUpdater(ExecutionGraph.class, JobStatus.class, "state");
 
+	/** In place updater for the execution graph's current global recovery version.
+	 * Avoids having to use an AtomicLong and thus makes the frequent read access a bit faster */
+	private static final AtomicLongFieldUpdater<ExecutionGraph> GLOBAL_VERSION_UPDATER =
+			AtomicLongFieldUpdater.newUpdater(ExecutionGraph.class, "globalModVersion");
+
 	/** The log object used for debugging. */
 	static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class);
 
@@ -169,6 +203,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	/** Listeners that receive messages whenever a single task execution changes its status */
 	private final List<ExecutionStatusListener> executionListeners;
 
+	/** The implementation that decides how to recover the failures of tasks */
+	private final FailoverStrategy failoverStrategy;
+
 	/** Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when
 	 * the execution graph transitioned into a certain state. The index into this array is the
 	 * ordinal of the enum value, i.e. the timestamp when the graph went into state "RUNNING" is
@@ -178,6 +215,10 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	/** The timeout for all messages that require a response/acknowledgement */
 	private final Time rpcCallTimeout;
 
+	/** The timeout for bulk slot allocation (eager scheduling mode). After this timeout,
+	 * slots are released and a recovery is triggered */
+	private final Time scheduleAllocationTimeout;
+
 	/** Strategy to use for restarts */
 	private final RestartStrategy restartStrategy;
 
@@ -190,6 +231,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	/** Registered KvState instances reported by the TaskManagers. */
 	private final KvStateLocationRegistry kvStateLocationRegistry;
 
+	/** The total number of vertices currently in the execution graph */
 	private int numVerticesTotal;
 
 	// ------ Configuration of the Execution -------
@@ -203,8 +245,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 * from results than need to be materialized. */
 	private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
 
-	private final Time scheduleAllocationTimeout;
-
 	// ------ Execution status and progress. These values are volatile, and accessed under the lock -------
 
 	private final AtomicInteger verticesFinished;
@@ -212,6 +252,13 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	/** Current status of the job execution */
 	private volatile JobStatus state = JobStatus.CREATED;
 
+	/** A future that completes once the job has reached a terminal state */
+	private volatile CompletableFuture<JobStatus> terminationFuture;
+
+	/** On each global recovery, this version is incremented. The version breaks conflicts
+	 * between concurrent restart attempts by local failover strategies */
+	private volatile long globalModVersion;
+
 	/** The exception that caused the job to fail. This is set to the first root exception
 	 * that was not recoverable and triggered job failure */
 	private volatile Throwable failureCause;
@@ -233,7 +280,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * This constructor is for tests only, because it does not include class loading information.
+	 * This constructor is for tests only, because it sets default values for many fields.
 	 */
 	@VisibleForTesting
 	ExecutionGraph(
@@ -255,6 +302,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			serializedConfig,
 			timeout,
 			restartStrategy,
+			new RestartAllStrategy.Factory(),
 			Collections.<BlobKey>emptyList(),
 			Collections.<URL>emptyList(),
 			slotProvider,
@@ -270,6 +318,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			SerializedValue<ExecutionConfig> serializedConfig,
 			Time timeout,
 			RestartStrategy restartStrategy,
+			FailoverStrategy.Factory failoverStrategyFactory,
 			List<BlobKey> requiredJarFiles,
 			List<URL> requiredClasspaths,
 			SlotProvider slotProvider,
@@ -322,6 +371,13 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		this.kvStateLocationRegistry = new KvStateLocationRegistry(jobId, getAllVertices());
 
 		this.verticesFinished = new AtomicInteger();
+
+		this.globalModVersion = 1L;
+
+		// the failover strategy must be instantiated last, so that the execution graph
+		// is ready by the time the failover strategy sees it
+		this.failoverStrategy = checkNotNull(failoverStrategyFactory.create(this), "null failover strategy");
+		LOG.info("Job recovers via failover strategy: {}", failoverStrategy.getStrategyName());
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -541,6 +597,18 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		return failureCause;
 	}
 
+	/**
+	 * Gets the number of full restarts that the execution graph went through.
+	 * If a full restart recovery is currently pending, this recovery is included in the
+	 * count.
+	 * 
+	 * @return The number of full restarts so far
+	 */
+	public long getNumberOfFullRestarts() {
+		// subtract one, because the version starts at one
+		return globalModVersion - 1;
+	}
+
 	@Override
 	public String getFailureCauseAsString() {
 		return ExceptionUtils.stringifyException(failureCause);
@@ -689,11 +757,12 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	// --------------------------------------------------------------------------------------------
 
 	public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug(String.format("Attaching %d topologically sorted vertices to existing job graph with %d "
-					+ "vertices and %d intermediate results.", topologiallySorted.size(), tasks.size(), intermediateResults.size()));
-		}
 
+		LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} " +
+				"vertices and {} intermediate results.",
+				topologiallySorted.size(), tasks.size(), intermediateResults.size());
+
+		final ArrayList<ExecutionJobVertex> newExecJobVertices = new ArrayList<>(topologiallySorted.size());
 		final long createTimestamp = System.currentTimeMillis();
 
 		for (JobVertex jobVertex : topologiallySorted) {
@@ -704,7 +773,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 			// create the execution job vertex and attach it to the graph
 			ExecutionJobVertex ejv =
-					new ExecutionJobVertex(this, jobVertex, 1, rpcCallTimeout, createTimestamp);
+					new ExecutionJobVertex(this, jobVertex, 1, rpcCallTimeout, globalModVersion, createTimestamp);
 			ejv.connectToPredecessors(this.intermediateResults);
 
 			ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
@@ -723,7 +792,11 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 			this.verticesInCreationOrder.add(ejv);
 			this.numVerticesTotal += ejv.getParallelism();
+			newExecJobVertices.add(ejv);
 		}
+
+		terminationFuture = new FlinkCompletableFuture<>();
+		failoverStrategy.notifyNewVertices(newExecJobVertices);
 	}
 
 	public void scheduleForExecution() throws JobException {
@@ -856,7 +929,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 						// ExecutionGraph notices
 						// we need to go into recovery and make sure to release all slots
 						try {
-							fail(t);
+							failGlobal(t);
 						}
 						finally {
 							ExecutionGraphUtils.releaseAllSlotsSilently(resources);
@@ -889,6 +962,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			if (current == JobStatus.RUNNING || current == JobStatus.CREATED) {
 				if (transitionState(current, JobStatus.CANCELLING)) {
 
+					// make sure no concurrent local actions interfere with the cancellation
+					incrementGlobalModVersion();
+
 					final ArrayList<Future<?>> futures = new ArrayList<>(verticesInCreationOrder.size());
 
 					// cancel all tasks (that still need cancelling)
@@ -920,8 +996,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			else if (current == JobStatus.RESTARTING) {
 				synchronized (progressLock) {
 					if (transitionState(current, JobStatus.CANCELED)) {
-						postRunCleanup();
-						progressLock.notifyAll();
+						onTerminalState(JobStatus.CANCELED);
 
 						LOG.info("Canceled during restart.");
 						return;
@@ -951,7 +1026,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 * Suspends the current ExecutionGraph.
 	 *
 	 * The JobStatus will be directly set to SUSPENDED iff the current state is not a terminal
-	 * state. All ExecutionJobVertices will be canceled and the postRunCleanup is executed.
+	 * state. All ExecutionJobVertices will be canceled and the onTerminalState() is executed.
 	 *
 	 * The SUSPENDED state is a local terminal state which stops the execution of the job but does
 	 * not remove the job from the HA job store so that it can be recovered by another JobManager.
@@ -962,21 +1037,23 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		while (true) {
 			JobStatus currentState = state;
 
-			if (currentState.isGloballyTerminalState()) {
+			if (currentState.isTerminalState()) {
 				// stay in a terminal state
 				return;
 			} else if (transitionState(currentState, JobStatus.SUSPENDED, suspensionCause)) {
 				this.failureCause = suspensionCause;
 
+				// make sure no concurrent local actions interfere with the cancellation
+				incrementGlobalModVersion();
+
 				for (ExecutionJobVertex ejv: verticesInCreationOrder) {
 					ejv.cancel();
 				}
 
 				synchronized (progressLock) {
-						postRunCleanup();
-						progressLock.notifyAll();
+					onTerminalState(JobStatus.SUSPENDED);
 
-						LOG.info("Job {} has been suspended.", getJobID());
+					LOG.info("Job {} has been suspended.", getJobID());
 				}
 
 				return;
@@ -984,7 +1061,18 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		}
 	}
 
-	public void fail(Throwable t) {
+	/**
+	 * Fails the execution graph globally. This failure will not be recovered by a specific
+	 * failover strategy, but results in a full restart of all tasks.
+	 * 
+	 * <p>This global failure is meant to be triggered in cases where the consistency of the
+	 * execution graph' state cannot be guaranteed any more (for example when catching unexpected
+	 * exceptions that indicate a bug or an unexpected call race), and where a full restart is the
+	 * safe way to get consistency back.
+	 * 
+	 * @param t The exception that caused the failure.
+	 */
+	public void failGlobal(Throwable t) {
 		while (true) {
 			JobStatus current = state;
 			// stay in these states
@@ -1003,6 +1091,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			else if (transitionState(current, JobStatus.FAILING, t)) {
 				this.failureCause = t;
 
+				// make sure no concurrent local actions interfere with the cancellation
+				incrementGlobalModVersion();
+
 				// we build a future that is complete once all vertices have reached a terminal state
 				final ArrayList<Future<?>> futures = new ArrayList<>(verticesInCreationOrder.size());
 
@@ -1044,23 +1135,20 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 					throw new IllegalStateException("Can only restart job from state restarting.");
 				}
 
-				if (slotProvider == null) {
-					throw new IllegalStateException("The execution graph has not been scheduled before - slotProvider is null.");
-				}
-
 				this.currentExecutions.clear();
 
-				Collection<CoLocationGroup> colGroups = new HashSet<>();
+				final Collection<CoLocationGroup> colGroups = new HashSet<>();
+				final long resetTimestamp = System.currentTimeMillis();
 
 				for (ExecutionJobVertex jv : this.verticesInCreationOrder) {
 
 					CoLocationGroup cgroup = jv.getCoLocationGroup();
-					if(cgroup != null && !colGroups.contains(cgroup)){
+					if (cgroup != null && !colGroups.contains(cgroup)){
 						cgroup.resetConstraints();
 						colGroups.add(cgroup);
 					}
 
-					jv.resetForNewExecution();
+					jv.resetForNewExecution(resetTimestamp, globalModVersion);
 				}
 
 				for (int i = 0; i < stateTimestamps.length; i++) {
@@ -1083,7 +1171,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		}
 		catch (Throwable t) {
 			LOG.warn("Failed to restart the job.", t);
-			fail(t);
+			failGlobal(t);
 		}
 	}
 
@@ -1124,32 +1212,45 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		return null;
 	}
 
+	@VisibleForTesting
+	public Future<JobStatus> getTerminationFuture() {
+		return terminationFuture;
+	}
+
+	@VisibleForTesting
+	public JobStatus waitUntilTerminal() throws InterruptedException {
+		try {
+			return terminationFuture.get();
+		}
+		catch (ExecutionException e) {
+			// this should never happen
+			// it would be a bug, so we  don't expect this to be handled and throw
+			// an unchecked exception here
+			throw new RuntimeException(e);
+		}
+	}
+
 	/**
-	 * For testing: This waits until the job execution has finished.
+	 * Gets the failover strategy used by the execution graph to recover from failures of tasks.
 	 */
-	public void waitUntilFinished() throws InterruptedException {
-		// we may need multiple attempts in the presence of failures / recovery
-		while (true) {
-			for (ExecutionJobVertex ejv : verticesInCreationOrder) {
-				for (ExecutionVertex ev : ejv.getTaskVertices()) {
-					try {
-						ev.getCurrentExecutionAttempt().getTerminationFuture().get();
-					}
-					catch (ExecutionException e) {
-						// this should never happen
-						throw new RuntimeException(e);
-					}
-				}
-			}
+	public FailoverStrategy getFailoverStrategy() {
+		return this.failoverStrategy;
+	}
 
-			// now that all vertices have been (at some point) in a terminal state,
-			// we need to check if the job as a whole has entered a final state
-			if (state.isTerminalState()) {
-				return;
-			}
-		}
+	/**
+	 * Gets the current global modification version of the ExecutionGraph.
+	 * The global modification version is incremented with each global action (cancel/fail/restart)
+	 * and is used to disambiguate concurrent modifications between local and global
+	 * failover actions.
+	 */
+	long getGlobalModVersion() {
+		return globalModVersion;
 	}
 
+	// ------------------------------------------------------------------------
+	//  State Transitions
+	// ------------------------------------------------------------------------
+
 	private boolean transitionState(JobStatus current, JobStatus newState) {
 		return transitionState(current, newState, null);
 	}
@@ -1175,11 +1276,45 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		}
 	}
 
+	private long incrementGlobalModVersion() {
+		return GLOBAL_VERSION_UPDATER.incrementAndGet(this);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Job Status Progress
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Called whenever a vertex reaches state FINISHED (completed successfully).
+	 * Once all vertices are in the FINISHED state, the program is successfully done.
+	 */
 	void vertexFinished() {
-		int numFinished = verticesFinished.incrementAndGet();
+		final int numFinished = verticesFinished.incrementAndGet();
 		if (numFinished == numVerticesTotal) {
 			// done :-)
-			allVerticesInTerminalState();
+
+			// check whether we are still in "RUNNING" and trigger the final cleanup
+			if (state == JobStatus.RUNNING) {
+				// we do the final cleanup in the I/O executor, because it may involve
+				// some heavier work
+
+				try {
+					for (ExecutionJobVertex ejv : verticesInCreationOrder) {
+						ejv.getJobVertex().finalizeOnMaster(getUserClassLoader());
+					}
+				}
+				catch (Throwable t) {
+					ExceptionUtils.rethrowIfFatalError(t);
+					failGlobal(new Exception("Failed to finalize execution on master", t));
+					return;
+				}
+
+				// if we do not make this state transition, then a concurrent
+				// cancellation or failure happened
+				if (transitionState(JobStatus.RUNNING, JobStatus.FINISHED)) {
+					onTerminalState(JobStatus.FINISHED);
+				}
+			}
 		}
 	}
 
@@ -1187,6 +1322,10 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		verticesFinished.getAndDecrement();
 	}
 
+	/**
+	 * This method is a callback during cancellation/failover and called when all tasks
+	 * have reached a terminal state (cancelled/failed/finished).
+	 */
 	private void allVerticesInTerminalState() {
 		// we are done, transition to the final state
 		JobStatus current;
@@ -1194,14 +1333,11 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			current = this.state;
 
 			if (current == JobStatus.RUNNING) {
-				if (transitionState(current, JobStatus.FINISHED)) {
-					postRunCleanup();
-					break;
-				}
+				failGlobal(new Exception("ExecutionGraph went into allVerticesInTerminalState() from RUNNING"));
 			}
 			else if (current == JobStatus.CANCELLING) {
 				if (transitionState(current, JobStatus.CANCELED)) {
-					postRunCleanup();
+					onTerminalState(JobStatus.CANCELED);
 					break;
 				}
 			}
@@ -1221,7 +1357,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 				break;
 			}
 			else {
-				fail(new Exception("ExecutionGraph went into final state from state " + current));
+				failGlobal(new Exception("ExecutionGraph went into final state from state " + current));
 				break;
 			}
 		}
@@ -1255,18 +1391,16 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 					restartStrategy.restart(this);
 
 					return true;
-				} else if (!isRestartable && transitionState(currentState, JobStatus.FAILED, failureCause)) {
-					final List<String> reasonsForNoRestart = new ArrayList<>(2);
-					if (!isFailureCauseAllowingRestart) {
-						reasonsForNoRestart.add("a type of SuppressRestartsException was thrown");
-					}
-					if (!isRestartStrategyAllowingRestart) {
-						reasonsForNoRestart.add("the restart strategy prevented it");
-					}
+				}
+				else if (!isRestartable && transitionState(currentState, JobStatus.FAILED, failureCause)) {
+					final String cause1 = isFailureCauseAllowingRestart ? null :  
+							"a type of SuppressRestartsException was thrown";
+					final String cause2 = isRestartStrategyAllowingRestart ? null :
+						"the restart strategy prevented it";
 
 					LOG.info("Could not restart the job {} ({}) because {}.", getJobName(), getJobID(),
-						StringUtils.join(reasonsForNoRestart, " and "), failureCause);
-					postRunCleanup();
+						StringUtils.concatenateWithAnd(cause1, cause2), failureCause);
+					onTerminalState(JobStatus.FAILED);
 
 					return true;
 				} else {
@@ -1280,16 +1414,20 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		}
 	}
 
-	private void postRunCleanup() {
+	private void onTerminalState(JobStatus status) {
 		try {
 			CheckpointCoordinator coord = this.checkpointCoordinator;
 			this.checkpointCoordinator = null;
 			if (coord != null) {
-				coord.shutdown(state);
+				coord.shutdown(status);
 			}
-		} catch (Exception e) {
+		}
+		catch (Exception e) {
 			LOG.error("Error while cleaning up after execution", e);
 		}
+		finally {
+			terminationFuture.complete(status);
+		}
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -1343,7 +1481,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 				ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
 
 				// failures during updates leave the ExecutionGraph inconsistent
-				fail(t);
+				failGlobal(t);
 				return false;
 			}
 		}
@@ -1405,7 +1543,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	void registerExecution(Execution exec) {
 		Execution previous = currentExecutions.putIfAbsent(exec.getAttemptId(), exec);
 		if (previous != null) {
-			fail(new Exception("Trying to register execution " + exec + " for already used ID " + exec.getAttemptId()));
+			failGlobal(new Exception("Trying to register execution " + exec + " for already used ID " + exec.getAttemptId()));
 		}
 	}
 
@@ -1413,7 +1551,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		Execution contained = currentExecutions.remove(exec.getAttemptId());
 
 		if (contained != null && contained != exec) {
-			fail(new Exception("De-registering execution " + exec + " failed. Found for same ID execution " + contained));
+			failGlobal(new Exception("De-registering execution " + exec + " failed. Found for same ID execution " + contained));
 		}
 	}
 
@@ -1471,21 +1609,21 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	}
 
 	void notifyExecutionChange(
-			JobVertexID vertexId, int subtask, ExecutionAttemptID executionID,
-			ExecutionState newExecutionState, Throwable error)
-	{
-		ExecutionJobVertex vertex = getJobVertex(vertexId);
+			final Execution execution,
+			final ExecutionState newExecutionState,
+			final Throwable error) {
 
 		if (executionListeners.size() > 0) {
+			final ExecutionJobVertex vertex = execution.getVertex().getJobVertex();
 			final String message = error == null ? null : ExceptionUtils.stringifyException(error);
 			final long timestamp = System.currentTimeMillis();
 
 			for (ExecutionStatusListener listener : executionListeners) {
 				try {
 					listener.executionStatusChanged(
-							getJobID(), vertexId, vertex.getJobVertex().getName(),
-							vertex.getParallelism(), subtask, executionID, newExecutionState,
-							timestamp, message);
+							getJobID(), vertex.getJobVertexId(), vertex.getJobVertex().getName(),
+							vertex.getParallelism(), execution.getParallelSubtaskIndex(),
+							execution.getAttemptId(), newExecutionState, timestamp, message);
 				} catch (Throwable t) {
 					LOG.warn("Error while notifying ExecutionStatusListener", t);
 				}
@@ -1494,7 +1632,20 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 		// see what this means for us. currently, the first FAILED state means -> FAILED
 		if (newExecutionState == ExecutionState.FAILED) {
-			fail(error);
+			final Throwable ex = error != null ? error : new FlinkException("Unknown Error (missing cause)");
+
+			// by filtering out late failure calls, we can save some work in
+			// avoiding redundant local failover
+			if (execution.getGlobalModVersion() == globalModVersion) {
+				try {
+					failoverStrategy.onTaskFailure(execution, ex);
+				}
+				catch (Throwable t) {
+					// bug in the failover strategy - fall back to global failover
+					LOG.warn("Error in failover strategy - falling back to global restart", t);
+					failGlobal(ex);
+				}
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index b40817f..88863e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -34,7 +34,10 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader;
 import org.apache.flink.runtime.executiongraph.metrics.DownTimeGauge;
+import org.apache.flink.runtime.executiongraph.metrics.NumberOfFullRestartsGauge;
 import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
 import org.apache.flink.runtime.executiongraph.metrics.UpTimeGauge;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
@@ -90,6 +93,9 @@ public class ExecutionGraphBuilder {
 		final String jobName = jobGraph.getName();
 		final JobID jobId = jobGraph.getJobID();
 
+		final FailoverStrategy.Factory failoverStrategy = 
+				FailoverStrategyLoader.loadFailoverStrategy(jobManagerConfig, log);
+
 		// create a new execution graph, if none exists so far
 		final ExecutionGraph executionGraph = (prior != null) ? prior :
 				new ExecutionGraph(
@@ -101,6 +107,7 @@ public class ExecutionGraphBuilder {
 						jobGraph.getSerializedExecutionConfig(),
 						timeout,
 						restartStrategy,
+						failoverStrategy,
 						jobGraph.getUserJarBlobKeys(),
 						jobGraph.getClasspaths(),
 						slotProvider,
@@ -269,6 +276,9 @@ public class ExecutionGraphBuilder {
 		metrics.gauge(RestartTimeGauge.METRIC_NAME, new RestartTimeGauge(executionGraph));
 		metrics.gauge(DownTimeGauge.METRIC_NAME, new DownTimeGauge(executionGraph));
 		metrics.gauge(UpTimeGauge.METRIC_NAME, new UpTimeGauge(executionGraph));
+		metrics.gauge(NumberOfFullRestartsGauge.METRIC_NAME, new NumberOfFullRestartsGauge(executionGraph));
+
+		executionGraph.getFailoverStrategy().registerMetrics(metrics);
 
 		return executionGraph;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 3197e65..3a98e0a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
@@ -28,7 +29,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.core.io.InputSplitSource;
-import org.apache.flink.core.io.LocatableInputSplit;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.concurrent.Future;
@@ -58,6 +58,13 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+/**
+ * An {@code ExecutionJobVertex} is part of the {@link ExecutionGraph}, and the peer 
+ * to the {@link JobVertex}.
+ * 
+ * <p>The {@code ExecutionJobVertex} corresponds to a parallelized operation. It
+ * contains an {@link ExecutionVertex} for each parallel instance of that operation.
+ */
 public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable<ArchivedExecutionJobVertex> {
 
 	/** Use the same log for all ExecutionGraph classes */
@@ -115,21 +122,26 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	private SerializedValue<TaskInformation> serializedTaskInformation;
 
 	private InputSplitAssigner splitAssigner;
-	
-	public ExecutionJobVertex(
+
+	/**
+	 * Convenience constructor for testing.
+	 */
+	@VisibleForTesting
+	ExecutionJobVertex(
 		ExecutionGraph graph,
 		JobVertex jobVertex,
 		int defaultParallelism,
 		Time timeout) throws JobException {
 
-		this(graph, jobVertex, defaultParallelism, timeout, System.currentTimeMillis());
+		this(graph, jobVertex, defaultParallelism, timeout, 1L, System.currentTimeMillis());
 	}
-	
+
 	public ExecutionJobVertex(
 		ExecutionGraph graph,
 		JobVertex jobVertex,
 		int defaultParallelism,
 		Time timeout,
+		long initialGlobalModVersion,
 		long createTimestamp) throws JobException {
 
 		if (graph == null || jobVertex == null) {
@@ -190,18 +202,24 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		// create all task vertices
 		for (int i = 0; i < numTaskVertices; i++) {
 			ExecutionVertex vertex = new ExecutionVertex(
-					this, i, this.producedDataSets, timeout, createTimestamp, maxPriorAttemptsHistoryLength);
+					this,
+					i,
+					producedDataSets,
+					timeout,
+					initialGlobalModVersion,
+					createTimestamp,
+					maxPriorAttemptsHistoryLength);
 
 			this.taskVertices[i] = vertex;
 		}
-		
+
 		// sanity check for the double referencing between intermediate result partitions and execution vertices
 		for (IntermediateResult ir : this.producedDataSets) {
 			if (ir.getNumberOfAssignedPartitions() != parallelism) {
 				throw new RuntimeException("The intermediate result's partitions were not correctly assigned.");
 			}
 		}
-		
+
 		// set up the input splits, if the vertex has any
 		try {
 			@SuppressWarnings("unchecked")
@@ -508,7 +526,8 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		}
 	}
 
-	public void resetForNewExecution() {
+	public void resetForNewExecution(final long timestamp, final long expectedGlobalModVersion)
+			throws GlobalModVersionMismatch {
 
 		synchronized (stateMonitor) {
 			// check and reset the sharing groups with scheduler hints
@@ -517,7 +536,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 			}
 
 			for (int i = 0; i < parallelism; i++) {
-				taskVertices[i].resetForNewExecution();
+				taskVertices[i].resetForNewExecution(timestamp, expectedGlobalModVersion);
 			}
 
 			// set up the input splits again
@@ -558,112 +577,36 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	}
 
 	// --------------------------------------------------------------------------------------------
-	//  Static / pre-assigned input splits
+	//  Archiving
 	// --------------------------------------------------------------------------------------------
 
-	private List<LocatableInputSplit>[] computeLocalInputSplitsPerTask(InputSplit[] splits) throws JobException {
-		
-		final int numSubTasks = getParallelism();
-		
-		// sanity check
-		if (numSubTasks > splits.length) {
-			throw new JobException("Strictly local assignment requires at least as many splits as subtasks.");
-		}
-		
-		// group the splits by host while preserving order per host
-		Map<String, List<LocatableInputSplit>> splitsByHost = new HashMap<String, List<LocatableInputSplit>>();
-		
-		for (InputSplit split : splits) {
-			// check that split has exactly one local host
-			if(!(split instanceof LocatableInputSplit)) {
-				throw new JobException("Invalid InputSplit type " + split.getClass().getCanonicalName() + ". " +
-						"Strictly local assignment requires LocatableInputSplit");
-			}
-			LocatableInputSplit lis = (LocatableInputSplit) split;
-
-			if (lis.getHostnames() == null) {
-				throw new JobException("LocatableInputSplit has no host information. " +
-						"Strictly local assignment requires exactly one hostname for each LocatableInputSplit.");
-			}
-			else if (lis.getHostnames().length != 1) {
-				throw new JobException("Strictly local assignment requires exactly one hostname for each LocatableInputSplit.");
-			}
-			String hostName = lis.getHostnames()[0];
-			
-			if (hostName == null) {
-				throw new JobException("For strictly local input split assignment, no null host names are allowed.");
-			}
-
-			List<LocatableInputSplit> hostSplits = splitsByHost.get(hostName);
-			if (hostSplits == null) {
-				hostSplits = new ArrayList<LocatableInputSplit>();
-				splitsByHost.put(hostName, hostSplits);
-			}
-			hostSplits.add(lis);
-		}
-		
-		
-		int numHosts = splitsByHost.size();
-		
-		if (numSubTasks < numHosts) {
-			throw new JobException("Strictly local split assignment requires at least as " +
-					"many parallel subtasks as distinct split hosts. Please increase the parallelism " +
-					"of DataSource "+this.getJobVertex().getName()+" to at least "+numHosts+".");
-		}
-
-		// get list of hosts in deterministic order
-		List<String> hosts = new ArrayList<String>(splitsByHost.keySet());
-		Collections.sort(hosts);
-		
-		@SuppressWarnings("unchecked")
-		List<LocatableInputSplit>[] subTaskSplitAssignment = (List<LocatableInputSplit>[]) new List<?>[numSubTasks];
-		
-		final int subtasksPerHost = numSubTasks / numHosts;
-		final int hostsWithOneMore = numSubTasks % numHosts;
-		
-		int subtaskNum = 0;
-		
-		// we go over all hosts and distribute the hosts' input splits
-		// over the subtasks
-		for (int hostNum = 0; hostNum < numHosts; hostNum++) {
-			String host = hosts.get(hostNum);
-			List<LocatableInputSplit> splitsOnHost = splitsByHost.get(host);
-			
-			int numSplitsOnHost = splitsOnHost.size();
-			
-			// the number of subtasks to split this over.
-			// NOTE: if the host has few splits, some subtasks will not get anything.
-			int subtasks = Math.min(numSplitsOnHost, 
-							hostNum < hostsWithOneMore ? subtasksPerHost + 1 : subtasksPerHost);
-			
-			int splitsPerSubtask = numSplitsOnHost / subtasks;
-			int subtasksWithOneMore = numSplitsOnHost % subtasks;
-			
-			int splitnum = 0;
-			
-			// go over the subtasks and grab a subrange of the input splits
-			for (int i = 0; i < subtasks; i++) {
-				int numSplitsForSubtask = (i < subtasksWithOneMore ? splitsPerSubtask + 1 : splitsPerSubtask);
-				
-				List<LocatableInputSplit> splitList;
-				
-				if (numSplitsForSubtask == numSplitsOnHost) {
-					splitList = splitsOnHost;
-				}
-				else {
-					splitList = new ArrayList<LocatableInputSplit>(numSplitsForSubtask);
-					for (int k = 0; k < numSplitsForSubtask; k++) {
-						splitList.add(splitsOnHost.get(splitnum++));
-					}
-				}
-				
-				subTaskSplitAssignment[subtaskNum++] = splitList;
-			}
-		}
-		
-		return subTaskSplitAssignment;
+	@Override
+	public ArchivedExecutionJobVertex archive() {
+		return new ArchivedExecutionJobVertex(this);
 	}
 
+	// ------------------------------------------------------------------------
+	//  Static Utilities
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A utility function that computes an "aggregated" state for the vertex.
+	 * 
+	 * <p>This state is not used anywhere in the  coordination, but can be used for display
+	 * in dashboards to as a summary for how the particular parallel operation represented by
+	 * this ExecutionJobVertex is currently behaving.
+	 * 
+	 * <p>For example, if at least one parallel task is failed, the aggregate state is failed.
+	 * If not, and at least one parallel task is cancelling (or cancelled), the aggregate state
+	 * is cancelling (or cancelled). If all tasks are finished, the aggregate state is finished,
+	 * and so on.
+	 * 
+	 * @param verticesPerState The number of vertices in each state (indexed by the ordinal of
+	 *                         the ExecutionState values).
+	 * @param parallelism The parallelism of the ExecutionJobVertex
+	 * 
+	 * @return The aggregate state of this ExecutionJobVertex. 
+	 */
 	public static ExecutionState getAggregateJobVertexState(int[] verticesPerState, int parallelism) {
 		if (verticesPerState == null || verticesPerState.length != ExecutionState.values().length) {
 			throw new IllegalArgumentException("Must provide an array as large as there are execution states.");
@@ -739,9 +682,4 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 
 		return expanded;
 	}
-
-	@Override
-	public ArchivedExecutionJobVertex archive() {
-		return new ArchivedExecutionJobVertex(this);
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index bcf7a7c..e8c1984 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
@@ -97,7 +98,11 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 
 	// --------------------------------------------------------------------------------------------
 
-	public ExecutionVertex(
+	/**
+	 * Convenience constructor for tests. Sets various fields to default values.
+	 */
+	@VisibleForTesting
+	ExecutionVertex(
 			ExecutionJobVertex jobVertex,
 			int subTaskIndex,
 			IntermediateResult[] producedDataSets,
@@ -108,24 +113,28 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 				subTaskIndex,
 				producedDataSets,
 				timeout,
+				1L,
 				System.currentTimeMillis(),
 				JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue());
 	}
 
+	/**
+	 * 
+	 * @param timeout
+	 *            The RPC timeout to use for deploy / cancel calls
+	 * @param initialGlobalModVersion
+	 *            The global modification version to initialize the first Execution with.
+	 * @param createTimestamp
+	 *            The timestamp for the vertex creation, used to initialize the first Execution with.
+	 * @param maxPriorExecutionHistoryLength
+	 *            The number of prior Executions (= execution attempts) to keep.
+	 */
 	public ExecutionVertex(
 			ExecutionJobVertex jobVertex,
 			int subTaskIndex,
 			IntermediateResult[] producedDataSets,
 			Time timeout,
-			int maxPriorExecutionHistoryLength) {
-		this(jobVertex, subTaskIndex, producedDataSets, timeout, System.currentTimeMillis(), maxPriorExecutionHistoryLength);
-	}
-
-	public ExecutionVertex(
-			ExecutionJobVertex jobVertex,
-			int subTaskIndex,
-			IntermediateResult[] producedDataSets,
-			Time timeout,
+			long initialGlobalModVersion,
 			long createTimestamp,
 			int maxPriorExecutionHistoryLength) {
 
@@ -151,6 +160,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 			getExecutionGraph().getFutureExecutor(),
 			this,
 			0,
+			initialGlobalModVersion,
 			createTimestamp,
 			timeout);
 
@@ -163,6 +173,8 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 			this.locationConstraint = null;
 		}
 
+		getExecutionGraph().registerExecution(currentExecution);
+
 		this.timeout = timeout;
 	}
 
@@ -508,11 +520,40 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 	//   Actions
 	// --------------------------------------------------------------------------------------------
 
-	public Execution resetForNewExecution() {
-
+	/**
+	 * Archives the current Execution and creates a new Execution for this vertex.
+	 * 
+	 * <p>This method atomically checks if the ExecutionGraph is still of an expected
+	 * global mod. version and replaces the execution if that is the case. If the ExecutionGraph
+	 * has increased its global mod. version in the meantime, this operation fails.
+	 * 
+	 * <p>This mechanism can be used to prevent conflicts between various concurrent recovery and
+	 * reconfiguration actions in a similar way as "optimistic concurrency control".
+	 * 
+	 * @param timestamp
+	 *             The creation timestamp for the new Execution
+	 * @param originatingGlobalModVersion
+	 *             The 
+	 * 
+	 * @return Returns the new created Execution. 
+	 * 
+	 * @throws GlobalModVersionMismatch Thrown, if the execution graph has a new global mod
+	 *                                  version than the one passed to this message.
+	 */
+	public Execution resetForNewExecution(final long timestamp, final long originatingGlobalModVersion)
+			throws GlobalModVersionMismatch
+	{
 		LOG.debug("Resetting execution vertex {} for new execution.", getTaskNameWithSubtaskIndex());
 
 		synchronized (priorExecutions) {
+			// check if another global modification has been triggered since the
+			// action that originally caused this reset/restart happened
+			final long actualModVersion = getExecutionGraph().getGlobalModVersion();
+			if (actualModVersion > originatingGlobalModVersion) {
+				// global change happened since, reject this action
+				throw new GlobalModVersionMismatch(originatingGlobalModVersion, actualModVersion);
+			}
+
 			final Execution oldExecution = currentExecution;
 			final ExecutionState oldState = oldExecution.getState();
 
@@ -522,8 +563,9 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 				final Execution newExecution = new Execution(
 					getExecutionGraph().getFutureExecutor(),
 					this,
-						oldExecution.getAttemptNumber()+1,
-					System.currentTimeMillis(),
+					oldExecution.getAttemptNumber() + 1,
+					originatingGlobalModVersion,
+					timestamp,
 					timeout);
 
 				this.currentExecution = newExecution;
@@ -533,6 +575,9 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 					this.locationConstraint = grp.getLocationConstraint(subTaskIndex);
 				}
 
+				// register this execution at the execution graph, to receive call backs
+				getExecutionGraph().registerExecution(newExecution);
+
 				// if the execution was 'FINISHED' before, tell the ExecutionGraph that
 				// we take one step back on the road to reaching global FINISHED
 				if (oldState == FINISHED) {
@@ -640,9 +685,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 	// --------------------------------------------------------------------------------------------
 
 	void executionFinished(Execution execution) {
-		if (execution == currentExecution) {
-			getExecutionGraph().vertexFinished();
-		}
+		getExecutionGraph().vertexFinished();
 	}
 
 	void executionCanceled(Execution execution) {
@@ -658,10 +701,14 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * Simply forward this notification. This is for logs and event archivers.
+	 * Simply forward this notification
 	 */
-	void notifyStateTransition(ExecutionAttemptID executionId, ExecutionState newState, Throwable error) {
-		getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, executionId, newState, error);
+	void notifyStateTransition(Execution execution, ExecutionState newState, Throwable error) {
+		// only forward this notification if the execution is still the current execution
+		// otherwise we have an outdated execution
+		if (currentExecution == execution) {
+			getExecutionGraph().notifyExecutionChange(execution, newState, error);
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/GlobalModVersionMismatch.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/GlobalModVersionMismatch.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/GlobalModVersionMismatch.java
new file mode 100644
index 0000000..bc96805
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/GlobalModVersionMismatch.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+/**
+ * An exception that indicates a mismatch between the expected global modification version
+ * of the execution graph, and the actual modification version.
+ */
+public class GlobalModVersionMismatch extends Exception {
+
+	private static final long serialVersionUID = 6643688395797045098L;
+
+	private final long expectedModVersion;
+
+	private final long actualModVersion;
+
+	public GlobalModVersionMismatch(long expectedModVersion, long actualModVersion) {
+		super("expected=" + expectedModVersion + ", actual=" + actualModVersion);
+		this.expectedModVersion = expectedModVersion;
+		this.actualModVersion = actualModVersion;
+	}
+
+	public long expectedModVersion() {
+		return expectedModVersion;
+	}
+
+	public long actualModVersion() {
+		return actualModVersion;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategy.java
new file mode 100644
index 0000000..2c4313f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategy.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+
+import java.util.List;
+
+/**
+ * A {@code FailoverStrategy} describes how the job computation recovers from task
+ * failures.
+ * 
+ * <p>Failover strategies implement recovery logic for failures of tasks. The execution
+ * graph still implements "global failure / recovery" (which restarts all tasks) as
+ * a fallback plan or safety net in cases where it deems that the state of the graph
+ * may have become inconsistent.
+ */
+public abstract class FailoverStrategy {
+
+
+	// ------------------------------------------------------------------------
+	//  failover implementation
+	// ------------------------------------------------------------------------ 
+
+	/**
+	 * Called by the execution graph when a task failure occurs.
+	 * 
+	 * @param taskExecution The execution attempt of the failed task. 
+	 * @param cause The exception that caused the task failure.
+	 */
+	public abstract void onTaskFailure(Execution taskExecution, Throwable cause);
+
+	/**
+	 * Called whenever new vertices are added to the ExecutionGraph.
+	 * 
+	 * @param newJobVerticesTopological The newly added vertices, in topological order.
+	 */
+	public abstract void notifyNewVertices(List<ExecutionJobVertex> newJobVerticesTopological);
+
+	/**
+	 * Gets the name of the failover strategy, for logging purposes.
+	 */
+	public abstract String getStrategyName();
+
+	/**
+	 * Tells the FailoverStrategy to register its metrics.
+	 * 
+	 * <p>The default implementation does nothing
+	 * 
+	 * @param metricGroup The metric group to register the metrics at
+	 */
+	public void registerMetrics(MetricGroup metricGroup) {}
+
+	// ------------------------------------------------------------------------
+	//  factory
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This factory is a necessary indirection when creating the FailoverStrategy to that
+	 * we can have both the FailoverStrategy final in the ExecutionGraph, and the
+	 * ExecutionGraph final in the FailOverStrategy.
+	 */
+	public interface Factory {
+
+		/**
+		 * Instantiates the {@code FailoverStrategy}.
+		 * 
+		 * @param executionGraph The execution graph for which the strategy implements failover.
+		 * @return The instantiated failover strategy.
+		 */
+		FailoverStrategy create(ExecutionGraph executionGraph);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
new file mode 100644
index 0000000..f18a90f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+/**
+ * A utility class to load failover strategies from the configuration. 
+ */
+public class FailoverStrategyLoader {
+
+	/** Config name for the {@link RestartAllStrategy} */
+	public static final String FULL_RESTART_STRATEGY_NAME = "full";
+
+	/** Config name for the strategy that restarts individual tasks */
+	public static final String INDIVIDUAL_RESTART_STRATEGY_NAME = "individual";
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Loads a FailoverStrategy Factory from the given configuration.
+	 */
+	public static FailoverStrategy.Factory loadFailoverStrategy(Configuration config, @Nullable Logger logger) {
+		final String strategyParam = config.getString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY);
+
+		if (StringUtils.isNullOrWhitespaceOnly(strategyParam)) {
+			if (logger != null) {
+				logger.warn("Null config value for {} ; using default failover strategy (full restarts).",
+						JobManagerOptions.EXECUTION_FAILOVER_STRATEGY.key());
+			}
+
+			return new RestartAllStrategy.Factory();
+		}
+		else {
+			switch (strategyParam.toLowerCase()) {
+				case FULL_RESTART_STRATEGY_NAME:
+					return new RestartAllStrategy.Factory();
+
+				case INDIVIDUAL_RESTART_STRATEGY_NAME:
+					return new RestartIndividualStrategy.Factory();
+
+				default:
+					// we could interpret the parameter as a factory class name and instantiate that
+					// for now we simply do not support this
+					throw new IllegalConfigurationException("Unknown failover strategy: " + strategyParam);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartAllStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartAllStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartAllStrategy.java
new file mode 100644
index 0000000..21f42b9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartAllStrategy.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Simple failover strategy that triggers a restart of all tasks in the
+ * execution graph, via {@link ExecutionGraph#failGlobal(Throwable)}.
+ */
+public class RestartAllStrategy extends FailoverStrategy {
+
+	/** The execution graph to recover */
+	private final ExecutionGraph executionGraph;
+
+	/**
+	 * Creates a new failover strategy that recovers from failures by restarting all tasks
+	 * of the execution graph.
+	 * 
+	 * @param executionGraph The execution graph to handle.
+	 */
+	public RestartAllStrategy(ExecutionGraph executionGraph) {
+		this.executionGraph = checkNotNull(executionGraph);
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void onTaskFailure(Execution taskExecution, Throwable cause) {
+		// this strategy makes every task failure a global failure
+		executionGraph.failGlobal(cause);
+	}
+
+	@Override
+	public void notifyNewVertices(List<ExecutionJobVertex> newJobVerticesTopological) {
+		// nothing to do
+	}
+
+	@Override
+	public String getStrategyName() {
+		return "full graph restart";
+	}
+
+	// ------------------------------------------------------------------------
+	//  factory
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Factory that instantiates the RestartAllStrategy.
+	 */
+	public static class Factory implements FailoverStrategy.Factory {
+
+		@Override
+		public FailoverStrategy create(ExecutionGraph executionGraph) {
+			return new RestartAllStrategy(executionGraph);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java
new file mode 100644
index 0000000..0a449b8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Simple failover strategy that restarts each task individually.
+ * This strategy is only applicable if the entire job consists unconnected
+ * tasks, meaning each task is its own component.
+ */
+public class RestartIndividualStrategy extends FailoverStrategy {
+
+	private static final Logger LOG = LoggerFactory.getLogger(RestartIndividualStrategy.class);
+
+	// ------------------------------------------------------------------------
+
+	/** The execution graph to recover */
+	private final ExecutionGraph executionGraph;
+
+	/** The executor that executes restart callbacks */
+	private final Executor callbackExecutor;
+
+	private final SimpleCounter numTaskFailures;
+
+	/**
+	 * Creates a new failover strategy that recovers from failures by restarting all tasks
+	 * of the execution graph.
+	 * 
+	 * <p>The strategy will use the ExecutionGraph's future executor for callbacks.
+	 * 
+	 * @param executionGraph The execution graph to handle.
+	 */
+	public RestartIndividualStrategy(ExecutionGraph executionGraph) {
+		this(executionGraph, executionGraph.getFutureExecutor());
+	}
+
+	/**
+	 * Creates a new failover strategy that recovers from failures by restarting all tasks
+	 * of the execution graph.
+	 *
+	 * @param executionGraph The execution graph to handle.
+	 * @param callbackExecutor The executor that executes restart callbacks
+	 */
+	public RestartIndividualStrategy(ExecutionGraph executionGraph, Executor callbackExecutor) {
+		this.executionGraph = checkNotNull(executionGraph);
+		this.callbackExecutor = checkNotNull(callbackExecutor);
+
+		this.numTaskFailures = new SimpleCounter();
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void onTaskFailure(Execution taskExecution, Throwable cause) {
+
+		// to better handle the lack of resources (potentially by a scale-in), we
+		// make failures due to missing resources global failures 
+		if (cause instanceof NoResourceAvailableException) {
+			LOG.info("Not enough resources to schedule {} - triggering full recovery.", taskExecution);
+			executionGraph.failGlobal(cause);
+			return;
+		}
+
+		LOG.info("Recovering task failure for {} (#{}) via individual restart.", 
+				taskExecution.getVertex().getTaskNameWithSubtaskIndex(), taskExecution.getAttemptNumber());
+
+		numTaskFailures.inc();
+
+		// trigger the restart once the task has reached its terminal state
+		// Note: currently all tasks passed here are already in their terminal state,
+		//       so we could actually avoid the future. We use it anyways because it is cheap and
+		//       it helps to support better testing
+		final Future<ExecutionState> terminationFuture = taskExecution.getTerminationFuture();
+
+		final ExecutionVertex vertexToRecover = taskExecution.getVertex(); 
+		final long globalModVersion = taskExecution.getGlobalModVersion();
+
+		terminationFuture.thenAcceptAsync(new AcceptFunction<ExecutionState>() {
+			@Override
+			public void accept(ExecutionState value) {
+				try {
+					long createTimestamp = System.currentTimeMillis();
+					Execution newExecution = vertexToRecover.resetForNewExecution(createTimestamp, globalModVersion);
+					newExecution.scheduleForExecution();
+				}
+				catch (GlobalModVersionMismatch e) {
+					// this happens if a concurrent global recovery happens. simply do nothing.
+				}
+				catch (Exception e) {
+					executionGraph.failGlobal(
+							new Exception("Error during fine grained recovery - triggering full recovery", e));
+				}
+			}
+		}, callbackExecutor);
+	}
+
+	@Override
+	public void notifyNewVertices(List<ExecutionJobVertex> newJobVerticesTopological) {
+		// we validate here that the vertices are in fact not connected to
+		// any other vertices
+		for (ExecutionJobVertex ejv : newJobVerticesTopological) {
+			List<IntermediateResult> inputs = ejv.getInputs();
+			IntermediateResult[] outputs = ejv.getProducedDataSets();
+
+			if ((inputs != null && inputs.size() > 0) || (outputs != null && outputs.length > 0)) {
+				throw new FlinkRuntimeException("Incompatible failover strategy - strategy '" + 
+						getStrategyName() + "' can only handle jobs with only disconnected tasks.");
+			}
+		}
+	}
+
+	@Override
+	public String getStrategyName() {
+		return "Individual Task Restart";
+	}
+
+	@Override
+	public void registerMetrics(MetricGroup metricGroup) {
+		metricGroup.counter("task_failures", numTaskFailures);
+	}
+
+	// ------------------------------------------------------------------------
+	//  factory
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Factory that instantiates the RestartAllStrategy.
+	 */
+	public static class Factory implements FailoverStrategy.Factory {
+
+		@Override
+		public RestartIndividualStrategy create(ExecutionGraph executionGraph) {
+			return new RestartIndividualStrategy(executionGraph);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/NumberOfFullRestartsGauge.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/NumberOfFullRestartsGauge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/NumberOfFullRestartsGauge.java
new file mode 100644
index 0000000..05a6414
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/NumberOfFullRestartsGauge.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.metrics;
+
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Gauge which returns the number of full restarts.
+ */
+public class NumberOfFullRestartsGauge implements Gauge<Long> {
+
+	public static final String METRIC_NAME = "fullRestarts";
+
+	// ------------------------------------------------------------------------
+
+	private final ExecutionGraph eg;
+
+	public NumberOfFullRestartsGauge(ExecutionGraph executionGraph) {
+		this.eg = checkNotNull(executionGraph);
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public Long getValue() {
+		return eg.getNumberOfFullRestarts();
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 99dbc86..e60ff77 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -376,7 +376,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 					executionGraph.scheduleForExecution();
 				}
 				catch (Throwable t) {
-					executionGraph.fail(t);
+					executionGraph.failGlobal(t);
 				}
 			}
 		});


[4/5] flink git commit: [FLINK-6340] [flip-1] Add a termination future to the Execution

Posted by se...@apache.org.
[FLINK-6340] [flip-1] Add a termination future to the Execution


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e0061272
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e0061272
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e0061272

Branch: refs/heads/master
Commit: e00612726991a05058168e5a4fbfb53853e645a5
Parents: aadfe45
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Mar 29 22:49:54 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed May 3 19:17:23 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java |  26 ++-
 .../runtime/executiongraph/ExecutionGraph.java  | 182 ++++++++++------
 .../executiongraph/ExecutionJobVertex.java      | 116 +++-------
 .../runtime/executiongraph/ExecutionVertex.java |  56 +++--
 .../ExecutionGraphRestartTest.java              |  78 ++++---
 .../executiongraph/ExecutionGraphTestUtils.java |   8 +-
 .../ExecutionStateProgressTest.java             |  94 --------
 .../TerminalStateDeadlockTest.java              | 216 -------------------
 8 files changed, 250 insertions(+), 526 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 729e161..2680849 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -121,6 +121,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 	private final ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor> partialInputChannelDeploymentDescriptors;
 
+	/** A future that completes once the Execution reaches a terminal ExecutionState */
+	private final FlinkCompletableFuture<ExecutionState> terminationFuture;
+
 	private volatile ExecutionState state = CREATED;
 
 	private volatile SimpleSlot assignedResource;     // once assigned, never changes until the execution is archived
@@ -161,6 +164,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		markTimestamp(ExecutionState.CREATED, startTimestamp);
 
 		this.partialInputChannelDeploymentDescriptors = new ConcurrentLinkedQueue<>();
+		this.terminationFuture = new FlinkCompletableFuture<>();
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -234,6 +238,16 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		this.taskState = checkpointStateHandles;
 	}
 
+	/**
+	 * Gets a future that completes once the task execution reaches a terminal state.
+	 * The future will be completed with specific state that the execution reached.
+	 *
+	 * @return A future for the execution's termination
+	 */
+	public Future<ExecutionState> getTerminationFuture() {
+		return terminationFuture;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Actions
 	// --------------------------------------------------------------------------------------------
@@ -473,7 +487,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 						}
 					}
 					finally {
-						vertex.executionCanceled();
+						vertex.executionCanceled(this);
+						terminationFuture.complete(CANCELED);
 					}
 					return;
 				}
@@ -741,7 +756,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 						vertex.getExecutionGraph().deregisterExecution(this);
 					}
 					finally {
-						vertex.executionFinished();
+						vertex.executionFinished(this);
+						terminationFuture.complete(FINISHED);
 					}
 					return;
 				}
@@ -793,7 +809,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 						vertex.getExecutionGraph().deregisterExecution(this);
 					}
 					finally {
-						vertex.executionCanceled();
+						vertex.executionCanceled(this);
+						terminationFuture.complete(CANCELED);
 					}
 					return;
 				}
@@ -886,7 +903,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 					vertex.getExecutionGraph().deregisterExecution(this);
 				}
 				finally {
-					vertex.executionFailed(t);
+					vertex.executionFailed(this, t);
+					terminationFuture.complete(FAILED);
 				}
 
 				if (!isCallback && (current == RUNNING || current == DEPLOYING)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 23ed99d..fff1ea2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -89,6 +90,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -188,6 +190,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	/** Registered KvState instances reported by the TaskManagers. */
 	private final KvStateLocationRegistry kvStateLocationRegistry;
 
+	private int numVerticesTotal;
+
 	// ------ Configuration of the Execution -------
 
 	/** Flag to indicate whether the scheduler may queue tasks for execution, or needs to be able
@@ -203,6 +207,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 	// ------ Execution status and progress. These values are volatile, and accessed under the lock -------
 
+	private final AtomicInteger verticesFinished;
+
 	/** Current status of the job execution */
 	private volatile JobStatus state = JobStatus.CREATED;
 
@@ -210,9 +216,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 * that was not recoverable and triggered job failure */
 	private volatile Throwable failureCause;
 
-	/** The number of job vertices that have reached a terminal state */
-	private volatile int numFinishedJobVertices;
-
 	// ------ Fields that are relevant to the execution and need to be cleared before archiving  -------
 
 	/** The coordinator for checkpoints, if snapshot checkpoints are enabled */
@@ -317,6 +320,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 		this.restartStrategy = restartStrategy;
 		this.kvStateLocationRegistry = new KvStateLocationRegistry(jobId, getAllVertices());
+
+		this.verticesFinished = new AtomicInteger();
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -454,7 +459,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			return jv.getTaskVertices();
 		}
 		else {
-			ArrayList<ExecutionVertex> all = new ArrayList<ExecutionVertex>();
+			ArrayList<ExecutionVertex> all = new ArrayList<>();
 			for (ExecutionJobVertex jv : jobVertices) {
 				if (jv.getGraph() != this) {
 					throw new IllegalArgumentException("Can only use ExecutionJobVertices of this ExecutionGraph");
@@ -586,6 +591,10 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		};
 	}
 
+	public int getTotalNumberOfVertices() {
+		return numVerticesTotal;
+	}
+
 	public Map<IntermediateDataSetID, IntermediateResult> getAllIntermediateResults() {
 		return Collections.unmodifiableMap(this.intermediateResults);
 	}
@@ -620,7 +629,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 */
 	public Map<String, Accumulator<?,?>> aggregateUserAccumulators() {
 
-		Map<String, Accumulator<?, ?>> userAccumulators = new HashMap<String, Accumulator<?, ?>>();
+		Map<String, Accumulator<?, ?>> userAccumulators = new HashMap<>();
 
 		for (ExecutionVertex vertex : getAllExecutionVertices()) {
 			Map<String, Accumulator<?, ?>> next = vertex.getCurrentExecutionAttempt().getUserAccumulators();
@@ -657,7 +666,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 		Map<String, Accumulator<?, ?>> accumulatorMap = aggregateUserAccumulators();
 
-		Map<String, SerializedValue<Object>> result = new HashMap<String, SerializedValue<Object>>();
+		Map<String, SerializedValue<Object>> result = new HashMap<>();
 		for (Map.Entry<String, Accumulator<?, ?>> entry : accumulatorMap.entrySet()) {
 			result.put(entry.getKey(), new SerializedValue<Object>(entry.getValue().getLocalValue()));
 		}
@@ -713,6 +722,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			}
 
 			this.verticesInCreationOrder.add(ejv);
+			this.numVerticesTotal += ejv.getParallelism();
 		}
 	}
 
@@ -878,9 +888,23 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 			if (current == JobStatus.RUNNING || current == JobStatus.CREATED) {
 				if (transitionState(current, JobStatus.CANCELLING)) {
+
+					final ArrayList<Future<?>> futures = new ArrayList<>(verticesInCreationOrder.size());
+
+					// cancel all tasks (that still need cancelling)
 					for (ExecutionJobVertex ejv : verticesInCreationOrder) {
-						ejv.cancel();
+						futures.add(ejv.cancelWithFuture());
 					}
+
+					// we build a future that is complete once all vertices have reached a terminal state
+					final ConjunctFuture allTerminal = FutureUtils.combineAll(futures);
+					allTerminal.thenAccept(new AcceptFunction<Void>() {
+						@Override
+						public void accept(Void value) {
+							allVerticesInTerminalState();
+						}
+					});
+
 					return;
 				}
 			}
@@ -968,26 +992,33 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 				current == JobStatus.SUSPENDED ||
 				current.isGloballyTerminalState()) {
 				return;
-			} else if (current == JobStatus.RESTARTING) {
+			}
+			else if (current == JobStatus.RESTARTING) {
 				this.failureCause = t;
 
 				if (tryRestartOrFail()) {
 					return;
 				}
-				// concurrent job status change, let's check again
-			} else if (transitionState(current, JobStatus.FAILING, t)) {
+			}
+			else if (transitionState(current, JobStatus.FAILING, t)) {
 				this.failureCause = t;
 
-				if (!verticesInCreationOrder.isEmpty()) {
-					// cancel all. what is failed will not cancel but stay failed
-					for (ExecutionJobVertex ejv : verticesInCreationOrder) {
-						ejv.cancel();
-					}
-				} else {
-					// set the state of the job to failed
-					transitionState(JobStatus.FAILING, JobStatus.FAILED, t);
+				// we build a future that is complete once all vertices have reached a terminal state
+				final ArrayList<Future<?>> futures = new ArrayList<>(verticesInCreationOrder.size());
+
+				// cancel all tasks (that still need cancelling)
+				for (ExecutionJobVertex ejv : verticesInCreationOrder) {
+					futures.add(ejv.cancelWithFuture());
 				}
 
+				final ConjunctFuture allTerminal = FutureUtils.combineAll(futures);
+				allTerminal.thenAccept(new AcceptFunction<Void>() {
+					@Override
+					public void accept(Void value) {
+						allVerticesInTerminalState();
+					}
+				});
+
 				return;
 			}
 
@@ -1039,7 +1070,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 						stateTimestamps[i] = 0;
 					}
 				}
-				numFinishedJobVertices = 0;
+
 				transitionState(JobStatus.RESTARTING, JobStatus.CREATED);
 
 				// if we have checkpointed state, reload it into the executions
@@ -1097,9 +1128,24 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 * For testing: This waits until the job execution has finished.
 	 */
 	public void waitUntilFinished() throws InterruptedException {
-		synchronized (progressLock) {
-			while (!state.isTerminalState()) {
-				progressLock.wait();
+		// we may need multiple attempts in the presence of failures / recovery
+		while (true) {
+			for (ExecutionJobVertex ejv : verticesInCreationOrder) {
+				for (ExecutionVertex ev : ejv.getTaskVertices()) {
+					try {
+						ev.getCurrentExecutionAttempt().getTerminationFuture().get();
+					}
+					catch (ExecutionException e) {
+						// this should never happen
+						throw new RuntimeException(e);
+					}
+				}
+			}
+
+			// now that all vertices have been (at some point) in a terminal state,
+			// we need to check if the job as a whole has entered a final state
+			if (state.isTerminalState()) {
+				return;
 			}
 		}
 	}
@@ -1129,59 +1175,57 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		}
 	}
 
-	void jobVertexInFinalState() {
-		synchronized (progressLock) {
-			if (numFinishedJobVertices >= verticesInCreationOrder.size()) {
-				throw new IllegalStateException("All vertices are already finished, cannot transition vertex to finished.");
-			}
-
-			numFinishedJobVertices++;
+	void vertexFinished() {
+		int numFinished = verticesFinished.incrementAndGet();
+		if (numFinished == numVerticesTotal) {
+			// done :-)
+			allVerticesInTerminalState();
+		}
+	}
 
-			if (numFinishedJobVertices == verticesInCreationOrder.size()) {
+	void vertexUnFinished() {
+		verticesFinished.getAndDecrement();
+	}
 
-				// we are done, transition to the final state
-				JobStatus current;
-				while (true) {
-					current = this.state;
+	private void allVerticesInTerminalState() {
+		// we are done, transition to the final state
+		JobStatus current;
+		while (true) {
+			current = this.state;
 
-					if (current == JobStatus.RUNNING) {
-						if (transitionState(current, JobStatus.FINISHED)) {
-							postRunCleanup();
-							break;
-						}
-					}
-					else if (current == JobStatus.CANCELLING) {
-						if (transitionState(current, JobStatus.CANCELED)) {
-							postRunCleanup();
-							break;
-						}
-					}
-					else if (current == JobStatus.FAILING) {
-						if (tryRestartOrFail()) {
-							break;
-						}
-						// concurrent job status change, let's check again
-					}
-					else if (current == JobStatus.SUSPENDED) {
-						// we've already cleaned up when entering the SUSPENDED state
-						break;
-					}
-					else if (current.isGloballyTerminalState()) {
-						LOG.warn("Job has entered globally terminal state without waiting for all " +
-							"job vertices to reach final state.");
-						break;
-					}
-					else {
-						fail(new Exception("ExecutionGraph went into final state from state " + current));
-						break;
-					}
+			if (current == JobStatus.RUNNING) {
+				if (transitionState(current, JobStatus.FINISHED)) {
+					postRunCleanup();
+					break;
 				}
-				// done transitioning the state
-
-				// also, notify waiters
-				progressLock.notifyAll();
+			}
+			else if (current == JobStatus.CANCELLING) {
+				if (transitionState(current, JobStatus.CANCELED)) {
+					postRunCleanup();
+					break;
+				}
+			}
+			else if (current == JobStatus.FAILING) {
+				if (tryRestartOrFail()) {
+					break;
+				}
+				// concurrent job status change, let's check again
+			}
+			else if (current == JobStatus.SUSPENDED) {
+				// we've already cleaned up when entering the SUSPENDED state
+				break;
+			}
+			else if (current.isGloballyTerminalState()) {
+				LOG.warn("Job has entered globally terminal state without waiting for all " +
+						"job vertices to reach final state.");
+				break;
+			}
+			else {
+				fail(new Exception("ExecutionGraph went into final state from state " + current));
+				break;
 			}
 		}
+		// done transitioning the state
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 2e5de64..3197e65 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -32,6 +32,7 @@ import org.apache.flink.core.io.LocatableInputSplit;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
@@ -65,9 +66,9 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	public static final int VALUE_NOT_SET = -1;
 
 	private final Object stateMonitor = new Object();
-	
+
 	private final ExecutionGraph graph;
-	
+
 	private final JobVertex jobVertex;
 
 	/**
@@ -91,12 +92,10 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	private final ExecutionVertex[] taskVertices;
 
 	private final IntermediateResult[] producedDataSets;
-	
+
 	private final List<IntermediateResult> inputs;
-	
-	private final int parallelism;
 
-	private final boolean[] finishedSubtasks;
+	private final int parallelism;
 
 	private final SlotSharingGroup slotSharingGroup;
 
@@ -108,8 +107,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 
 	private int maxParallelism;
 
-	private volatile int numSubtasksInFinalState;
-
 	/**
 	 * Serialized task information which is for all sub tasks the same. Thus, it avoids to
 	 * serialize the same information multiple times in order to create the
@@ -231,8 +228,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		catch (Throwable t) {
 			throw new JobException("Creating the input splits caused an error: " + t.getMessage(), t);
 		}
-		
-		finishedSubtasks = new boolean[parallelism];
 	}
 
 	/**
@@ -360,10 +355,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		return serializedTaskInformation;
 	}
 
-	public boolean isInFinalState() {
-		return numSubtasksInFinalState == parallelism;
-	}
-
 	@Override
 	public ExecutionState getAggregateState() {
 		int[] num = new int[ExecutionState.values().length];
@@ -484,51 +475,51 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		return slots;
 	}
 
+	/**
+	 * Cancels all currently running vertex executions.
+	 */
 	public void cancel() {
 		for (ExecutionVertex ev : getTaskVertices()) {
 			ev.cancel();
 		}
 	}
-	
-	public void fail(Throwable t) {
+
+	/**
+	 * Cancels all currently running vertex executions.
+	 * 
+	 * @return A future that is complete once all tasks have canceled.
+	 */
+	public Future<Void> cancelWithFuture() {
+		// we collect all futures from the task cancellations
+		ArrayList<Future<?>> futures = new ArrayList<>(parallelism);
+
+		// cancel each vertex
 		for (ExecutionVertex ev : getTaskVertices()) {
-			ev.fail(t);
+			futures.add(ev.cancel());
 		}
+
+		// return a conjunct future, which is complete once all individual tasks are canceled
+		return FutureUtils.combineAll(futures);
 	}
-	
-	public void waitForAllVerticesToReachFinishingState() throws InterruptedException {
-		synchronized (stateMonitor) {
-			while (numSubtasksInFinalState < parallelism) {
-				stateMonitor.wait();
-			}
+
+	public void fail(Throwable t) {
+		for (ExecutionVertex ev : getTaskVertices()) {
+			ev.fail(t);
 		}
 	}
-	
+
 	public void resetForNewExecution() {
-		if (!(numSubtasksInFinalState == 0 || numSubtasksInFinalState == parallelism)) {
-			throw new IllegalStateException("Cannot reset vertex that is not in final state");
-		}
-		
+
 		synchronized (stateMonitor) {
 			// check and reset the sharing groups with scheduler hints
 			if (slotSharingGroup != null) {
 				slotSharingGroup.clearTaskAssignment();
 			}
-			
-			// reset vertices one by one. if one reset fails, the "vertices in final state"
-			// fields will be consistent to handle triggered cancel calls
+
 			for (int i = 0; i < parallelism; i++) {
 				taskVertices[i].resetForNewExecution();
-				if (finishedSubtasks[i]) {
-					finishedSubtasks[i] = false;
-					numSubtasksInFinalState--;
-				}
-			}
-			
-			if (numSubtasksInFinalState != 0) {
-				throw new RuntimeException("Bug: resetting the execution job vertex failed.");
 			}
-			
+
 			// set up the input splits again
 			try {
 				if (this.inputSplits != null) {
@@ -548,51 +539,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 			}
 		}
 	}
-	
-	//---------------------------------------------------------------------------------------------
-	//  Notifications
-	//---------------------------------------------------------------------------------------------
-	
-	void vertexFinished(int subtask) {
-		subtaskInFinalState(subtask);
-	}
-	
-	void vertexCancelled(int subtask) {
-		subtaskInFinalState(subtask);
-	}
-	
-	void vertexFailed(int subtask, Throwable error) {
-		subtaskInFinalState(subtask);
-	}
-	
-	private void subtaskInFinalState(int subtask) {
-		synchronized (stateMonitor) {
-			if (!finishedSubtasks[subtask]) {
-				finishedSubtasks[subtask] = true;
-				
-				if (numSubtasksInFinalState+1 == parallelism) {
-					
-					// call finalizeOnMaster hook
-					try {
-						getJobVertex().finalizeOnMaster(getGraph().getUserClassLoader());
-					}
-					catch (Throwable t) {
-						getGraph().fail(t);
-					}
-
-					numSubtasksInFinalState++;
-					
-					// we are in our final state
-					stateMonitor.notifyAll();
-					
-					// tell the graph
-					graph.jobVertexInFinalState();
-				} else {
-					numSubtasksInFinalState++;
-				}
-			}
-		}
-	}
 
 	// --------------------------------------------------------------------------------------------
 	//  Accumulators / Metrics

http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 3f6ce88..bcf7a7c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
@@ -60,8 +61,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
-import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
 import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
 
 /**
@@ -509,30 +508,41 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 	//   Actions
 	// --------------------------------------------------------------------------------------------
 
-	public void resetForNewExecution() {
+	public Execution resetForNewExecution() {
 
 		LOG.debug("Resetting execution vertex {} for new execution.", getTaskNameWithSubtaskIndex());
 
 		synchronized (priorExecutions) {
-			Execution execution = currentExecution;
-			ExecutionState state = execution.getState();
+			final Execution oldExecution = currentExecution;
+			final ExecutionState oldState = oldExecution.getState();
 
-			if (state == FINISHED || state == CANCELED || state == FAILED) {
-				priorExecutions.add(execution);
-				currentExecution = new Execution(
+			if (oldState.isTerminal()) {
+				priorExecutions.add(oldExecution);
+
+				final Execution newExecution = new Execution(
 					getExecutionGraph().getFutureExecutor(),
 					this,
-					execution.getAttemptNumber()+1,
+						oldExecution.getAttemptNumber()+1,
 					System.currentTimeMillis(),
 					timeout);
 
+				this.currentExecution = newExecution;
+
 				CoLocationGroup grp = jobVertex.getCoLocationGroup();
 				if (grp != null) {
 					this.locationConstraint = grp.getLocationConstraint(subTaskIndex);
 				}
+
+				// if the execution was 'FINISHED' before, tell the ExecutionGraph that
+				// we take one step back on the road to reaching global FINISHED
+				if (oldState == FINISHED) {
+					getExecutionGraph().vertexUnFinished();
+				}
+
+				return newExecution;
 			}
 			else {
-				throw new IllegalStateException("Cannot reset a vertex that is in state " + state);
+				throw new IllegalStateException("Cannot reset a vertex that is in non-terminal state " + oldState);
 			}
 		}
 	}
@@ -545,8 +555,16 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 		this.currentExecution.deployToSlot(slot);
 	}
 
-	public void cancel() {
-		this.currentExecution.cancel();
+	/**
+	 *  
+	 * @return A future that completes once the execution has reached its final state.
+	 */
+	public Future<ExecutionState> cancel() {
+		// to avoid any case of mixup in the presence of concurrent calls,
+		// we copy a reference to the stack to make sure both calls go to the same Execution 
+		final Execution exec = this.currentExecution;
+		exec.cancel();
+		return exec.getTerminationFuture();
 	}
 
 	public void stop() {
@@ -621,16 +639,18 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 	//   Notifications from the Execution Attempt
 	// --------------------------------------------------------------------------------------------
 
-	void executionFinished() {
-		jobVertex.vertexFinished(subTaskIndex);
+	void executionFinished(Execution execution) {
+		if (execution == currentExecution) {
+			getExecutionGraph().vertexFinished();
+		}
 	}
 
-	void executionCanceled() {
-		jobVertex.vertexCancelled(subTaskIndex);
+	void executionCanceled(Execution execution) {
+		// nothing to do
 	}
 
-	void executionFailed(Throwable t) {
-		jobVertex.vertexFailed(subTaskIndex, t);
+	void executionFailed(Execution execution, Throwable cause) {
+		// nothing to do
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 1729582..1ebfcac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -60,12 +60,12 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doCallRealMethod;
-import static org.mockito.Mockito.doNothing;
+
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
@@ -288,48 +288,58 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 	@Test
 	public void testCancelWhileFailing() throws Exception {
-		// We want to manually control the restart and delay
-		RestartStrategy restartStrategy = new InfiniteDelayRestartStrategy();
-		Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = createSpyExecutionGraph(restartStrategy);
-		ExecutionGraph executionGraph = executionGraphInstanceTuple.f0;
-		Instance instance = executionGraphInstanceTuple.f1;
-		doNothing().when(executionGraph).jobVertexInFinalState();
+		final RestartStrategy restartStrategy = new InfiniteDelayRestartStrategy();
+		final ExecutionGraph graph = createExecutionGraph(restartStrategy).f0;
 
-		// Kill the instance...
-		instance.markDead();
+		assertEquals(JobStatus.RUNNING, graph.getState());
 
-		Deadline deadline = TestingUtils.TESTING_DURATION().fromNow();
+		// switch all tasks to running
+		for (ExecutionVertex vertex : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
+			vertex.getCurrentExecutionAttempt().switchToRunning();
+		}
 
-		// ...and wait for all vertices to be in state FAILED. The
-		// jobVertexInFinalState does nothing, that's why we don't wait on the
-		// job status.
-		boolean success = false;
-		while (deadline.hasTimeLeft() && !success) {
-			success = true;
-			for (ExecutionVertex vertex : executionGraph.getAllExecutionVertices()) {
-				ExecutionState state = vertex.getExecutionState();
-				if (state != ExecutionState.FAILED && state != ExecutionState.CANCELED) {
-					success = false;
-					Thread.sleep(100);
-					break;
-				}
-			}
+		graph.fail(new Exception("test"));
+
+		assertEquals(JobStatus.FAILING, graph.getState());
+
+		graph.cancel();
+
+		assertEquals(JobStatus.CANCELLING, graph.getState());
+
+		// let all tasks finish cancelling
+		for (ExecutionVertex vertex : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
+			vertex.getCurrentExecutionAttempt().cancelingComplete();
 		}
 
-		// Still in failing
-		assertEquals(JobStatus.FAILING, executionGraph.getState());
+		assertEquals(JobStatus.CANCELED, graph.getState());
+	}
 
-		// The cancel call needs to change the state to CANCELLING
-		executionGraph.cancel();
+	@Test
+	public void testFailWhileCanceling() throws Exception {
+		final RestartStrategy restartStrategy = new NoRestartStrategy();
+		final ExecutionGraph graph = createExecutionGraph(restartStrategy).f0;
 
-		assertEquals(JobStatus.CANCELLING, executionGraph.getState());
+		assertEquals(JobStatus.RUNNING, graph.getState());
 
-		// Unspy and finalize the job state
-		doCallRealMethod().when(executionGraph).jobVertexInFinalState();
+		// switch all tasks to running
+		for (ExecutionVertex vertex : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
+			vertex.getCurrentExecutionAttempt().switchToRunning();
+		}
 
-		executionGraph.jobVertexInFinalState();
+		graph.cancel();
 
-		assertEquals(JobStatus.CANCELED, executionGraph.getState());
+		assertEquals(JobStatus.CANCELLING, graph.getState());
+
+		graph.fail(new Exception("test"));
+
+		assertEquals(JobStatus.FAILING, graph.getState());
+
+		// let all tasks finish cancelling
+		for (ExecutionVertex vertex : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
+			vertex.getCurrentExecutionAttempt().cancelingComplete();
+		}
+
+		assertEquals(JobStatus.FAILED, graph.getState());
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index b0137fa..73838dc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 
@@ -52,9 +51,10 @@ import org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPart
 import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
-import org.mockito.Matchers;
+
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.ExecutionContext$;
 
@@ -198,10 +198,6 @@ public class ExecutionGraphTestUtils {
 			}
 		};
 
-		doAnswer(noop).when(ejv).vertexCancelled(Matchers.anyInt());
-		doAnswer(noop).when(ejv).vertexFailed(Matchers.anyInt(), Matchers.any(Throwable.class));
-		doAnswer(noop).when(ejv).vertexFinished(Matchers.anyInt());
-
 		return ejv;
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
deleted file mode 100644
index bd51c81..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-
-import java.util.Collections;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
-import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.SerializedValue;
-import org.junit.Test;
-
-public class ExecutionStateProgressTest {
-
-	@Test
-	public void testAccumulatedStateFinished() {
-		try {
-			final JobID jid = new JobID();
-			final JobVertexID vid = new JobVertexID();
-
-			JobVertex ajv = new JobVertex("TestVertex", vid);
-			ajv.setParallelism(3);
-			ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
-
-			ExecutionGraph graph = new ExecutionGraph(
-				TestingUtils.defaultExecutor(),
-				TestingUtils.defaultExecutor(),
-				jid, 
-				"test job", 
-				new Configuration(),
-				new SerializedValue<>(new ExecutionConfig()),
-				AkkaUtils.getDefaultTimeout(),
-				new NoRestartStrategy(),
-				new Scheduler(TestingUtils.defaultExecutionContext()));
-			graph.attachJobGraph(Collections.singletonList(ajv));
-
-			setGraphStatus(graph, JobStatus.RUNNING);
-
-			ExecutionJobVertex ejv = graph.getJobVertex(vid);
-
-			// mock resources and mock taskmanager
-			for (ExecutionVertex ee : ejv.getTaskVertices()) {
-				SimpleSlot slot = getInstance(
-					new ActorTaskManagerGateway(
-						new SimpleActorGateway(
-							TestingUtils.defaultExecutionContext()))
-				).allocateSimpleSlot(jid);
-				ee.deployToSlot(slot);
-			}
-
-			// finish all
-			for (ExecutionVertex ee : ejv.getTaskVertices()) {
-				ee.executionFinished();
-			}
-
-			assertTrue(ejv.isInFinalState());
-			assertEquals(JobStatus.FINISHED, graph.getState());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e0061272/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
deleted file mode 100644
index d717986..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
-import org.apache.flink.runtime.instance.DummyActorGateway;
-import org.apache.flink.runtime.instance.HardwareDescription;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.instance.SlotProvider;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.SerializedValue;
-
-import org.junit.Test;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.net.InetAddress;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-
-import static org.junit.Assert.*;
-
-public class TerminalStateDeadlockTest {
-
-	private final Field stateField;
-	private final Field resourceField;
-	private final Field execGraphStateField;
-	private final Field execGraphSlotProviderField;
-
-	private final SimpleSlot resource;
-
-
-	public TerminalStateDeadlockTest() {
-		try {
-			// the reflection fields to access the private fields
-			this.stateField = Execution.class.getDeclaredField("state");
-			this.stateField.setAccessible(true);
-
-			this.resourceField = Execution.class.getDeclaredField("assignedResource");
-			this.resourceField.setAccessible(true);
-
-			this.execGraphStateField = ExecutionGraph.class.getDeclaredField("state");
-			this.execGraphStateField.setAccessible(true);
-
-			this.execGraphSlotProviderField = ExecutionGraph.class.getDeclaredField("slotProvider");
-			this.execGraphSlotProviderField.setAccessible(true);
-			
-			// the dummy resource
-			ResourceID resourceId = ResourceID.generate();
-			InetAddress address = InetAddress.getByName("127.0.0.1");
-			TaskManagerLocation ci = new TaskManagerLocation(resourceId, address, 12345);
-				
-			HardwareDescription resources = new HardwareDescription(4, 4000000, 3000000, 2000000);
-			Instance instance = new Instance(
-				new ActorTaskManagerGateway(DummyActorGateway.INSTANCE), ci, new InstanceID(), resources, 4);
-
-			this.resource = instance.allocateSimpleSlot(new JobID());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-			
-			// silence the compiler
-			throw new RuntimeException();
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	@Test
-	public void testProvokeDeadlock() {
-		try {
-			final JobID jobId = resource.getJobID();
-			final JobVertexID vid1 = new JobVertexID();
-			final JobVertexID vid2 = new JobVertexID();
-			
-			final List<JobVertex> vertices;
-			{
-				JobVertex v1 = new JobVertex("v1", vid1);
-				JobVertex v2 = new JobVertex("v2", vid2);
-				v1.setParallelism(1);
-				v2.setParallelism(1);
-				v1.setInvokableClass(DummyInvokable.class);
-				v2.setInvokableClass(DummyInvokable.class);
-				vertices = Arrays.asList(v1, v2);
-			}
-			
-			final Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
-			
-			final Executor executor = Executors.newFixedThreadPool(4);
-			
-			// try a lot!
-			for (int i = 0; i < 20000; i++) {
-				final TestExecGraph eg = new TestExecGraph(jobId);
-				eg.attachJobGraph(vertices);
-
-				final Execution e1 = eg.getJobVertex(vid1).getTaskVertices()[0].getCurrentExecutionAttempt();
-				final Execution e2 = eg.getJobVertex(vid2).getTaskVertices()[0].getCurrentExecutionAttempt();
-
-				initializeExecution(e1);
-				initializeExecution(e2);
-
-				execGraphStateField.set(eg, JobStatus.FAILING);
-				execGraphSlotProviderField.set(eg, scheduler);
-				
-				Runnable r1 = new Runnable() {
-					@Override
-					public void run() {
-						e1.cancelingComplete();
-					}
-				};
-				Runnable r2 = new Runnable() {
-					@Override
-					public void run() {
-						e2.cancelingComplete();
-					}
-				};
-				
-				executor.execute(r1);
-				executor.execute(r2);
-				
-				eg.waitTillDone();
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	private void initializeExecution(Execution exec) throws IllegalAccessException {
-		// set state to canceling
-		stateField.set(exec, ExecutionState.CANCELING);
-		
-		// assign a resource
-		resourceField.set(exec, resource);
-	}
-	
-	
-	static class TestExecGraph extends ExecutionGraph {
-
-		private static final Configuration EMPTY_CONFIG = new Configuration();
-
-		private static final Time TIMEOUT = Time.seconds(30L);
-
-		private volatile boolean done;
-
-		TestExecGraph(JobID jobId) throws IOException {
-			super(
-				TestingUtils.defaultExecutor(),
-				TestingUtils.defaultExecutor(),
-				jobId,
-				"test graph",
-				EMPTY_CONFIG,
-				new SerializedValue<>(new ExecutionConfig()),
-				TIMEOUT,
-				new FixedDelayRestartStrategy(1, 0),
-				new Scheduler(TestingUtils.defaultExecutionContext()));
-		}
-
-		@Override
-		public void scheduleForExecution() {
-			// notify that we are done with the "restarting"
-			synchronized (this) {
-				done = true;
-				this.notifyAll();
-			}
-		}
-
-		public void waitTillDone() {
-			try {
-				synchronized (this) {
-					while (!done) {
-						this.wait();
-					}
-				}
-			}
-			catch (InterruptedException e) {
-				throw new RuntimeException(e);
-			}
-		}
-	}
-}


[2/5] flink git commit: [FLINK-5869] [flip-1] Add basic abstraction for Failover Strategies to ExecutionGraph

Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index b01ddc0..479ec51 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1345,7 +1345,7 @@ class JobManager(
           currentJobs.remove(jobId)
 
           if (executionGraph != null) {
-            executionGraph.fail(t)
+            executionGraph.failGlobal(t)
           }
 
           val rt: Throwable = if (t.isInstanceOf[JobExecutionException]) {
@@ -1426,7 +1426,7 @@ class JobManager(
           }
         } catch {
           case t: Throwable => try {
-            executionGraph.fail(t)
+            executionGraph.failGlobal(t)
           } catch {
             case tt: Throwable =>
               log.error("Error while marking ExecutionGraph as failed.", tt)
@@ -1837,7 +1837,7 @@ class JobManager(
         job =>
           future {
             // Fail the execution graph
-            job._1.fail(new IllegalStateException("Another JobManager removed the job from " +
+            job._1.failGlobal(new IllegalStateException("Another JobManager removed the job from " +
               "ZooKeeper."))
           }(context.dispatcher)
       )

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 41b0e35..9250634 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -3135,6 +3135,7 @@ public class CheckpointCoordinatorTest {
 			vertex,
 			1,
 			1L,
+			1L,
 			Time.milliseconds(500L)
 		));
 		when(exec.getAttemptId()).thenReturn(attemptID);

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index 2e0bd76..99a3815 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -90,7 +90,7 @@ public class CoordinatorShutdownTest {
 
 			FailingBlockingInvokable.unblock();
 
-			graph.waitUntilFinished();
+			graph.waitUntilTerminal();
 			
 			// verify that the coordinator was shut down
 			CheckpointCoordinator coord = graph.getCheckpointCoordinator();
@@ -149,7 +149,7 @@ public class CoordinatorShutdownTest {
 
 			BlockingInvokable.unblock();
 			
-			graph.waitUntilFinished();
+			graph.waitUntilTerminal();
 
 			// verify that the coordinator was shut down
 			CheckpointCoordinator coord = graph.getCheckpointCoordinator();

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 1f038bd..a524e5c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -56,7 +57,7 @@ public class ExecutionGraphCheckpointCoordinatorTest {
 		CompletedCheckpointStore store = mock(CompletedCheckpointStore.class);
 
 		ExecutionGraph graph = createExecutionGraphAndEnableCheckpointing(counter, store);
-		graph.fail(new Exception("Test Exception"));
+		graph.failGlobal(new Exception("Test Exception"));
 
 		verify(counter, times(1)).shutdown(JobStatus.FAILED);
 		verify(store, times(1)).shutdown(eq(JobStatus.FAILED));
@@ -91,6 +92,7 @@ public class ExecutionGraphCheckpointCoordinatorTest {
 			new SerializedValue<>(new ExecutionConfig()),
 			Time.days(1L),
 			new NoRestartStrategy(),
+			new RestartAllStrategy.Factory(),
 			Collections.<BlobKey>emptyList(),
 			Collections.<URL>emptyList(),
 			new Scheduler(TestingUtils.defaultExecutionContext()),

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 866f55c..a739e14 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -24,11 +24,9 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 import static org.junit.Assert.assertNotEquals;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -342,50 +340,6 @@ public class ExecutionGraphDeploymentTest {
 	}
 
 	@Test
-	public void testRegistrationOfExecutionsFailingFinalize() {
-		try {
-
-			final JobVertexID jid1 = new JobVertexID();
-			final JobVertexID jid2 = new JobVertexID();
-
-			JobVertex v1 = new FailingFinalizeJobVertex("v1", jid1);
-			JobVertex v2 = new JobVertex("v2", jid2);
-
-			Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 6, v2, 4).f1;
-
-			List<Execution> execList = new ArrayList<Execution>();
-			execList.addAll(executions.values());
-			// sort executions by job vertex. Failing job vertex first
-			Collections.sort(execList, new Comparator<Execution>() {
-				@Override
-				public int compare(Execution o1, Execution o2) {
-					return o1.getVertex().getTaskNameWithSubtaskIndex().compareTo(o2.getVertex().getTaskNameWithSubtaskIndex());
-				}
-			});
-
-			int cnt = 0;
-			for (Execution e : execList) {
-				cnt++;
-				e.markFinished();
-				if (cnt <= 6) {
-					// the last execution of the first job vertex triggers the failing finalize hook
-					assertEquals(ExecutionState.FINISHED, e.getState());
-				}
-				else {
-					// all following executions should be canceled
-					assertEquals(ExecutionState.CANCELED, e.getState());
-				}
-			}
-
-			assertEquals(0, executions.size());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
 	/**
 	 * Tests that a blocking batch job fails if there are not enough resources left to schedule the
 	 * succeeding tasks. This test case is related to [FLINK-4296] where finished producing tasks
@@ -473,24 +427,6 @@ public class ExecutionGraphDeploymentTest {
 			eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
 	}
 
-	@Test
-	public void testSettingIllegalMaxNumberOfCheckpointsToRetain() throws Exception {
-
-		final int negativeMaxNumberOfCheckpointsToRetain = -10;
-
-		final Configuration jobManagerConfig = new Configuration();
-		jobManagerConfig.setInteger(CoreOptions.MAX_RETAINED_CHECKPOINTS,
-			negativeMaxNumberOfCheckpointsToRetain);
-
-		final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
-
-		assertNotEquals(negativeMaxNumberOfCheckpointsToRetain,
-			eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
-
-		assertEquals(CoreOptions.MAX_RETAINED_CHECKPOINTS.defaultValue().intValue(),
-			eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
-	}
-
 	private Tuple2<ExecutionGraph, Map<ExecutionAttemptID, Execution>> setupExecution(JobVertex v1, int dop1, JobVertex v2, int dop2) throws Exception {
 		final JobID jobId = new JobID();
 
@@ -503,24 +439,24 @@ public class ExecutionGraphDeploymentTest {
 		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
 		for (int i = 0; i < dop1 + dop2; i++) {
 			scheduler.newInstanceAvailable(
-				ExecutionGraphTestUtils.getInstance(
-					new ActorTaskManagerGateway(
-						new ExecutionGraphTestUtils.SimpleActorGateway(
-							TestingUtils.directExecutionContext()))));
+					ExecutionGraphTestUtils.getInstance(
+							new ActorTaskManagerGateway(
+									new ExecutionGraphTestUtils.SimpleActorGateway(
+											TestingUtils.directExecutionContext()))));
 		}
 
 		// execution graph that executes actions synchronously
 		ExecutionGraph eg = new ExecutionGraph(
-			new DirectScheduledExecutorService(),
-			TestingUtils.defaultExecutor(),
-			jobId, 
-			"some job", 
-			new Configuration(), 
-			new SerializedValue<>(new ExecutionConfig()),
-			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy(),
-			scheduler);
-		
+				new DirectScheduledExecutorService(),
+				TestingUtils.defaultExecutor(),
+				jobId,
+				"some job",
+				new Configuration(),
+				new SerializedValue<>(new ExecutionConfig()),
+				AkkaUtils.getDefaultTimeout(),
+				new NoRestartStrategy(),
+				scheduler);
+
 		eg.setQueuedSchedulingAllowed(false);
 
 		List<JobVertex> ordered = Arrays.asList(v1, v2);
@@ -537,13 +473,27 @@ public class ExecutionGraphDeploymentTest {
 		return new Tuple2<>(eg, executions);
 	}
 
+	@Test
+	public void testSettingIllegalMaxNumberOfCheckpointsToRetain() throws Exception {
+
+		final int negativeMaxNumberOfCheckpointsToRetain = -10;
+
+		final Configuration jobManagerConfig = new Configuration();
+		jobManagerConfig.setInteger(CoreOptions.MAX_RETAINED_CHECKPOINTS,
+			negativeMaxNumberOfCheckpointsToRetain);
+
+		final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
+
+		assertNotEquals(negativeMaxNumberOfCheckpointsToRetain,
+			eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
+
+		assertEquals(CoreOptions.MAX_RETAINED_CHECKPOINTS.defaultValue().intValue(),
+			eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
+	}
+
 	@SuppressWarnings("serial")
 	public static class FailingFinalizeJobVertex extends JobVertex {
 
-		public FailingFinalizeJobVertex(String name) {
-			super(name);
-		}
-
 		public FailingFinalizeJobVertex(String name, JobVertexID id) {
 			super(name, id);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 97127c7..6b5ceae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
@@ -52,9 +51,7 @@ import org.junit.Test;
 import org.mockito.Matchers;
 
 import java.io.IOException;
-import java.net.URL;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -133,10 +130,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 				new SerializedValue<ExecutionConfig>(null),
 				timeout,
 				testingRestartStrategy,
-				Collections.<BlobKey>emptyList(),
-				Collections.<URL>emptyList(),
-				scheduler,
-				getClass().getClassLoader());
+				scheduler);
 
 			RestartTimeGauge restartingTime = new RestartTimeGauge(executionGraph);
 
@@ -242,8 +236,8 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 
 			// now lets fail the job while it is in restarting and see whether the restarting time then stops to increase
 			// for this to work, we have to use a SuppressRestartException
-			executionGraph.fail(new SuppressRestartsException(new Exception()));
-
+			executionGraph.failGlobal(new SuppressRestartsException(new Exception()));
+	
 			assertEquals(JobStatus.FAILED, executionGraph.getState());
 
 			previousRestartingTime = restartingTime.getValue();

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 1ebfcac..eeb6c69 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -66,9 +66,8 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
+
 
 public class ExecutionGraphRestartTest extends TestLogger {
 
@@ -271,12 +270,12 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		assertEquals(JobStatus.RESTARTING, executionGraph.getState());
 
 		// The restarting should not fail with an ordinary exception
-		executionGraph.fail(new Exception("Test exception"));
+		executionGraph.failGlobal(new Exception("Test exception"));
 
 		assertEquals(JobStatus.RESTARTING, executionGraph.getState());
 
 		// but it should fail when sending a SuppressRestartsException
-		executionGraph.fail(new SuppressRestartsException(new Exception("Test exception")));
+		executionGraph.failGlobal(new SuppressRestartsException(new Exception("Test exception")));
 
 		assertEquals(JobStatus.FAILED, executionGraph.getState());
 
@@ -298,7 +297,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 			vertex.getCurrentExecutionAttempt().switchToRunning();
 		}
 
-		graph.fail(new Exception("test"));
+		graph.failGlobal(new Exception("test"));
 
 		assertEquals(JobStatus.FAILING, graph.getState());
 
@@ -330,7 +329,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 		assertEquals(JobStatus.CANCELLING, graph.getState());
 
-		graph.fail(new Exception("test"));
+		graph.failGlobal(new Exception("test"));
 
 		assertEquals(JobStatus.FAILING, graph.getState());
 
@@ -344,8 +343,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 	@Test
 	public void testNoRestartOnSuppressException() throws Exception {
-		Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = createSpyExecutionGraph(new FixedDelayRestartStrategy(1, 1000));
-		ExecutionGraph eg = executionGraphInstanceTuple.f0;
+		final ExecutionGraph eg = createExecutionGraph(new FixedDelayRestartStrategy(Integer.MAX_VALUE, 0)).f0;
 
 		// Fail with unrecoverable Exception
 		eg.getAllExecutionVertices().iterator().next().fail(
@@ -357,21 +355,10 @@ public class ExecutionGraphRestartTest extends TestLogger {
 			vertex.getCurrentExecutionAttempt().cancelingComplete();
 		}
 
-		FiniteDuration timeout = new FiniteDuration(2, TimeUnit.MINUTES);
-
-		// Wait for async restart
-		Deadline deadline = timeout.fromNow();
-		while (deadline.hasTimeLeft() && eg.getState() != JobStatus.FAILED) {
-			Thread.sleep(100);
-		}
-
+		eg.waitUntilTerminal();
 		assertEquals(JobStatus.FAILED, eg.getState());
 
-		// No restart
-		verify(eg, never()).restart();
-
 		RestartStrategy restartStrategy = eg.getRestartStrategy();
-
 		assertTrue(restartStrategy instanceof FixedDelayRestartStrategy);
 
 		assertEquals(0, ((FixedDelayRestartStrategy) restartStrategy).getCurrentRestartAttempt());
@@ -443,7 +430,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 	/**
 	 * Tests that a graph is not restarted after cancellation via a call to
-	 * {@link ExecutionGraph#fail(Throwable)}. This can happen when a slot is
+	 * {@link ExecutionGraph#failGlobal(Throwable)}. This can happen when a slot is
 	 * released concurrently with cancellation.
 	 */
 	@Test
@@ -490,7 +477,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 	/**
 	 * Tests that it is possible to fail a graph via a call to
-	 * {@link ExecutionGraph#fail(Throwable)} after cancellation.
+	 * {@link ExecutionGraph#failGlobal(Throwable)} after cancellation.
 	 */
 	@Test
 	public void testFailExecutionGraphAfterCancel() throws Exception {
@@ -523,7 +510,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		eg.cancel();
 		assertEquals(JobStatus.CANCELLING, eg.getState());
 
-		eg.fail(new Exception("Test Exception"));
+		eg.failGlobal(new Exception("Test Exception"));
 		assertEquals(JobStatus.FAILING, eg.getState());
 
 		Execution execution = eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt();

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index c8a0422..1eecd4a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -61,6 +61,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
@@ -300,8 +301,6 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		slotProvider.addSlots(targetVertex.getID(), targetFutures);
 
 		final ExecutionGraph eg = createExecutionGraph(jobGraph, slotProvider);
-		TerminalJobStatusListener testListener = new TerminalJobStatusListener();
-		eg.registerJobStatusListener(testListener);
 
 		//
 		//  we complete some of the futures
@@ -322,7 +321,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		sourceFutures[1].completeExceptionally(new TestRuntimeException());
 
 		// wait until the job failed as a whole
-		testListener.waitForTerminalState(2000);
+		eg.getTerminationFuture().get(2000, TimeUnit.MILLISECONDS);
 
 		// wait until all slots are back
 		verify(slotOwner, new Timeout(2000, times(6))).returnAllocatedSlot(any(Slot.class));
@@ -371,8 +370,6 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		slotProvider.addSlots(vertex.getID(), slotFutures);
 
 		final ExecutionGraph eg = createExecutionGraph(jobGraph, slotProvider, Time.milliseconds(20));
-		final TerminalJobStatusListener statusListener = new TerminalJobStatusListener();
-		eg.registerJobStatusListener(statusListener);
 
 		//  we complete one future
 		slotFutures[1].complete(slots[1]);
@@ -388,7 +385,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 
 		// since future[0] is still missing the while operation must time out
 		// we have no restarts allowed, so the job will go terminal
-		statusListener.waitForTerminalState(2000);
+		eg.getTerminationFuture().get(2000, TimeUnit.MILLISECONDS);
 
 		// wait until all slots are back
 		verify(slotOwner, new Timeout(2000, times(2))).returnAllocatedSlot(any(Slot.class));

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
deleted file mode 100644
index 27844c1..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
+++ /dev/null
@@ -1,399 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.runtime.executiongraph;
-
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StoppingException;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.execution.SuppressRestartsException;
-import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
-import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.SerializedValue;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.api.mockito.PowerMockito;
-
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.same;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(ExecutionGraph.class)
-public class ExecutionGraphSignalsTest {
-	private ExecutionJobVertex[] mockEJV = new ExecutionJobVertex[5];
-	private int[] dop = new int[] { 5, 7, 2, 11, 4 };
-	private ExecutionVertex[][] mockEV = new ExecutionVertex[mockEJV.length][];
-	private ExecutionGraph eg;
-	private Field f;
-
-	@Before
-	public void prepare() throws Exception {
-		final JobID jobId = new JobID();
-		final String jobName = "Test Job Sample Name";
-		final Configuration cfg = new Configuration();
-
-
-		assert (mockEJV.length == 5);
-		JobVertex v1 = new JobVertex("vertex1");
-		JobVertex v2 = new JobVertex("vertex2");
-		JobVertex v3 = new JobVertex("vertex3");
-		JobVertex v4 = new JobVertex("vertex4");
-		JobVertex v5 = new JobVertex("vertex5");
-
-		for(int i = 0; i < mockEJV.length; ++i) {
-			mockEJV[i] = mock(ExecutionJobVertex.class);
-
-			this.mockEV[i] = new ExecutionVertex[dop[i]];
-			for (int j = 0; j < dop[i]; ++j) {
-				this.mockEV[i][j] = mock(ExecutionVertex.class);
-			}
-
-			when(mockEJV[i].getProducedDataSets()).thenReturn(new IntermediateResult[0]);
-			when(mockEJV[i].getTaskVertices()).thenReturn(this.mockEV[i]);
-		}
-
-		PowerMockito
-			.whenNew(ExecutionJobVertex.class)
-			.withArguments(any(ExecutionGraph.class), same(v1), any(Integer.class).intValue(),
-				any(FiniteDuration.class), any(Long.class).longValue()).thenReturn(mockEJV[0]);
-		PowerMockito
-			.whenNew(ExecutionJobVertex.class)
-			.withArguments(any(ExecutionGraph.class), same(v2), any(Integer.class).intValue(),
-				any(FiniteDuration.class), any(Long.class).longValue()).thenReturn(mockEJV[1]);
-		PowerMockito
-			.whenNew(ExecutionJobVertex.class)
-			.withArguments(any(ExecutionGraph.class), same(v3), any(Integer.class).intValue(),
-				any(FiniteDuration.class), any(Long.class).longValue()).thenReturn(mockEJV[2]);
-		PowerMockito
-			.whenNew(ExecutionJobVertex.class)
-			.withArguments(any(ExecutionGraph.class), same(v4), any(Integer.class).intValue(),
-				any(FiniteDuration.class), any(Long.class).longValue()).thenReturn(mockEJV[3]);
-		PowerMockito
-			.whenNew(ExecutionJobVertex.class)
-			.withArguments(any(ExecutionGraph.class), same(v5), any(Integer.class).intValue(),
-				any(FiniteDuration.class), any(Long.class).longValue()).thenReturn(mockEJV[4]);
-
-		v1.setParallelism(dop[0]);
-		v2.setParallelism(dop[1]);
-		v3.setParallelism(dop[2]);
-		v4.setParallelism(dop[3]);
-		v5.setParallelism(dop[4]);
-
-		v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
-		mockNumberOfInputs(1,0);
-		v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
-		mockNumberOfInputs(3,1);
-		v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
-		mockNumberOfInputs(3,2);
-		v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
-		mockNumberOfInputs(4,3);
-		v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
-		mockNumberOfInputs(4,2);
-
-		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
-
-		eg = new ExecutionGraph(
-			TestingUtils.defaultExecutor(),
-			TestingUtils.defaultExecutor(),
-			jobId,
-			jobName,
-			cfg,
-			new SerializedValue<>(new ExecutionConfig()),
-			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy(),
-			new Scheduler(TestingUtils.defaultExecutionContext()));
-		eg.attachJobGraph(ordered);
-
-		f = eg.getClass().getDeclaredField("state");
-		f.setAccessible(true);
-	}
-
-	private void mockNumberOfInputs(int nodeIndex, int predecessorIndex) {
-		for(int j = 0; j < dop[nodeIndex]; ++j) {
-			when(mockEV[nodeIndex][j].getNumberOfInputs()).thenReturn(dop[predecessorIndex]);
-		}
-	}
-
-	@Test
-	public void testCancel() throws Exception {
-		assertEquals(JobStatus.CREATED, eg.getState());
-		eg.cancel();
-
-		verifyCancel(1);
-
-		f.set(eg, JobStatus.RUNNING);
-		eg.cancel();
-
-		verifyCancel(2);
-		assertEquals(JobStatus.CANCELLING, eg.getState());
-
-		eg.cancel();
-
-		verifyCancel(2);
-		assertEquals(JobStatus.CANCELLING, eg.getState());
-
-		f.set(eg, JobStatus.CANCELED);
-		eg.cancel();
-
-		verifyCancel(2);
-		assertEquals(JobStatus.CANCELED, eg.getState());
-
-		f.set(eg, JobStatus.FAILED);
-		eg.cancel();
-
-		verifyCancel(2);
-		assertEquals(JobStatus.FAILED, eg.getState());
-
-		f.set(eg, JobStatus.FAILING);
-		eg.cancel();
-
-		verifyCancel(2);
-		assertEquals(JobStatus.CANCELLING, eg.getState());
-
-		f.set(eg, JobStatus.FINISHED);
-		eg.cancel();
-
-		verifyCancel(2);
-		assertEquals(JobStatus.FINISHED, eg.getState());
-
-		f.set(eg, JobStatus.RESTARTING);
-		eg.cancel();
-
-		verifyCancel(2);
-		assertEquals(JobStatus.CANCELED, eg.getState());
-	}
-
-	private void verifyCancel(int times) {
-		for (int i = 0; i < mockEJV.length; ++i) {
-			verify(mockEJV[i], times(times)).cancel();
-		}
-	}
-
-	/**
-	 * Tests that suspend cancels the ExecutionJobVertices and transitions to SUSPENDED state.
-	 * Tests also that one cannot leave the SUSPENDED state to enter a terminal state.
-	 */
-	@Test
-	public void testSuspend() throws Exception {
-		assertEquals(JobStatus.CREATED, eg.getState());
-		Exception testException = new Exception("Test exception");
-
-		eg.suspend(testException);
-
-		verifyCancel(1);
-		assertEquals(JobStatus.SUSPENDED, eg.getState());
-
-		f.set(eg, JobStatus.RUNNING);
-
-		eg.suspend(testException);
-
-		verifyCancel(2);
-		assertEquals(JobStatus.SUSPENDED, eg.getState());
-
-		f.set(eg, JobStatus.FAILING);
-
-		eg.suspend(testException);
-
-		verifyCancel(3);
-		assertEquals(JobStatus.SUSPENDED, eg.getState());
-
-		f.set(eg, JobStatus.CANCELLING);
-
-		eg.suspend(testException);
-
-		verifyCancel(4);
-		assertEquals(JobStatus.SUSPENDED, eg.getState());
-
-		f.set(eg, JobStatus.FAILED);
-
-		eg.suspend(testException);
-
-		verifyCancel(4);
-		assertEquals(JobStatus.FAILED, eg.getState());
-
-		f.set(eg, JobStatus.FINISHED);
-
-		eg.suspend(testException);
-
-		verifyCancel(4);
-		assertEquals(JobStatus.FINISHED, eg.getState());
-
-		f.set(eg, JobStatus.CANCELED);
-
-		eg.suspend(testException);
-
-		verifyCancel(4);
-		assertEquals(JobStatus.CANCELED, eg.getState());
-
-		f.set(eg, JobStatus.SUSPENDED);
-
-		eg.fail(testException);
-
-		assertEquals(JobStatus.SUSPENDED, eg.getState());
-
-		eg.cancel();
-
-		assertEquals(JobStatus.SUSPENDED, eg.getState());
-	}
-
-	// test that all source tasks receive STOP signal
-	// test that all non-source tasks do not receive STOP signal
-	@Test
-	public void testStop() throws Exception {
-		Field f = eg.getClass().getDeclaredField("isStoppable");
-		f.setAccessible(true);
-		f.set(eg, true);
-
-		eg.stop();
-
-		for (int i : new int[]{0,2}) {
-			for (int j = 0; j < mockEV[i].length; ++j) {
-				verify(mockEV[i][j], times(1)).stop();
-			}
-		}
-
-		for (int i : new int[]{1,3,4}) {
-			for (int j = 0; j < mockEV[i].length; ++j) {
-				verify(mockEV[i][j], times(0)).stop();
-			}
-		}
-	}
-
-	/**
-	 * Test that failing in state restarting will retrigger the restarting logic. This means that
-	 * it only goes into the state FAILED after the restart strategy says the job is no longer
-	 * restartable.
-	 */
-	@Test
-	public void testFailureWhileRestarting() throws IllegalAccessException, NoSuchFieldException, InterruptedException {
-		Field restartStrategyField = eg.getClass().getDeclaredField("restartStrategy");
-		restartStrategyField.setAccessible(true);
-
-		restartStrategyField.set(eg, new InfiniteDelayRestartStrategy(1));
-
-		f.set(eg, JobStatus.RESTARTING);
-
-		eg.fail(new Exception("Test"));
-
-		// we should restart since we have one restart attempt left
-		assertEquals(JobStatus.RESTARTING, eg.getState());
-
-		eg.fail(new Exception("Test"));
-
-		// after depleting all our restart attempts we should go into Failed
-		assertEquals(JobStatus.FAILED, eg.getState());
-	}
-
-	/**
-	 * Tests that a {@link SuppressRestartsException} in state RESTARTING stops the restarting
-	 * immediately and sets the execution graph's state to FAILED.
-	 */
-	@Test
-	public void testSuppressRestartFailureWhileRestarting() throws IllegalAccessException, NoSuchFieldException {
-		Field restartStrategyField = eg.getClass().getDeclaredField("restartStrategy");
-		restartStrategyField.setAccessible(true);
-
-		restartStrategyField.set(eg, new InfiniteDelayRestartStrategy());
-
-		f.set(eg, JobStatus.RESTARTING);
-
-		// suppress a possible restart
-		eg.fail(new SuppressRestartsException(new Exception("Test")));
-
-		assertEquals(JobStatus.FAILED, eg.getState());
-	}
-
-	/**
-	 * Tests that we can suspend a job when in state RESTARTING.
-	 */
-	@Test
-	public void testSuspendWhileRestarting() throws IllegalAccessException, NoSuchFieldException {
-		Field restartStrategyField = eg.getClass().getDeclaredField("restartStrategy");
-		restartStrategyField.setAccessible(true);
-
-		restartStrategyField.set(eg, new InfiniteDelayRestartStrategy());
-
-		f.set(eg, JobStatus.RESTARTING);
-
-		final Exception exception = new Exception("Suspended");
-
-		eg.suspend(exception);
-
-		assertEquals(JobStatus.SUSPENDED, eg.getState());
-
-		assertEquals(exception, eg.getFailureCause());
-	}
-
-	// STOP only supported if all sources are stoppable 
-	@Test(expected = StoppingException.class)
-	public void testStopBatching() throws StoppingException {
-		eg.stop();
-	}
-
-	/**
-	 * Tests that a failing scheduleOrUpdateConsumers call with a non-existing execution attempt
-	 * id, will not fail the execution graph.
-	 */
-	@Test
-	public void testFailingScheduleOrUpdateConsumers() throws IllegalAccessException {
-		IntermediateResultPartitionID intermediateResultPartitionId = new IntermediateResultPartitionID();
-		// The execution attempt id does not exist and thus the scheduleOrUpdateConsumers call
-		// should fail
-		ExecutionAttemptID producerId = new ExecutionAttemptID();
-		ResultPartitionID resultPartitionId = new ResultPartitionID(intermediateResultPartitionId, producerId);
-
-		f.set(eg, JobStatus.RUNNING);
-
-		assertEquals(JobStatus.RUNNING, eg.getState());
-
-		try {
-			eg.scheduleOrUpdateConsumers(resultPartitionId);
-			fail("Expected ExecutionGraphException.");
-		} catch (ExecutionGraphException e) {
-			// we've expected this exception to occur
-		}
-
-		assertEquals(JobStatus.RUNNING, eg.getState());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
new file mode 100644
index 0000000..effe417
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.StoppingException;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.StoppableInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Validates that stop() calls are handled correctly.
+ */
+public class ExecutionGraphStopTest extends TestLogger {
+
+	/**
+	 * Tests that STOP is only supported if all sources are stoppable
+	 */
+	@Test
+	public void testStopIfSourcesNotStoppable() throws Exception {
+		final ExecutionGraph graph = ExecutionGraphTestUtils.createSimpleTestGraph();
+
+		try {
+			graph.stop();
+			fail("exception expected");
+		}
+		catch (StoppingException e) {
+			// expected
+		}
+	}
+
+	/**
+	 * Validates that stop is only sent to the sources
+	 * 
+	 * This test build a simple job with two sources and two non-source vertices.
+	 */
+	@Test
+	public void testStop() throws Exception {
+		final int sourcePar1 = 11;
+		final int sourcePar2 = 7;
+
+		final JobVertex source1 = new JobVertex("source 1");
+		source1.setInvokableClass(StoppableInvokable.class);
+		source1.setParallelism(sourcePar1);
+
+		final JobVertex source2 = new JobVertex("source 2");
+		source2.setInvokableClass(StoppableInvokable.class);
+		source2.setParallelism(sourcePar2);
+
+		final JobVertex nonSource1 = new JobVertex("non-source-1");
+		nonSource1.setInvokableClass(NoOpInvokable.class);
+		nonSource1.setParallelism(10);
+
+		final JobVertex nonSource2 = new JobVertex("non-source-2");
+		nonSource2.setInvokableClass(NoOpInvokable.class);
+		nonSource2.setParallelism(10);
+
+		nonSource1.connectNewDataSetAsInput(source1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+		nonSource1.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+		nonSource2.connectNewDataSetAsInput(nonSource1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+
+		final JobID jid = new JobID();
+		final ExecutionGraph eg = ExecutionGraphTestUtils.createSimpleTestGraph(
+				jid, source1, source2, nonSource1, nonSource2);
+
+		// we use different gateways for sources and non-sources to make sure the right ones
+		// get the RPC calls
+		final TaskManagerGateway sourceGateway = spy(new SimpleAckingTaskManagerGateway());
+		final TaskManagerGateway nonSourceGateway = spy(new SimpleAckingTaskManagerGateway());
+
+		// deploy source 1
+		for (ExecutionVertex ev : eg.getJobVertex(source1.getID()).getTaskVertices()) {
+			SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, sourceGateway);
+			ev.getCurrentExecutionAttempt().deployToSlot(slot);
+		}
+
+		// deploy source 2
+		for (ExecutionVertex ev : eg.getJobVertex(source2.getID()).getTaskVertices()) {
+			SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, sourceGateway);
+			ev.getCurrentExecutionAttempt().deployToSlot(slot);
+		}
+
+		// deploy non-source 1
+		for (ExecutionVertex ev : eg.getJobVertex(nonSource1.getID()).getTaskVertices()) {
+			SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, nonSourceGateway);
+			ev.getCurrentExecutionAttempt().deployToSlot(slot);
+		}
+
+		// deploy non-source 2
+		for (ExecutionVertex ev : eg.getJobVertex(nonSource2.getID()).getTaskVertices()) {
+			SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, nonSourceGateway);
+			ev.getCurrentExecutionAttempt().deployToSlot(slot);
+		}
+
+		eg.stop();
+
+		verify(sourceGateway, timeout(1000).times(sourcePar1 + sourcePar2)).stopTask(any(ExecutionAttemptID.class), any(Time.class));
+		verify(nonSourceGateway, times(0)).stopTask(any(ExecutionAttemptID.class), any(Time.class));
+
+		ExecutionGraphTestUtils.finishAllVertices(eg);
+	}
+
+	/**
+	 * Tests that the stopping RPC call is sent upon stopping requests.
+	 */
+	@Test
+	public void testStopRpc() throws Exception {
+		final JobID jid = new JobID();
+		final JobVertex vertex = new JobVertex("vertex");
+		vertex.setInvokableClass(NoOpInvokable.class);
+		vertex.setParallelism(5);
+
+		final ExecutionGraph graph = ExecutionGraphTestUtils.createSimpleTestGraph(jid, vertex);
+		final Execution exec = graph.getJobVertex(vertex.getID()).getTaskVertices()[0].getCurrentExecutionAttempt();
+
+		final TaskManagerGateway gateway = mock(TaskManagerGateway.class);
+		when(gateway.submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)))
+				.thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+		when(gateway.stopTask(any(ExecutionAttemptID.class), any(Time.class)))
+				.thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+
+		final SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, gateway);
+
+		exec.deployToSlot(slot);
+		exec.switchToRunning();
+		assertEquals(ExecutionState.RUNNING, exec.getState());
+
+		exec.stop();
+		assertEquals(ExecutionState.RUNNING, exec.getState());
+
+		verify(gateway, times(1)).stopTask(any(ExecutionAttemptID.class), any(Time.class));
+
+		exec.markFinished();
+		assertEquals(ExecutionState.FINISHED, exec.getState());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
new file mode 100644
index 0000000..b3af005
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+/**
+ * Validates that suspending out of various states works correctly.
+ */
+public class ExecutionGraphSuspendTest {
+
+	/**
+	 * Going into SUSPENDED out of CREATED should immediately cancel everything and
+	 * not send out RPC calls.
+	 */
+	@Test
+	public void testSuspendedOutOfCreated() throws Exception {
+		final TaskManagerGateway gateway = spy(new SimpleAckingTaskManagerGateway());
+		final int parallelism = 10;
+		final ExecutionGraph eg = createExecutionGraph(gateway, parallelism);
+
+		assertEquals(JobStatus.CREATED, eg.getState());
+
+		// suspend
+
+		eg.suspend(new Exception("suspend"));
+
+		assertEquals(JobStatus.SUSPENDED, eg.getState());
+		validateAllVerticesInState(eg, ExecutionState.CANCELED);
+		validateCancelRpcCalls(gateway, 0);
+
+		ensureCannotLeaveSuspendedState(eg, gateway);
+	}
+
+	/**
+	 * Going into SUSPENDED out of DEPLOYING vertices should cancel all vertices once with RPC calls.
+	 */
+	@Test
+	public void testSuspendedOutOfDeploying() throws Exception {
+		final TaskManagerGateway gateway = spy(new SimpleAckingTaskManagerGateway());
+		final int parallelism = 10;
+		final ExecutionGraph eg = createExecutionGraph(gateway, parallelism);
+
+		eg.scheduleForExecution();
+		assertEquals(JobStatus.RUNNING, eg.getState());
+		validateAllVerticesInState(eg, ExecutionState.DEPLOYING);
+
+		// suspend
+
+		eg.suspend(new Exception("suspend"));
+
+		assertEquals(JobStatus.SUSPENDED, eg.getState());
+		validateAllVerticesInState(eg, ExecutionState.CANCELING);
+		validateCancelRpcCalls(gateway, parallelism);
+
+		ensureCannotLeaveSuspendedState(eg, gateway);
+	}
+
+	/**
+	 * Going into SUSPENDED out of RUNNING vertices should cancel all vertices once with RPC calls.
+	 */
+	@Test
+	public void testSuspendedOutOfRunning() throws Exception {
+		final TaskManagerGateway gateway = spy(new SimpleAckingTaskManagerGateway());
+		final int parallelism = 10;
+		final ExecutionGraph eg = createExecutionGraph(gateway, parallelism);
+
+		eg.scheduleForExecution();
+		ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
+
+		assertEquals(JobStatus.RUNNING, eg.getState());
+		validateAllVerticesInState(eg, ExecutionState.RUNNING);
+
+		// suspend
+
+		eg.suspend(new Exception("suspend"));
+
+		assertEquals(JobStatus.SUSPENDED, eg.getState());
+		validateAllVerticesInState(eg, ExecutionState.CANCELING);
+		validateCancelRpcCalls(gateway, parallelism);
+
+		ensureCannotLeaveSuspendedState(eg, gateway);
+	}
+
+	/**
+	 * Suspending from FAILING goes to SUSPENDED and sends no additional RPC calls
+	 */
+	@Test
+	public void testSuspendedOutOfFailing() throws Exception {
+		final TaskManagerGateway gateway = spy(new SimpleAckingTaskManagerGateway());
+		final int parallelism = 10;
+		final ExecutionGraph eg = createExecutionGraph(gateway, parallelism);
+
+		eg.scheduleForExecution();
+		ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
+
+		eg.failGlobal(new Exception("fail global"));
+
+		assertEquals(JobStatus.FAILING, eg.getState());
+		validateCancelRpcCalls(gateway, parallelism);
+
+		// suspend
+		eg.suspend(new Exception("suspend"));
+		assertEquals(JobStatus.SUSPENDED, eg.getState());
+
+		ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
+
+		ensureCannotLeaveSuspendedState(eg, gateway);
+	}
+
+	/**
+	 * Suspending from FAILED should do nothing.
+	 */
+	@Test
+	public void testSuspendedOutOfFailed() throws Exception {
+		final TaskManagerGateway gateway = spy(new SimpleAckingTaskManagerGateway());
+		final int parallelism = 10;
+		final ExecutionGraph eg = createExecutionGraph(gateway, parallelism);
+
+		eg.scheduleForExecution();
+		ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
+
+		eg.failGlobal(new Exception("fail global"));
+
+		assertEquals(JobStatus.FAILING, eg.getState());
+		validateCancelRpcCalls(gateway, parallelism);
+
+		ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
+		assertEquals(JobStatus.FAILED, eg.getState());
+
+		// suspend
+		eg.suspend(new Exception("suspend"));
+
+		// still in failed state
+		assertEquals(JobStatus.FAILED, eg.getState());
+		validateCancelRpcCalls(gateway, parallelism);
+	}
+
+	/**
+	 * Suspending from CANCELING goes to SUSPENDED and sends no additional RPC calls. 
+	 */
+	@Test
+	public void testSuspendedOutOfCanceling() throws Exception {
+		final TaskManagerGateway gateway = spy(new SimpleAckingTaskManagerGateway());
+		final int parallelism = 10;
+		final ExecutionGraph eg = createExecutionGraph(gateway, parallelism);
+
+		eg.scheduleForExecution();
+		ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
+
+		eg.cancel();
+
+		assertEquals(JobStatus.CANCELLING, eg.getState());
+		validateCancelRpcCalls(gateway, parallelism);
+
+		// suspend
+		eg.suspend(new Exception("suspend"));
+		assertEquals(JobStatus.SUSPENDED, eg.getState());
+
+		ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
+
+		ensureCannotLeaveSuspendedState(eg, gateway);
+	}
+
+	/**
+	 * Suspending from CANCELLED should do nothing.
+	 */
+	@Test
+	public void testSuspendedOutOfCanceled() throws Exception {
+		final TaskManagerGateway gateway = spy(new SimpleAckingTaskManagerGateway());
+		final int parallelism = 10;
+		final ExecutionGraph eg = createExecutionGraph(gateway, parallelism);
+
+		eg.scheduleForExecution();
+		ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
+
+		eg.cancel();
+
+		assertEquals(JobStatus.CANCELLING, eg.getState());
+		validateCancelRpcCalls(gateway, parallelism);
+
+		ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
+		assertEquals(JobStatus.CANCELED, eg.getState());
+
+		// suspend
+		eg.suspend(new Exception("suspend"));
+
+		// still in failed state
+		assertEquals(JobStatus.CANCELED, eg.getState());
+		validateCancelRpcCalls(gateway, parallelism);
+	}
+
+	/**
+	 * Tests that we can suspend a job when in state RESTARTING.
+	 */
+	@Test
+	public void testSuspendWhileRestarting() throws Exception {
+		final ExecutionGraph eg = ExecutionGraphTestUtils.createSimpleTestGraph(new InfiniteDelayRestartStrategy(10));
+		eg.scheduleForExecution();
+
+		assertEquals(JobStatus.RUNNING, eg.getState());
+		ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
+
+		eg.failGlobal(new Exception("test"));
+		assertEquals(JobStatus.FAILING, eg.getState());
+
+		ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
+		assertEquals(JobStatus.RESTARTING, eg.getState());
+
+		final Exception exception = new Exception("Suspended");
+
+		eg.suspend(exception);
+
+		assertEquals(JobStatus.SUSPENDED, eg.getState());
+
+		assertEquals(exception, eg.getFailureCause());
+	}
+
+	// ------------------------------------------------------------------------
+	//  utilities
+	// ------------------------------------------------------------------------
+
+	private static void ensureCannotLeaveSuspendedState(ExecutionGraph eg, TaskManagerGateway gateway) {
+		assertEquals(JobStatus.SUSPENDED, eg.getState());
+		reset(gateway);
+
+		eg.failGlobal(new Exception("fail"));
+		assertEquals(JobStatus.SUSPENDED, eg.getState());
+		verifyNoMoreInteractions(gateway);
+
+		eg.cancel();
+		assertEquals(JobStatus.SUSPENDED, eg.getState());
+		verifyNoMoreInteractions(gateway);
+
+		eg.suspend(new Exception("suspend again"));
+		assertEquals(JobStatus.SUSPENDED, eg.getState());
+		verifyNoMoreInteractions(gateway);
+
+		for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
+			assertEquals(0, ev.getCurrentExecutionAttempt().getAttemptNumber());
+		}
+	}
+
+	private static void validateAllVerticesInState(ExecutionGraph eg, ExecutionState expected) {
+		for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
+			assertEquals(expected, ev.getCurrentExecutionAttempt().getState());
+		}
+	}
+
+	private static void validateCancelRpcCalls(TaskManagerGateway gateway, int num) {
+		verify(gateway, times(num)).cancelTask(any(ExecutionAttemptID.class), any(Time.class));
+	}
+
+	private static ExecutionGraph createExecutionGraph(TaskManagerGateway gateway, int parallelism) throws Exception {
+		final JobID jobId = new JobID();
+
+		final JobVertex vertex = new JobVertex("vertex");
+		vertex.setInvokableClass(NoOpInvokable.class);
+		vertex.setParallelism(parallelism);
+
+		final SlotProvider slotProvider = new SimpleSlotProvider(jobId, parallelism, gateway);
+
+		return ExecutionGraphTestUtils.createSimpleTestGraph(
+				jobId,
+				slotProvider,
+				new FixedDelayRestartStrategy(0, 0),
+				vertex);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 73838dc..0d7e389 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -18,51 +18,158 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-
-import java.lang.reflect.Field;
-import java.net.InetAddress;
-import java.util.concurrent.ScheduledExecutorService;
-
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
 import org.apache.flink.runtime.instance.BaseTestingActorGateway;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
-import org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPartitions;
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
+import org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPartitions;
+import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.SerializedValue;
 
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.ExecutionContext$;
 
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+/**
+ * A collection of utility methods for testing the ExecutionGraph and its related classes. 
+ */
 public class ExecutionGraphTestUtils {
 
-	// --------------------------------------------------------------------------------------------
+	private static final Logger TEST_LOGGER = LoggerFactory.getLogger(ExecutionGraphTestUtils.class);
+
+	// ------------------------------------------------------------------------
+	//  reaching states
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Waits until the job has reached a certain state.
+	 * 
+	 * <p>This method is based on polling and might miss very fast state transitions!
+	 */
+	public static void waitUntilJobStatus(ExecutionGraph eg, JobStatus status, long maxWaitMillis) 
+			throws TimeoutException {
+
+		checkNotNull(eg);
+		checkNotNull(status);
+		checkArgument(maxWaitMillis >= 0);
+
+		// this is a poor implementation - we may want to improve it eventually
+		final long deadline = maxWaitMillis == 0 ? Long.MAX_VALUE : System.nanoTime() + (maxWaitMillis * 1_000_000);
+
+		while (eg.getState() != status && System.nanoTime() < deadline) {
+			try { 
+				Thread.sleep(2);
+			} catch (InterruptedException ignored) {}
+		}
+
+		if (System.nanoTime() >= deadline) {
+			throw new TimeoutException();
+		}
+	}
+
+	/**
+	 * Waits until the Execution has reached a certain state.
+	 *
+	 * <p>This method is based on polling and might miss very fast state transitions!
+	 */
+	public static void waitUntilExecutionState(Execution execution, ExecutionState state, long maxWaitMillis)
+			throws TimeoutException {
+
+		checkNotNull(execution);
+		checkNotNull(state);
+		checkArgument(maxWaitMillis >= 0);
+
+		// this is a poor implementation - we may want to improve it eventually
+		final long deadline = maxWaitMillis == 0 ? Long.MAX_VALUE : System.nanoTime() + (maxWaitMillis * 1_000_000);
+
+		while (execution.getState() != state && System.nanoTime() < deadline) {
+			try {
+				Thread.sleep(2);
+			} catch (InterruptedException ignored) {}
+		}
+
+		if (System.nanoTime() >= deadline) {
+			throw new TimeoutException();
+		}
+	}
+
+	/**
+	 * Takes all vertices in the given ExecutionGraph and switches their current
+	 * execution to RUNNING.
+	 */
+	public static void switchAllVerticesToRunning(ExecutionGraph eg) {
+		for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
+			vertex.getCurrentExecutionAttempt().switchToRunning();
+		}
+	}
+
+	/**
+	 * Takes all vertices in the given ExecutionGraph and switches their current
+	 * execution to RUNNING.
+	 */
+	public static void completeCancellingForAllVertices(ExecutionGraph eg) {
+		for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
+			vertex.getCurrentExecutionAttempt().cancelingComplete();
+		}
+	}
+
+	/**
+	 * Takes all vertices in the given ExecutionGraph and switches their current
+	 * execution to RUNNING.
+	 */
+	public static void finishAllVertices(ExecutionGraph eg) {
+		for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
+			vertex.getCurrentExecutionAttempt().markFinished();
+		}
+	}
+
+	// ------------------------------------------------------------------------
 	//  state modifications
-	// --------------------------------------------------------------------------------------------
+	// ------------------------------------------------------------------------
 	
 	public static void setVertexState(ExecutionVertex vertex, ExecutionState state) {
 		try {
@@ -89,21 +196,105 @@ public class ExecutionGraphTestUtils {
 			throw new RuntimeException("Modifying the slot failed", e);
 		}
 	}
-	
-	public static void setGraphStatus(ExecutionGraph graph, JobStatus status) {
-		try {
-			Field f = ExecutionGraph.class.getDeclaredField("state");
-			f.setAccessible(true);
-			f.set(graph, status);
-		}
-		catch (Exception e) {
-			throw new RuntimeException("Modifying the status failed", e);
+
+	// ------------------------------------------------------------------------
+	//  Mocking Slots
+	// ------------------------------------------------------------------------
+
+	public static SimpleSlot createMockSimpleSlot(JobID jid, TaskManagerGateway gateway) {
+		final TaskManagerLocation location = new TaskManagerLocation(
+				ResourceID.generate(), InetAddress.getLoopbackAddress(), 6572);
+
+		final AllocatedSlot allocatedSlot = new AllocatedSlot(
+				new AllocationID(),
+				jid,
+				location,
+				0,
+				ResourceProfile.UNKNOWN,
+				gateway);
+
+		return new SimpleSlot(allocatedSlot, mock(SlotOwner.class), 0);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Mocking ExecutionGraph
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates an execution graph with on job vertex of parallelism 10 that does no restarts.
+	 */
+	public static ExecutionGraph createSimpleTestGraph() throws Exception {
+		return createSimpleTestGraph(new NoRestartStrategy());
+	}
+
+	/**
+	 * Creates an execution graph with on job vertex of parallelism 10, using the given
+	 * restart strategy.
+	 */
+	public static ExecutionGraph createSimpleTestGraph(RestartStrategy restartStrategy) throws Exception {
+		JobVertex vertex = new JobVertex("vertex");
+		vertex.setInvokableClass(NoOpInvokable.class);
+		vertex.setParallelism(10);
+
+		return createSimpleTestGraph(new JobID(), restartStrategy, vertex);
+	}
+
+	/**
+	 * Creates an execution graph containing the given vertices.
+	 * 
+	 * <p>The execution graph uses {@link NoRestartStrategy} as the restart strategy.
+	 */
+	public static ExecutionGraph createSimpleTestGraph(JobID jid, JobVertex... vertices) throws Exception {
+		return createSimpleTestGraph(jid, new NoRestartStrategy(), vertices);
+	}
+
+	/**
+	 * Creates an execution graph containing the given vertices and the given restart strategy.
+	 */
+	public static ExecutionGraph createSimpleTestGraph(
+			JobID jid,
+			RestartStrategy restartStrategy,
+			JobVertex... vertices) throws Exception {
+
+		int numSlotsNeeded = 0;
+		for (JobVertex vertex : vertices) {
+			numSlotsNeeded += vertex.getParallelism();
 		}
+
+		SlotProvider slotProvider = new SimpleSlotProvider(jid, numSlotsNeeded);
+
+		return createSimpleTestGraph(jid, slotProvider, restartStrategy, vertices);
 	}
-	
-	// --------------------------------------------------------------------------------------------
+
+	public static ExecutionGraph createSimpleTestGraph(
+			JobID jid,
+			SlotProvider slotProvider,
+			RestartStrategy restartStrategy,
+			JobVertex... vertices) throws Exception {
+
+		checkNotNull(jid);
+		checkNotNull(restartStrategy);
+		checkNotNull(vertices);
+
+		return ExecutionGraphBuilder.buildGraph(
+				null,
+				new JobGraph(jid, "test job", vertices),
+				new Configuration(),
+				TestingUtils.defaultExecutor(),
+				TestingUtils.defaultExecutor(),
+				slotProvider,
+				ExecutionGraphTestUtils.class.getClassLoader(),
+				mock(CheckpointRecoveryFactory.class),
+				Time.seconds(10),
+				restartStrategy,
+				new UnregisteredMetricsGroup(),
+				1,
+				TEST_LOGGER);
+	}
+
+	// ------------------------------------------------------------------------
 	//  utility mocking methods
-	// --------------------------------------------------------------------------------------------
+	// ------------------------------------------------------------------------
 
 	public static Instance getInstance(final TaskManagerGateway gateway) throws Exception {
 		return getInstance(gateway, 1);
@@ -188,17 +379,7 @@ public class ExecutionGraphTestUtils {
 			new NoRestartStrategy(),
 			new Scheduler(ExecutionContext$.MODULE$.fromExecutor(executor)));
 
-		ExecutionJobVertex ejv = spy(new ExecutionJobVertex(graph, ajv, 1,
-				AkkaUtils.getDefaultTimeout()));
-
-		Answer<Void> noop = new Answer<Void>() {
-			@Override
-			public Void answer(InvocationOnMock invocation) {
-				return null;
-			}
-		};
-
-		return ejv;
+		return spy(new ExecutionJobVertex(graph, ajv, 1, AkkaUtils.getDefaultTimeout()));
 	}
 	
 	public static ExecutionJobVertex getExecutionVertex(JobVertexID id) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java
new file mode 100644
index 0000000..0797ef9
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.execution.SuppressRestartsException;
+import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+
+public class ExecutionGraphVariousFailuesTest {
+
+	/**
+	 * Test that failing in state restarting will retrigger the restarting logic. This means that
+	 * it only goes into the state FAILED after the restart strategy says the job is no longer
+	 * restartable.
+	 */
+	@Test
+	public void testFailureWhileRestarting() throws Exception {
+		final ExecutionGraph eg = ExecutionGraphTestUtils.createSimpleTestGraph(new InfiniteDelayRestartStrategy(2));
+		eg.scheduleForExecution();
+
+		assertEquals(JobStatus.RUNNING, eg.getState());
+		ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
+
+		eg.failGlobal(new Exception("Test 1"));
+		assertEquals(JobStatus.FAILING, eg.getState());
+		ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
+
+		// we should restart since we have two restart attempts left
+		assertEquals(JobStatus.RESTARTING, eg.getState());
+
+		eg.failGlobal(new Exception("Test 2"));
+
+		// we should restart since we have one restart attempts left
+		assertEquals(JobStatus.RESTARTING, eg.getState());
+
+		eg.failGlobal(new Exception("Test 3"));
+
+		// after depleting all our restart attempts we should go into Failed
+		assertEquals(JobStatus.FAILED, eg.getState());
+	}
+
+	/**
+	 * Tests that a {@link SuppressRestartsException} in state RESTARTING stops the restarting
+	 * immediately and sets the execution graph's state to FAILED.
+	 */
+	@Test
+	public void testSuppressRestartFailureWhileRestarting() throws Exception {
+		final ExecutionGraph eg = ExecutionGraphTestUtils.createSimpleTestGraph(new InfiniteDelayRestartStrategy(10));
+		eg.scheduleForExecution();
+
+		assertEquals(JobStatus.RUNNING, eg.getState());
+		ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
+
+		eg.failGlobal(new Exception("test"));
+		assertEquals(JobStatus.FAILING, eg.getState());
+
+		ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
+		assertEquals(JobStatus.RESTARTING, eg.getState());
+
+		// suppress a possible restart
+		eg.failGlobal(new SuppressRestartsException(new Exception("Test")));
+
+		assertEquals(JobStatus.FAILED, eg.getState());
+	}
+
+	/**
+	 * Tests that a failing scheduleOrUpdateConsumers call with a non-existing execution attempt
+	 * id, will not fail the execution graph.
+	 */
+	@Test
+	public void testFailingScheduleOrUpdateConsumers() throws Exception {
+		final ExecutionGraph eg = ExecutionGraphTestUtils.createSimpleTestGraph(new InfiniteDelayRestartStrategy(10));
+		eg.scheduleForExecution();
+
+		assertEquals(JobStatus.RUNNING, eg.getState());
+		ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
+
+		IntermediateResultPartitionID intermediateResultPartitionId = new IntermediateResultPartitionID();
+		ExecutionAttemptID producerId = new ExecutionAttemptID();
+		ResultPartitionID resultPartitionId = new ResultPartitionID(intermediateResultPartitionId, producerId);
+
+		// The execution attempt id does not exist and thus the scheduleOrUpdateConsumers call
+		// should fail
+
+		try {
+			eg.scheduleOrUpdateConsumers(resultPartitionId);
+			fail("Expected ExecutionGraphException.");
+		} catch (ExecutionGraphException e) {
+			// we've expected this exception to occur
+		}
+
+		assertEquals(JobStatus.RUNNING, eg.getState());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index 82561b2..fe6c3cd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
@@ -403,41 +404,25 @@ public class ExecutionVertexCancelTest {
 	}
 
 	@Test
-	public void testSendCancelAndReceiveFail() {
-		try {
-			final JobVertexID jid = new JobVertexID();
-			final ExecutionJobVertex ejv = getExecutionVertex(jid);
-
-			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
-					AkkaUtils.getDefaultTimeout());
+	public void testSendCancelAndReceiveFail() throws Exception {
+		final ExecutionGraph graph = ExecutionGraphTestUtils.createSimpleTestGraph();
 
-			final ActorGateway gateway = new CancelSequenceActorGateway(
-					TestingUtils.defaultExecutionContext(),
-					1);
+		graph.scheduleForExecution();
+		ExecutionGraphTestUtils.switchAllVerticesToRunning(graph);
+		assertEquals(JobStatus.RUNNING, graph.getState());
 
-			Instance instance = getInstance(new ActorTaskManagerGateway(gateway));
-			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+		final ExecutionVertex[] vertices = graph.getVerticesTopologically().iterator().next().getTaskVertices();
+		assertEquals(vertices.length, graph.getRegisteredExecutions().size());
 
-			setVertexState(vertex, ExecutionState.RUNNING);
-			setVertexResource(vertex, slot);
+		final Execution exec = vertices[3].getCurrentExecutionAttempt();
+		exec.cancel();
+		assertEquals(ExecutionState.CANCELING, exec.getState());
 
-			assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
+		exec.markFailed(new Exception("test"));
+		assertTrue(exec.getState() == ExecutionState.FAILED || exec.getState() == ExecutionState.CANCELED);
 
-			vertex.cancel();
-
-			assertTrue(vertex.getExecutionState() == ExecutionState.CANCELING || vertex.getExecutionState() == ExecutionState.FAILED);
-
-			vertex.getCurrentExecutionAttempt().markFailed(new Throwable("test"));
-
-			assertTrue(vertex.getExecutionState() == ExecutionState.CANCELED || vertex.getExecutionState() == ExecutionState.FAILED);
-
-			assertTrue(slot.isReleased());
-
-			assertEquals(0, vertex.getExecutionGraph().getRegisteredExecutions().size());
-		} catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+		assertTrue(exec.getAssignedResource().isCanceled());
+		assertEquals(vertices.length - 1, exec.getVertex().getExecutionGraph().getRegisteredExecutions().size());
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
index eb85a33..0eed90d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
@@ -156,7 +156,7 @@ public class ExecutionVertexLocalityTest extends TestLogger {
 
 		// mimic a restart: all vertices get re-initialized without actually being executed
 		for (ExecutionJobVertex ejv : graph.getVerticesTopologically()) {
-			ejv.resetForNewExecution();
+			ejv.resetForNewExecution(System.currentTimeMillis(), graph.getGlobalModVersion());
 		}
 
 		// set new location for the sources and some state for the targets

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexStopTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexStopTest.java
deleted file mode 100644
index 5edaf91..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexStopTest.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.BaseTestingActorGateway;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.messages.TaskMessages;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.TestLogger;
-import org.junit.AfterClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import scala.concurrent.ExecutionContext;
-import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState;
-import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
-import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(ExecutionVertex.class)
-public class ExecutionVertexStopTest extends TestLogger {
-
-	private static ActorSystem system;
-
-	private static boolean receivedStopSignal;
-
-	@AfterClass
-	public static void teardown(){
-		if(system != null) {
-			JavaTestKit.shutdownActorSystem(system);
-			system = null;
-		}
-	}
-
-	@Test
-	public void testStop() throws Exception {
-		final JobVertexID jid = new JobVertexID();
-		final ExecutionJobVertex ejv = getExecutionVertex(jid);
-
-		Execution executionMock = mock(Execution.class);
-		whenNew(Execution.class).withAnyArguments().thenReturn(executionMock);
-
-		final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
-				AkkaUtils.getDefaultTimeout());
-
-		vertex.stop();
-
-		verify(executionMock).stop();
-	}
-
-	@Test
-	public void testStopRpc() throws Exception {
-		final JobVertexID jid = new JobVertexID();
-		final ExecutionJobVertex ejv = getExecutionVertex(jid);
-
-		final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
-				AkkaUtils.getDefaultTimeout());
-		final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
-
-		setVertexState(vertex, ExecutionState.SCHEDULED);
-		assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState());
-
-		final ActorGateway gateway = new StopSequenceInstanceGateway(
-				TestingUtils.defaultExecutionContext());
-
-		Instance instance = getInstance(new ActorTaskManagerGateway(gateway));
-		SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
-
-		vertex.deployToSlot(slot);
-
-		receivedStopSignal = false;
-		vertex.stop();
-		assertTrue(receivedStopSignal);
-	}
-
-	public static class StopSequenceInstanceGateway extends BaseTestingActorGateway {
-		private static final long serialVersionUID = 7611571264006653627L;
-
-		public StopSequenceInstanceGateway(ExecutionContext executionContext) {
-			super(executionContext);
-		}
-
-		@Override
-		public Object handleMessage(Object message) throws Exception {
-			Object result = null;
-			if (message instanceof TaskMessages.SubmitTask) {
-				result = Acknowledge.get();
-			} else if (message instanceof TaskMessages.StopTask) {
-				result = Acknowledge.get();
-				receivedStopSignal = true;
-			}
-
-			return result;
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FinalizeOnMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FinalizeOnMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FinalizeOnMasterTest.java
new file mode 100644
index 0000000..8e0c04d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FinalizeOnMasterTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createSimpleTestGraph;
+
+import static org.junit.Assert.assertEquals;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests that the {@link JobVertex#finalizeOnMaster(ClassLoader)} is called properly and
+ * only when the execution graph reaches the a successful final state.
+ */
+public class FinalizeOnMasterTest extends TestLogger {
+
+	@Test
+	public void testFinalizeIsCalledUponSuccess() throws Exception {
+		final JobID jid = new JobID();
+
+		final JobVertex vertex1 = spy(new JobVertex("test vertex 1"));
+		vertex1.setInvokableClass(NoOpInvokable.class);
+		vertex1.setParallelism(3);
+
+		final JobVertex vertex2 = spy(new JobVertex("test vertex 2"));
+		vertex2.setInvokableClass(NoOpInvokable.class);
+		vertex2.setParallelism(2);
+
+		final ExecutionGraph eg = createSimpleTestGraph(jid, vertex1, vertex2);
+		eg.scheduleForExecution();
+		assertEquals(JobStatus.RUNNING, eg.getState());
+		
+		ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
+
+		// move all vertices to finished state
+		ExecutionGraphTestUtils.finishAllVertices(eg);
+		assertEquals(JobStatus.FINISHED, eg.waitUntilTerminal());
+
+		verify(vertex1, times(1)).finalizeOnMaster(any(ClassLoader.class));
+		verify(vertex2, times(1)).finalizeOnMaster(any(ClassLoader.class));
+
+		assertEquals(0, eg.getRegisteredExecutions().size());
+	}
+
+	@Test
+	public void testFinalizeIsNotCalledUponFailure() throws Exception {
+		final JobID jid = new JobID();
+
+		final JobVertex vertex = spy(new JobVertex("test vertex 1"));
+		vertex.setInvokableClass(NoOpInvokable.class);
+		vertex.setParallelism(1);
+
+		final ExecutionGraph eg = createSimpleTestGraph(jid, vertex);
+		eg.scheduleForExecution();
+		assertEquals(JobStatus.RUNNING, eg.getState());
+
+		ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
+
+		// fail the execution
+		final Execution exec = eg.getJobVertex(vertex.getID()).getTaskVertices()[0].getCurrentExecutionAttempt();
+		exec.fail(new Exception("test"));
+
+		assertEquals(JobStatus.FAILED, eg.waitUntilTerminal());
+
+		verify(vertex, times(0)).finalizeOnMaster(any(ClassLoader.class));
+
+		assertEquals(0, eg.getRegisteredExecutions().size());
+	}
+}


[5/5] flink git commit: [FLINK-6312] [build] Update curator version to 2.12.0

Posted by se...@apache.org.
[FLINK-6312] [build] Update curator version to 2.12.0

The updated curator version includes a bugfix for a potential block


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aadfe45a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aadfe45a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aadfe45a

Branch: refs/heads/master
Commit: aadfe45a880d0f71f23c7a742ff5264e8dc14bf8
Parents: 821ec80
Author: WangTaoTheTonic <wa...@huawei.com>
Authored: Mon Apr 17 14:55:27 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed May 3 19:17:23 2017 +0200

----------------------------------------------------------------------
 flink-contrib/flink-storm-examples/pom.xml                   | 4 ++++
 .../checkpoint/ZooKeeperCompletedCheckpointStoreTest.java    | 8 ++++----
 pom.xml                                                      | 2 +-
 3 files changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/aadfe45a/flink-contrib/flink-storm-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/pom.xml b/flink-contrib/flink-storm-examples/pom.xml
index b9da214..d042dde 100644
--- a/flink-contrib/flink-storm-examples/pom.xml
+++ b/flink-contrib/flink-storm-examples/pom.xml
@@ -84,6 +84,10 @@ under the License.
 					<groupId>org.apache.storm</groupId>
 					<artifactId>storm-core</artifactId>
 				</exclusion>
+				<exclusion>
+					<groupId>org.apache.curator</groupId>
+					<artifactId>curator-framework</artifactId>
+				</exclusion>
 			</exclusions>
 		</dependency>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/aadfe45a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index 8fc0f02..0d22dc6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -22,7 +22,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.CuratorEventType;
-import org.apache.curator.framework.api.Pathable;
+import org.apache.curator.framework.api.ErrorListenerPathable;
 import org.apache.curator.utils.EnsurePath;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.concurrent.Executors;
@@ -128,12 +128,12 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
 				.delete()
 				.deletingChildrenIfNeeded()
 				.inBackground(any(BackgroundCallback.class), any(Executor.class))
-		).thenAnswer(new Answer<Pathable<Void>>() {
+		).thenAnswer(new Answer<ErrorListenerPathable<Void>>() {
 			@Override
-			public Pathable<Void> answer(InvocationOnMock invocation) throws Throwable {
+			public ErrorListenerPathable<Void> answer(InvocationOnMock invocation) throws Throwable {
 				final BackgroundCallback callback = (BackgroundCallback) invocation.getArguments()[0];
 
-				Pathable<Void> result = mock(Pathable.class);
+				ErrorListenerPathable<Void> result = mock(ErrorListenerPathable.class);
 
 				when(result.forPath(anyString())).thenAnswer(new Answer<Void>() {
 					@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/aadfe45a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f822198..71ad779 100644
--- a/pom.xml
+++ b/pom.xml
@@ -101,7 +101,7 @@ under the License.
 		<chill.version>0.7.4</chill.version>
 		<asm.version>5.0.4</asm.version>
 		<zookeeper.version>3.4.6</zookeeper.version>
-		<curator.version>2.8.0</curator.version>
+		<curator.version>2.12.0</curator.version>
 		<jackson.version>2.7.4</jackson.version>
 		<metrics.version>3.1.0</metrics.version>
 		<junit.version>4.12</junit.version>