You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/10 07:41:26 UTC

[flink] branch master updated (f3252c4 -> 0b7f703)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from f3252c4  [FLINK-9445][scala] Scala-shell uses JAVA_RUN
     new 6a793e8  [FLINK-12369][runtime] Add Failover interfaces
     new 0b7f703  [FLINK-12369][runtime] Port RestartPipelinedRegionStrategy

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../failover/flip1/FailoverEdge.java               |  57 +++
 .../failover/flip1/FailoverRegion.java             |  67 +++
 .../failover/flip1/FailoverStrategy.java           |  19 +-
 .../failover/flip1/FailoverTopology.java           |  25 +-
 .../failover/flip1/FailoverVertex.java             |  38 +-
 .../flip1/RestartPipelinedRegionStrategy.java      | 214 ++++++++
 ...RestartPipelinedRegionStrategyBuildingTest.java | 544 +++++++++++++++++++++
 .../flip1/RestartPipelinedRegionStrategyTest.java  |  66 +++
 .../failover/flip1/TestFailoverTopology.java       | 161 ++++++
 9 files changed, 1158 insertions(+), 33 deletions(-)
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverEdge.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverRegion.java
 copy flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentFactory.java => flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverStrategy.java (59%)
 copy flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/UdfStreamOperatorFactory.java => flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverTopology.java (60%)
 copy flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/UdfStreamOperatorFactory.java => flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverVertex.java (53%)
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategyBuildingTest.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategyTest.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/TestFailoverTopology.java


[flink] 01/02: [FLINK-12369][runtime] Add Failover interfaces

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6a793e84afa762094c18fce57fe9e343df43c66c
Author: zhuzhu.zz <zh...@alibaba-inc.com>
AuthorDate: Thu May 9 10:41:54 2019 +0200

    [FLINK-12369][runtime] Add Failover interfaces
---
 .../failover/flip1/FailoverEdge.java               | 57 ++++++++++++++++++++++
 .../failover/flip1/FailoverStrategy.java           | 37 ++++++++++++++
 .../failover/flip1/FailoverTopology.java           | 39 +++++++++++++++
 .../failover/flip1/FailoverVertex.java             | 54 ++++++++++++++++++++
 4 files changed, 187 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverEdge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverEdge.java
new file mode 100644
index 0000000..73dac88
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverEdge.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+/**
+ * A connection between {@link FailoverVertex FailoverVertices}.
+ *
+ * <p>producer -> ResultPartition -> consumer
+ */
+public interface FailoverEdge {
+
+	/**
+	 * Returns the ID of the result partition that the source produces.
+	 *
+	 * @return ID of the result partition that the source produces
+	 */
+	IntermediateResultPartitionID getResultPartitionID();
+
+	/**
+	 * Returns the {@link ResultPartitionType} of the produced result partition.
+	 *
+	 * @return type of the produced result partition
+	 */
+	ResultPartitionType getResultPartitionType();
+
+	/**
+	 * Returns the source vertex, i.e., the producer of the result partition.
+	 *
+	 * @return source vertex
+	 */
+	FailoverVertex getSourceVertex();
+
+	/**
+	 * Returns the target vertex, i.e., the consumer of the result partition.
+	 *
+	 * @return target vertex
+	 */
+	FailoverVertex getTargetVertex();
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverStrategy.java
new file mode 100644
index 0000000..2fd4ce7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverStrategy.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.Set;
+
+/**
+ * New interface for failover strategies.
+ */
+public interface FailoverStrategy {
+
+	/**
+	 * Returns a set of IDs corresponding to the set of vertices that should be restarted.
+	 *
+	 * @param executionVertexId ID of the failed task
+	 * @param cause cause of the failure
+	 * @return set of IDs of vertices to restart
+	 */
+	Set<ExecutionVertexID> getTasksNeedingRestart(ExecutionVertexID executionVertexId, Throwable cause);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverTopology.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverTopology.java
new file mode 100644
index 0000000..18f48fc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverTopology.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+/**
+ * Represents a topology.
+ */
+public interface FailoverTopology {
+
+	/**
+	 * Returns an iterable over all vertices, topologically sorted.
+	 *
+	 * @return topologically sorted iterable over all vertices
+	 */
+	Iterable<? extends FailoverVertex> getFailoverVertices();
+
+	/**
+	 * Returns whether the topology contains co-location constraints.
+	 * Co-location constraints are currently used for iterations.
+	 *
+	 * @return whether the topology contains co-location constraints
+	 */
+	boolean containsCoLocationConstraints();
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverVertex.java
new file mode 100644
index 0000000..7c1497a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverVertex.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+/**
+ * Represents an ExecutionVertex.
+ */
+public interface FailoverVertex {
+
+	/**
+	 * Returns the ID of this vertex.
+	 *
+	 * @return ID of this vertex
+	 */
+	ExecutionVertexID getExecutionVertexID();
+
+	/**
+	 * Returns the name of this vertex.
+	 *
+	 * @return name of this vertex
+	 */
+	String getExecutionVertexName();
+
+	/**
+	 * Returns all input edges of this vertex.
+	 *
+	 * @return input edges of this vertex
+	 */
+	Iterable<? extends FailoverEdge> getInputEdges();
+
+	/**
+	 * Returns all output edges of this vertex.
+	 *
+	 * @return output edges of this vertex
+	 */
+	Iterable<? extends FailoverEdge> getOutputEdges();
+}


[flink] 02/02: [FLINK-12369][runtime] Port RestartPipelinedRegionStrategy

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0b7f7032068451538bda392814d8ea6128d1816a
Author: zhuzhu.zz <zh...@alibaba-inc.com>
AuthorDate: Thu May 9 10:42:31 2019 +0200

    [FLINK-12369][runtime] Port RestartPipelinedRegionStrategy
---
 .../failover/flip1/FailoverRegion.java             |  67 +++
 .../flip1/RestartPipelinedRegionStrategy.java      | 214 ++++++++
 ...RestartPipelinedRegionStrategyBuildingTest.java | 544 +++++++++++++++++++++
 .../flip1/RestartPipelinedRegionStrategyTest.java  |  66 +++
 .../failover/flip1/TestFailoverTopology.java       | 161 ++++++
 5 files changed, 1052 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverRegion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverRegion.java
new file mode 100644
index 0000000..88a7658
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverRegion.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * FailoverRegion is a subset of all the vertices in the job topology.
+ */
+public class FailoverRegion {
+
+	/** All vertices in this region. */
+	private final Map<ExecutionVertexID, FailoverVertex> executionVertices;
+
+	/**
+	 * Creates a new failover region containing a set of vertices.
+	 *
+	 * @param executionVertices to be contained in this region
+	 */
+	public FailoverRegion(Collection<? extends FailoverVertex> executionVertices) {
+		checkNotNull(executionVertices);
+		this.executionVertices = new HashMap<>();
+		executionVertices.forEach(v -> this.executionVertices.put(v.getExecutionVertexID(), v));
+	}
+
+	/**
+	 * Returns IDs of all vertices in this region.
+	 *
+	 * @return IDs of all vertices in this region
+	 */
+	public Set<ExecutionVertexID> getAllExecutionVertexIDs() {
+		return executionVertices.keySet();
+	}
+
+	/**
+	 * Returns all vertices in this region.
+	 *
+	 * @return all vertices in this region
+	 */
+	public Set<FailoverVertex> getAllExecutionVertices() {
+		return new HashSet<>(executionVertices.values());
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
new file mode 100644
index 0000000..e7040c3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A failover strategy that proposes to restart involved regions when a vertex fails.
+ * A region is defined by this strategy as tasks that communicate via pipelined data exchange.
+ */
+public class RestartPipelinedRegionStrategy implements FailoverStrategy {
+
+	/** The log object used for debugging. */
+	private static final Logger LOG = LoggerFactory.getLogger(RestartPipelinedRegionStrategy.class);
+
+	/** The topology containing info about all the vertices and edges. */
+	private final FailoverTopology topology;
+
+	/** Maps execution vertex id to failover region. */
+	private final Map<ExecutionVertexID, FailoverRegion> regions;
+
+	/**
+	 * Creates a new failover strategy to restart pipelined regions that works on the given topology.
+	 *
+	 * @param topology containing info about all the vertices and edges
+	 */
+	public RestartPipelinedRegionStrategy(FailoverTopology topology) {
+		this.topology = checkNotNull(topology);
+		this.regions = new HashMap<>();
+
+		// build regions based on the given topology
+		LOG.info("Start building failover regions.");
+		buildFailoverRegions();
+	}
+
+	// ------------------------------------------------------------------------
+	//  region building
+	// ------------------------------------------------------------------------
+
+	private void buildFailoverRegions() {
+		// currently we let a job with co-location constraints fail as one region
+		// putting co-located vertices in the same region with each other can be a future improvement
+		if (topology.containsCoLocationConstraints()) {
+			buildOneRegionForAllVertices();
+			return;
+		}
+
+		// we use the map (list -> null) to imitate an IdentityHashSet (which does not exist)
+		// this helps to optimize the building performance as it uses reference equality
+		final IdentityHashMap<FailoverVertex, HashSet<FailoverVertex>> vertexToRegion = new IdentityHashMap<>();
+
+		// iterate all the vertices which are topologically sorted
+		for (FailoverVertex vertex : topology.getFailoverVertices()) {
+			HashSet<FailoverVertex> currentRegion = new HashSet<>(1);
+			currentRegion.add(vertex);
+			vertexToRegion.put(vertex, currentRegion);
+
+			for (FailoverEdge inputEdge : vertex.getInputEdges()) {
+				if (inputEdge.getResultPartitionType().isPipelined()) {
+					final FailoverVertex producerVertex = inputEdge.getSourceVertex();
+					final HashSet<FailoverVertex> producerRegion = vertexToRegion.get(producerVertex);
+
+					if (producerRegion == null) {
+						throw new IllegalStateException("Producer task " + producerVertex.getExecutionVertexName()
+							+ " failover region is null while calculating failover region for the consumer task "
+							+ vertex.getExecutionVertexName() + ". This should be a failover region building bug.");
+					}
+
+					// check if it is the same as the producer region, if so skip the merge
+					// this check can significantly reduce compute complexity in All-to-All PIPELINED edge case
+					if (currentRegion != producerRegion) {
+						// merge current region and producer region
+						// merge the smaller region into the larger one to reduce the cost
+						final HashSet<FailoverVertex> smallerSet;
+						final HashSet<FailoverVertex> largerSet;
+						if (currentRegion.size() < producerRegion.size()) {
+							smallerSet = currentRegion;
+							largerSet = producerRegion;
+						} else {
+							smallerSet = producerRegion;
+							largerSet = currentRegion;
+						}
+						for (FailoverVertex v : smallerSet) {
+							vertexToRegion.put(v, largerSet);
+						}
+						largerSet.addAll(smallerSet);
+						currentRegion = largerSet;
+					}
+				}
+			}
+		}
+
+		// find out all the distinct regions
+		final IdentityHashMap<HashSet<FailoverVertex>, Object> distinctRegions = new IdentityHashMap<>();
+		for (HashSet<FailoverVertex> regionVertices : vertexToRegion.values()) {
+			distinctRegions.put(regionVertices, null);
+		}
+
+		// creating all the failover regions and register them
+		for (HashSet<FailoverVertex> regionVertices : distinctRegions.keySet()) {
+			LOG.debug("Creating a failover region with {} vertices.", regionVertices.size());
+			final FailoverRegion failoverRegion = new FailoverRegion(regionVertices);
+			for (FailoverVertex vertex : regionVertices) {
+				this.regions.put(vertex.getExecutionVertexID(), failoverRegion);
+			}
+		}
+		LOG.info("Created {} failover regions.", distinctRegions.size());
+	}
+
+	private void buildOneRegionForAllVertices() {
+		LOG.warn("Cannot decompose the topology into individual failover regions due to use of " +
+			"Co-Location constraints (iterations). Job will fail over as one holistic unit.");
+
+		final List<FailoverVertex> allVertices = new ArrayList<>();
+		for (FailoverVertex vertex : topology.getFailoverVertices()) {
+			allVertices.add(vertex);
+		}
+
+		final FailoverRegion region = new FailoverRegion(allVertices);
+		for (FailoverVertex vertex : topology.getFailoverVertices()) {
+			regions.put(vertex.getExecutionVertexID(), region);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  task failure handling
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Returns a set of IDs corresponding to the set of vertices that should be restarted.
+	 * In this strategy, all task vertices in 'involved' regions are proposed to be restarted.
+	 * The 'involved' regions are calculated with rules below:
+	 * 1. The region containing the failed task is always involved
+	 * 2. TODO: If an input result partition of an involved region is not available, i.e. Missing or Corrupted,
+	 *    the region containing the partition producer task is involved
+	 * 3. TODO: If a region is involved, all of its consumer regions are involved
+	 *
+	 * @param executionVertexId ID of the failed task
+	 * @param cause cause of the failure
+	 * @return set of IDs of vertices to restart
+	 */
+	@Override
+	public Set<ExecutionVertexID> getTasksNeedingRestart(ExecutionVertexID executionVertexId, Throwable cause) {
+		final FailoverRegion failedRegion = regions.get(executionVertexId);
+		if (failedRegion == null) {
+			// TODO: show the task name in the log
+			throw new IllegalStateException("Can not find the failover region for task " + executionVertexId, cause);
+		}
+
+		// TODO: if the failure cause is data consumption error, mark the corresponding data partition to be unavailable
+
+		return getRegionsToRestart(failedRegion).stream().flatMap(
+			r -> r.getAllExecutionVertexIDs().stream()).collect(Collectors.toSet());
+	}
+
+	/**
+	 * All 'involved' regions are proposed to be restarted.
+	 * The 'involved' regions are calculated with rules below:
+	 * 1. The region containing the failed task is always involved
+	 * 2. TODO: If an input result partition of an involved region is not available, i.e. Missing or Corrupted,
+	 *    the region containing the partition producer task is involved
+	 * 3. TODO: If a region is involved, all of its consumer regions are involved
+	 */
+	private Set<FailoverRegion> getRegionsToRestart(FailoverRegion regionToRestart) {
+		return Collections.singleton(regionToRestart);
+
+		// TODO: implement backtracking logic
+	}
+
+	// ------------------------------------------------------------------------
+	//  testing
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Returns the failover region that contains the given execution vertex.
+	 *
+	 * @return the failover region that contains the given execution vertex
+	 */
+	@VisibleForTesting
+	FailoverRegion getFailoverRegion(ExecutionVertexID vertexID) {
+		return regions.get(vertexID);
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategyBuildingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategyBuildingTest.java
new file mode 100644
index 0000000..81f5e38
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategyBuildingTest.java
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+
+/**
+ * Tests the failover region building logic of the {@link RestartPipelinedRegionStrategy}.
+ */
+public class RestartPipelinedRegionStrategyBuildingTest extends TestLogger {
+
+	/**
+	 * Tests that validates that a graph with single unconnected vertices works correctly.
+	 *
+	 * <pre>
+	 *     (v1)
+	 *
+	 *     (v2)
+	 *
+	 *     (v3)
+	 * </pre>
+	 */
+	@Test
+	public void testIndividualVertices() throws Exception {
+		TestFailoverTopology.Builder topologyBuilder = new TestFailoverTopology.Builder();
+
+		TestFailoverTopology.TestFailoverVertex v1 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v2 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v3 = topologyBuilder.newVertex();
+
+		FailoverTopology topology = topologyBuilder.build();
+
+		RestartPipelinedRegionStrategy strategy = new RestartPipelinedRegionStrategy(topology);
+
+		FailoverRegion r1 = strategy.getFailoverRegion(v1.getExecutionVertexID());
+		FailoverRegion r2 = strategy.getFailoverRegion(v2.getExecutionVertexID());
+		FailoverRegion r3 = strategy.getFailoverRegion(v3.getExecutionVertexID());
+
+		assertDistinctRegions(r1, r2, r3);
+	}
+
+	/**
+	 * Tests that validates that embarrassingly parallel chains of vertices work correctly.
+	 *
+	 * <pre>
+	 *     (a1) --> (b1)
+	 *
+	 *     (a2) --> (b2)
+	 *
+	 *     (a3) --> (b3)
+	 * </pre>
+	 */
+	@Test
+	public void testEmbarrassinglyParallelCase() throws Exception {
+		TestFailoverTopology.Builder topologyBuilder = new TestFailoverTopology.Builder();
+
+		TestFailoverTopology.TestFailoverVertex va1 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex va2 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex va3 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex vb1 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex vb2 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex vb3 = topologyBuilder.newVertex();
+
+		topologyBuilder
+			.connect(va1, vb1, ResultPartitionType.PIPELINED)
+			.connect(va2, vb2, ResultPartitionType.PIPELINED)
+			.connect(va3, vb3, ResultPartitionType.PIPELINED);
+
+		FailoverTopology topology = topologyBuilder.build();
+
+		RestartPipelinedRegionStrategy strategy = new RestartPipelinedRegionStrategy(topology);
+
+		FailoverRegion ra1 = strategy.getFailoverRegion(va1.getExecutionVertexID());
+		FailoverRegion ra2 = strategy.getFailoverRegion(va2.getExecutionVertexID());
+		FailoverRegion ra3 = strategy.getFailoverRegion(va3.getExecutionVertexID());
+		FailoverRegion rb1 = strategy.getFailoverRegion(vb1.getExecutionVertexID());
+		FailoverRegion rb2 = strategy.getFailoverRegion(vb2.getExecutionVertexID());
+		FailoverRegion rb3 = strategy.getFailoverRegion(vb3.getExecutionVertexID());
+
+		assertSameRegion(ra1, rb1);
+		assertSameRegion(ra2, rb2);
+		assertSameRegion(ra3, rb3);
+
+		assertDistinctRegions(ra1, ra2, ra3);
+	}
+
+	/**
+	 * Tests that validates that a single pipelined component via a sequence of all-to-all
+	 * connections works correctly.
+	 *
+	 * <pre>
+	 *     (a1) -+-> (b1) -+-> (c1)
+	 *           X         X
+	 *     (a2) -+-> (b2) -+-> (c2)
+	 * </pre>
+	 */
+	@Test
+	public void testOneComponentViaTwoExchanges() throws Exception {
+		TestFailoverTopology.Builder topologyBuilder = new TestFailoverTopology.Builder();
+
+		TestFailoverTopology.TestFailoverVertex va1 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex va2 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex vb1 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex vb2 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex vc1 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex vc2 = topologyBuilder.newVertex();
+
+		topologyBuilder
+			.connect(va1, vb1, ResultPartitionType.PIPELINED)
+			.connect(va1, vb2, ResultPartitionType.PIPELINED)
+			.connect(va2, vb1, ResultPartitionType.PIPELINED)
+			.connect(va2, vb2, ResultPartitionType.PIPELINED)
+			.connect(vb1, vc1, ResultPartitionType.PIPELINED)
+			.connect(vb1, vc2, ResultPartitionType.PIPELINED)
+			.connect(vb2, vc1, ResultPartitionType.PIPELINED)
+			.connect(vb2, vc2, ResultPartitionType.PIPELINED);
+
+		FailoverTopology topology = topologyBuilder.build();
+
+		RestartPipelinedRegionStrategy strategy = new RestartPipelinedRegionStrategy(topology);
+
+		FailoverRegion ra1 = strategy.getFailoverRegion(va1.getExecutionVertexID());
+		FailoverRegion ra2 = strategy.getFailoverRegion(va2.getExecutionVertexID());
+		FailoverRegion rb1 = strategy.getFailoverRegion(vb1.getExecutionVertexID());
+		FailoverRegion rb2 = strategy.getFailoverRegion(vb2.getExecutionVertexID());
+		FailoverRegion rc1 = strategy.getFailoverRegion(vc1.getExecutionVertexID());
+		FailoverRegion rc2 = strategy.getFailoverRegion(vc2.getExecutionVertexID());
+
+		assertSameRegion(ra1, ra2, rb1, rb2, rc1, rc2);
+	}
+
+	/**
+	 * Tests that validates that a single pipelined component via a cascade of joins
+	 * works correctly.
+	 *
+	 * <pre>
+	 *     (v1)--+
+	 *          +--(v5)-+
+	 *     (v2)--+      |
+	 *                 +--(v7)
+	 *     (v3)--+      |
+	 *          +--(v6)-+
+	 *     (v4)--+
+	 * </pre>
+	 */
+	@Test
+	public void testOneComponentViaCascadeOfJoins() throws Exception {
+		TestFailoverTopology.Builder topologyBuilder = new TestFailoverTopology.Builder();
+
+		TestFailoverTopology.TestFailoverVertex v1 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v2 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v3 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v4 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v5 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v6 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v7 = topologyBuilder.newVertex();
+
+		topologyBuilder
+			.connect(v1, v5, ResultPartitionType.PIPELINED)
+			.connect(v2, v5, ResultPartitionType.PIPELINED)
+			.connect(v3, v6, ResultPartitionType.PIPELINED)
+			.connect(v4, v6, ResultPartitionType.PIPELINED)
+			.connect(v5, v7, ResultPartitionType.PIPELINED)
+			.connect(v6, v7, ResultPartitionType.PIPELINED);
+
+		FailoverTopology topology = topologyBuilder.build();
+
+		RestartPipelinedRegionStrategy strategy = new RestartPipelinedRegionStrategy(topology);
+
+		FailoverRegion r1 = strategy.getFailoverRegion(v1.getExecutionVertexID());
+		FailoverRegion r2 = strategy.getFailoverRegion(v2.getExecutionVertexID());
+		FailoverRegion r3 = strategy.getFailoverRegion(v3.getExecutionVertexID());
+		FailoverRegion r4 = strategy.getFailoverRegion(v4.getExecutionVertexID());
+		FailoverRegion r5 = strategy.getFailoverRegion(v5.getExecutionVertexID());
+		FailoverRegion r6 = strategy.getFailoverRegion(v6.getExecutionVertexID());
+		FailoverRegion r7 = strategy.getFailoverRegion(v7.getExecutionVertexID());
+
+		assertSameRegion(r1, r2, r3, r4, r5, r6, r7);
+	}
+
+	/**
+	 * Tests that validates that a single pipelined component instance from one source
+	 * works correctly.
+	 *
+	 * <pre>
+	 *                 +--(v4)
+	 *          +--(v2)-+
+	 *          |      +--(v5)
+	 *     (v1)--+
+	 *          |      +--(v6)
+	 *          +--(v3)-+
+	 *                 +--(v7)
+	 * </pre>
+	 */
+	@Test
+	public void testOneComponentInstanceFromOneSource() throws Exception {
+		TestFailoverTopology.Builder topologyBuilder = new TestFailoverTopology.Builder();
+
+		TestFailoverTopology.TestFailoverVertex v1 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v2 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v3 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v4 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v5 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v6 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v7 = topologyBuilder.newVertex();
+
+		topologyBuilder
+			.connect(v1, v2, ResultPartitionType.PIPELINED)
+			.connect(v1, v3, ResultPartitionType.PIPELINED)
+			.connect(v2, v4, ResultPartitionType.PIPELINED)
+			.connect(v2, v5, ResultPartitionType.PIPELINED)
+			.connect(v3, v6, ResultPartitionType.PIPELINED)
+			.connect(v3, v7, ResultPartitionType.PIPELINED);
+
+		FailoverTopology topology = topologyBuilder.build();
+
+		RestartPipelinedRegionStrategy strategy = new RestartPipelinedRegionStrategy(topology);
+
+		FailoverRegion r1 = strategy.getFailoverRegion(v1.getExecutionVertexID());
+		FailoverRegion r2 = strategy.getFailoverRegion(v2.getExecutionVertexID());
+		FailoverRegion r3 = strategy.getFailoverRegion(v3.getExecutionVertexID());
+		FailoverRegion r4 = strategy.getFailoverRegion(v4.getExecutionVertexID());
+		FailoverRegion r5 = strategy.getFailoverRegion(v5.getExecutionVertexID());
+		FailoverRegion r6 = strategy.getFailoverRegion(v6.getExecutionVertexID());
+		FailoverRegion r7 = strategy.getFailoverRegion(v7.getExecutionVertexID());
+
+		assertSameRegion(r1, r2, r3, r4, r5, r6, r7);
+	}
+
+	/**
+	 * Tests the below topology.
+	 * <pre>
+	 *     (a1) -+-> (b1) -+-> (c1)
+	 *           X
+	 *     (a2) -+-> (b2) -+-> (c2)
+	 *
+	 *           ^         ^
+	 *           |         |
+	 *     (pipelined) (blocking)
+	 * </pre>
+	 */
+	@Test
+	public void testTwoComponentsViaBlockingExchange() throws Exception {
+		TestFailoverTopology.Builder topologyBuilder = new TestFailoverTopology.Builder();
+
+		TestFailoverTopology.TestFailoverVertex va1 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex va2 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex vb1 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex vb2 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex vc1 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex vc2 = topologyBuilder.newVertex();
+
+		topologyBuilder
+			.connect(va1, vb1, ResultPartitionType.PIPELINED)
+			.connect(va1, vb2, ResultPartitionType.PIPELINED)
+			.connect(va2, vb1, ResultPartitionType.PIPELINED)
+			.connect(va2, vb2, ResultPartitionType.PIPELINED)
+			.connect(vb1, vc1, ResultPartitionType.BLOCKING)
+			.connect(vb2, vc2, ResultPartitionType.BLOCKING);
+
+		FailoverTopology topology = topologyBuilder.build();
+
+		RestartPipelinedRegionStrategy strategy = new RestartPipelinedRegionStrategy(topology);
+
+		FailoverRegion ra1 = strategy.getFailoverRegion(va1.getExecutionVertexID());
+		FailoverRegion ra2 = strategy.getFailoverRegion(va2.getExecutionVertexID());
+		FailoverRegion rb1 = strategy.getFailoverRegion(vb1.getExecutionVertexID());
+		FailoverRegion rb2 = strategy.getFailoverRegion(vb2.getExecutionVertexID());
+		FailoverRegion rc1 = strategy.getFailoverRegion(vc1.getExecutionVertexID());
+		FailoverRegion rc2 = strategy.getFailoverRegion(vc2.getExecutionVertexID());
+
+		assertSameRegion(ra1, ra2, rb1, rb2);
+
+		assertDistinctRegions(ra1, rc1, rc2);
+	}
+
+	/**
+	 * Tests the below topology.
+	 * <pre>
+	 *     (a1) -+-> (b1) -+-> (c1)
+	 *           X         X
+	 *     (a2) -+-> (b2) -+-> (c2)
+	 *
+	 *           ^         ^
+	 *           |         |
+	 *     (pipelined) (blocking)
+	 * </pre>
+	 */
+	@Test
+	public void testTwoComponentsViaBlockingExchange2() throws Exception {
+		TestFailoverTopology.Builder topologyBuilder = new TestFailoverTopology.Builder();
+
+		TestFailoverTopology.TestFailoverVertex va1 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex va2 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex vb1 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex vb2 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex vc1 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex vc2 = topologyBuilder.newVertex();
+
+		topologyBuilder
+			.connect(va1, vb1, ResultPartitionType.PIPELINED)
+			.connect(va1, vb2, ResultPartitionType.PIPELINED)
+			.connect(va2, vb1, ResultPartitionType.PIPELINED)
+			.connect(va2, vb2, ResultPartitionType.PIPELINED)
+			.connect(vb1, vc1, ResultPartitionType.BLOCKING)
+			.connect(vb1, vc2, ResultPartitionType.BLOCKING)
+			.connect(vb2, vc1, ResultPartitionType.BLOCKING)
+			.connect(vb2, vc2, ResultPartitionType.BLOCKING);
+
+		FailoverTopology topology = topologyBuilder.build();
+
+		RestartPipelinedRegionStrategy strategy = new RestartPipelinedRegionStrategy(topology);
+
+		FailoverRegion ra1 = strategy.getFailoverRegion(va1.getExecutionVertexID());
+		FailoverRegion ra2 = strategy.getFailoverRegion(va2.getExecutionVertexID());
+		FailoverRegion rb1 = strategy.getFailoverRegion(vb1.getExecutionVertexID());
+		FailoverRegion rb2 = strategy.getFailoverRegion(vb2.getExecutionVertexID());
+		FailoverRegion rc1 = strategy.getFailoverRegion(vc1.getExecutionVertexID());
+		FailoverRegion rc2 = strategy.getFailoverRegion(vc2.getExecutionVertexID());
+
+		assertSameRegion(ra1, ra2, rb1, rb2);
+
+		assertDistinctRegions(ra1, rc1, rc2);
+	}
+
+	/**
+	 * Cascades of joins with partially blocking, partially pipelined exchanges.
+	 * <pre>
+	 *     (1)--+
+	 *          +--(5)-+
+	 *     (2)--+      |
+	 *              (blocking)
+	 *                 |
+	 *                 +--(7)
+	 *                 |
+	 *              (blocking)
+	 *     (3)--+      |
+	 *          +--(6)-+
+	 *     (4)--+
+	 * </pre>
+	 *
+	 * <p>Component 1: 1, 2, 5; component 2: 3,4,6; component 3: 7
+	 */
+	@Test
+	public void testMultipleComponentsViaCascadeOfJoins() throws Exception {
+		TestFailoverTopology.Builder topologyBuilder = new TestFailoverTopology.Builder();
+
+		TestFailoverTopology.TestFailoverVertex v1 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v2 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v3 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v4 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v5 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v6 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v7 = topologyBuilder.newVertex();
+
+		topologyBuilder
+			.connect(v1, v5, ResultPartitionType.PIPELINED)
+			.connect(v2, v5, ResultPartitionType.PIPELINED)
+			.connect(v3, v6, ResultPartitionType.PIPELINED)
+			.connect(v4, v6, ResultPartitionType.PIPELINED)
+			.connect(v5, v7, ResultPartitionType.BLOCKING)
+			.connect(v6, v7, ResultPartitionType.BLOCKING);
+
+		FailoverTopology topology = topologyBuilder.build();
+
+		RestartPipelinedRegionStrategy strategy = new RestartPipelinedRegionStrategy(topology);
+
+		FailoverRegion r1 = strategy.getFailoverRegion(v1.getExecutionVertexID());
+		FailoverRegion r2 = strategy.getFailoverRegion(v2.getExecutionVertexID());
+		FailoverRegion r3 = strategy.getFailoverRegion(v3.getExecutionVertexID());
+		FailoverRegion r4 = strategy.getFailoverRegion(v4.getExecutionVertexID());
+		FailoverRegion r5 = strategy.getFailoverRegion(v5.getExecutionVertexID());
+		FailoverRegion r6 = strategy.getFailoverRegion(v6.getExecutionVertexID());
+		FailoverRegion r7 = strategy.getFailoverRegion(v7.getExecutionVertexID());
+
+		assertSameRegion(r1, r2, r5);
+		assertSameRegion(r3, r4, r6);
+
+		assertDistinctRegions(r1, r3, r7);
+	}
+
+	/**
+	 * Tests the below topology.
+	 * <pre>
+	 *       (blocking)
+	 *           |
+	 *           v
+	 *          +|-(v2)-+
+	 *          |       |
+	 *     (v1)--+      +--(v4)
+	 *          |       |
+	 *          +--(v3)-+
+	 * </pre>
+	 */
+	@Test
+	public void testDiamondWithMixedPipelinedAndBlockingExchanges() throws Exception {
+		TestFailoverTopology.Builder topologyBuilder = new TestFailoverTopology.Builder();
+
+		TestFailoverTopology.TestFailoverVertex v1 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v2 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v3 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v4 = topologyBuilder.newVertex();
+
+		topologyBuilder
+			.connect(v1, v2, ResultPartitionType.BLOCKING)
+			.connect(v1, v3, ResultPartitionType.PIPELINED)
+			.connect(v2, v4, ResultPartitionType.PIPELINED)
+			.connect(v3, v4, ResultPartitionType.PIPELINED);
+
+		FailoverTopology topology = topologyBuilder.build();
+
+		RestartPipelinedRegionStrategy strategy = new RestartPipelinedRegionStrategy(topology);
+
+		FailoverRegion r1 = strategy.getFailoverRegion(v1.getExecutionVertexID());
+		FailoverRegion r2 = strategy.getFailoverRegion(v2.getExecutionVertexID());
+		FailoverRegion r3 = strategy.getFailoverRegion(v3.getExecutionVertexID());
+		FailoverRegion r4 = strategy.getFailoverRegion(v4.getExecutionVertexID());
+
+		assertSameRegion(r1, r2, r3, r4);
+	}
+
+	/**
+	 * This test checks that are strictly co-located vertices are in the same failover region,
+	 * even through they are only connected via a blocking pattern.
+	 * This is currently an assumption / limitation of the scheduler.
+	 * <pre>
+	 *     (a1) -+-> (b1)
+	 *           X
+	 *     (a2) -+-> (b2)
+	 *
+	 *           ^
+	 *           |
+	 *       (blocking)
+	 * </pre>
+	 */
+	@Test
+	public void testBlockingAllToAllTopologyWithCoLocation() throws Exception {
+		TestFailoverTopology.Builder topologyBuilder = new TestFailoverTopology.Builder();
+
+		TestFailoverTopology.TestFailoverVertex va1 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex va2 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex vb1 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex vb2 = topologyBuilder.newVertex();
+
+		topologyBuilder
+			.connect(va1, vb1, ResultPartitionType.BLOCKING)
+			.connect(va1, vb2, ResultPartitionType.BLOCKING)
+			.connect(va2, vb1, ResultPartitionType.BLOCKING)
+			.connect(va2, vb2, ResultPartitionType.BLOCKING);
+
+		topologyBuilder.setContainsCoLocationConstraints(true);
+		FailoverTopology topology = topologyBuilder.build();
+
+		RestartPipelinedRegionStrategy strategy = new RestartPipelinedRegionStrategy(topology);
+
+		FailoverRegion ra1 = strategy.getFailoverRegion(va1.getExecutionVertexID());
+		FailoverRegion ra2 = strategy.getFailoverRegion(va2.getExecutionVertexID());
+		FailoverRegion rb1 = strategy.getFailoverRegion(vb1.getExecutionVertexID());
+		FailoverRegion rb2 = strategy.getFailoverRegion(vb2.getExecutionVertexID());
+
+		assertSameRegion(ra1, ra2, rb1, rb2);
+	}
+
+	/**
+	 * This test checks that are strictly co-located vertices are in the same failover region,
+	 * even through they are not connected.
+	 * This is currently an assumption / limitation of the scheduler.
+	 * <pre>
+	 *     (a1) -+-> (b1)
+	 *
+	 *     (a2) -+-> (b2)
+	 * </pre>
+	 */
+	@Test
+	public void testPipelinedOneToOneTopologyWithCoLocation() throws Exception {
+		TestFailoverTopology.Builder topologyBuilder = new TestFailoverTopology.Builder();
+
+		TestFailoverTopology.TestFailoverVertex va1 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex va2 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex vb1 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex vb2 = topologyBuilder.newVertex();
+
+		topologyBuilder
+			.connect(va1, vb1, ResultPartitionType.PIPELINED)
+			.connect(va2, vb2, ResultPartitionType.PIPELINED);
+
+		topologyBuilder.setContainsCoLocationConstraints(true);
+		FailoverTopology topology = topologyBuilder.build();
+
+		RestartPipelinedRegionStrategy strategy = new RestartPipelinedRegionStrategy(topology);
+
+		FailoverRegion ra1 = strategy.getFailoverRegion(va1.getExecutionVertexID());
+		FailoverRegion ra2 = strategy.getFailoverRegion(va2.getExecutionVertexID());
+		FailoverRegion rb1 = strategy.getFailoverRegion(vb1.getExecutionVertexID());
+		FailoverRegion rb2 = strategy.getFailoverRegion(vb2.getExecutionVertexID());
+
+		assertSameRegion(ra1, ra2, rb1, rb2);
+	}
+
+	// ------------------------------------------------------------------------
+	//  utilities
+	// ------------------------------------------------------------------------
+
+	private static void assertSameRegion(FailoverRegion ...regions) {
+		checkNotNull(regions);
+		for (int i = 0; i < regions.length; i++) {
+			for (int j = i + 1; i < regions.length; i++) {
+				assertSame(regions[i], regions[j]);
+			}
+		}
+	}
+
+	private static void assertDistinctRegions(FailoverRegion ...regions) {
+		checkNotNull(regions);
+		for (int i = 0; i < regions.length; i++) {
+			for (int j = i + 1; j < regions.length; j++) {
+				assertNotSame(regions[i], regions[j]);
+			}
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategyTest.java
new file mode 100644
index 0000000..9a3f595
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategyTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the failure handling logic of the {@link RestartPipelinedRegionStrategy}.
+ */
+public class RestartPipelinedRegionStrategyTest extends TestLogger {
+
+	/**
+	 * Tests for scenes that a task fails for its own error, in which case only the
+	 * region containing the failed task should be restarted.
+	 * <pre>
+	 *     (v1)
+	 *
+	 *     (v2)
+	 *
+	 *     (v3)
+	 * </pre>
+	 */
+	@Test
+	public void testRegionFailoverForTaskInternalErrors() throws Exception {
+		TestFailoverTopology.Builder topologyBuilder = new TestFailoverTopology.Builder();
+
+		TestFailoverTopology.TestFailoverVertex v1 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v2 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v3 = topologyBuilder.newVertex();
+
+		FailoverTopology topology = topologyBuilder.build();
+
+		RestartPipelinedRegionStrategy strategy = new RestartPipelinedRegionStrategy(topology);
+
+		FailoverRegion r1 = strategy.getFailoverRegion(v1.getExecutionVertexID());
+		FailoverRegion r2 = strategy.getFailoverRegion(v2.getExecutionVertexID());
+		FailoverRegion r3 = strategy.getFailoverRegion(v3.getExecutionVertexID());
+
+		assertEquals(r1.getAllExecutionVertexIDs(),
+			strategy.getTasksNeedingRestart(v1.getExecutionVertexID(), new Exception("Test failure")));
+		assertEquals(r2.getAllExecutionVertexIDs(),
+			strategy.getTasksNeedingRestart(v2.getExecutionVertexID(), new Exception("Test failure")));
+		assertEquals(r3.getAllExecutionVertexIDs(),
+			strategy.getTasksNeedingRestart(v3.getExecutionVertexID(), new Exception("Test failure")));
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/TestFailoverTopology.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/TestFailoverTopology.java
new file mode 100644
index 0000000..9daec8e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/TestFailoverTopology.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.UUID;
+
+/**
+ * A FailoverTopology implementation for tests.
+ */
+public class TestFailoverTopology implements FailoverTopology {
+
+	private final Collection<FailoverVertex> vertices;
+	private final boolean containsCoLocationConstraints;
+
+	public TestFailoverTopology(Collection<FailoverVertex> vertices, boolean containsCoLocationConstraints) {
+		this.vertices = vertices;
+		this.containsCoLocationConstraints = containsCoLocationConstraints;
+	}
+
+	@Override
+	public Iterable<? extends FailoverVertex> getFailoverVertices() {
+		return vertices::iterator;
+	}
+
+	@Override
+	public boolean containsCoLocationConstraints() {
+		return containsCoLocationConstraints;
+	}
+
+	public static class TestFailoverVertex implements FailoverVertex {
+
+		private final Collection<FailoverEdge> inputEdges = new ArrayList<>();
+		private final Collection<FailoverEdge> outputEdges = new ArrayList<>();
+		private final ExecutionVertexID id;
+		private final String name;
+
+		public TestFailoverVertex(ExecutionVertexID id, String name) {
+			this.id = id;
+			this.name = name;
+		}
+
+		void addInputEdge(FailoverEdge edge) {
+			inputEdges.add(edge);
+		}
+
+		void addOuputEdge(FailoverEdge edge) {
+			outputEdges.add(edge);
+		}
+
+		public ExecutionVertexID getExecutionVertexID() {
+			return id;
+		}
+
+		@Override
+		public String getExecutionVertexName() {
+			return name;
+		}
+
+		@Override
+		public Iterable<? extends FailoverEdge> getInputEdges() {
+			return inputEdges::iterator;
+		}
+
+		@Override
+		public Iterable<? extends FailoverEdge> getOutputEdges() {
+			return outputEdges::iterator;
+		}
+	}
+
+	public static class TestFailoverEdge implements FailoverEdge {
+
+		private final IntermediateResultPartitionID resultPartitionID;
+		private final ResultPartitionType resultPartitionType;
+		private final FailoverVertex sourceVertex;
+		private final FailoverVertex targetVertex;
+
+		public TestFailoverEdge(IntermediateResultPartitionID resultPartitionID, ResultPartitionType resultPartitionType, FailoverVertex sourceVertex, FailoverVertex targetVertex) {
+			this.resultPartitionID = resultPartitionID;
+			this.resultPartitionType = resultPartitionType;
+			this.sourceVertex = sourceVertex;
+			this.targetVertex = targetVertex;
+		}
+
+		@Override
+		public IntermediateResultPartitionID getResultPartitionID() {
+			return resultPartitionID;
+		}
+
+		@Override
+		public ResultPartitionType getResultPartitionType() {
+			return resultPartitionType;
+		}
+
+		@Override
+		public FailoverVertex getSourceVertex() {
+			return sourceVertex;
+		}
+
+		@Override
+		public FailoverVertex getTargetVertex() {
+			return targetVertex;
+		}
+	}
+
+	public static class Builder {
+		private boolean containsCoLocationConstraints = false;
+		private Collection<FailoverVertex> vertices = new ArrayList<>();
+
+		public TestFailoverVertex newVertex() {
+			TestFailoverVertex testFailoverVertex = newVertex(UUID.randomUUID().toString());
+			vertices.add(testFailoverVertex);
+			return testFailoverVertex;
+		}
+
+		public TestFailoverVertex newVertex(String name) {
+			TestFailoverVertex testFailoverVertex = new TestFailoverVertex(new ExecutionVertexID(new JobVertexID(), 0), name);
+			vertices.add(testFailoverVertex);
+			return testFailoverVertex;
+		}
+
+		public Builder connect(TestFailoverVertex source, TestFailoverVertex target, ResultPartitionType partitionType) {
+			FailoverEdge edge = new TestFailoverEdge(new IntermediateResultPartitionID(), partitionType, source, target);
+			source.addOuputEdge(edge);
+			target.addInputEdge(edge);
+
+			return this;
+		}
+
+		public Builder setContainsCoLocationConstraints(boolean containsCoLocationConstraints) {
+			this.containsCoLocationConstraints = containsCoLocationConstraints;
+			return this;
+		}
+
+		public FailoverTopology build() {
+			return new TestFailoverTopology(vertices, containsCoLocationConstraints);
+		}
+	}
+}