You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/28 07:38:05 UTC

[flink] branch master updated: [FLINK-12068][runtime] Implement region backtracking for region failover strategy

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 58987dd  [FLINK-12068][runtime] Implement region backtracking for region failover strategy
58987dd is described below

commit 58987dd16c7e8af36e935e811f716d2f843de5ca
Author: zhuzhu.zz <zh...@alibaba-inc.com>
AuthorDate: Wed May 15 18:38:37 2019 +0800

    [FLINK-12068][runtime] Implement region backtracking for region failover strategy
---
 .../failover/flip1/FailoverRegion.java             |  20 +-
 .../flip1/RestartPipelinedRegionStrategy.java      | 164 +++++++--
 .../flip1/ResultPartitionAvailabilityChecker.java  |  35 ++
 .../flip1/RestartPipelinedRegionStrategyTest.java  | 388 ++++++++++++++++++++-
 .../failover/flip1/TestFailoverTopology.java       |  12 +-
 5 files changed, 569 insertions(+), 50 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverRegion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverRegion.java
index 88a7658..9a013df 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverRegion.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverRegion.java
@@ -20,10 +20,7 @@ package org.apache.flink.runtime.executiongraph.failover.flip1;
 
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 
-import java.util.Collection;
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -33,18 +30,21 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class FailoverRegion {
 
+	/** All vertex IDs in this region. */
+	private final Set<ExecutionVertexID> executionVertexIDs;
+
 	/** All vertices in this region. */
-	private final Map<ExecutionVertexID, FailoverVertex> executionVertices;
+	private final Set<FailoverVertex> executionVertices;
 
 	/**
 	 * Creates a new failover region containing a set of vertices.
 	 *
 	 * @param executionVertices to be contained in this region
 	 */
-	public FailoverRegion(Collection<? extends FailoverVertex> executionVertices) {
-		checkNotNull(executionVertices);
-		this.executionVertices = new HashMap<>();
-		executionVertices.forEach(v -> this.executionVertices.put(v.getExecutionVertexID(), v));
+	public FailoverRegion(Set<FailoverVertex> executionVertices) {
+		this.executionVertices = checkNotNull(executionVertices);
+		this.executionVertexIDs = new HashSet<>();
+		executionVertices.forEach(v -> this.executionVertexIDs.add(v.getExecutionVertexID()));
 	}
 
 	/**
@@ -53,7 +53,7 @@ public class FailoverRegion {
 	 * @return IDs of all vertices in this region
 	 */
 	public Set<ExecutionVertexID> getAllExecutionVertexIDs() {
-		return executionVertices.keySet();
+		return executionVertexIDs;
 	}
 
 	/**
@@ -62,6 +62,6 @@ public class FailoverRegion {
 	 * @return all vertices in this region
 	 */
 	public Set<FailoverVertex> getAllExecutionVertices() {
-		return new HashSet<>(executionVertices.values());
+		return executionVertices;
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
index e7040c3..8e06327 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
@@ -18,20 +18,22 @@
 package org.apache.flink.runtime.executiongraph.failover.flip1;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.partition.PartitionException;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.ExceptionUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Collections;
+import java.util.ArrayDeque;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.IdentityHashMap;
-import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -47,23 +49,46 @@ public class RestartPipelinedRegionStrategy implements FailoverStrategy {
 	/** The topology containing info about all the vertices and edges. */
 	private final FailoverTopology topology;
 
+	/** All failover regions. */
+	private final IdentityHashMap<FailoverRegion, Object> regions;
+
 	/** Maps execution vertex id to failover region. */
-	private final Map<ExecutionVertexID, FailoverRegion> regions;
+	private final Map<ExecutionVertexID, FailoverRegion> vertexToRegionMap;
+
+	/** The checker helps to query result partition availability. */
+	private final RegionFailoverResultPartitionAvailabilityChecker resultPartitionAvailabilityChecker;
 
 	/**
 	 * Creates a new failover strategy to restart pipelined regions that works on the given topology.
+	 * The result partitions are always considered to be available if no data consumption error happens.
 	 *
 	 * @param topology containing info about all the vertices and edges
 	 */
+	@VisibleForTesting
 	public RestartPipelinedRegionStrategy(FailoverTopology topology) {
+		this(topology, resultPartitionID -> true);
+	}
+
+	/**
+	 * Creates a new failover strategy to restart pipelined regions that works on the given topology.
+	 *
+	 * @param topology containing info about all the vertices and edges
+	 * @param resultPartitionAvailabilityChecker helps to query result partition availability
+	 */
+	public RestartPipelinedRegionStrategy(
+		FailoverTopology topology,
+		ResultPartitionAvailabilityChecker resultPartitionAvailabilityChecker) {
+
 		this.topology = checkNotNull(topology);
-		this.regions = new HashMap<>();
+		this.regions = new IdentityHashMap<>();
+		this.vertexToRegionMap = new HashMap<>();
+		this.resultPartitionAvailabilityChecker = new RegionFailoverResultPartitionAvailabilityChecker(
+			resultPartitionAvailabilityChecker);
 
 		// build regions based on the given topology
 		LOG.info("Start building failover regions.");
 		buildFailoverRegions();
 	}
-
 	// ------------------------------------------------------------------------
 	//  region building
 	// ------------------------------------------------------------------------
@@ -131,25 +156,28 @@ public class RestartPipelinedRegionStrategy implements FailoverStrategy {
 		for (HashSet<FailoverVertex> regionVertices : distinctRegions.keySet()) {
 			LOG.debug("Creating a failover region with {} vertices.", regionVertices.size());
 			final FailoverRegion failoverRegion = new FailoverRegion(regionVertices);
+			regions.put(failoverRegion, null);
 			for (FailoverVertex vertex : regionVertices) {
-				this.regions.put(vertex.getExecutionVertexID(), failoverRegion);
+				vertexToRegionMap.put(vertex.getExecutionVertexID(), failoverRegion);
 			}
 		}
-		LOG.info("Created {} failover regions.", distinctRegions.size());
+
+		LOG.info("Created {} failover regions.", regions.size());
 	}
 
 	private void buildOneRegionForAllVertices() {
 		LOG.warn("Cannot decompose the topology into individual failover regions due to use of " +
 			"Co-Location constraints (iterations). Job will fail over as one holistic unit.");
 
-		final List<FailoverVertex> allVertices = new ArrayList<>();
+		final Set<FailoverVertex> allVertices = new HashSet<>();
 		for (FailoverVertex vertex : topology.getFailoverVertices()) {
 			allVertices.add(vertex);
 		}
 
 		final FailoverRegion region = new FailoverRegion(allVertices);
+		regions.put(region, null);
 		for (FailoverVertex vertex : topology.getFailoverVertices()) {
-			regions.put(vertex.getExecutionVertexID(), region);
+			vertexToRegionMap.put(vertex.getExecutionVertexID(), region);
 		}
 	}
 
@@ -162,9 +190,9 @@ public class RestartPipelinedRegionStrategy implements FailoverStrategy {
 	 * In this strategy, all task vertices in 'involved' regions are proposed to be restarted.
 	 * The 'involved' regions are calculated with rules below:
 	 * 1. The region containing the failed task is always involved
-	 * 2. TODO: If an input result partition of an involved region is not available, i.e. Missing or Corrupted,
+	 * 2. If an input result partition of an involved region is not available, i.e. Missing or Corrupted,
 	 *    the region containing the partition producer task is involved
-	 * 3. TODO: If a region is involved, all of its consumer regions are involved
+	 * 3. If a region is involved, all of its consumer regions are involved
 	 *
 	 * @param executionVertexId ID of the failed task
 	 * @param cause cause of the failure
@@ -172,30 +200,87 @@ public class RestartPipelinedRegionStrategy implements FailoverStrategy {
 	 */
 	@Override
 	public Set<ExecutionVertexID> getTasksNeedingRestart(ExecutionVertexID executionVertexId, Throwable cause) {
-		final FailoverRegion failedRegion = regions.get(executionVertexId);
+		LOG.info("Calculating tasks to restart to recover the failed task {}.", executionVertexId);
+
+		final FailoverRegion failedRegion = vertexToRegionMap.get(executionVertexId);
 		if (failedRegion == null) {
 			// TODO: show the task name in the log
 			throw new IllegalStateException("Can not find the failover region for task " + executionVertexId, cause);
 		}
 
-		// TODO: if the failure cause is data consumption error, mark the corresponding data partition to be unavailable
+		// if the failure cause is data consumption error, mark the corresponding data partition to be failed,
+		// so that the failover process will try to recover it
+		Optional<PartitionException> dataConsumptionException = ExceptionUtils.findThrowable(
+			cause, PartitionException.class);
+		if (dataConsumptionException.isPresent()) {
+			resultPartitionAvailabilityChecker.markResultPartitionFailed(
+				dataConsumptionException.get().getPartitionId().getPartitionId());
+		}
+
+		// calculate the tasks to restart based on the result of regions to restart
+		Set<ExecutionVertexID> tasksToRestart = new HashSet<>();
+		for (FailoverRegion region : getRegionsToRestart(failedRegion)) {
+			tasksToRestart.addAll(region.getAllExecutionVertexIDs());
+		}
 
-		return getRegionsToRestart(failedRegion).stream().flatMap(
-			r -> r.getAllExecutionVertexIDs().stream()).collect(Collectors.toSet());
+		// the previous failed partition will be recovered. remove its failed state from the checker
+		if (dataConsumptionException.isPresent()) {
+			resultPartitionAvailabilityChecker.removeResultPartitionFromFailedState(
+				dataConsumptionException.get().getPartitionId().getPartitionId());
+		}
+
+		LOG.info("{} tasks should be restarted to recover the failed task {}. ", tasksToRestart.size(), executionVertexId);
+		return tasksToRestart;
 	}
 
 	/**
 	 * All 'involved' regions are proposed to be restarted.
 	 * The 'involved' regions are calculated with rules below:
 	 * 1. The region containing the failed task is always involved
-	 * 2. TODO: If an input result partition of an involved region is not available, i.e. Missing or Corrupted,
+	 * 2. If an input result partition of an involved region is not available, i.e. Missing or Corrupted,
 	 *    the region containing the partition producer task is involved
-	 * 3. TODO: If a region is involved, all of its consumer regions are involved
+	 * 3. If a region is involved, all of its consumer regions are involved
 	 */
-	private Set<FailoverRegion> getRegionsToRestart(FailoverRegion regionToRestart) {
-		return Collections.singleton(regionToRestart);
+	private Set<FailoverRegion> getRegionsToRestart(FailoverRegion failedRegion) {
+		IdentityHashMap<FailoverRegion, Object> regionsToRestart = new IdentityHashMap<>();
+		IdentityHashMap<FailoverRegion, Object> visitedRegions = new IdentityHashMap<>();
+
+		// start from the failed region to visit all involved regions
+		Queue<FailoverRegion> regionsToVisit = new ArrayDeque<>();
+		visitedRegions.put(failedRegion, null);
+		regionsToVisit.add(failedRegion);
+		while (!regionsToVisit.isEmpty()) {
+			FailoverRegion regionToRestart = regionsToVisit.poll();
+
+			// an involved region should be restarted
+			regionsToRestart.put(regionToRestart, null);
 
-		// TODO: implement backtracking logic
+			// if a needed input result partition is not available, its producer region is involved
+			for (FailoverVertex vertex : regionToRestart.getAllExecutionVertices()) {
+				for (FailoverEdge inEdge : vertex.getInputEdges()) {
+					if (!resultPartitionAvailabilityChecker.isAvailable(inEdge.getResultPartitionID())) {
+						FailoverRegion producerRegion = vertexToRegionMap.get(inEdge.getSourceVertex().getExecutionVertexID());
+						if (!visitedRegions.containsKey(producerRegion)) {
+							visitedRegions.put(producerRegion, null);
+							regionsToVisit.add(producerRegion);
+						}
+					}
+				}
+			}
+
+			// all consumer regions of an involved region should be involved
+			for (FailoverVertex vertex : regionToRestart.getAllExecutionVertices()) {
+				for (FailoverEdge outEdge : vertex.getOutputEdges()) {
+					FailoverRegion consumerRegion = vertexToRegionMap.get(outEdge.getTargetVertex().getExecutionVertexID());
+					if (!visitedRegions.containsKey(consumerRegion)) {
+						visitedRegions.put(consumerRegion, null);
+						regionsToVisit.add(consumerRegion);
+					}
+				}
+			}
+		}
+
+		return regionsToRestart.keySet();
 	}
 
 	// ------------------------------------------------------------------------
@@ -208,7 +293,38 @@ public class RestartPipelinedRegionStrategy implements FailoverStrategy {
 	 * @return the failover region that contains the given execution vertex
 	 */
 	@VisibleForTesting
-	FailoverRegion getFailoverRegion(ExecutionVertexID vertexID) {
-		return regions.get(vertexID);
+	public FailoverRegion getFailoverRegion(ExecutionVertexID vertexID) {
+		return vertexToRegionMap.get(vertexID);
+	}
+
+	/**
+	 * A stateful {@link ResultPartitionAvailabilityChecker} which maintains the failed partitions which are not available.
+	 */
+	private static class RegionFailoverResultPartitionAvailabilityChecker implements ResultPartitionAvailabilityChecker {
+
+		/** Result partition state checker from the shuffle master. */
+		private final ResultPartitionAvailabilityChecker resultPartitionAvailabilityChecker;
+
+		/** Records partitions which has caused {@link PartitionException}. */
+		private final HashSet<IntermediateResultPartitionID> failedPartitions;
+
+		RegionFailoverResultPartitionAvailabilityChecker(ResultPartitionAvailabilityChecker checker) {
+			this.resultPartitionAvailabilityChecker = checkNotNull(checker);
+			this.failedPartitions = new HashSet<>();
+		}
+
+		@Override
+		public boolean isAvailable(IntermediateResultPartitionID resultPartitionID) {
+			return !failedPartitions.contains(resultPartitionID) &&
+				resultPartitionAvailabilityChecker.isAvailable(resultPartitionID);
+		}
+
+		public void markResultPartitionFailed(IntermediateResultPartitionID resultPartitionID) {
+			failedPartitions.add(resultPartitionID);
+		}
+
+		public void removeResultPartitionFromFailedState(IntermediateResultPartitionID resultPartitionID) {
+			failedPartitions.remove(resultPartitionID);
+		}
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ResultPartitionAvailabilityChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ResultPartitionAvailabilityChecker.java
new file mode 100644
index 0000000..286ad3e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ResultPartitionAvailabilityChecker.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+/**
+ * This checker helps to query result partition availability.
+ */
+interface ResultPartitionAvailabilityChecker {
+
+	/**
+	 * Returns whether the given partition is available.
+	 *
+	 * @param resultPartitionID ID of the result partition to query
+	 * @return whether the given partition is available
+	 */
+	boolean isAvailable(IntermediateResultPartitionID resultPartitionID);
+}
+
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategyTest.java
index 9a3f595..5a9c844 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategyTest.java
@@ -18,10 +18,22 @@
 
 package org.apache.flink.runtime.executiongraph.failover.flip1;
 
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import java.util.HashSet;
+import java.util.Iterator;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -30,37 +42,387 @@ import static org.junit.Assert.assertEquals;
 public class RestartPipelinedRegionStrategyTest extends TestLogger {
 
 	/**
-	 * Tests for scenes that a task fails for its own error, in which case only the
-	 * region containing the failed task should be restarted.
+	 * Tests for scenes that a task fails for its own error, in which case the
+	 * region containing the failed task and its consumer regions should be restarted.
 	 * <pre>
-	 *     (v1)
+	 *     (v1) -+-> (v4)
+	 *           x
+	 *     (v2) -+-> (v5)
 	 *
-	 *     (v2)
+	 *     (v3) -+-> (v6)
 	 *
-	 *     (v3)
+	 *           ^
+	 *           |
+	 *       (blocking)
 	 * </pre>
+	 * Each vertex is in an individual region.
 	 */
 	@Test
-	public void testRegionFailoverForTaskInternalErrors() throws Exception {
+	public void testRegionFailoverForRegionInternalErrors() throws Exception {
 		TestFailoverTopology.Builder topologyBuilder = new TestFailoverTopology.Builder();
 
 		TestFailoverTopology.TestFailoverVertex v1 = topologyBuilder.newVertex();
 		TestFailoverTopology.TestFailoverVertex v2 = topologyBuilder.newVertex();
 		TestFailoverTopology.TestFailoverVertex v3 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v4 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v5 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v6 = topologyBuilder.newVertex();
+
+		topologyBuilder.connect(v1, v4, ResultPartitionType.BLOCKING);
+		topologyBuilder.connect(v1, v5, ResultPartitionType.BLOCKING);
+		topologyBuilder.connect(v2, v4, ResultPartitionType.BLOCKING);
+		topologyBuilder.connect(v2, v5, ResultPartitionType.BLOCKING);
+		topologyBuilder.connect(v3, v6, ResultPartitionType.BLOCKING);
 
 		FailoverTopology topology = topologyBuilder.build();
 
 		RestartPipelinedRegionStrategy strategy = new RestartPipelinedRegionStrategy(topology);
 
-		FailoverRegion r1 = strategy.getFailoverRegion(v1.getExecutionVertexID());
-		FailoverRegion r2 = strategy.getFailoverRegion(v2.getExecutionVertexID());
-		FailoverRegion r3 = strategy.getFailoverRegion(v3.getExecutionVertexID());
-
-		assertEquals(r1.getAllExecutionVertexIDs(),
+		// when v1 fails, {v1,v4,v5} should be restarted
+		HashSet<ExecutionVertexID> expectedResult = new HashSet<>();
+		expectedResult.add(v1.getExecutionVertexID());
+		expectedResult.add(v4.getExecutionVertexID());
+		expectedResult.add(v5.getExecutionVertexID());
+		assertEquals(expectedResult,
 			strategy.getTasksNeedingRestart(v1.getExecutionVertexID(), new Exception("Test failure")));
-		assertEquals(r2.getAllExecutionVertexIDs(),
+
+		// when v2 fails, {v2,v4,v5} should be restarted
+		expectedResult.clear();
+		expectedResult.add(v2.getExecutionVertexID());
+		expectedResult.add(v4.getExecutionVertexID());
+		expectedResult.add(v5.getExecutionVertexID());
+		assertEquals(expectedResult,
 			strategy.getTasksNeedingRestart(v2.getExecutionVertexID(), new Exception("Test failure")));
-		assertEquals(r3.getAllExecutionVertexIDs(),
+
+		// when v3 fails, {v3,v6} should be restarted
+		expectedResult.clear();
+		expectedResult.add(v3.getExecutionVertexID());
+		expectedResult.add(v6.getExecutionVertexID());
+		assertEquals(expectedResult,
 			strategy.getTasksNeedingRestart(v3.getExecutionVertexID(), new Exception("Test failure")));
+
+		// when v4 fails, {v4} should be restarted
+		expectedResult.clear();
+		expectedResult.add(v4.getExecutionVertexID());
+		assertEquals(expectedResult,
+			strategy.getTasksNeedingRestart(v4.getExecutionVertexID(), new Exception("Test failure")));
+
+		// when v5 fails, {v5} should be restarted
+		expectedResult.clear();
+		expectedResult.add(v5.getExecutionVertexID());
+		assertEquals(expectedResult,
+			strategy.getTasksNeedingRestart(v5.getExecutionVertexID(), new Exception("Test failure")));
+
+		// when v6 fails, {v6} should be restarted
+		expectedResult.clear();
+		expectedResult.add(v6.getExecutionVertexID());
+		assertEquals(expectedResult,
+			strategy.getTasksNeedingRestart(v6.getExecutionVertexID(), new Exception("Test failure")));
+	}
+
+	/**
+	 * Tests for scenes that a task fails for data consumption error, in which case the
+	 * region containing the failed task, the region containing the unavailable result partition
+	 * and all their consumer regions should be restarted.
+	 * <pre>
+	 *     (v1) -+-> (v4)
+	 *           x
+	 *     (v2) -+-> (v5)
+	 *
+	 *     (v3) -+-> (v6)
+	 *
+	 *           ^
+	 *           |
+	 *       (blocking)
+	 * </pre>
+	 * Each vertex is in an individual region.
+	 */
+	@Test
+	public void testRegionFailoverForDataConsumptionErrors() throws Exception {
+		TestFailoverTopology.Builder topologyBuilder = new TestFailoverTopology.Builder();
+
+		TestFailoverTopology.TestFailoverVertex v1 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v2 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v3 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v4 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v5 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v6 = topologyBuilder.newVertex();
+
+		topologyBuilder.connect(v1, v4, ResultPartitionType.BLOCKING);
+		topologyBuilder.connect(v1, v5, ResultPartitionType.BLOCKING);
+		topologyBuilder.connect(v2, v4, ResultPartitionType.BLOCKING);
+		topologyBuilder.connect(v2, v5, ResultPartitionType.BLOCKING);
+		topologyBuilder.connect(v3, v6, ResultPartitionType.BLOCKING);
+
+		FailoverTopology topology = topologyBuilder.build();
+
+		RestartPipelinedRegionStrategy strategy = new RestartPipelinedRegionStrategy(topology);
+
+		// when v4 fails to consume data from v1, {v1,v4,v5} should be restarted
+		HashSet<ExecutionVertexID> expectedResult = new HashSet<>();
+		Iterator<? extends FailoverEdge> v4InputEdgeIterator = v4.getInputEdges().iterator();
+		expectedResult.add(v1.getExecutionVertexID());
+		expectedResult.add(v4.getExecutionVertexID());
+		expectedResult.add(v5.getExecutionVertexID());
+		assertEquals(expectedResult,
+			strategy.getTasksNeedingRestart(v4.getExecutionVertexID(),
+				new PartitionConnectionException(
+					new ResultPartitionID(
+						v4InputEdgeIterator.next().getResultPartitionID(),
+						new ExecutionAttemptID()),
+					new Exception("Test failure"))));
+
+		// when v4 fails to consume data from v2, {v2,v4,v5} should be restarted
+		expectedResult.clear();
+		expectedResult.add(v2.getExecutionVertexID());
+		expectedResult.add(v4.getExecutionVertexID());
+		expectedResult.add(v5.getExecutionVertexID());
+		assertEquals(expectedResult,
+			strategy.getTasksNeedingRestart(v4.getExecutionVertexID(),
+				new PartitionNotFoundException(
+					new ResultPartitionID(
+						v4InputEdgeIterator.next().getResultPartitionID(),
+						new ExecutionAttemptID()))));
+
+		// when v5 fails to consume data from v1, {v1,v4,v5} should be restarted
+		expectedResult.clear();
+		Iterator<? extends FailoverEdge> v5InputEdgeIterator = v5.getInputEdges().iterator();
+		expectedResult.add(v1.getExecutionVertexID());
+		expectedResult.add(v4.getExecutionVertexID());
+		expectedResult.add(v5.getExecutionVertexID());
+		assertEquals(expectedResult,
+			strategy.getTasksNeedingRestart(v5.getExecutionVertexID(),
+				new PartitionConnectionException(
+					new ResultPartitionID(
+						v5InputEdgeIterator.next().getResultPartitionID(),
+						new ExecutionAttemptID()),
+					new Exception("Test failure"))));
+
+		// when v5 fails to consume data from v2, {v2,v4,v5} should be restarted
+		expectedResult.clear();
+		expectedResult.add(v2.getExecutionVertexID());
+		expectedResult.add(v4.getExecutionVertexID());
+		expectedResult.add(v5.getExecutionVertexID());
+		assertEquals(expectedResult,
+			strategy.getTasksNeedingRestart(v5.getExecutionVertexID(),
+				new PartitionNotFoundException(
+					new ResultPartitionID(
+						v5InputEdgeIterator.next().getResultPartitionID(),
+						new ExecutionAttemptID()))));
+
+		// when v6 fails to consume data from v3, {v3,v6} should be restarted
+		expectedResult.clear();
+		Iterator<? extends FailoverEdge> v6InputEdgeIterator = v6.getInputEdges().iterator();
+		expectedResult.add(v3.getExecutionVertexID());
+		expectedResult.add(v6.getExecutionVertexID());
+		assertEquals(expectedResult,
+			strategy.getTasksNeedingRestart(v6.getExecutionVertexID(),
+				new PartitionConnectionException(
+					new ResultPartitionID(
+						v6InputEdgeIterator.next().getResultPartitionID(),
+						new ExecutionAttemptID()),
+					new Exception("Test failure"))));
+	}
+
+	/**
+	 * Tests to verify region failover results regarding different input result partition availability combinations.
+	 * <pre>
+	 *     (v1) --rp1--\
+	 *                 (v3)
+	 *     (v2) --rp2--/
+	 *
+	 *             ^
+	 *             |
+	 *         (blocking)
+	 * </pre>
+	 * Each vertex is in an individual region.
+	 * rp1, rp2 are result partitions.
+	 */
+	@Test
+	public void testRegionFailoverForVariousResultPartitionAvailabilityCombinations() throws Exception {
+		TestFailoverTopology.Builder topologyBuilder = new TestFailoverTopology.Builder();
+
+		TestFailoverTopology.TestFailoverVertex v1 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v2 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v3 = topologyBuilder.newVertex();
+
+		topologyBuilder.connect(v1, v3, ResultPartitionType.BLOCKING);
+		topologyBuilder.connect(v2, v3, ResultPartitionType.BLOCKING);
+
+		FailoverTopology topology = topologyBuilder.build();
+
+		TestResultPartitionAvailabilityChecker availabilityChecker = new TestResultPartitionAvailabilityChecker();
+		RestartPipelinedRegionStrategy strategy = new RestartPipelinedRegionStrategy(topology, availabilityChecker);
+
+		IntermediateResultPartitionID rp1ID = v1.getOutputEdges().iterator().next().getResultPartitionID();
+		IntermediateResultPartitionID rp2ID = v2.getOutputEdges().iterator().next().getResultPartitionID();
+
+		// -------------------------------------------------
+		// Combination1: (rp1 == available, rp == available)
+		// -------------------------------------------------
+		availabilityChecker.failedPartitions.clear();
+
+		// when v1 fails, {v1,v3} should be restarted
+		assertThat(
+			strategy.getTasksNeedingRestart(v1.getExecutionVertexID(), new Exception("Test failure")),
+			containsInAnyOrder(v1.getExecutionVertexID(), v3.getExecutionVertexID()));
+
+		// when v2 fails, {v2,v3} should be restarted
+		assertThat(
+			strategy.getTasksNeedingRestart(v2.getExecutionVertexID(), new Exception("Test failure")),
+			containsInAnyOrder(v2.getExecutionVertexID(), v3.getExecutionVertexID()));
+
+		// when v3 fails, {v3} should be restarted
+		assertThat(
+			strategy.getTasksNeedingRestart(v3.getExecutionVertexID(), new Exception("Test failure")),
+			containsInAnyOrder(v3.getExecutionVertexID()));
+
+		// -------------------------------------------------
+		// Combination2: (rp1 == unavailable, rp == available)
+		// -------------------------------------------------
+		availabilityChecker.failedPartitions.clear();
+		availabilityChecker.markResultPartitionFailed(rp1ID);
+
+		// when v1 fails, {v1,v3} should be restarted
+		assertThat(
+			strategy.getTasksNeedingRestart(v1.getExecutionVertexID(), new Exception("Test failure")),
+			containsInAnyOrder(v1.getExecutionVertexID(), v3.getExecutionVertexID()));
+
+		// when v2 fails, {v1,v2,v3} should be restarted
+		assertThat(
+			strategy.getTasksNeedingRestart(v2.getExecutionVertexID(), new Exception("Test failure")),
+			containsInAnyOrder(v1.getExecutionVertexID(), v2.getExecutionVertexID(), v3.getExecutionVertexID()));
+
+		// when v3 fails, {v1,v3} should be restarted
+		assertThat(
+			strategy.getTasksNeedingRestart(v3.getExecutionVertexID(), new Exception("Test failure")),
+			containsInAnyOrder(v1.getExecutionVertexID(), v3.getExecutionVertexID()));
+
+		// -------------------------------------------------
+		// Combination3: (rp1 == available, rp == unavailable)
+		// -------------------------------------------------
+		availabilityChecker.failedPartitions.clear();
+		availabilityChecker.markResultPartitionFailed(rp2ID);
+
+		// when v1 fails, {v1,v2,v3} should be restarted
+		assertThat(
+			strategy.getTasksNeedingRestart(v1.getExecutionVertexID(), new Exception("Test failure")),
+			containsInAnyOrder(v1.getExecutionVertexID(), v2.getExecutionVertexID(), v3.getExecutionVertexID()));
+
+		// when v2 fails, {v2,v3} should be restarted
+		assertThat(
+			strategy.getTasksNeedingRestart(v2.getExecutionVertexID(), new Exception("Test failure")),
+			containsInAnyOrder(v2.getExecutionVertexID(), v3.getExecutionVertexID()));
+
+		// when v3 fails, {v2,v3} should be restarted
+		assertThat(
+			strategy.getTasksNeedingRestart(v3.getExecutionVertexID(), new Exception("Test failure")),
+			containsInAnyOrder(v2.getExecutionVertexID(), v3.getExecutionVertexID()));
+
+		// -------------------------------------------------
+		// Combination4: (rp1 == unavailable, rp == unavailable)
+		// -------------------------------------------------
+		availabilityChecker.failedPartitions.clear();
+		availabilityChecker.markResultPartitionFailed(rp1ID);
+		availabilityChecker.markResultPartitionFailed(rp2ID);
+
+		// when v1 fails, {v1,v2,v3} should be restarted
+		assertThat(
+			strategy.getTasksNeedingRestart(v1.getExecutionVertexID(), new Exception("Test failure")),
+			containsInAnyOrder(v1.getExecutionVertexID(), v2.getExecutionVertexID(), v3.getExecutionVertexID()));
+
+		// when v2 fails, {v1,v2,v3} should be restarted
+		assertThat(
+			strategy.getTasksNeedingRestart(v2.getExecutionVertexID(), new Exception("Test failure")),
+			containsInAnyOrder(v1.getExecutionVertexID(), v2.getExecutionVertexID(), v3.getExecutionVertexID()));
+
+		// when v3 fails, {v1,v2,v3} should be restarted
+		assertThat(
+			strategy.getTasksNeedingRestart(v3.getExecutionVertexID(), new Exception("Test failure")),
+			containsInAnyOrder(v1.getExecutionVertexID(), v2.getExecutionVertexID(), v3.getExecutionVertexID()));
+	}
+
+	/**
+	 * Tests region failover scenes for topology with multiple vertices.
+	 * <pre>
+	 *     (v1) ---> (v2) --|--> (v3) ---> (v4) --|--> (v5) ---> (v6)
+	 *
+	 *           ^          ^          ^          ^          ^
+	 *           |          |          |          |          |
+	 *     (pipelined) (blocking) (pipelined) (blocking) (pipelined)
+	 * </pre>
+	 * Component 1: 1,2; component 2: 3,4; component 3: 5,6
+	 */
+	@Test
+	public void testRegionFailoverForMultipleVerticesRegions() throws Exception {
+		TestFailoverTopology.Builder topologyBuilder = new TestFailoverTopology.Builder();
+
+		TestFailoverTopology.TestFailoverVertex v1 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v2 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v3 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v4 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v5 = topologyBuilder.newVertex();
+		TestFailoverTopology.TestFailoverVertex v6 = topologyBuilder.newVertex();
+
+		topologyBuilder.connect(v1, v2, ResultPartitionType.PIPELINED);
+		topologyBuilder.connect(v2, v3, ResultPartitionType.BLOCKING);
+		topologyBuilder.connect(v3, v4, ResultPartitionType.PIPELINED);
+		topologyBuilder.connect(v4, v5, ResultPartitionType.BLOCKING);
+		topologyBuilder.connect(v5, v6, ResultPartitionType.PIPELINED);
+
+		FailoverTopology topology = topologyBuilder.build();
+
+		RestartPipelinedRegionStrategy strategy = new RestartPipelinedRegionStrategy(topology);
+
+		// when v3 fails due to internal error, {v3,v4,v5,v6} should be restarted
+		HashSet<ExecutionVertexID> expectedResult = new HashSet<>();
+		expectedResult.add(v3.getExecutionVertexID());
+		expectedResult.add(v4.getExecutionVertexID());
+		expectedResult.add(v5.getExecutionVertexID());
+		expectedResult.add(v6.getExecutionVertexID());
+		assertEquals(expectedResult,
+			strategy.getTasksNeedingRestart(v3.getExecutionVertexID(), new Exception("Test failure")));
+
+		// when v3 fails to consume from v2, {v1,v2,v3,v4,v5,v6} should be restarted
+		expectedResult.clear();
+		expectedResult.add(v1.getExecutionVertexID());
+		expectedResult.add(v2.getExecutionVertexID());
+		expectedResult.add(v3.getExecutionVertexID());
+		expectedResult.add(v4.getExecutionVertexID());
+		expectedResult.add(v5.getExecutionVertexID());
+		expectedResult.add(v6.getExecutionVertexID());
+		assertEquals(expectedResult,
+			strategy.getTasksNeedingRestart(v3.getExecutionVertexID(),
+				new PartitionConnectionException(
+					new ResultPartitionID(
+						v3.getInputEdges().iterator().next().getResultPartitionID(),
+						new ExecutionAttemptID()),
+					new Exception("Test failure"))));
+	}
+
+	// ------------------------------------------------------------------------
+	//  utilities
+	// ------------------------------------------------------------------------
+
+	private static class TestResultPartitionAvailabilityChecker implements ResultPartitionAvailabilityChecker {
+
+		private final HashSet<IntermediateResultPartitionID> failedPartitions;
+
+		public TestResultPartitionAvailabilityChecker() {
+			this.failedPartitions = new HashSet<>();
+		}
+
+		@Override
+		public boolean isAvailable(IntermediateResultPartitionID resultPartitionID) {
+			return !failedPartitions.contains(resultPartitionID);
+		}
+
+		public void markResultPartitionFailed(IntermediateResultPartitionID resultPartitionID) {
+			failedPartitions.add(resultPartitionID);
+		}
+
+		public void removeResultPartitionFromFailedState(IntermediateResultPartitionID resultPartitionID) {
+			failedPartitions.remove(resultPartitionID);
+		}
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/TestFailoverTopology.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/TestFailoverTopology.java
index 9daec8e..6b8b5fd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/TestFailoverTopology.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/TestFailoverTopology.java
@@ -130,9 +130,7 @@ public class TestFailoverTopology implements FailoverTopology {
 		private Collection<FailoverVertex> vertices = new ArrayList<>();
 
 		public TestFailoverVertex newVertex() {
-			TestFailoverVertex testFailoverVertex = newVertex(UUID.randomUUID().toString());
-			vertices.add(testFailoverVertex);
-			return testFailoverVertex;
+			return newVertex(UUID.randomUUID().toString());
 		}
 
 		public TestFailoverVertex newVertex(String name) {
@@ -149,6 +147,14 @@ public class TestFailoverTopology implements FailoverTopology {
 			return this;
 		}
 
+		public Builder connect(TestFailoverVertex source, TestFailoverVertex target, ResultPartitionType partitionType, IntermediateResultPartitionID partitionID) {
+			FailoverEdge edge = new TestFailoverEdge(partitionID, partitionType, source, target);
+			source.addOuputEdge(edge);
+			target.addInputEdge(edge);
+
+			return this;
+		}
+
 		public Builder setContainsCoLocationConstraints(boolean containsCoLocationConstraints) {
 			this.containsCoLocationConstraints = containsCoLocationConstraints;
 			return this;