You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/05/06 21:29:16 UTC

[2/4] flink git commit: [FLINK-5867] [flip-1] Support restarting only pipelined sub-regions of the ExecutionGraph on task failure

[FLINK-5867] [flip-1] Support restarting only pipelined sub-regions of the ExecutionGraph on task failure


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4eb9e46e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4eb9e46e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4eb9e46e

Branch: refs/heads/master
Commit: 4eb9e46ee31e3f003c1a92322d13056cb4d4cfd5
Parents: b01d737
Author: shuai.xus <sh...@alibaba-inc.com>
Authored: Tue Apr 18 14:15:29 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat May 6 21:40:04 2017 +0200

----------------------------------------------------------------------
 .../executiongraph/failover/FailoverRegion.java | 251 ++++++++++
 .../failover/FailoverStrategyLoader.java        |   8 +-
 .../RestartPipelinedRegionStrategy.java         | 206 ++++++++
 .../executiongraph/ExecutionGraphTestUtils.java |  38 +-
 .../executiongraph/FailoverRegionTest.java      | 490 +++++++++++++++++++
 .../PipelinedRegionFailoverConcurrencyTest.java | 353 +++++++++++++
 .../RestartPipelinedRegionStrategyTest.java     | 396 +++++++++++++++
 7 files changed, 1731 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4eb9e46e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
new file mode 100644
index 0000000..b36cfcf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * FailoverRegion manages the failover of a minimal pipeline connected sub graph.
+ * It will change from CREATED to CANCELING and then to CANCELLED and at last to RUNNING,
+ */
+public class FailoverRegion {
+
+	private static final AtomicReferenceFieldUpdater<FailoverRegion, JobStatus> STATE_UPDATER =
+			AtomicReferenceFieldUpdater.newUpdater(FailoverRegion.class, JobStatus.class, "state");
+
+	/** The log object used for debugging. */
+	private static final Logger LOG = LoggerFactory.getLogger(FailoverRegion.class);
+
+	// ------------------------------------------------------------------------
+
+	/** a unique id for debugging */
+	private final AbstractID id = new AbstractID();
+
+	private final ExecutionGraph executionGraph;
+
+	private final List<ExecutionVertex> connectedExecutionVertexes;
+
+	/** The executor that executes the recovery action after all vertices are in a */
+	private final Executor executor;
+
+	/** Current status of the job execution */
+	private volatile JobStatus state = JobStatus.RUNNING;
+
+
+	public FailoverRegion(ExecutionGraph executionGraph, Executor executor, List<ExecutionVertex> connectedExecutions) {
+		this.executionGraph = checkNotNull(executionGraph);
+		this.executor = checkNotNull(executor);
+		this.connectedExecutionVertexes = checkNotNull(connectedExecutions);
+
+		LOG.debug("Created failover region {} with vertices: {}", id, connectedExecutions);
+	}
+
+	public void onExecutionFail(Execution taskExecution, Throwable cause) {
+		// TODO: check if need to failover the preceding region
+		if (!executionGraph.getRestartStrategy().canRestart()) {
+			// delegate the failure to a global fail that will check the restart strategy and not restart
+			executionGraph.failGlobal(cause);
+		}
+		else {
+			cancel(taskExecution.getGlobalModVersion());
+		}
+	}
+
+	private void allVerticesInTerminalState(long globalModVersionOfFailover) {
+		while (true) {
+			JobStatus curStatus = this.state;
+			if (curStatus.equals(JobStatus.CANCELLING)) {
+				if (transitionState(curStatus, JobStatus.CANCELED)) {
+					reset(globalModVersionOfFailover);
+					break;
+				}
+			}
+			else {
+				LOG.info("FailoverRegion {} is {} when allVerticesInTerminalState.", id, state);
+				break;
+			}
+		}
+	}
+
+	public JobStatus getState() {
+		return state;
+	}
+
+	/**
+	 * get all execution vertexes contained in this region
+	 */
+	public List<ExecutionVertex> getAllExecutionVertexes() {
+		return connectedExecutionVertexes;
+	}
+
+	// Notice the region to failover, 
+	private void failover(long globalModVersionOfFailover) {
+		if (!executionGraph.getRestartStrategy().canRestart()) {
+			executionGraph.failGlobal(new FlinkException("RestartStrategy validate fail"));
+		}
+		else {
+			JobStatus curStatus = this.state;
+			if (curStatus.equals(JobStatus.RUNNING)) {
+				cancel(globalModVersionOfFailover);
+			}
+			else if (curStatus.equals(JobStatus.CANCELED)) {
+				reset(globalModVersionOfFailover);
+			}
+			else {
+				LOG.info("FailoverRegion {} is {} when notified to failover.", id, state);
+			}
+		}
+	}
+
+	// cancel all executions in this sub graph
+	private void cancel(final long globalModVersionOfFailover) {
+		while (true) {
+			JobStatus curStatus = this.state;
+			if (curStatus.equals(JobStatus.RUNNING)) {
+				if (transitionState(curStatus, JobStatus.CANCELLING)) {
+
+					// we build a future that is complete once all vertices have reached a terminal state
+					final ArrayList<Future<?>> futures = new ArrayList<>(connectedExecutionVertexes.size());
+
+					// cancel all tasks (that still need cancelling)
+					for (ExecutionVertex vertex : connectedExecutionVertexes) {
+						futures.add(vertex.cancel());
+					}
+
+					final FutureUtils.ConjunctFuture allTerminal = FutureUtils.combineAll(futures);
+					allTerminal.thenAcceptAsync(new AcceptFunction<Void>() {
+						@Override
+						public void accept(Void value) {
+							allVerticesInTerminalState(globalModVersionOfFailover);
+						}
+					}, executor);
+
+					break;
+				}
+			}
+			else {
+				LOG.info("FailoverRegion {} is {} when cancel.", id, state);
+				break;
+			}
+		}
+	}
+
+	// reset all executions in this sub graph
+	private void reset(long globalModVersionOfFailover) {
+		try {
+			// reset all connected ExecutionVertexes
+			final Collection<CoLocationGroup> colGroups = new HashSet<>();
+			final long restartTimestamp = System.currentTimeMillis();
+
+			for (ExecutionVertex ev : connectedExecutionVertexes) {
+				CoLocationGroup cgroup = ev.getJobVertex().getCoLocationGroup();
+				if (cgroup != null && !colGroups.contains(cgroup)){
+					cgroup.resetConstraints();
+					colGroups.add(cgroup);
+				}
+
+				ev.resetForNewExecution(restartTimestamp, globalModVersionOfFailover);
+			}
+			if (transitionState(JobStatus.CANCELED, JobStatus.CREATED)) {
+				restart(globalModVersionOfFailover);
+			}
+			else {
+				LOG.info("FailoverRegion {} switched from CANCELLING to CREATED fail, will fail this region again.", id);
+				failover(globalModVersionOfFailover);
+			}
+		}
+		catch (GlobalModVersionMismatch e) {
+			// happens when a global recovery happens concurrently to the regional recovery
+			// go back to a clean state
+			state = JobStatus.RUNNING;
+		}
+		catch (Throwable e) {
+			LOG.info("FailoverRegion {} reset fail, will failover again.", id);
+			failover(globalModVersionOfFailover);
+		}
+	}
+
+	// restart all executions in this sub graph
+	private void restart(long globalModVersionOfFailover) {
+		try {
+			if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
+				// if we have checkpointed state, reload it into the executions
+				//TODO: checkpoint support restore part ExecutionVertex cp
+				/**
+				if (executionGraph.getCheckpointCoordinator() != null) {
+					executionGraph.getCheckpointCoordinator().restoreLatestCheckpointedState(
+							connectedExecutionVertexes, false, false);
+				}
+				*/
+				//TODO, use restart strategy to schedule them.
+				//restart all connected ExecutionVertexes
+				for (ExecutionVertex ev : connectedExecutionVertexes) {
+					try {
+						ev.scheduleForExecution(
+								executionGraph.getSlotProvider(),
+								executionGraph.isQueuedSchedulingAllowed());
+					}
+					catch (Throwable e) {
+						failover(globalModVersionOfFailover);
+					}
+				}
+			}
+			else {
+				LOG.info("FailoverRegion {} switched from CREATED to RUNNING fail, will fail this region again.", id);
+				failover(globalModVersionOfFailover);
+			}
+		} catch (Exception e) {
+			LOG.info("FailoverRegion {} restart failed, failover again.", id, e);
+			failover(globalModVersionOfFailover);
+		}
+	}
+
+	private boolean transitionState(JobStatus current, JobStatus newState) {
+		if (STATE_UPDATER.compareAndSet(this, current, newState)) {
+			LOG.info("FailoverRegion {} switched from state {} to {}.", id, current, newState);
+			return true;
+		}
+		else {
+			return false;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4eb9e46e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
index f18a90f..8b6fa6e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
@@ -35,9 +35,12 @@ public class FailoverStrategyLoader {
 	/** Config name for the {@link RestartAllStrategy} */
 	public static final String FULL_RESTART_STRATEGY_NAME = "full";
 
-	/** Config name for the strategy that restarts individual tasks */
+	/** Config name for the {@link RestartIndividualStrategy} */
 	public static final String INDIVIDUAL_RESTART_STRATEGY_NAME = "individual";
 
+	/** Config name for the {@link RestartPipelinedRegionStrategy} */
+	public static final String PIPELINED_REGION_RESTART_STRATEGY_NAME = "region";
+
 	// ------------------------------------------------------------------------
 
 	/**
@@ -59,6 +62,9 @@ public class FailoverStrategyLoader {
 				case FULL_RESTART_STRATEGY_NAME:
 					return new RestartAllStrategy.Factory();
 
+				case PIPELINED_REGION_RESTART_STRATEGY_NAME:
+					return new RestartPipelinedRegionStrategy.Factory();
+
 				case INDIVIDUAL_RESTART_STRATEGY_NAME:
 					return new RestartIndividualStrategy.Factory();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4eb9e46e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java
new file mode 100644
index 0000000..0a5baa8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionEdge;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A failover strategy that restarts regions of the ExecutionGraph. A region is defined
+ * by this strategy as the weakly connected component of tasks that communicate via pipelined
+ * data exchange.
+ */
+public class RestartPipelinedRegionStrategy extends FailoverStrategy {
+
+	/** The log object used for debugging. */
+	private static final Logger LOG = LoggerFactory.getLogger(RestartPipelinedRegionStrategy.class);
+
+	/** The execution graph on which this FailoverStrategy works */
+	private final ExecutionGraph executionGraph;
+
+	/** The executor used for future actions */
+	private final Executor executor;
+
+	/** Fast lookup from vertex to failover region */
+	private final HashMap<ExecutionVertex, FailoverRegion> vertexToRegion;
+
+
+	/**
+	 * Creates a new failover strategy to restart pipelined regions that works on the given
+	 * execution graph and uses the execution graph's future executor to call restart actions.
+	 * 
+	 * @param executionGraph The execution graph on which this FailoverStrategy will work
+	 */
+	public RestartPipelinedRegionStrategy(ExecutionGraph executionGraph) {
+		this(executionGraph, executionGraph.getFutureExecutor());
+	}
+
+	/**
+	 * Creates a new failover strategy to restart pipelined regions that works on the given
+	 * execution graph and uses the given executor to call restart actions.
+	 * 
+	 * @param executionGraph The execution graph on which this FailoverStrategy will work
+	 * @param executor  The executor used for future actions
+	 */
+	public RestartPipelinedRegionStrategy(ExecutionGraph executionGraph, Executor executor) {
+		this.executionGraph = checkNotNull(executionGraph);
+		this.executor = checkNotNull(executor);
+		this.vertexToRegion = new HashMap<>();
+	}
+
+	// ------------------------------------------------------------------------
+	//  failover implementation
+	// ------------------------------------------------------------------------ 
+
+	@Override
+	public void onTaskFailure(Execution taskExecution, Throwable cause) {
+		final ExecutionVertex ev = taskExecution.getVertex();
+		final FailoverRegion failoverRegion = vertexToRegion.get(ev);
+
+		if (failoverRegion == null) {
+			executionGraph.failGlobal(new FlinkException(
+					"Can not find a failover region for the execution " + ev.getTaskNameWithSubtaskIndex()));
+		}
+		else {
+			failoverRegion.onExecutionFail(taskExecution, cause);
+		}
+	}
+
+	@Override
+	public void notifyNewVertices(List<ExecutionJobVertex> newJobVerticesTopological) {
+		LOG.debug("Generating failover regions for {} new job vertices", newJobVerticesTopological.size());
+		generateAllFailoverRegion(newJobVerticesTopological);
+	}
+
+	@Override
+	public String getStrategyName() {
+		return "Pipelined Region Failover";
+	}
+
+	// Generate all the FailoverRegion from the new added job vertexes
+	private void generateAllFailoverRegion(List<ExecutionJobVertex> newJobVerticesTopological) {
+		for (ExecutionJobVertex ejv : newJobVerticesTopological) {
+			for (ExecutionVertex ev : ejv.getTaskVertices()) {
+				if (getFailoverRegion(ev) != null) {
+					continue;
+				}
+				List<ExecutionVertex> pipelinedExecutions = new ArrayList<>();
+				List<ExecutionVertex> orgExecutions = new ArrayList<>();
+				orgExecutions.add(ev);
+				pipelinedExecutions.add(ev);
+				getAllPipelinedConnectedVertexes(orgExecutions, pipelinedExecutions);
+
+				FailoverRegion region = new FailoverRegion(executionGraph, executor, pipelinedExecutions);
+				for (ExecutionVertex vertex : pipelinedExecutions) {
+					vertexToRegion.put(vertex, region);
+				}
+			}
+		}
+	}
+
+	/**
+	 * Get all connected executions of the original executions
+	 *
+	 * @param orgExecutions  the original execution vertexes
+	 * @param connectedExecutions  the total connected executions
+	 */
+	private static void getAllPipelinedConnectedVertexes(List<ExecutionVertex> orgExecutions, List<ExecutionVertex> connectedExecutions) {
+		List<ExecutionVertex> newAddedExecutions = new ArrayList<>();
+		for (ExecutionVertex ev : orgExecutions) {
+			// Add downstream ExecutionVertex
+			for (IntermediateResultPartition irp : ev.getProducedPartitions().values()) {
+				if (irp.getIntermediateResult().getResultType().isPipelined()) {
+					for (List<ExecutionEdge> consumers : irp.getConsumers()) {
+						for (ExecutionEdge consumer : consumers) {
+							ExecutionVertex cev = consumer.getTarget();
+							if (!connectedExecutions.contains(cev)) {
+								newAddedExecutions.add(cev);
+							}
+						}
+					}
+				}
+			}
+			if (!newAddedExecutions.isEmpty()) {
+				connectedExecutions.addAll(newAddedExecutions);
+				getAllPipelinedConnectedVertexes(newAddedExecutions, connectedExecutions);
+				newAddedExecutions.clear();
+			}
+			// Add upstream ExecutionVertex
+			int inputNum = ev.getNumberOfInputs();
+			for (int i = 0; i < inputNum; i++) {
+				for (ExecutionEdge input : ev.getInputEdges(i)) {
+					if (input.getSource().getIntermediateResult().getResultType().isPipelined()) {
+						ExecutionVertex pev = input.getSource().getProducer();
+						if (!connectedExecutions.contains(pev)) {
+							newAddedExecutions.add(pev);
+						}
+					}
+				}
+			}
+			if (!newAddedExecutions.isEmpty()) {
+				connectedExecutions.addAll(0, newAddedExecutions);
+				getAllPipelinedConnectedVertexes(newAddedExecutions, connectedExecutions);
+				newAddedExecutions.clear();
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  testing
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Finds the failover region that contains the given execution vertex.
+ 	 */
+	@VisibleForTesting
+	public FailoverRegion getFailoverRegion(ExecutionVertex ev) {
+		return vertexToRegion.get(ev);
+	}
+
+	// ------------------------------------------------------------------------
+	//  factory
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Factory that instantiates the RestartPipelinedRegionStrategy.
+	 */
+	public static class Factory implements FailoverStrategy.Factory {
+
+		@Override
+		public FailoverStrategy create(ExecutionGraph executionGraph) {
+			return new RestartPipelinedRegionStrategy(executionGraph);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4eb9e46e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 0d7e389..140e984 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.failover.FailoverRegion;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
@@ -137,6 +138,27 @@ public class ExecutionGraphTestUtils {
 		}
 	}
 
+	public static void waitUntilFailoverRegionState(FailoverRegion region, JobStatus status, long maxWaitMillis)
+			throws TimeoutException {
+
+		checkNotNull(region);
+		checkNotNull(status);
+		checkArgument(maxWaitMillis >= 0);
+
+		// this is a poor implementation - we may want to improve it eventually
+		final long deadline = maxWaitMillis == 0 ? Long.MAX_VALUE : System.nanoTime() + (maxWaitMillis * 1_000_000);
+
+		while (region.getState() != status && System.nanoTime() < deadline) {
+			try {
+				Thread.sleep(2);
+			} catch (InterruptedException ignored) {}
+		}
+
+		if (System.nanoTime() >= deadline) {
+			throw new TimeoutException();
+		}
+	}
+
 	/**
 	 * Takes all vertices in the given ExecutionGraph and switches their current
 	 * execution to RUNNING.
@@ -320,21 +342,17 @@ public class ExecutionGraphTestUtils {
 
 		@Override
 		public Object handleMessage(Object message) {
-			Object result = null;
-			if(message instanceof SubmitTask) {
+			if (message instanceof SubmitTask) {
 				SubmitTask submitTask = (SubmitTask) message;
 				lastTDD = submitTask.tasks();
-
-				result = Acknowledge.get();
+				return Acknowledge.get();
 			} else if(message instanceof CancelTask) {
-				CancelTask cancelTask = (CancelTask) message;
-
-				result = Acknowledge.get();
+				return Acknowledge.get();
 			} else if(message instanceof FailIntermediateResultPartitions) {
-				result = new Object();
+				return new Object();
+			} else {
+				return null;
 			}
-
-			return result;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4eb9e46e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
new file mode 100644
index 0000000..beeefb1b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
@@ -0,0 +1,490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionStrategy;
+import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilExecutionState;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilFailoverRegionState;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class FailoverRegionTest extends TestLogger {
+
+	/**
+	 * Tests that a job only has one failover region and can recover from task failure successfully
+	 * @throws Exception
+	 */
+	@Test
+	public void testSingleRegionFailover() throws Exception {
+		RestartStrategy restartStrategy = new InfiniteDelayRestartStrategy(10);
+		ExecutionGraph eg = createSingleRegionExecutionGraph(restartStrategy);
+		RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
+
+		ExecutionVertex ev = eg.getAllExecutionVertices().iterator().next();
+
+		assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev).getState());
+
+		ev.getCurrentExecutionAttempt().fail(new Exception("Test Exception"));
+		assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev).getState());
+
+		for (ExecutionVertex evs : eg.getAllExecutionVertices()) {
+			evs.getCurrentExecutionAttempt().cancelingComplete();
+		}
+		assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev).getState());
+	}
+
+	/**
+	 * Tests that a job has server failover regions and one region failover does not influence others
+	 * 
+	 * <pre>
+	 *     (a1) ---> (b1) -+-> (c1) ---+-> (d1) 
+	 *                     X          /
+	 *     (a2) ---> (b2) -+-> (c2) -+
+	 *
+	 *           ^         ^         ^
+	 *           |         |         |
+	 *     (pipelined) (blocking) (pipelined)
+	 *
+	 * </pre>
+	 */
+	@Test
+	public void testMultiRegionsFailover() throws Exception {
+		final JobID jobId = new JobID();
+		final String jobName = "Test Job Sample Name";
+
+		final SlotProvider slotProvider = new SimpleSlotProvider(jobId, 20);
+				
+		JobVertex v1 = new JobVertex("vertex1");
+		JobVertex v2 = new JobVertex("vertex2");
+		JobVertex v3 = new JobVertex("vertex3");
+		JobVertex v4 = new JobVertex("vertex4");
+
+		v1.setParallelism(2);
+		v2.setParallelism(2);
+		v3.setParallelism(2);
+		v4.setParallelism(1);
+
+		v1.setInvokableClass(AbstractInvokable.class);
+		v2.setInvokableClass(AbstractInvokable.class);
+		v3.setInvokableClass(AbstractInvokable.class);
+		v4.setInvokableClass(AbstractInvokable.class);
+
+		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+		v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+		v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+		List<JobVertex> ordered = Arrays.asList(v1, v2, v3, v4);
+
+		ExecutionGraph eg = new ExecutionGraph(
+				TestingUtils.defaultExecutor(),
+				TestingUtils.defaultExecutor(),
+				jobId,
+				jobName,
+				new Configuration(),
+				new SerializedValue<>(new ExecutionConfig()),
+				AkkaUtils.getDefaultTimeout(),
+				new InfiniteDelayRestartStrategy(10),
+				new FailoverPipelinedRegionWithDirectExecutor(),
+				Collections.<BlobKey>emptyList(),
+				Collections.<URL>emptyList(),
+				slotProvider,
+				ExecutionGraph.class.getClassLoader());
+
+		eg.attachJobGraph(ordered);
+
+		RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
+
+		// the following two vertices are in the same failover region
+		ExecutionVertex ev11 = eg.getJobVertex(v1.getID()).getTaskVertices()[0];
+		ExecutionVertex ev21 = eg.getJobVertex(v2.getID()).getTaskVertices()[0];
+
+		// the following two vertices are in the same failover region
+		ExecutionVertex ev12 = eg.getJobVertex(v1.getID()).getTaskVertices()[1];
+		ExecutionVertex ev22 = eg.getJobVertex(v2.getID()).getTaskVertices()[1];
+
+		// the following vertices are in one failover region
+		ExecutionVertex ev31 = eg.getJobVertex(v3.getID()).getTaskVertices()[0];
+		ExecutionVertex ev32 = eg.getJobVertex(v3.getID()).getTaskVertices()[1];
+		ExecutionVertex ev4 = eg.getJobVertex(v3.getID()).getTaskVertices()[0];
+
+		eg.scheduleForExecution();
+
+		assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev11).getState());
+
+		ev21.scheduleForExecution(slotProvider, true);
+		ev21.getCurrentExecutionAttempt().fail(new Exception("New fail"));
+		assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev11).getState());
+		assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev22).getState());
+		assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev31).getState());
+
+		ev11.getCurrentExecutionAttempt().cancelingComplete();
+		assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev11).getState());
+		assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev22).getState());
+		assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev31).getState());
+
+		ev11.getCurrentExecutionAttempt().markFinished();
+		ev21.getCurrentExecutionAttempt().markFinished();
+		ev22.scheduleForExecution(slotProvider, true);
+		ev22.getCurrentExecutionAttempt().markFinished();
+		assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev11).getState());
+		assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev22).getState());
+		assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev31).getState());
+
+		waitUntilExecutionState(ev31.getCurrentExecutionAttempt(), ExecutionState.DEPLOYING, 2000);
+		waitUntilExecutionState(ev32.getCurrentExecutionAttempt(), ExecutionState.DEPLOYING, 2000);
+
+		ev31.getCurrentExecutionAttempt().fail(new Exception("New fail"));
+		assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev11).getState());
+		assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev22).getState());
+		assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev31).getState());
+
+		ev32.getCurrentExecutionAttempt().cancelingComplete();
+		assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev11).getState());
+		assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev22).getState());
+		assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev31).getState());
+	}
+
+	/**
+	 * Tests that when a task fail, and restart strategy doesn't support restarting, the job will go to failed
+	 * @throws Exception
+	 */
+	@Test
+	public void testNoManualRestart() throws Exception {
+		NoRestartStrategy restartStrategy = new NoRestartStrategy();
+		ExecutionGraph eg = createSingleRegionExecutionGraph(restartStrategy);
+
+		ExecutionVertex ev = eg.getAllExecutionVertices().iterator().next();
+
+		ev.fail(new Exception("Test Exception"));
+
+		for (ExecutionVertex evs : eg.getAllExecutionVertices()) {
+			evs.getCurrentExecutionAttempt().cancelingComplete();
+		}
+		assertEquals(JobStatus.FAILED, eg.getState());
+	}
+
+	/**
+	 * Tests that two failover regions failover at the same time, they will not influence each orther
+	 * @throws Exception
+	 */
+	@Test
+	public void testMutilRegionFailoverAtSameTime() throws Exception {
+		Instance instance = ExecutionGraphTestUtils.getInstance(
+				new ActorTaskManagerGateway(
+						new SimpleActorGateway(TestingUtils.directExecutionContext())),
+				16);
+
+		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
+		scheduler.newInstanceAvailable(instance);
+
+		final JobID jobId = new JobID();
+		final String jobName = "Test Job Sample Name";
+		final Configuration cfg = new Configuration();
+
+		JobVertex v1 = new JobVertex("vertex1");
+		JobVertex v2 = new JobVertex("vertex2");
+		JobVertex v3 = new JobVertex("vertex3");
+		JobVertex v4 = new JobVertex("vertex4");
+
+		v1.setParallelism(2);
+		v2.setParallelism(2);
+		v3.setParallelism(2);
+		v4.setParallelism(2);
+
+		v1.setInvokableClass(AbstractInvokable.class);
+		v2.setInvokableClass(AbstractInvokable.class);
+		v3.setInvokableClass(AbstractInvokable.class);
+		v4.setInvokableClass(AbstractInvokable.class);
+
+		v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+		v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+		v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+		List<JobVertex> ordered = Arrays.asList(v1, v2, v3, v4);
+
+		ExecutionGraph eg = new ExecutionGraph(
+				TestingUtils.defaultExecutor(),
+				TestingUtils.defaultExecutor(),
+				jobId,
+				jobName,
+				cfg,
+				new SerializedValue<>(new ExecutionConfig()),
+				AkkaUtils.getDefaultTimeout(),
+				new InfiniteDelayRestartStrategy(10),
+				new RestartPipelinedRegionStrategy.Factory(),
+				Collections.<BlobKey>emptyList(),
+				Collections.<URL>emptyList(),
+				scheduler,
+				ExecutionGraph.class.getClassLoader());
+		try {
+			eg.attachJobGraph(ordered);
+		}
+		catch (JobException e) {
+			e.printStackTrace();
+			fail("Job failed with exception: " + e.getMessage());
+		}
+		eg.scheduleForExecution();
+		RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
+
+		ExecutionVertex ev11 = eg.getJobVertex(v1.getID()).getTaskVertices()[0];
+		ExecutionVertex ev12 = eg.getJobVertex(v1.getID()).getTaskVertices()[1];
+		ExecutionVertex ev31 = eg.getJobVertex(v3.getID()).getTaskVertices()[0];
+		ExecutionVertex ev32 = eg.getJobVertex(v3.getID()).getTaskVertices()[1];
+		assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev11).getState());
+		assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev31).getState());
+
+		ev11.getCurrentExecutionAttempt().fail(new Exception("new fail"));
+		ev31.getCurrentExecutionAttempt().fail(new Exception("new fail"));
+		assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev11).getState());
+		assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev31).getState());
+
+		ev32.getCurrentExecutionAttempt().cancelingComplete();
+		waitUntilFailoverRegionState(strategy.getFailoverRegion(ev31), JobStatus.RUNNING, 1000);
+
+		ev12.getCurrentExecutionAttempt().cancelingComplete();
+		waitUntilFailoverRegionState(strategy.getFailoverRegion(ev11), JobStatus.RUNNING, 1000);
+	}
+
+	/**
+	 * Tests that if a task reports the result of its preceding task is failed,
+	 * its preceding task will be considered as failed, and start to failover
+	 * TODO: as the report part is not finished yet, this case is ignored temporarily
+	 * @throws Exception
+	 */
+	@Ignore
+	@Test
+	public void testSucceedingNoticePreceding() throws Exception {
+		Instance instance = ExecutionGraphTestUtils.getInstance(
+				new ActorTaskManagerGateway(
+						new SimpleActorGateway(TestingUtils.directExecutionContext())),
+				14);
+
+		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
+		scheduler.newInstanceAvailable(instance);
+
+		final JobID jobId = new JobID();
+		final String jobName = "Test Job Sample Name";
+		final Configuration cfg = new Configuration();
+
+		JobVertex v1 = new JobVertex("vertex1");
+		JobVertex v2 = new JobVertex("vertex2");
+
+		v1.setParallelism(1);
+		v2.setParallelism(1);
+
+		v1.setInvokableClass(AbstractInvokable.class);
+		v2.setInvokableClass(AbstractInvokable.class);
+
+		v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+
+		List<JobVertex> ordered = new ArrayList<>(Arrays.asList(v1, v2));
+
+		ExecutionGraph eg = new ExecutionGraph(
+				TestingUtils.defaultExecutor(),
+				TestingUtils.defaultExecutor(),
+				jobId,
+				jobName,
+				cfg,
+				new SerializedValue<>(new ExecutionConfig()),
+				AkkaUtils.getDefaultTimeout(),
+				new InfiniteDelayRestartStrategy(10),
+				new FailoverPipelinedRegionWithDirectExecutor(),
+				Collections.<BlobKey>emptyList(),
+				Collections.<URL>emptyList(),
+				scheduler,
+				ExecutionGraph.class.getClassLoader());
+		try {
+			eg.attachJobGraph(ordered);
+		}
+		catch (JobException e) {
+			e.printStackTrace();
+			fail("Job failed with exception: " + e.getMessage());
+		}
+		eg.setScheduleMode(ScheduleMode.EAGER);
+		eg.scheduleForExecution();
+		RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
+
+		ExecutionVertex ev11 = eg.getJobVertex(v2.getID()).getTaskVertices()[0];
+		ExecutionVertex ev21 = eg.getJobVertex(v2.getID()).getTaskVertices()[0];
+		ev21.getCurrentExecutionAttempt().fail(new Exception("Fail with v1"));
+
+		assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev21).getState());
+		assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev11).getState());
+	}
+
+	/**
+	 * Tests that a new failure comes while the failover region is in CANCELLING
+	 * @throws Exception
+	 */
+	@Test
+	public void testFailWhileCancelling() throws Exception {
+		RestartStrategy restartStrategy = new InfiniteDelayRestartStrategy();
+		ExecutionGraph eg = createSingleRegionExecutionGraph(restartStrategy);
+		RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
+
+		Iterator<ExecutionVertex> iter = eg.getAllExecutionVertices().iterator();
+		ExecutionVertex ev1 = iter.next();
+		ev1.getCurrentExecutionAttempt().switchToRunning();
+		assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev1).getState());
+
+		ev1.getCurrentExecutionAttempt().fail(new Exception("new fail"));
+		assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev1).getState());
+
+		ExecutionVertex ev2 = iter.next();
+		ev2.getCurrentExecutionAttempt().fail(new Exception("new fail"));
+		assertEquals(JobStatus.RUNNING, eg.getState());
+		assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev1).getState());
+	}
+
+	/**
+	 * Tests that a new failure comes while the failover region is restarting
+	 * @throws Exception
+	 */
+	@Test
+	public void testFailWhileRestarting() throws Exception {
+		RestartStrategy restartStrategy = new InfiniteDelayRestartStrategy();
+		ExecutionGraph eg = createSingleRegionExecutionGraph(restartStrategy);
+		RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
+
+		Iterator<ExecutionVertex> iter = eg.getAllExecutionVertices().iterator();
+		ExecutionVertex ev1 = iter.next();
+		assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev1).getState());
+
+		ev1.getCurrentExecutionAttempt().fail(new Exception("new fail"));
+		assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev1).getState());
+
+		for (ExecutionVertex evs : eg.getAllExecutionVertices()) {
+			evs.getCurrentExecutionAttempt().cancelingComplete();
+		}
+		assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev1).getState());
+
+		ev1.getCurrentExecutionAttempt().fail(new Exception("new fail"));
+		assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev1).getState());
+	}
+
+	private static ExecutionGraph createSingleRegionExecutionGraph(RestartStrategy restartStrategy) throws Exception {
+		Instance instance = ExecutionGraphTestUtils.getInstance(
+				new ActorTaskManagerGateway(
+						new SimpleActorGateway(TestingUtils.directExecutionContext())),
+				14);
+
+		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
+		scheduler.newInstanceAvailable(instance);
+
+		final JobID jobId = new JobID();
+		final String jobName = "Test Job Sample Name";
+		final Configuration cfg = new Configuration();
+
+		JobVertex v1 = new JobVertex("vertex1");
+		JobVertex v2 = new JobVertex("vertex2");
+		JobVertex v3 = new JobVertex("vertex3");
+
+		v1.setParallelism(3);
+		v2.setParallelism(2);
+		v3.setParallelism(2);
+
+		v1.setInvokableClass(AbstractInvokable.class);
+		v2.setInvokableClass(AbstractInvokable.class);
+		v3.setInvokableClass(AbstractInvokable.class);
+
+		v3.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+		v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+		List<JobVertex> ordered = new ArrayList<>(Arrays.asList(v1, v2, v3));
+
+		ExecutionGraph eg = new ExecutionGraph(
+				TestingUtils.defaultExecutor(),
+				TestingUtils.defaultExecutor(),
+				jobId,
+				jobName,
+				cfg,
+				new SerializedValue<>(new ExecutionConfig()),
+				AkkaUtils.getDefaultTimeout(),
+				restartStrategy,
+				new FailoverPipelinedRegionWithDirectExecutor(),
+				Collections.<BlobKey>emptyList(),
+				Collections.<URL>emptyList(),
+				scheduler,
+				ExecutionGraph.class.getClassLoader());
+		try {
+			eg.attachJobGraph(ordered);
+		}
+		catch (JobException e) {
+			e.printStackTrace();
+			fail("Job failed with exception: " + e.getMessage());
+		}
+
+		eg.scheduleForExecution();
+		return eg;
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A factory to create a RestartPipelinedRegionStrategy that uses a
+	 * direct (synchronous) executor for easier testing.
+	 */
+	private static class FailoverPipelinedRegionWithDirectExecutor implements Factory {
+
+		@Override
+		public FailoverStrategy create(ExecutionGraph executionGraph) {
+			return new RestartPipelinedRegionStrategy(executionGraph, Executors.directExecutor());
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4eb9e46e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
new file mode 100644
index 0000000..635ec75
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.SuppressRestartsException;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionStrategy;
+import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Test;
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilExecutionState;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilJobStatus;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * These tests make sure that global failover (restart all) always takes precedence over
+ * local recovery strategies for the {@link RestartPipelinedRegionStrategy}
+ * 
+ * <p>This test must be in the package it resides in, because it uses package-private methods
+ * from the ExecutionGraph classes.
+ */
+public class PipelinedRegionFailoverConcurrencyTest {
+
+	/**
+	 * Tests that a cancellation concurrent to a local failover leads to a properly
+	 * cancelled state.
+	 */
+	@Test
+	public void testCancelWhileInLocalFailover() throws Exception {
+
+		// the logic in this test is as follows:
+		//  - start a job
+		//  - cause a task failure and delay the local recovery action via the manual executor
+		//  - cancel the job to go into cancelling
+		//  - resume in local recovery action
+		//  - validate that this does in fact not start a new task, because the graph as a
+		//    whole should now be cancelled already
+
+		final JobID jid = new JobID();
+		final int parallelism = 2;
+
+		final ManuallyTriggeredDirectExecutor executor = new ManuallyTriggeredDirectExecutor();
+
+		final SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, parallelism);
+
+		final ExecutionGraph graph = createSampleGraph(
+				jid,
+				new FailoverPipelinedRegionWithCustomExecutor(executor),
+				new FixedDelayRestartStrategy(Integer.MAX_VALUE, 0),
+				slotProvider,
+				2);
+
+		final ExecutionJobVertex ejv = graph.getVerticesTopologically().iterator().next();
+		final ExecutionVertex vertex1 = ejv.getTaskVertices()[0];
+		final ExecutionVertex vertex2 = ejv.getTaskVertices()[1];
+
+		graph.scheduleForExecution();
+		assertEquals(JobStatus.RUNNING, graph.getState());
+
+		// let one of the vertices fail - that triggers a local recovery action
+		vertex1.getCurrentExecutionAttempt().fail(new Exception("test failure"));
+		assertEquals(ExecutionState.FAILED, vertex1.getCurrentExecutionAttempt().getState());
+
+		// graph should still be running and the failover recovery action should be queued
+		assertEquals(JobStatus.RUNNING, graph.getState());
+		assertEquals(1, executor.numQueuedRunnables());
+
+		// now cancel the job
+		graph.cancel();
+
+		assertEquals(JobStatus.CANCELLING, graph.getState());
+		assertEquals(ExecutionState.FAILED, vertex1.getCurrentExecutionAttempt().getState());
+		assertEquals(ExecutionState.CANCELING, vertex2.getCurrentExecutionAttempt().getState());
+
+		// let the recovery action continue
+		executor.trigger();
+
+		// now report that cancelling is complete for the other vertex
+		vertex2.getCurrentExecutionAttempt().cancelingComplete();
+
+		assertEquals(JobStatus.CANCELED, graph.getState());
+		assertTrue(vertex1.getCurrentExecutionAttempt().getState().isTerminal());
+		assertTrue(vertex2.getCurrentExecutionAttempt().getState().isTerminal());
+
+		// make sure all slots are recycled
+		assertEquals(parallelism, slotProvider.getNumberOfAvailableSlots());
+	}
+
+	/**
+	 * Tests that a terminal global failure concurrent to a local failover
+	 * leads to a properly failed state.
+	 */
+	@Test
+	public void testGlobalFailureConcurrentToLocalFailover() throws Exception {
+
+		// the logic in this test is as follows:
+		//  - start a job
+		//  - cause a task failure and delay the local recovery action via the manual executor
+		//  - cause a global failure
+		//  - resume in local recovery action
+		//  - validate that this does in fact not start a new task, because the graph as a
+		//    whole should now be terminally failed already
+
+		final JobID jid = new JobID();
+		final int parallelism = 2;
+
+		final ManuallyTriggeredDirectExecutor executor = new ManuallyTriggeredDirectExecutor();
+
+		final SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, parallelism);
+
+		final ExecutionGraph graph = createSampleGraph(
+				jid,
+				new FailoverPipelinedRegionWithCustomExecutor(executor),
+				new FixedDelayRestartStrategy(Integer.MAX_VALUE, 0),
+				slotProvider,
+				2);
+
+		final ExecutionJobVertex ejv = graph.getVerticesTopologically().iterator().next();
+		final ExecutionVertex vertex1 = ejv.getTaskVertices()[0];
+		final ExecutionVertex vertex2 = ejv.getTaskVertices()[1];
+
+		graph.scheduleForExecution();
+		assertEquals(JobStatus.RUNNING, graph.getState());
+
+		// let one of the vertices fail - that triggers a local recovery action
+		vertex1.getCurrentExecutionAttempt().fail(new Exception("test failure"));
+		assertEquals(ExecutionState.FAILED, vertex1.getCurrentExecutionAttempt().getState());
+
+		// graph should still be running and the failover recovery action should be queued
+		assertEquals(JobStatus.RUNNING, graph.getState());
+		assertEquals(1, executor.numQueuedRunnables());
+
+		// now cancel the job
+		graph.failGlobal(new SuppressRestartsException(new Exception("test exception")));
+
+		assertEquals(JobStatus.FAILING, graph.getState());
+		assertEquals(ExecutionState.FAILED, vertex1.getCurrentExecutionAttempt().getState());
+		assertEquals(ExecutionState.CANCELING, vertex2.getCurrentExecutionAttempt().getState());
+
+		// let the recovery action continue
+		executor.trigger();
+
+		// now report that cancelling is complete for the other vertex
+		vertex2.getCurrentExecutionAttempt().cancelingComplete();
+
+		assertEquals(JobStatus.FAILED, graph.getState());
+		assertTrue(vertex1.getCurrentExecutionAttempt().getState().isTerminal());
+		assertTrue(vertex2.getCurrentExecutionAttempt().getState().isTerminal());
+
+		// make sure all slots are recycled
+		assertEquals(parallelism, slotProvider.getNumberOfAvailableSlots());
+	}
+
+	/**
+	 * Tests that a local failover does not try to trump a global failover.
+	 */
+	@Test
+	public void testGlobalRecoveryConcurrentToLocalRecovery() throws Exception {
+
+		// the logic in this test is as follows:
+		//  - start a job
+		//  - cause a task failure and delay the local recovery action via the manual executor
+		//  - cause a global failure that is recovering immediately
+		//  - resume in local recovery action
+		//  - validate that this does in fact not cause another task restart, because the global
+		//    recovery should already have restarted the task graph
+
+		final JobID jid = new JobID();
+		final int parallelism = 2;
+
+		final ManuallyTriggeredDirectExecutor executor = new ManuallyTriggeredDirectExecutor();
+
+		final SimpleSlotProvider slotProvider = new SimpleSlotProvider(jid, parallelism);
+
+		final ExecutionGraph graph = createSampleGraph(
+				jid,
+				new FailoverPipelinedRegionWithCustomExecutor(executor),
+				new FixedDelayRestartStrategy(2, 0), // twice restart, no delay
+				slotProvider,
+				2);
+		RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)graph.getFailoverStrategy();
+
+		final ExecutionJobVertex ejv = graph.getVerticesTopologically().iterator().next();
+		final ExecutionVertex vertex1 = ejv.getTaskVertices()[0];
+		final ExecutionVertex vertex2 = ejv.getTaskVertices()[1];
+
+		graph.scheduleForExecution();
+		assertEquals(JobStatus.RUNNING, graph.getState());
+		assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(vertex1).getState());
+
+		// let one of the vertices fail - that triggers a local recovery action
+		vertex2.getCurrentExecutionAttempt().fail(new Exception("test failure"));
+		assertEquals(ExecutionState.FAILED, vertex2.getCurrentExecutionAttempt().getState());
+		assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(vertex2).getState());
+
+		// graph should still be running and the failover recovery action should be queued
+		assertEquals(JobStatus.RUNNING, graph.getState());
+		assertEquals(1, executor.numQueuedRunnables());
+
+		// now cancel the job
+		graph.failGlobal(new Exception("test exception"));
+
+		assertEquals(JobStatus.FAILING, graph.getState());
+		assertEquals(ExecutionState.FAILED, vertex2.getCurrentExecutionAttempt().getState());
+		assertEquals(ExecutionState.CANCELING, vertex1.getCurrentExecutionAttempt().getState());
+
+		// now report that cancelling is complete for the other vertex
+		vertex1.getCurrentExecutionAttempt().cancelingComplete();
+
+		waitUntilJobStatus(graph, JobStatus.RUNNING, 1000);
+		assertEquals(JobStatus.RUNNING, graph.getState());
+
+		waitUntilExecutionState(vertex1.getCurrentExecutionAttempt(), ExecutionState.DEPLOYING, 1000);
+		waitUntilExecutionState(vertex2.getCurrentExecutionAttempt(), ExecutionState.DEPLOYING, 1000);
+		vertex1.getCurrentExecutionAttempt().switchToRunning();
+		vertex2.getCurrentExecutionAttempt().switchToRunning();
+		assertEquals(ExecutionState.RUNNING, vertex1.getCurrentExecutionAttempt().getState());
+		assertEquals(ExecutionState.RUNNING, vertex2.getCurrentExecutionAttempt().getState());
+
+		// let the recovery action continue - this should do nothing any more
+		executor.trigger();
+
+		// validate that the graph is still peachy
+		assertEquals(JobStatus.RUNNING, graph.getState());
+		assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(vertex1).getState());
+		assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(vertex2).getState());
+		assertEquals(ExecutionState.RUNNING, vertex1.getCurrentExecutionAttempt().getState());
+		assertEquals(ExecutionState.RUNNING, vertex2.getCurrentExecutionAttempt().getState());
+		assertEquals(1, vertex1.getCurrentExecutionAttempt().getAttemptNumber());
+		assertEquals(1, vertex2.getCurrentExecutionAttempt().getAttemptNumber());
+		assertEquals(1, vertex1.getCopyOfPriorExecutionsList().size());
+		assertEquals(1, vertex2.getCopyOfPriorExecutionsList().size());
+
+		// make sure all slots are in use
+		assertEquals(0, slotProvider.getNumberOfAvailableSlots());
+
+		// validate that a task failure then can be handled by the local recovery
+		vertex2.getCurrentExecutionAttempt().fail(new Exception("test failure"));
+		assertEquals(1, executor.numQueuedRunnables());
+
+		// let the local recovery action continue - this should recover the vertex2
+		executor.trigger();
+
+		waitUntilExecutionState(vertex2.getCurrentExecutionAttempt(), ExecutionState.DEPLOYING, 1000);
+		vertex2.getCurrentExecutionAttempt().switchToRunning();
+
+		// validate that the local recovery result
+		assertEquals(JobStatus.RUNNING, graph.getState());
+		assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(vertex1).getState());
+		assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(vertex2).getState());
+		assertEquals(ExecutionState.RUNNING, vertex1.getCurrentExecutionAttempt().getState());
+		assertEquals(ExecutionState.RUNNING, vertex2.getCurrentExecutionAttempt().getState());
+		assertEquals(1, vertex1.getCurrentExecutionAttempt().getAttemptNumber());
+		assertEquals(2, vertex2.getCurrentExecutionAttempt().getAttemptNumber());
+		assertEquals(1, vertex1.getCopyOfPriorExecutionsList().size());
+		assertEquals(2, vertex2.getCopyOfPriorExecutionsList().size());
+
+		// make sure all slots are in use
+		assertEquals(0, slotProvider.getNumberOfAvailableSlots());
+	}
+
+	// ------------------------------------------------------------------------
+	//  utilities
+	// ------------------------------------------------------------------------
+
+	private ExecutionGraph createSampleGraph(
+			JobID jid,
+			Factory failoverStrategy,
+			RestartStrategy restartStrategy,
+			SlotProvider slotProvider,
+			int parallelism) throws Exception {
+
+		// build a simple execution graph with on job vertex, parallelism 2
+		final ExecutionGraph graph = new ExecutionGraph(
+				TestingUtils.defaultExecutor(),
+				TestingUtils.defaultExecutor(),
+				jid,
+				"test job",
+				new Configuration(),
+				new SerializedValue<>(new ExecutionConfig()),
+				Time.seconds(10),
+				restartStrategy,
+				failoverStrategy,
+				Collections.<BlobKey>emptyList(),
+				Collections.<URL>emptyList(),
+				slotProvider,
+				getClass().getClassLoader());
+
+		JobVertex jv = new JobVertex("test vertex");
+		jv.setInvokableClass(NoOpInvokable.class);
+		jv.setParallelism(parallelism);
+
+		JobGraph jg = new JobGraph(jid, "testjob", jv);
+		graph.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources());
+
+		return graph;
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static class FailoverPipelinedRegionWithCustomExecutor implements Factory {
+
+		private final Executor executor;
+
+		FailoverPipelinedRegionWithCustomExecutor(Executor executor) {
+			this.executor = executor;
+		}
+
+		@Override
+		public FailoverStrategy create(ExecutionGraph executionGraph) {
+			return new RestartPipelinedRegionStrategy(executionGraph, executor);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4eb9e46e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
new file mode 100644
index 0000000..45aabe6
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.executiongraph.failover.FailoverRegion;
+import org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.SerializedValue;
+import org.junit.Test;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+public class RestartPipelinedRegionStrategyTest {
+
+	/**
+	 * Creates a JobGraph of the following form:
+	 * 
+	 * <pre>
+	 *  v1--->v2-->\
+	 *              \
+	 *               v4 --->\
+	 *        ----->/        \
+	 *  v3-->/                v5
+	 *       \               /
+	 *        ------------->/
+	 * </pre>
+	 */
+	@Test
+	public void testSimpleFailoverRegion() throws Exception {
+		
+		final JobID jobId = new JobID();
+		final String jobName = "Test Job Sample Name";
+		final Configuration cfg = new Configuration();
+		
+		JobVertex v1 = new JobVertex("vertex1");
+		JobVertex v2 = new JobVertex("vertex2");
+		JobVertex v3 = new JobVertex("vertex3");
+		JobVertex v4 = new JobVertex("vertex4");
+		JobVertex v5 = new JobVertex("vertex5");
+		
+		v1.setParallelism(5);
+		v2.setParallelism(7);
+		v3.setParallelism(2);
+		v4.setParallelism(11);
+		v5.setParallelism(4);
+
+		v1.setInvokableClass(AbstractInvokable.class);
+		v2.setInvokableClass(AbstractInvokable.class);
+		v3.setInvokableClass(AbstractInvokable.class);
+		v4.setInvokableClass(AbstractInvokable.class);
+		v5.setInvokableClass(AbstractInvokable.class);
+
+		v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+		v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+		v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+		v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+		v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+		
+		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
+
+        Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
+		ExecutionGraph eg = new ExecutionGraph(
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
+			jobId, 
+			jobName, 
+			cfg,
+			new SerializedValue<>(new ExecutionConfig()),
+			AkkaUtils.getDefaultTimeout(),
+			new NoRestartStrategy(),
+            new RestartPipelinedRegionStrategy.Factory(),
+            Collections.<BlobKey>emptyList(),
+            Collections.<URL>emptyList(),
+            scheduler,
+            ExecutionGraph.class.getClassLoader());
+		try {
+			eg.attachJobGraph(ordered);
+		}
+		catch (JobException e) {
+			e.printStackTrace();
+			fail("Job failed with exception: " + e.getMessage());
+		}
+
+        RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
+        ExecutionJobVertex ejv1 = eg.getJobVertex(v1.getID());
+        ExecutionJobVertex ejv2 = eg.getJobVertex(v2.getID());
+        ExecutionJobVertex ejv3 = eg.getJobVertex(v3.getID());
+        ExecutionJobVertex ejv4 = eg.getJobVertex(v4.getID());
+        ExecutionJobVertex ejv5 = eg.getJobVertex(v5.getID());
+        FailoverRegion region1 = strategy.getFailoverRegion(ejv1.getTaskVertices()[2]);
+        FailoverRegion region2 = strategy.getFailoverRegion(ejv2.getTaskVertices()[3]);
+        FailoverRegion region3 = strategy.getFailoverRegion(ejv3.getTaskVertices()[0]);
+        FailoverRegion region4 = strategy.getFailoverRegion(ejv4.getTaskVertices()[4]);
+        FailoverRegion region5 = strategy.getFailoverRegion(ejv5.getTaskVertices()[1]);
+
+        assertEquals(region1, region2);
+        assertEquals(region3, region2);
+        assertEquals(region4, region2);
+        assertEquals(region5, region2);
+	}
+
+    /**
+     * Creates a JobGraph of the following form:
+     *
+     * <pre>
+     *  v2 ------->\
+     *              \
+     *  v1---------> v4 --->|\
+     *                        \
+     *                        v5
+     *                       /
+     *  v3--------------->|/
+     * </pre>
+     */
+	@Test
+	public void testMultipleFailoverRegions() throws Exception {
+		final JobID jobId = new JobID();
+		final String jobName = "Test Job Sample Name";
+		final Configuration cfg = new Configuration();
+
+        JobVertex v1 = new JobVertex("vertex1");
+        JobVertex v2 = new JobVertex("vertex2");
+        JobVertex v3 = new JobVertex("vertex3");
+        JobVertex v4 = new JobVertex("vertex4");
+        JobVertex v5 = new JobVertex("vertex5");
+
+        v1.setParallelism(3);
+        v2.setParallelism(2);
+        v3.setParallelism(2);
+        v4.setParallelism(5);
+        v5.setParallelism(2);
+
+        v1.setInvokableClass(AbstractInvokable.class);
+        v2.setInvokableClass(AbstractInvokable.class);
+        v3.setInvokableClass(AbstractInvokable.class);
+        v4.setInvokableClass(AbstractInvokable.class);
+        v5.setInvokableClass(AbstractInvokable.class);
+
+        v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+        v4.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+        v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+        v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+
+        List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
+
+        Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
+		ExecutionGraph eg = new ExecutionGraph(
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
+			jobId, 
+			jobName, 
+			cfg,
+			new SerializedValue<>(new ExecutionConfig()),
+			AkkaUtils.getDefaultTimeout(),
+			new NoRestartStrategy(),
+            new RestartPipelinedRegionStrategy.Factory(),
+            Collections.<BlobKey>emptyList(),
+            Collections.<URL>emptyList(),
+            scheduler,
+            ExecutionGraph.class.getClassLoader());
+		try {
+			eg.attachJobGraph(ordered);
+		}
+		catch (JobException e) {
+			e.printStackTrace();
+			fail("Job failed with exception: " + e.getMessage());
+		}
+
+        // All in one failover region
+        RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
+        ExecutionJobVertex ejv1 = eg.getJobVertex(v1.getID());
+        ExecutionJobVertex ejv2 = eg.getJobVertex(v2.getID());
+        ExecutionJobVertex ejv3 = eg.getJobVertex(v3.getID());
+        ExecutionJobVertex ejv4 = eg.getJobVertex(v4.getID());
+        ExecutionJobVertex ejv5 = eg.getJobVertex(v5.getID());
+        FailoverRegion region1 = strategy.getFailoverRegion(ejv1.getTaskVertices()[1]);
+        FailoverRegion region2 = strategy.getFailoverRegion(ejv2.getTaskVertices()[0]);
+        FailoverRegion region4 = strategy.getFailoverRegion(ejv4.getTaskVertices()[3]);
+        FailoverRegion region31 = strategy.getFailoverRegion(ejv3.getTaskVertices()[0]);
+        FailoverRegion region32 = strategy.getFailoverRegion(ejv3.getTaskVertices()[1]);
+        FailoverRegion region51 = strategy.getFailoverRegion(ejv5.getTaskVertices()[0]);
+        FailoverRegion region52 = strategy.getFailoverRegion(ejv5.getTaskVertices()[1]);
+
+        //There should be 5 failover regions. v1 v2 v4 in one, v3 has two, v5 has two
+        assertEquals(region1, region2);
+        assertEquals(region2, region4);
+        assertFalse(region31.equals(region32));
+        assertFalse(region51.equals(region52));
+	}
+
+    /**
+     * Creates a JobGraph of the following form:
+     *
+     * <pre>
+     *  v1--->v2-->\
+     *              \
+     *               v4 --->|\
+     *        ----->/        \
+     *  v3-->/                v5
+     *       \               /
+     *        ------------->/
+     * </pre>
+     */
+	@Test
+	public void testSingleRegionWithMixedInput() throws Exception {
+		final JobID jobId = new JobID();
+		final String jobName = "Test Job Sample Name";
+		final Configuration cfg = new Configuration();
+
+        JobVertex v1 = new JobVertex("vertex1");
+        JobVertex v2 = new JobVertex("vertex2");
+        JobVertex v3 = new JobVertex("vertex3");
+        JobVertex v4 = new JobVertex("vertex4");
+        JobVertex v5 = new JobVertex("vertex5");
+
+        v1.setParallelism(3);
+        v2.setParallelism(2);
+        v3.setParallelism(2);
+        v4.setParallelism(5);
+        v5.setParallelism(2);
+
+        v1.setInvokableClass(AbstractInvokable.class);
+        v2.setInvokableClass(AbstractInvokable.class);
+        v3.setInvokableClass(AbstractInvokable.class);
+        v4.setInvokableClass(AbstractInvokable.class);
+        v5.setInvokableClass(AbstractInvokable.class);
+
+        v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+        v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+        v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+        v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+        v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+
+        List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
+
+        Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
+		ExecutionGraph eg = new ExecutionGraph(
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
+			jobId, 
+			jobName, 
+			cfg,
+			new SerializedValue<>(new ExecutionConfig()),
+			AkkaUtils.getDefaultTimeout(),
+			new NoRestartStrategy(),
+            new RestartPipelinedRegionStrategy.Factory(),
+            Collections.<BlobKey>emptyList(),
+            Collections.<URL>emptyList(),
+            scheduler,
+            ExecutionGraph.class.getClassLoader());
+		try {
+			eg.attachJobGraph(ordered);
+		}
+		catch (JobException e) {
+			e.printStackTrace();
+			fail("Job failed with exception: " + e.getMessage());
+		}
+
+        // All in one failover region
+        RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
+        ExecutionJobVertex ejv1 = eg.getJobVertex(v1.getID());
+        ExecutionJobVertex ejv2 = eg.getJobVertex(v2.getID());
+        ExecutionJobVertex ejv3 = eg.getJobVertex(v3.getID());
+        ExecutionJobVertex ejv4 = eg.getJobVertex(v4.getID());
+        ExecutionJobVertex ejv5 = eg.getJobVertex(v5.getID());
+        FailoverRegion region1 = strategy.getFailoverRegion(ejv1.getTaskVertices()[1]);
+        FailoverRegion region2 = strategy.getFailoverRegion(ejv2.getTaskVertices()[0]);
+        FailoverRegion region4 = strategy.getFailoverRegion(ejv4.getTaskVertices()[3]);
+        FailoverRegion region3 = strategy.getFailoverRegion(ejv3.getTaskVertices()[0]);
+        FailoverRegion region5 = strategy.getFailoverRegion(ejv5.getTaskVertices()[1]);
+
+        assertEquals(region1, region2);
+        assertEquals(region2, region4);
+        assertEquals(region3, region2);
+        assertEquals(region1, region5);
+    }
+
+    /**
+     * Creates a JobGraph of the following form:
+     *
+     * <pre>
+     *  v1-->v2-->|\
+     *              \
+     *               v4
+     *             /
+     *  v3------>/
+     * </pre>
+     */
+	@Test
+	public void testMultiRegionNotAllToAll() throws Exception {
+		final JobID jobId = new JobID();
+		final String jobName = "Test Job Sample Name";
+		final Configuration cfg = new Configuration();
+
+        JobVertex v1 = new JobVertex("vertex1");
+        JobVertex v2 = new JobVertex("vertex2");
+        JobVertex v3 = new JobVertex("vertex3");
+        JobVertex v4 = new JobVertex("vertex4");
+        JobVertex v5 = new JobVertex("vertex5");
+
+        v1.setParallelism(2);
+        v2.setParallelism(2);
+        v3.setParallelism(5);
+        v4.setParallelism(5);
+
+        v1.setInvokableClass(AbstractInvokable.class);
+        v2.setInvokableClass(AbstractInvokable.class);
+        v3.setInvokableClass(AbstractInvokable.class);
+        v4.setInvokableClass(AbstractInvokable.class);
+
+        v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+        v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+        v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+
+        List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4));
+
+        Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
+        ExecutionGraph eg = new ExecutionGraph(
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
+			jobId, 
+			jobName, 
+			cfg,
+			new SerializedValue<>(new ExecutionConfig()),
+			AkkaUtils.getDefaultTimeout(),
+			new NoRestartStrategy(),
+            new RestartPipelinedRegionStrategy.Factory(),
+            Collections.<BlobKey>emptyList(),
+            Collections.<URL>emptyList(),
+            scheduler,
+            ExecutionGraph.class.getClassLoader());
+		try {
+			eg.attachJobGraph(ordered);
+		}
+		catch (JobException e) {
+			e.printStackTrace();
+			fail("Job failed with exception: " + e.getMessage());
+		}
+
+        // All in one failover region
+        RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
+        ExecutionJobVertex ejv1 = eg.getJobVertex(v1.getID());
+        ExecutionJobVertex ejv2 = eg.getJobVertex(v2.getID());
+        ExecutionJobVertex ejv3 = eg.getJobVertex(v3.getID());
+        ExecutionJobVertex ejv4 = eg.getJobVertex(v4.getID());
+        FailoverRegion region11 = strategy.getFailoverRegion(ejv1.getTaskVertices()[0]);
+        FailoverRegion region12 = strategy.getFailoverRegion(ejv1.getTaskVertices()[1]);
+        FailoverRegion region21 = strategy.getFailoverRegion(ejv2.getTaskVertices()[0]);
+        FailoverRegion region22 = strategy.getFailoverRegion(ejv2.getTaskVertices()[1]);
+        FailoverRegion region3 = strategy.getFailoverRegion(ejv3.getTaskVertices()[0]);
+        FailoverRegion region4 = strategy.getFailoverRegion(ejv4.getTaskVertices()[3]);
+
+        //There should be 3 failover regions. v1 v2 in two, v3 and v4 in one
+        assertEquals(region11, region21);
+        assertEquals(region12, region22);
+        assertFalse(region11.equals(region12));
+        assertFalse(region3.equals(region4));
+	}
+
+}