You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2019/07/03 23:03:36 UTC

[flink] 04/04: [FLINK-12876][runtime] Adapt new RestartPipelinedRegionStrategy to legacy scheduling

This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6a682ff7bfa6f0073759ec716b09bf6e06a3d36b
Author: Gary Yao <ga...@apache.org>
AuthorDate: Wed Jul 3 20:20:34 2019 +0200

    [FLINK-12876][runtime] Adapt new RestartPipelinedRegionStrategy to legacy scheduling
    
    Implement adapter (AdaptedRestartPipelinedRegionStrategyNG) that adapts
    org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy
    to the legacy failover strategy interface
    (org.apache.flink.runtime.executiongraph.failover.FailoverStrategy).
    
    The new AdaptedRestartPipelinedRegionStrategyNG is chosen if config option
    jobmanager.execution.failover-strategy is set to "region". The legacy behavior
    can be enabled by setting the config option to "region-legacy".
    
    This closes #8922.
---
 .../runtime/executiongraph/ExecutionGraph.java     | 156 +------
 .../runtime/executiongraph/SchedulingUtils.java    | 218 ++++++++++
 .../AdaptedRestartPipelinedRegionStrategyNG.java   | 316 ++++++++++++++
 .../failover/FailoverStrategyLoader.java           |   8 +-
 .../runtime/scheduler/ExecutionVertexVersion.java  |  45 ++
 .../scheduler/ExecutionVertexVersioner.java        |  76 ++++
 ...egionStrategyNGAbortPendingCheckpointsTest.java | 171 ++++++++
 ...inedRegionStrategyNGConcurrentFailoverTest.java | 284 +++++++++++++
 ...startPipelinedRegionStrategyNGFailoverTest.java | 459 +++++++++++++++++++++
 .../executiongraph/ExecutionGraphRestartTest.java  | 108 ++++-
 .../PipelinedFailoverRegionBuildingTest.java       |   2 +-
 ...RestartPipelinedRegionStrategyBuildingTest.java |   4 +-
 .../scheduler/ExecutionVertexVersionerTest.java    | 121 ++++++
 13 files changed, 1807 insertions(+), 161 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index ce65b68..0840c00 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -40,7 +40,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
@@ -65,9 +64,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
-import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
-import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
-import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
 import org.apache.flink.runtime.scheduler.adapter.ExecutionGraphToSchedulingTopologyAdapter;
@@ -108,16 +104,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Optional;
-import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -952,11 +945,11 @@ public class ExecutionGraph implements AccessExecutionGraph {
 			switch (scheduleMode) {
 
 				case LAZY_FROM_SOURCES:
-					newSchedulingFuture = scheduleLazy(slotProvider);
+					newSchedulingFuture = scheduleLazy();
 					break;
 
 				case EAGER:
-					newSchedulingFuture = scheduleEager(slotProvider, allocationTimeout);
+					newSchedulingFuture = scheduleEager();
 					break;
 
 				default:
@@ -985,123 +978,16 @@ public class ExecutionGraph implements AccessExecutionGraph {
 		}
 	}
 
-	private CompletableFuture<Void> scheduleLazy(SlotProvider slotProvider) {
-
-		final ArrayList<CompletableFuture<Void>> schedulingFutures = new ArrayList<>(numVerticesTotal);
-		// simply take the vertices without inputs.
-		for (ExecutionJobVertex ejv : verticesInCreationOrder) {
-			if (ejv.getJobVertex().isInputVertex()) {
-				final CompletableFuture<Void> schedulingJobVertexFuture = ejv.scheduleAll(
-					slotProvider,
-					allowQueuedScheduling,
-					LocationPreferenceConstraint.ALL, // since it is an input vertex, the input based location preferences should be empty
-					Collections.emptySet());
-
-				schedulingFutures.add(schedulingJobVertexFuture);
-			}
-		}
-
-		return FutureUtils.waitForAll(schedulingFutures);
+	private CompletableFuture<Void> scheduleLazy() {
+		return SchedulingUtils.scheduleLazy(getAllExecutionVertices(), this);
 	}
 
 	/**
-	 *
-	 *
-	 * @param slotProvider  The resource provider from which the slots are allocated
-	 * @param timeout       The maximum time that the deployment may take, before a
-	 *                      TimeoutException is thrown.
 	 * @return Future which is completed once the {@link ExecutionGraph} has been scheduled.
 	 * The future can also be completed exceptionally if an error happened.
 	 */
-	private CompletableFuture<Void> scheduleEager(SlotProvider slotProvider, final Time timeout) {
-		assertRunningInJobMasterMainThread();
-		checkState(state == JobStatus.RUNNING, "job is not running currently");
-
-		// Important: reserve all the space we need up front.
-		// that way we do not have any operation that can fail between allocating the slots
-		// and adding them to the list. If we had a failure in between there, that would
-		// cause the slots to get lost
-		final boolean queued = allowQueuedScheduling;
-
-		// collecting all the slots may resize and fail in that operation without slots getting lost
-		final ArrayList<CompletableFuture<Execution>> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
-
-		final Set<AllocationID> allPreviousAllocationIds =
-			Collections.unmodifiableSet(computeAllPriorAllocationIdsIfRequiredByScheduling());
-
-		// allocate the slots (obtain all their futures
-		for (ExecutionJobVertex ejv : getVerticesTopologically()) {
-			// these calls are not blocking, they only return futures
-			Collection<CompletableFuture<Execution>> allocationFutures = ejv.allocateResourcesForAll(
-				slotProvider,
-				queued,
-				LocationPreferenceConstraint.ALL,
-				allPreviousAllocationIds,
-				timeout);
-
-			allAllocationFutures.addAll(allocationFutures);
-		}
-
-		// this future is complete once all slot futures are complete.
-		// the future fails once one slot future fails.
-		final ConjunctFuture<Collection<Execution>> allAllocationsFuture = FutureUtils.combineAll(allAllocationFutures);
-
-		return allAllocationsFuture.thenAccept(
-			(Collection<Execution> executionsToDeploy) -> {
-				for (Execution execution : executionsToDeploy) {
-					try {
-						execution.deploy();
-					} catch (Throwable t) {
-						throw new CompletionException(
-							new FlinkException(
-								String.format("Could not deploy execution %s.", execution),
-								t));
-					}
-				}
-			})
-			// Generate a more specific failure message for the eager scheduling
-			.exceptionally(
-				(Throwable throwable) -> {
-					final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
-					final Throwable resultThrowable;
-					if (strippedThrowable instanceof TimeoutException) {
-						int numTotal = allAllocationsFuture.getNumFuturesTotal();
-						int numComplete = allAllocationsFuture.getNumFuturesCompleted();
-
-						String message = "Could not allocate all requires slots within timeout of " +
-							timeout + ". Slots required: " + numTotal + ", slots allocated: " + numComplete +
-								", previous allocation IDs: " + allPreviousAllocationIds;
-
-						StringBuilder executionMessageBuilder = new StringBuilder();
-
-						for (int i = 0; i < allAllocationFutures.size(); i++) {
-							CompletableFuture<Execution> executionFuture = allAllocationFutures.get(i);
-
-							try {
-								Execution execution = executionFuture.getNow(null);
-								if (execution != null) {
-									executionMessageBuilder.append("completed: " + execution);
-								} else {
-									executionMessageBuilder.append("incomplete: " + executionFuture);
-								}
-							} catch (CompletionException completionException) {
-								executionMessageBuilder.append("completed exceptionally: " + completionException + "/" + executionFuture);
-							}
-
-							if (i < allAllocationFutures.size() - 1) {
-								executionMessageBuilder.append(", ");
-							}
-						}
-
-						message += ", execution status: " + executionMessageBuilder.toString();
-
-						resultThrowable = new NoResourceAvailableException(message);
-					} else {
-						resultThrowable = strippedThrowable;
-					}
-
-					throw new CompletionException(resultThrowable);
-				});
+	private CompletableFuture<Void> scheduleEager() {
+		return SchedulingUtils.scheduleEager(getAllExecutionVertices(), this);
 	}
 
 	public void cancel() {
@@ -1414,7 +1300,7 @@ public class ExecutionGraph implements AccessExecutionGraph {
 	 * and is used to disambiguate concurrent modifications between local and global
 	 * failover actions.
 	 */
-	long getGlobalModVersion() {
+	public long getGlobalModVersion() {
 		return globalModVersion;
 	}
 
@@ -1808,34 +1694,6 @@ public class ExecutionGraph implements AccessExecutionGraph {
 		}
 	}
 
-	/**
-	 * Computes and returns a set with the prior allocation ids from all execution vertices in the graph.
-	 */
-	private Set<AllocationID> computeAllPriorAllocationIds() {
-		HashSet<AllocationID> allPreviousAllocationIds = new HashSet<>(getNumberOfExecutionJobVertices());
-		for (ExecutionVertex executionVertex : getAllExecutionVertices()) {
-			AllocationID latestPriorAllocation = executionVertex.getLatestPriorAllocation();
-			if (latestPriorAllocation != null) {
-				allPreviousAllocationIds.add(latestPriorAllocation);
-			}
-		}
-		return allPreviousAllocationIds;
-	}
-
-	/**
-	 * Returns the result of {@link #computeAllPriorAllocationIds()}, but only if the scheduling really requires it.
-	 * Otherwise this method simply returns an empty set.
-	 */
-	private Set<AllocationID> computeAllPriorAllocationIdsIfRequiredByScheduling() {
-		// This is a temporary optimization to avoid computing all previous allocations if not required
-		// This can go away when we progress with the implementation of the Scheduler.
-		if (slotProvider instanceof Scheduler && ((Scheduler) slotProvider).requiresPreviousExecutionGraphAllocations()) {
-			return computeAllPriorAllocationIds();
-		} else {
-			return Collections.emptySet();
-		}
-	}
-
 	// --------------------------------------------------------------------------------------------
 	//  Listeners & Observers
 	// --------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SchedulingUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SchedulingUtils.java
new file mode 100644
index 0000000..950fcb1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SchedulingUtils.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This class contains scheduling logic for EAGER and LAZY_FROM_SOURCES.
+ * It is used for normal scheduling and legacy failover strategy re-scheduling.
+ */
+public class SchedulingUtils {
+
+	/**
+	 * Schedule vertices lazy. That means only vertices satisfying its input constraint will be scheduled.
+	 *
+	 * @param vertices Topologically sorted vertices to schedule.
+	 * @param executionGraph The graph the given vertices belong to.
+	 */
+	public static CompletableFuture<Void> scheduleLazy(
+			final Iterable<ExecutionVertex> vertices,
+			final ExecutionGraph executionGraph) {
+
+		executionGraph.assertRunningInJobMasterMainThread();
+
+		final Set<AllocationID> previousAllocations = computePriorAllocationIdsIfRequiredByScheduling(
+			vertices, executionGraph.getSlotProvider());
+
+		final ArrayList<CompletableFuture<Void>> schedulingFutures = new ArrayList<>();
+		for (ExecutionVertex executionVertex : vertices) {
+			// only schedule vertex when its input constraint is satisfied
+			if (executionVertex.getJobVertex().getJobVertex().isInputVertex() ||
+				executionVertex.checkInputDependencyConstraints()) {
+
+				final CompletableFuture<Void> schedulingVertexFuture = executionVertex.scheduleForExecution(
+					executionGraph.getSlotProvider(),
+					executionGraph.isQueuedSchedulingAllowed(),
+					LocationPreferenceConstraint.ANY,
+					previousAllocations);
+
+				schedulingFutures.add(schedulingVertexFuture);
+			}
+		}
+
+		return FutureUtils.waitForAll(schedulingFutures);
+	}
+
+	/**
+	 * Schedule vertices eagerly. That means all vertices will be scheduled at once.
+	 *
+	 * @param vertices Topologically sorted vertices to schedule.
+	 * @param executionGraph The graph the given vertices belong to.
+	 */
+	public static CompletableFuture<Void> scheduleEager(
+			final Iterable<ExecutionVertex> vertices,
+			final ExecutionGraph executionGraph) {
+
+		executionGraph.assertRunningInJobMasterMainThread();
+
+		checkState(executionGraph.getState() == JobStatus.RUNNING, "job is not running currently");
+
+		// Important: reserve all the space we need up front.
+		// that way we do not have any operation that can fail between allocating the slots
+		// and adding them to the list. If we had a failure in between there, that would
+		// cause the slots to get lost
+		final boolean queued = executionGraph.isQueuedSchedulingAllowed();
+
+		// collecting all the slots may resize and fail in that operation without slots getting lost
+		final ArrayList<CompletableFuture<Execution>> allAllocationFutures = new ArrayList<>();
+
+		final Set<AllocationID> allPreviousAllocationIds = Collections.unmodifiableSet(
+			computePriorAllocationIdsIfRequiredByScheduling(vertices, executionGraph.getSlotProvider()));
+
+		// allocate the slots (obtain all their futures)
+		for (ExecutionVertex ev : vertices) {
+			// these calls are not blocking, they only return futures
+			CompletableFuture<Execution> allocationFuture = ev.getCurrentExecutionAttempt().allocateResourcesForExecution(
+				executionGraph.getSlotProvider(),
+				queued,
+				LocationPreferenceConstraint.ALL,
+				allPreviousAllocationIds,
+				executionGraph.getAllocationTimeout());
+
+			allAllocationFutures.add(allocationFuture);
+		}
+
+		// this future is complete once all slot futures are complete.
+		// the future fails once one slot future fails.
+		final ConjunctFuture<Collection<Execution>> allAllocationsFuture = FutureUtils.combineAll(allAllocationFutures);
+
+		return allAllocationsFuture.thenAccept(
+			(Collection<Execution> executionsToDeploy) -> {
+				for (Execution execution : executionsToDeploy) {
+					try {
+						execution.deploy();
+					} catch (Throwable t) {
+						throw new CompletionException(
+							new FlinkException(
+								String.format("Could not deploy execution %s.", execution),
+								t));
+					}
+				}
+			})
+			// Generate a more specific failure message for the eager scheduling
+			.exceptionally(
+				(Throwable throwable) -> {
+					final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
+					final Throwable resultThrowable;
+					if (strippedThrowable instanceof TimeoutException) {
+						int numTotal = allAllocationsFuture.getNumFuturesTotal();
+						int numComplete = allAllocationsFuture.getNumFuturesCompleted();
+
+						String message = "Could not allocate all requires slots within timeout of "
+							+ executionGraph.getAllocationTimeout() + ". Slots required: "
+							+ numTotal + ", slots allocated: " + numComplete
+							+ ", previous allocation IDs: " + allPreviousAllocationIds;
+
+						StringBuilder executionMessageBuilder = new StringBuilder();
+
+						for (int i = 0; i < allAllocationFutures.size(); i++) {
+							CompletableFuture<Execution> executionFuture = allAllocationFutures.get(i);
+
+							try {
+								Execution execution = executionFuture.getNow(null);
+								if (execution != null) {
+									executionMessageBuilder.append("completed: " + execution);
+								} else {
+									executionMessageBuilder.append("incomplete: " + executionFuture);
+								}
+							} catch (CompletionException completionException) {
+								executionMessageBuilder.append("completed exceptionally: "
+									+ completionException + "/" + executionFuture);
+							}
+
+							if (i < allAllocationFutures.size() - 1) {
+								executionMessageBuilder.append(", ");
+							}
+						}
+
+						message += ", execution status: " + executionMessageBuilder.toString();
+
+						resultThrowable = new NoResourceAvailableException(message);
+					} else {
+						resultThrowable = strippedThrowable;
+					}
+
+					throw new CompletionException(resultThrowable);
+				});
+	}
+
+	/**
+	 * Returns the result of {@link #computePriorAllocationIds(Iterable)},
+	 * but only if the scheduling really requires it.
+	 * Otherwise this method simply returns an empty set.
+	 */
+	private static Set<AllocationID> computePriorAllocationIdsIfRequiredByScheduling(
+			final Iterable<ExecutionVertex> vertices,
+			final SlotProvider slotProvider) {
+		// This is a temporary optimization to avoid computing all previous allocations if not required
+		// This can go away when we progress with the implementation of the Scheduler.
+		if (slotProvider instanceof Scheduler &&
+			((Scheduler) slotProvider).requiresPreviousExecutionGraphAllocations()) {
+
+			return computePriorAllocationIds(vertices);
+		} else {
+			return Collections.emptySet();
+		}
+	}
+
+	/**
+	 * Computes and returns a set with the prior allocation ids for given execution vertices.
+	 */
+	private static Set<AllocationID> computePriorAllocationIds(final Iterable<ExecutionVertex> vertices) {
+		HashSet<AllocationID> allPreviousAllocationIds = new HashSet<>();
+		for (ExecutionVertex executionVertex : vertices) {
+			AllocationID latestPriorAllocation = executionVertex.getLatestPriorAllocation();
+			if (latestPriorAllocation != null) {
+				allPreviousAllocationIds.add(latestPriorAllocation);
+			}
+		}
+		return allPreviousAllocationIds;
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
new file mode 100644
index 0000000..5fddac3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
+import org.apache.flink.runtime.executiongraph.SchedulingUtils;
+import org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersion;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This failover strategy uses flip1.RestartPipelinedRegionStrategy to make task failover decisions.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy {
+
+	private static final Logger LOG = LoggerFactory.getLogger(AdaptedRestartPipelinedRegionStrategyNG.class);
+
+	/** The execution graph on which this FailoverStrategy works. */
+	private final ExecutionGraph executionGraph;
+
+	/** The versioner helps to maintain execution vertex versions. */
+	private final ExecutionVertexVersioner executionVertexVersioner;
+
+	/** The underlying new generation region failover strategy. */
+	private RestartPipelinedRegionStrategy restartPipelinedRegionStrategy;
+
+	public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph executionGraph) {
+		this.executionGraph = checkNotNull(executionGraph);
+		this.executionVertexVersioner = new ExecutionVertexVersioner();
+	}
+
+	@Override
+	public void onTaskFailure(final Execution taskExecution, final Throwable cause) {
+		if (!executionGraph.getRestartStrategy().canRestart()) {
+			// delegate the failure to a global fail that will check the restart strategy and not restart
+			LOG.info("Fail to pass the restart strategy validation in region failover. Fallback to fail global.");
+			failGlobal(cause);
+			return;
+		}
+
+		if (!isLocalFailoverValid(executionGraph.getGlobalModVersion())) {
+			LOG.info("Skip current region failover as a global failover is ongoing.");
+			return;
+		}
+
+		final ExecutionVertexID vertexID = getExecutionVertexID(taskExecution.getVertex());
+
+		final Set<ExecutionVertexID> tasksToRestart = restartPipelinedRegionStrategy.getTasksNeedingRestart(vertexID, cause);
+		restartTasks(tasksToRestart);
+	}
+
+	@VisibleForTesting
+	protected void restartTasks(final Set<ExecutionVertexID> verticesToRestart) {
+		final long globalModVersion = executionGraph.getGlobalModVersion();
+		final Set<ExecutionVertexVersion> vertexVersions = new HashSet<>(
+			executionVertexVersioner.recordVertexModifications(verticesToRestart).values());
+
+		FutureUtils.assertNoException(
+			cancelTasks(verticesToRestart)
+				.thenRunAsync(resetAndRescheduleTasks(globalModVersion, vertexVersions), executionGraph.getJobMasterMainThreadExecutor())
+				.handle(failGlobalOnError()));
+	}
+
+	private Runnable resetAndRescheduleTasks(final long globalModVersion, final Set<ExecutionVertexVersion> vertexVersions) {
+		return () -> {
+			if (!isLocalFailoverValid(globalModVersion)) {
+				LOG.info("Skip current region failover as a global failover is ongoing.");
+				return;
+			}
+
+			// found out vertices which are still valid to restart.
+			// some vertices involved in this failover may be modified if another region
+			// failover happens during the cancellation stage of this failover.
+			// Will ignore the modified vertices as the other failover will deal with them.
+			final Set<ExecutionVertex> unmodifiedVertices = executionVertexVersioner
+				.getUnmodifiedExecutionVertices(vertexVersions)
+				.stream()
+				.map(this::getExecutionVertex)
+				.collect(Collectors.toSet());
+
+			try {
+				LOG.info("Finally restart {} tasks to recover from task failure.", unmodifiedVertices.size());
+
+				// reset tasks to CREATED state and reload state
+				resetTasks(unmodifiedVertices, globalModVersion);
+
+				// re-schedule tasks
+				rescheduleTasks(unmodifiedVertices, globalModVersion);
+			} catch (GlobalModVersionMismatch e) {
+				throw new IllegalStateException(
+					"Bug: ExecutionGraph was concurrently modified outside of main thread", e);
+			} catch (Exception e) {
+				throw new CompletionException(e);
+			}
+		};
+	}
+
+	private BiFunction<Object, Throwable, Object> failGlobalOnError() {
+		return (Object ignored, Throwable t) -> {
+			if (t != null) {
+				LOG.info("Unexpected error happens in region failover. Fail globally.", t);
+				failGlobal(t);
+			}
+			return null;
+		};
+	}
+
+	@VisibleForTesting
+	protected CompletableFuture<?> cancelTasks(final Set<ExecutionVertexID> vertices) {
+		final List<CompletableFuture<?>> cancelFutures = vertices.stream()
+			.map(this::cancelExecutionVertex)
+			.collect(Collectors.toList());
+
+		return FutureUtils.combineAll(cancelFutures);
+	}
+
+	private void resetTasks(final Set<ExecutionVertex> vertices, final long globalModVersion) throws Exception {
+		final Set<CoLocationGroup> colGroups = new HashSet<>();
+		final long restartTimestamp = System.currentTimeMillis();
+
+		for (ExecutionVertex ev : vertices) {
+			CoLocationGroup cgroup = ev.getJobVertex().getCoLocationGroup();
+			if (cgroup != null && !colGroups.contains(cgroup)){
+				cgroup.resetConstraints();
+				colGroups.add(cgroup);
+			}
+
+			ev.resetForNewExecution(restartTimestamp, globalModVersion);
+		}
+
+		// if there is checkpointed state, reload it into the executions
+		if (executionGraph.getCheckpointCoordinator() != null) {
+			// abort pending checkpoints to
+			// i) enable new checkpoint triggering without waiting for last checkpoint expired.
+			// ii) ensure the EXACTLY_ONCE semantics if needed.
+			executionGraph.getCheckpointCoordinator().abortPendingCheckpoints(
+				new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION));
+
+			final Map<JobVertexID, ExecutionJobVertex> involvedExecutionJobVertices =
+				getInvolvedExecutionJobVertices(vertices);
+			executionGraph.getCheckpointCoordinator().restoreLatestCheckpointedState(
+				involvedExecutionJobVertices, false, true);
+		}
+	}
+
+	private void rescheduleTasks(final Set<ExecutionVertex> vertices, final long globalModVersion) throws Exception {
+
+		// sort vertices topologically
+		// this is to reduce the possibility that downstream tasks get launched earlier,
+		// which may cause lots of partition state checks in EAGER mode
+		final List<ExecutionVertex> sortedVertices = sortVerticesTopologically(vertices);
+
+		final CompletableFuture<Void> newSchedulingFuture;
+		switch (executionGraph.getScheduleMode()) {
+
+			case LAZY_FROM_SOURCES:
+				newSchedulingFuture = SchedulingUtils.scheduleLazy(sortedVertices, executionGraph);
+				break;
+
+			case EAGER:
+				newSchedulingFuture = SchedulingUtils.scheduleEager(sortedVertices, executionGraph);
+				break;
+
+			default:
+				throw new JobException("Schedule mode is invalid.");
+		}
+
+		// if no global failover is triggered in the scheduling process,
+		// register a failure handling callback to the scheduling
+		if (isLocalFailoverValid(globalModVersion)) {
+			newSchedulingFuture.whenComplete(
+				(Void ignored, Throwable throwable) -> {
+					if (throwable != null) {
+						final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
+
+						if (!(strippedThrowable instanceof CancellationException)) {
+							// only fail if the scheduling future was not canceled
+							failGlobal(strippedThrowable);
+						}
+					}
+				});
+		}
+	}
+
+	private boolean isLocalFailoverValid(final long globalModVersion) {
+		// local failover is valid only if the job is still RUNNING and
+		// no global failover happens since the given globalModVersion is recorded
+		return executionGraph.getState() == JobStatus.RUNNING &&
+			executionGraph.getGlobalModVersion() == globalModVersion;
+	}
+
+	private CompletableFuture<?> cancelExecutionVertex(final ExecutionVertexID executionVertexId) {
+		return getExecutionVertex(executionVertexId).cancel();
+	}
+
+	private Map<JobVertexID, ExecutionJobVertex> getInvolvedExecutionJobVertices(
+		final Set<ExecutionVertex> executionVertices) {
+
+		Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
+		for (ExecutionVertex executionVertex : executionVertices) {
+			JobVertexID jobvertexId = executionVertex.getJobvertexId();
+			ExecutionJobVertex jobVertex = executionVertex.getJobVertex();
+			tasks.putIfAbsent(jobvertexId, jobVertex);
+		}
+		return tasks;
+	}
+
+	private void failGlobal(final Throwable cause) {
+		executionGraph.failGlobal(cause);
+	}
+
+	private ExecutionVertex getExecutionVertex(final ExecutionVertexID vertexID) {
+		return executionGraph.getAllVertices()
+			.get(vertexID.getJobVertexId())
+			.getTaskVertices()[vertexID.getSubtaskIndex()];
+	}
+
+	private ExecutionVertexID getExecutionVertexID(final ExecutionVertex vertex) {
+		return new ExecutionVertexID(vertex.getJobvertexId(), vertex.getParallelSubtaskIndex());
+	}
+
+	private List<ExecutionVertex> sortVerticesTopologically(final Set<ExecutionVertex> vertices) {
+		// org execution vertex by jobVertexId
+		final Map<JobVertexID, List<ExecutionVertex>> verticesMap = new HashMap<>();
+		for (ExecutionVertex vertex : vertices) {
+			verticesMap.computeIfAbsent(vertex.getJobvertexId(), id -> new ArrayList<>()).add(vertex);
+		}
+
+		// sort in jobVertex topological order
+		final List<ExecutionVertex> sortedVertices = new ArrayList<>(vertices.size());
+		for (ExecutionJobVertex jobVertex : executionGraph.getVerticesTopologically()) {
+			sortedVertices.addAll(verticesMap.getOrDefault(jobVertex.getJobVertexId(), Collections.emptyList()));
+		}
+		return sortedVertices;
+	}
+
+	@Override
+	public void notifyNewVertices(final List<ExecutionJobVertex> newJobVerticesTopological) {
+		// build the underlying new generation failover strategy when the executionGraph vertices are all added,
+		// otherwise the failover topology will not be correctly built.
+		// currently it's safe to add it here, as this method is invoked only once in production code.
+		checkState(restartPipelinedRegionStrategy == null, "notifyNewVertices() must be called only once");
+		this.restartPipelinedRegionStrategy = new RestartPipelinedRegionStrategy(
+			new DefaultFailoverTopology(executionGraph));
+	}
+
+	@Override
+	public String getStrategyName() {
+		return "New Pipelined Region Failover";
+	}
+
+	// ------------------------------------------------------------------------
+	//  factory
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Factory that instantiates the AdaptedRestartPipelinedRegionStrategyNG.
+	 */
+	public static class Factory implements FailoverStrategy.Factory {
+
+		@Override
+		public FailoverStrategy create(final ExecutionGraph executionGraph) {
+			return new AdaptedRestartPipelinedRegionStrategyNG(executionGraph);
+		}
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
index eb003b9..e80c6ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
@@ -38,9 +38,12 @@ public class FailoverStrategyLoader {
 	/** Config name for the {@link RestartIndividualStrategy}. */
 	public static final String INDIVIDUAL_RESTART_STRATEGY_NAME = "individual";
 
-	/** Config name for the {@link RestartPipelinedRegionStrategy} */
+	/** Config name for the {@link AdaptedRestartPipelinedRegionStrategyNG}. */
 	public static final String PIPELINED_REGION_RESTART_STRATEGY_NAME = "region";
 
+	/** Config name for the {@link RestartPipelinedRegionStrategy}. */
+	public static final String LEGACY_PIPELINED_REGION_RESTART_STRATEGY_NAME = "region-legacy";
+
 	// ------------------------------------------------------------------------
 
 	/**
@@ -63,6 +66,9 @@ public class FailoverStrategyLoader {
 					return new RestartAllStrategy.Factory();
 
 				case PIPELINED_REGION_RESTART_STRATEGY_NAME:
+					return new AdaptedRestartPipelinedRegionStrategyNG.Factory();
+
+				case LEGACY_PIPELINED_REGION_RESTART_STRATEGY_NAME:
 					return new RestartPipelinedRegionStrategy.Factory();
 
 				case INDIVIDUAL_RESTART_STRATEGY_NAME:
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersion.java
new file mode 100644
index 0000000..28ae3cf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersion.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+/**
+ * This class helps to record version of an execution vertex.
+ */
+public class ExecutionVertexVersion {
+
+	private final ExecutionVertexID executionVertexId;
+
+	private final long version;
+
+	ExecutionVertexVersion(final ExecutionVertexID executionVertexId, final long version) {
+		this.executionVertexId = executionVertexId;
+		this.version = version;
+	}
+
+	public ExecutionVertexID getExecutionVertexId() {
+		return executionVertexId;
+	}
+
+	public long getVersion() {
+		return version;
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java
new file mode 100644
index 0000000..081b6ff
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Records modifications of
+ * {@link org.apache.flink.runtime.executiongraph.ExecutionVertex ExecutionVertices}, and allows
+ * for checking whether a vertex was modified.
+ *
+ * <p>Examples for modifications include:
+ * <ul>
+ *     <li>cancellation of the underlying execution
+ *     <li>deployment of the execution vertex
+ * </ul>
+ *
+ * @see DefaultScheduler
+ */
+public class ExecutionVertexVersioner {
+
+	private final Map<ExecutionVertexID, Long> executionVertexToVersion = new HashMap<>();
+
+	public ExecutionVertexVersion recordModification(final ExecutionVertexID executionVertexId) {
+		final Long newVersion = executionVertexToVersion.merge(executionVertexId, 1L, Long::sum);
+		return new ExecutionVertexVersion(executionVertexId, newVersion);
+	}
+
+	public Map<ExecutionVertexID, ExecutionVertexVersion> recordVertexModifications(
+		final Collection<ExecutionVertexID> vertices) {
+		return vertices.stream()
+			.map(this::recordModification)
+			.collect(Collectors.toMap(ExecutionVertexVersion::getExecutionVertexId, Function.identity()));
+	}
+
+	public boolean isModified(final ExecutionVertexVersion executionVertexVersion) {
+		final Long currentVersion = executionVertexToVersion.get(executionVertexVersion.getExecutionVertexId());
+		Preconditions.checkState(currentVersion != null,
+			"Execution vertex %s does not have a recorded version",
+			executionVertexVersion.getExecutionVertexId());
+		return currentVersion != executionVertexVersion.getVersion();
+	}
+
+	public Set<ExecutionVertexID> getUnmodifiedExecutionVertices(final Set<ExecutionVertexVersion> executionVertexVersions) {
+		return executionVertexVersions.stream()
+			.filter(executionVertexVersion -> !isModified(executionVertexVersion))
+			.map(ExecutionVertexVersion::getExecutionVertexId)
+			.collect(Collectors.toSet());
+	}
+
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java
new file mode 100644
index 0000000..17271b7
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG}.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest extends TestLogger {
+
+	private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor;
+
+	private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+	@Before
+	public void setUp() {
+		manualMainThreadExecutor = new ManuallyTriggeredScheduledExecutor();
+		componentMainThreadExecutor = new ComponentMainThreadExecutorServiceAdapter(manualMainThreadExecutor, Thread.currentThread());
+	}
+
+	@Test
+	public void abortPendingCheckpointsWhenRestartingTasks() throws Exception {
+		final JobGraph jobGraph = createStreamingJobGraph();
+		final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
+
+		final Iterator<ExecutionVertex> vertexIterator = executionGraph.getAllExecutionVertices().iterator();
+		final ExecutionVertex onlyExecutionVertex = vertexIterator.next();
+
+		setTaskRunning(executionGraph, onlyExecutionVertex);
+
+		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
+		checkState(checkpointCoordinator != null);
+
+		checkpointCoordinator.triggerCheckpoint(System.currentTimeMillis(),  false);
+		final int pendingCheckpointsBeforeFailure = checkpointCoordinator.getNumberOfPendingCheckpoints();
+
+		failVertex(onlyExecutionVertex);
+
+		assertThat(pendingCheckpointsBeforeFailure, is(equalTo(1)));
+		assertNoPendingCheckpoints(checkpointCoordinator);
+	}
+
+	private void setTaskRunning(final ExecutionGraph executionGraph, final ExecutionVertex executionVertex) {
+		executionGraph.updateState(
+			new TaskExecutionState(executionGraph.getJobID(),
+				executionVertex.getCurrentExecutionAttempt().getAttemptId(),
+				ExecutionState.RUNNING));
+	}
+
+	private void failVertex(final ExecutionVertex onlyExecutionVertex) {
+		onlyExecutionVertex.getCurrentExecutionAttempt().fail(new Exception("Test Exception"));
+		manualMainThreadExecutor.triggerAll();
+	}
+
+	private static JobGraph createStreamingJobGraph() {
+		final JobVertex v1 = new JobVertex("vertex1");
+		v1.setInvokableClass(AbstractInvokable.class);
+
+		final JobGraph jobGraph = new JobGraph(v1);
+		jobGraph.setScheduleMode(ScheduleMode.EAGER);
+
+		return jobGraph;
+	}
+
+	private ExecutionGraph createExecutionGraph(final JobGraph jobGraph) throws Exception {
+		final ExecutionGraph executionGraph = new ExecutionGraph(
+			new DummyJobInformation(
+				jobGraph.getJobID(),
+				jobGraph.getName()),
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
+			AkkaUtils.getDefaultTimeout(),
+			new InfiniteDelayRestartStrategy(10),
+			AdaptedRestartPipelinedRegionStrategyNG::new,
+			new SimpleSlotProvider(jobGraph.getJobID(), 1));
+
+		executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+		enableCheckpointing(executionGraph);
+		executionGraph.setScheduleMode(jobGraph.getScheduleMode());
+		executionGraph.start(componentMainThreadExecutor);
+		executionGraph.scheduleForExecution();
+		manualMainThreadExecutor.triggerAll();
+		return executionGraph;
+	}
+
+	private static void enableCheckpointing(final ExecutionGraph executionGraph) {
+		final List<ExecutionJobVertex> jobVertices = new ArrayList<>(executionGraph.getAllVertices().values());
+		final CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = new CheckpointCoordinatorConfiguration(
+			Long.MAX_VALUE,
+			Long.MAX_VALUE,
+			0,
+			1,
+			CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION,
+			true,
+			false,
+			0);
+
+		executionGraph.enableCheckpointing(
+			checkpointCoordinatorConfiguration,
+			jobVertices,
+			jobVertices,
+			jobVertices,
+			Collections.emptyList(),
+			new StandaloneCheckpointIDCounter(),
+			new StandaloneCompletedCheckpointStore(1),
+			new MemoryStateBackend(),
+			new CheckpointStatsTracker(
+				0,
+				jobVertices,
+				checkpointCoordinatorConfiguration,
+				new UnregisteredMetricsGroup()));
+	}
+
+	private static void assertNoPendingCheckpoints(final CheckpointCoordinator checkpointCoordinator) {
+		assertThat(checkpointCoordinator.getPendingCheckpoints().entrySet(), is(empty()));
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
new file mode 100644
index 0000000..20fb083
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AdaptedRestartPipelinedRegionStrategyNGFailoverTest.TestAdaptedRestartPipelinedRegionStrategyNG;
+import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling when concurrent failovers happen.
+ * There can be local+local and local+global concurrent failovers.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest extends TestLogger {
+
+	private static final JobID TEST_JOB_ID = new JobID();
+
+	private static final int DEFAULT_PARALLELISM = 2;
+
+	private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor;
+
+	private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+	private TestRestartStrategy manuallyTriggeredRestartStrategy;
+
+	@Before
+	public void setUp() {
+		manualMainThreadExecutor = new ManuallyTriggeredScheduledExecutor();
+		componentMainThreadExecutor = new ComponentMainThreadExecutorServiceAdapter(manualMainThreadExecutor, Thread.currentThread());
+		manuallyTriggeredRestartStrategy = TestRestartStrategy.manuallyTriggered();
+	}
+
+	/**
+	 * Tests that 2 concurrent region failovers can lead to a properly vertex state.
+	 * <pre>
+	 *     (v11) -+-> (v21)
+	 *            x
+	 *     (v12) -+-> (v22)
+	 *
+	 *            ^
+	 *            |
+	 *       (blocking)
+	 * </pre>
+	 */
+	@Test
+	public void testConcurrentRegionFailovers() throws Exception {
+
+		// the logic in this test is as follows:
+		//  - start a job
+		//  - cause {ev11} failure and delay the local recovery action via the manual executor
+		//  - cause {ev12} failure and delay the local recovery action via the manual executor
+		//  - resume local recovery actions
+		//  - validate that each task is restarted only once
+
+		final ExecutionGraph eg = createExecutionGraph();
+
+		final TestAdaptedRestartPipelinedRegionStrategyNG failoverStrategy =
+			(TestAdaptedRestartPipelinedRegionStrategyNG) eg.getFailoverStrategy();
+		failoverStrategy.setBlockerFuture(new CompletableFuture<>());
+
+		final Iterator<ExecutionVertex> vertexIterator = eg.getAllExecutionVertices().iterator();
+		final ExecutionVertex ev11 = vertexIterator.next();
+		final ExecutionVertex ev12 = vertexIterator.next();
+		final ExecutionVertex ev21 = vertexIterator.next();
+		final ExecutionVertex ev22 = vertexIterator.next();
+
+		// start job scheduling
+		eg.scheduleForExecution();
+		manualMainThreadExecutor.triggerAll();
+
+		// fail ev11 to trigger region failover of {ev11}, {ev21}, {ev22}
+		ev11.getCurrentExecutionAttempt().fail(new Exception("task failure 1"));
+		manualMainThreadExecutor.triggerAll();
+		assertEquals(ExecutionState.FAILED, ev11.getExecutionState());
+		assertEquals(ExecutionState.DEPLOYING, ev12.getExecutionState());
+		assertEquals(ExecutionState.CANCELED, ev21.getExecutionState());
+		assertEquals(ExecutionState.CANCELED, ev22.getExecutionState());
+
+		// fail ev12 to trigger region failover of {ev12}, {ev21}, {ev22}
+		ev12.getCurrentExecutionAttempt().fail(new Exception("task failure 2"));
+		manualMainThreadExecutor.triggerAll();
+		assertEquals(ExecutionState.FAILED, ev11.getExecutionState());
+		assertEquals(ExecutionState.FAILED, ev12.getExecutionState());
+		assertEquals(ExecutionState.CANCELED, ev21.getExecutionState());
+		assertEquals(ExecutionState.CANCELED, ev22.getExecutionState());
+
+		// complete region failover blocker to trigger region failover recovery
+		failoverStrategy.getBlockerFuture().complete(null);
+		manualMainThreadExecutor.triggerAll();
+
+		// verify that all tasks are recovered and no task is restarted more than once
+		assertEquals(ExecutionState.DEPLOYING, ev11.getExecutionState());
+		assertEquals(ExecutionState.DEPLOYING, ev12.getExecutionState());
+		assertEquals(ExecutionState.CREATED, ev21.getExecutionState());
+		assertEquals(ExecutionState.CREATED, ev22.getExecutionState());
+		assertEquals(1, ev11.getCurrentExecutionAttempt().getAttemptNumber());
+		assertEquals(1, ev12.getCurrentExecutionAttempt().getAttemptNumber());
+		assertEquals(1, ev21.getCurrentExecutionAttempt().getAttemptNumber());
+		assertEquals(1, ev22.getCurrentExecutionAttempt().getAttemptNumber());
+	}
+
+	/**
+	 * Tests that a global failover will take precedence over local failovers.
+	 * <pre>
+	 *     (v11) -+-> (v21)
+	 *            x
+	 *     (v12) -+-> (v22)
+	 *
+	 *            ^
+	 *            |
+	 *       (blocking)
+	 * </pre>
+	 */
+	@Test
+	public void testRegionFailoverInterruptedByGlobalFailover() throws Exception {
+
+		// the logic in this test is as follows:
+		//  - start a job
+		//  - cause a task failure and delay the local recovery action via the manual executor
+		//  - cause a global failure
+		//  - resume in local recovery action
+		//  - validate that the local recovery does not restart tasks
+
+		final ExecutionGraph eg = createExecutionGraph();
+
+		final TestAdaptedRestartPipelinedRegionStrategyNG failoverStrategy =
+			(TestAdaptedRestartPipelinedRegionStrategyNG) eg.getFailoverStrategy();
+		failoverStrategy.setBlockerFuture(new CompletableFuture<>());
+
+		final Iterator<ExecutionVertex> vertexIterator = eg.getAllExecutionVertices().iterator();
+		final ExecutionVertex ev11 = vertexIterator.next();
+		final ExecutionVertex ev12 = vertexIterator.next();
+		final ExecutionVertex ev21 = vertexIterator.next();
+		final ExecutionVertex ev22 = vertexIterator.next();
+
+		// start job scheduling
+		eg.scheduleForExecution();
+		manualMainThreadExecutor.triggerAll();
+
+		// fail ev11 to trigger region failover of {ev11}, {ev21}, {ev22}
+		ev11.getCurrentExecutionAttempt().fail(new Exception("task failure"));
+		manualMainThreadExecutor.triggerAll();
+		assertEquals(JobStatus.RUNNING, eg.getState());
+		assertEquals(ExecutionState.FAILED, ev11.getExecutionState());
+		assertEquals(ExecutionState.DEPLOYING, ev12.getExecutionState());
+		assertEquals(ExecutionState.CANCELED, ev21.getExecutionState());
+		assertEquals(ExecutionState.CANCELED, ev22.getExecutionState());
+
+		// trigger global failover cancelling and immediately recovery
+		eg.failGlobal(new Exception("Test global failure"));
+		ev12.getCurrentExecutionAttempt().completeCancelling();
+		manuallyTriggeredRestartStrategy.triggerNextAction();
+		manualMainThreadExecutor.triggerAll();
+
+		// verify the job state and vertex attempt number
+		assertEquals(2, eg.getGlobalModVersion());
+		assertEquals(1, ev11.getCurrentExecutionAttempt().getAttemptNumber());
+		assertEquals(1, ev12.getCurrentExecutionAttempt().getAttemptNumber());
+		assertEquals(1, ev21.getCurrentExecutionAttempt().getAttemptNumber());
+		assertEquals(1, ev22.getCurrentExecutionAttempt().getAttemptNumber());
+
+		// complete region failover blocker to trigger region failover
+		failoverStrategy.getBlockerFuture().complete(null);
+		manualMainThreadExecutor.triggerAll();
+
+		// verify that no task is restarted by region failover
+		assertEquals(ExecutionState.DEPLOYING, ev11.getExecutionState());
+		assertEquals(ExecutionState.DEPLOYING, ev12.getExecutionState());
+		assertEquals(ExecutionState.CREATED, ev21.getExecutionState());
+		assertEquals(ExecutionState.CREATED, ev22.getExecutionState());
+		assertEquals(1, ev11.getCurrentExecutionAttempt().getAttemptNumber());
+		assertEquals(1, ev12.getCurrentExecutionAttempt().getAttemptNumber());
+		assertEquals(1, ev21.getCurrentExecutionAttempt().getAttemptNumber());
+		assertEquals(1, ev22.getCurrentExecutionAttempt().getAttemptNumber());
+	}
+
+	@Test
+	public void testSkipFailoverIfExecutionStateIsNotRunning() throws Exception {
+		final ExecutionGraph executionGraph = createExecutionGraph();
+
+		final Iterator<ExecutionVertex> vertexIterator = executionGraph.getAllExecutionVertices().iterator();
+		final ExecutionVertex firstVertex = vertexIterator.next();
+
+		executionGraph.cancel();
+
+		final FailoverStrategy failoverStrategy = executionGraph.getFailoverStrategy();
+		failoverStrategy.onTaskFailure(firstVertex.getCurrentExecutionAttempt(), new Exception("Test Exception"));
+		manualMainThreadExecutor.triggerAll();
+
+		assertEquals(ExecutionState.CANCELED, firstVertex.getExecutionState());
+	}
+
+	// ------------------------------------------------------------------------
+	//  utilities
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creating a sample ExecutionGraph for testing with topology as below.
+	 * <pre>
+	 *     (v11) -+-> (v21)
+	 *            x
+	 *     (v12) -+-> (v22)
+	 *
+	 *            ^
+	 *            |
+	 *       (blocking)
+	 * </pre>
+	 * 4 regions. Each consists of one individual execution vertex.
+	 */
+	private ExecutionGraph createExecutionGraph() throws Exception {
+
+		final JobInformation jobInformation = new DummyJobInformation(TEST_JOB_ID, "test job");
+		final SimpleSlotProvider slotProvider = new SimpleSlotProvider(TEST_JOB_ID, DEFAULT_PARALLELISM);
+
+		final Time timeout = Time.seconds(10L);
+		final ExecutionGraph graph = new ExecutionGraph(
+			jobInformation,
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
+			timeout,
+			manuallyTriggeredRestartStrategy,
+			TestAdaptedRestartPipelinedRegionStrategyNG::new,
+			slotProvider,
+			getClass().getClassLoader(),
+			VoidBlobWriter.getInstance(),
+			timeout);
+
+		JobVertex v1 = new JobVertex("vertex1");
+		v1.setInvokableClass(NoOpInvokable.class);
+		v1.setParallelism(DEFAULT_PARALLELISM);
+
+		JobVertex v2 = new JobVertex("vertex2");
+		v2.setInvokableClass(NoOpInvokable.class);
+		v2.setParallelism(DEFAULT_PARALLELISM);
+
+		v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+
+		JobGraph jg = new JobGraph(TEST_JOB_ID, "testjob", v1, v2);
+		graph.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources());
+
+		graph.start(componentMainThreadExecutor);
+
+		return graph;
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
new file mode 100644
index 0000000..c4c1c32
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLogger {
+
+	private static final JobID TEST_JOB_ID = new JobID();
+
+	private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+	private FailingSlotProviderDecorator slotProvider;
+
+	private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor;
+
+	@Before
+	public void setUp() {
+		manualMainThreadExecutor = new ManuallyTriggeredScheduledExecutor();
+		componentMainThreadExecutor = new ComponentMainThreadExecutorServiceAdapter(manualMainThreadExecutor, Thread.currentThread());
+		slotProvider = new FailingSlotProviderDecorator(new SimpleSlotProvider(TEST_JOB_ID, 14));
+	}
+
+	/**
+	 * Tests for region failover for job in EAGER mode.
+	 * This applies to streaming job, with no BLOCKING edge.
+	 * <pre>
+	 *     (v11) ---> (v21)
+	 *
+	 *     (v12) ---> (v22)
+	 *
+	 *            ^
+	 *            |
+	 *       (pipelined)
+	 * </pre>
+	 */
+	@Test
+	public void testRegionFailoverInEagerMode() throws Exception {
+		// create a streaming job graph with EAGER schedule mode
+		final JobGraph jobGraph = createStreamingJobGraph();
+		final ExecutionGraph eg = createExecutionGraph(jobGraph);
+
+		final Iterator<ExecutionVertex> vertexIterator = eg.getAllExecutionVertices().iterator();
+		final ExecutionVertex ev11 = vertexIterator.next();
+		final ExecutionVertex ev12 = vertexIterator.next();
+		final ExecutionVertex ev21 = vertexIterator.next();
+		final ExecutionVertex ev22 = vertexIterator.next();
+
+		// trigger task failure of ev11
+		// vertices { ev11, ev21 } should be affected
+		ev11.getCurrentExecutionAttempt().fail(new Exception("Test Exception"));
+		manualMainThreadExecutor.triggerAll();
+
+		// verify vertex states and complete cancellation
+		assertVertexInState(ExecutionState.FAILED, ev11);
+		assertVertexInState(ExecutionState.DEPLOYING, ev12);
+		assertVertexInState(ExecutionState.CANCELING, ev21);
+		assertVertexInState(ExecutionState.DEPLOYING, ev22);
+		ev21.getCurrentExecutionAttempt().completeCancelling();
+		manualMainThreadExecutor.triggerAll();
+
+		// verify vertex states
+		// in eager mode, all affected vertices should be scheduled in failover
+		assertVertexInState(ExecutionState.DEPLOYING, ev11);
+		assertVertexInState(ExecutionState.DEPLOYING, ev12);
+		assertVertexInState(ExecutionState.DEPLOYING, ev21);
+		assertVertexInState(ExecutionState.DEPLOYING, ev22);
+
+		// verify attempt number
+		assertEquals(1, ev11.getCurrentExecutionAttempt().getAttemptNumber());
+		assertEquals(0, ev12.getCurrentExecutionAttempt().getAttemptNumber());
+		assertEquals(1, ev21.getCurrentExecutionAttempt().getAttemptNumber());
+		assertEquals(0, ev22.getCurrentExecutionAttempt().getAttemptNumber());
+	}
+
+	/**
+	 * Tests for scenario where a task fails for its own error, in which case the
+	 * region containing the failed task and its consumer regions should be restarted.
+	 * <pre>
+	 *     (v11) -+-> (v21)
+	 *            x
+	 *     (v12) -+-> (v22)
+	 *
+	 *            ^
+	 *            |
+	 *        (blocking)
+	 * </pre>
+	 */
+	@Test
+	public void testRegionFailoverForRegionInternalErrorsInLazyMode() throws Exception {
+		final JobGraph jobGraph = createBatchJobGraph();
+		final ExecutionGraph eg = createExecutionGraph(jobGraph);
+
+		final Iterator<ExecutionVertex> vertexIterator = eg.getAllExecutionVertices().iterator();
+		final ExecutionVertex ev11 = vertexIterator.next();
+		final ExecutionVertex ev12 = vertexIterator.next();
+		final ExecutionVertex ev21 = vertexIterator.next();
+		final ExecutionVertex ev22 = vertexIterator.next();
+
+		// trigger task failure of ev11
+		// regions {ev11}, {ev21}, {ev22} should be affected
+		ev11.getCurrentExecutionAttempt().fail(new Exception("Test Exception"));
+		manualMainThreadExecutor.triggerAll();
+
+		// verify vertex states
+		// only vertices with consumable inputs can be scheduled
+		assertVertexInState(ExecutionState.DEPLOYING, ev11);
+		assertVertexInState(ExecutionState.DEPLOYING, ev12);
+		assertVertexInState(ExecutionState.CREATED, ev21);
+		assertVertexInState(ExecutionState.CREATED, ev22);
+
+		// verify attempt number
+		assertEquals(1, ev11.getCurrentExecutionAttempt().getAttemptNumber());
+		assertEquals(0, ev12.getCurrentExecutionAttempt().getAttemptNumber());
+		assertEquals(1, ev21.getCurrentExecutionAttempt().getAttemptNumber());
+		assertEquals(1, ev22.getCurrentExecutionAttempt().getAttemptNumber());
+	}
+
+	/**
+	 * Tests that the failure is properly propagated to underlying strategy
+	 * to calculate tasks to restart.
+	 * <pre>
+	 *     (v11) -+-> (v21)
+	 *            x
+	 *     (v12) -+-> (v22)
+	 *
+	 *            ^
+	 *            |
+	 *        (blocking)
+	 * </pre>
+	 */
+	@Test
+	public void testFailurePropagationToUnderlyingStrategy() throws Exception {
+		final JobGraph jobGraph = createBatchJobGraph();
+		final ExecutionGraph eg = createExecutionGraph(jobGraph);
+
+		final TestAdaptedRestartPipelinedRegionStrategyNG failoverStrategy =
+			(TestAdaptedRestartPipelinedRegionStrategyNG) eg.getFailoverStrategy();
+
+		final Iterator<ExecutionVertex> vertexIterator = eg.getAllExecutionVertices().iterator();
+		final ExecutionVertex ev11 = vertexIterator.next();
+		final ExecutionVertex ev12 = vertexIterator.next();
+		final ExecutionVertex ev21 = vertexIterator.next();
+		final ExecutionVertex ev22 = vertexIterator.next();
+
+		// finish upstream regions to trigger scheduling of downstream regions
+		ev11.getCurrentExecutionAttempt().markFinished();
+		ev12.getCurrentExecutionAttempt().markFinished();
+
+		// trigger task failure of ev21 on consuming data from ev11
+		Exception taskFailureCause = new PartitionConnectionException(
+			new ResultPartitionID(
+				ev11.getProducedPartitions().keySet().iterator().next(),
+				ev11.getCurrentExecutionAttempt().getAttemptId()),
+			new Exception("Test failure"));
+		ev21.getCurrentExecutionAttempt().fail(taskFailureCause);
+		manualMainThreadExecutor.triggerAll();
+
+		assertThat(failoverStrategy.getLastTasksToCancel(),
+			containsInAnyOrder(ev11.getID(), ev21.getID(), ev22.getID()));
+	}
+
+	/**
+	 * Tests that when a task fail, and restart strategy doesn't support restarting, the job will go to failed.
+	 */
+	@Test
+	public void testNoRestart() throws Exception {
+		final JobGraph jobGraph = createBatchJobGraph();
+		final NoRestartStrategy restartStrategy = new NoRestartStrategy();
+		final ExecutionGraph eg = createExecutionGraph(jobGraph, restartStrategy);
+
+		final ExecutionVertex ev = eg.getAllExecutionVertices().iterator().next();
+
+		ev.fail(new Exception("Test Exception"));
+
+		for (ExecutionVertex evs : eg.getAllExecutionVertices()) {
+			evs.getCurrentExecutionAttempt().completeCancelling();
+		}
+
+		manualMainThreadExecutor.triggerAll();
+
+		assertEquals(JobStatus.FAILED, eg.getState());
+	}
+
+	@Test
+	public void testFailGlobalIfErrorOnRestartingTasks() throws Exception {
+		final JobGraph jobGraph = createStreamingJobGraph();
+		final ExecutionGraph eg = createExecutionGraph(jobGraph);
+
+		final Iterator<ExecutionVertex> vertexIterator = eg.getAllExecutionVertices().iterator();
+		final ExecutionVertex ev11 = vertexIterator.next();
+		final ExecutionVertex ev12 = vertexIterator.next();
+		final ExecutionVertex ev21 = vertexIterator.next();
+		final ExecutionVertex ev22 = vertexIterator.next();
+
+		final long globalModVersionBeforeFailure = eg.getGlobalModVersion();
+
+		slotProvider.setFailSlotAllocation(true);
+		ev11.fail(new Exception("Test Exception"));
+		completeCancelling(ev11, ev12, ev21, ev22);
+
+		manualMainThreadExecutor.triggerAll();
+
+		final long globalModVersionAfterFailure = eg.getGlobalModVersion();
+
+		assertNotEquals(globalModVersionBeforeFailure, globalModVersionAfterFailure);
+	}
+
+	// ------------------------------- Test Utils -----------------------------------------
+
+	/**
+	 * Creating job graph as below (execution view).
+	 * It's a representative of streaming job.
+	 * <pre>
+	 *     (v11) -+-> (v21)
+	 *
+	 *     (v12) -+-> (v22)
+	 *
+	 *            ^
+	 *            |
+	 *       (pipelined)
+	 * </pre>
+	 * 2 regions. Each has 2 pipelined connected vertices.
+	 */
+	private JobGraph createStreamingJobGraph() {
+		final JobVertex v1 = new JobVertex("vertex1");
+		final JobVertex v2 = new JobVertex("vertex2");
+
+		v1.setParallelism(2);
+		v2.setParallelism(2);
+
+		v1.setInvokableClass(AbstractInvokable.class);
+		v2.setInvokableClass(AbstractInvokable.class);
+
+		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+
+		final JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "Testjob", v1, v2);
+		jobGraph.setScheduleMode(ScheduleMode.EAGER);
+
+		return jobGraph;
+	}
+
+	/**
+	 * Creating job graph as below (execution view).
+	 * It's a representative of batch job.
+	 * <pre>
+	 *     (v11) -+-> (v21)
+	 *            x
+	 *     (v12) -+-> (v22)
+	 *
+	 *            ^
+	 *            |
+	 *        (blocking)
+	 * </pre>
+	 * 4 regions. Each consists of one individual vertex.
+	 */
+	private JobGraph createBatchJobGraph() {
+		final JobVertex v1 = new JobVertex("vertex1");
+		final JobVertex v2 = new JobVertex("vertex2");
+
+		v1.setParallelism(2);
+		v2.setParallelism(2);
+
+		v1.setInvokableClass(AbstractInvokable.class);
+		v2.setInvokableClass(AbstractInvokable.class);
+
+		v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+
+		final JobGraph jobGraph = new JobGraph(v1, v2);
+		jobGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES);
+
+		return jobGraph;
+	}
+
+	private ExecutionGraph createExecutionGraph(final JobGraph jobGraph) throws Exception {
+		return createExecutionGraph(jobGraph, new InfiniteDelayRestartStrategy(10));
+	}
+
+	private ExecutionGraph createExecutionGraph(
+			final JobGraph jobGraph,
+			final RestartStrategy restartStrategy) throws Exception {
+
+		final ExecutionGraph eg = new ExecutionGraph(
+			new DummyJobInformation(
+				jobGraph.getJobID(),
+				jobGraph.getName()),
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
+			AkkaUtils.getDefaultTimeout(),
+			restartStrategy,
+			TestAdaptedRestartPipelinedRegionStrategyNG::new,
+			slotProvider);
+		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+
+		eg.setScheduleMode(jobGraph.getScheduleMode());
+
+		eg.start(componentMainThreadExecutor);
+		eg.scheduleForExecution();
+		manualMainThreadExecutor.triggerAll();
+
+		return eg;
+	}
+
+	private static void assertVertexInState(final ExecutionState state, final ExecutionVertex vertex) {
+		assertEquals(state, vertex.getExecutionState());
+	}
+
+	private static void completeCancelling(ExecutionVertex... executionVertices) {
+		for (final ExecutionVertex executionVertex : executionVertices) {
+			executionVertex.getCurrentExecutionAttempt().completeCancelling();
+		}
+	}
+
+	/**
+	 * Test implementation of the {@link AdaptedRestartPipelinedRegionStrategyNG} that makes it possible
+	 * to control when the failover action is performed via {@link CompletableFuture}.
+	 * It also exposes some internal state of {@link AdaptedRestartPipelinedRegionStrategyNG}.
+	 */
+	static class TestAdaptedRestartPipelinedRegionStrategyNG extends AdaptedRestartPipelinedRegionStrategyNG {
+
+		private CompletableFuture<?> blockerFuture;
+
+		private Set<ExecutionVertexID> lastTasksToRestart;
+
+		TestAdaptedRestartPipelinedRegionStrategyNG(ExecutionGraph executionGraph) {
+			super(executionGraph);
+			this.blockerFuture = CompletableFuture.completedFuture(null);
+		}
+
+		void setBlockerFuture(CompletableFuture<?> blockerFuture) {
+			this.blockerFuture = blockerFuture;
+		}
+
+		@Override
+		protected void restartTasks(final Set<ExecutionVertexID> verticesToRestart) {
+			this.lastTasksToRestart = verticesToRestart;
+			super.restartTasks(verticesToRestart);
+		}
+
+		@Override
+		protected CompletableFuture<?> cancelTasks(final Set<ExecutionVertexID> vertices) {
+			final List<CompletableFuture<?>> terminationAndBlocker = Arrays.asList(
+				super.cancelTasks(vertices),
+				blockerFuture);
+			return FutureUtils.waitForAll(terminationAndBlocker);
+		}
+
+		CompletableFuture<?> getBlockerFuture() {
+			return blockerFuture;
+		}
+
+		Set<ExecutionVertexID> getLastTasksToCancel() {
+			return lastTasksToRestart;
+		}
+	}
+
+	private static class FailingSlotProviderDecorator implements SlotProvider {
+
+		private final SlotProvider delegate;
+
+		private boolean failSlotAllocation = false;
+
+		FailingSlotProviderDecorator(final SlotProvider delegate) {
+			this.delegate = checkNotNull(delegate);
+		}
+
+		@Override
+		public CompletableFuture<LogicalSlot> allocateSlot(
+				final SlotRequestId slotRequestId,
+				final ScheduledUnit scheduledUnit,
+				final SlotProfile slotProfile,
+				final boolean allowQueuedScheduling,
+				final Time allocationTimeout) {
+			if (failSlotAllocation) {
+				return FutureUtils.completedExceptionally(new TimeoutException("Expected"));
+			}
+			return delegate.allocateSlot(slotRequestId, scheduledUnit, slotProfile, allowQueuedScheduling, allocationTimeout);
+		}
+
+		@Override
+		public void cancelSlotRequest(
+				final SlotRequestId slotRequestId,
+				@Nullable final SlotSharingGroupId slotSharingGroupId,
+				final Throwable cause) {
+			delegate.cancelSlotRequest(slotRequestId, slotSharingGroupId, cause);
+		}
+
+		public void setFailSlotAllocation(final boolean failSlotAllocation) {
+			this.failSlotAllocation = failSlotAllocation;
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 1e6be72..e78fa6f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -28,6 +28,8 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
 import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -68,6 +70,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -264,6 +267,30 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 	}
 
+	@Test
+	public void testTaskFailingWhileGlobalFailing() throws Exception {
+		try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) {
+			final ExecutionGraph graph = TestingExecutionGraphBuilder.newBuilder()
+				.setRestartStrategy(new InfiniteDelayRestartStrategy())
+				.setFailoverStrategyFactory(new TestFailoverStrategy.Factory())
+				.buildAndScheduleForExecution(slotPool);
+			final TestFailoverStrategy failoverStrategy = (TestFailoverStrategy) graph.getFailoverStrategy();
+
+			// switch all tasks to running
+			for (ExecutionVertex vertex : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
+				vertex.getCurrentExecutionAttempt().switchToRunning();
+			}
+
+			graph.failGlobal(new Exception("test"));
+
+			graph.getAllExecutionVertices().iterator().next().fail(new Exception("Test task failure"));
+
+			// no local failover should happen when in global failover cancelling
+			assertEquals(0, failoverStrategy.getLocalFailoverCount());
+		}
+
+	}
+
 	private void switchAllTasksToRunning(ExecutionGraph graph) {
 		executeOperationForAllExecutions(graph, Execution::switchToRunning);
 	}
@@ -659,6 +686,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 	private static class TestingExecutionGraphBuilder {
 		private RestartStrategy restartStrategy = new NoRestartStrategy();
+		private FailoverStrategy.Factory failoverStrategyFactory = new RestartAllStrategy.Factory();
 		private JobGraph jobGraph = createJobGraph();
 		private int tasksNum = NUM_TASKS;
 		private TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
@@ -668,6 +696,11 @@ public class ExecutionGraphRestartTest extends TestLogger {
 			return this;
 		}
 
+		private TestingExecutionGraphBuilder setFailoverStrategyFactory(FailoverStrategy.Factory failoverStrategyFactory) {
+			this.failoverStrategyFactory = failoverStrategyFactory;
+			return this;
+		}
+
 		private TestingExecutionGraphBuilder setJobGraph(JobGraph jobGraph) {
 			this.jobGraph = jobGraph;
 			return this;
@@ -689,7 +722,11 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 		private ExecutionGraph buildAndScheduleForExecution(SlotPool slotPool) throws Exception {
 			final Scheduler scheduler = createSchedulerWithSlots(tasksNum, slotPool, taskManagerLocation);
-			final ExecutionGraph eg = createSimpleExecutionGraph(restartStrategy, scheduler, jobGraph);
+			final ExecutionGraph eg = createSimpleExecutionGraph(
+				restartStrategy,
+				failoverStrategyFactory,
+				scheduler,
+				jobGraph);
 
 			assertEquals(JobStatus.CREATED, eg.getState());
 
@@ -744,18 +781,32 @@ public class ExecutionGraphRestartTest extends TestLogger {
 	}
 
 	private static ExecutionGraph createSimpleExecutionGraph(
-		RestartStrategy restartStrategy, SlotProvider slotProvider, JobGraph jobGraph)
-		throws IOException, JobException {
+		final RestartStrategy restartStrategy,
+		final SlotProvider slotProvider,
+		final JobGraph jobGraph) throws IOException, JobException {
 
-		ExecutionGraph executionGraph = new ExecutionGraph(
+		return createSimpleExecutionGraph(restartStrategy, new RestartAllStrategy.Factory(), slotProvider, jobGraph);
+	}
+
+	private static ExecutionGraph createSimpleExecutionGraph(
+		final RestartStrategy restartStrategy,
+		final FailoverStrategy.Factory failoverStrategyFactory,
+		final SlotProvider slotProvider,
+		final JobGraph jobGraph) throws IOException, JobException {
+
+		final ExecutionGraph executionGraph = new ExecutionGraph(
+			new JobInformation(
+				TEST_JOB_ID,
+				"Test job",
+				new SerializedValue<>(new ExecutionConfig()),
+				new Configuration(),
+				Collections.emptyList(),
+				Collections.emptyList()),
 			TestingUtils.defaultExecutor(),
 			TestingUtils.defaultExecutor(),
-			TEST_JOB_ID,
-			"Test job",
-			new Configuration(),
-			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
 			restartStrategy,
+			failoverStrategyFactory,
 			slotProvider);
 
 		executionGraph.start(mainThreadExecutor);
@@ -779,4 +830,45 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		finishAllVertices(eg);
 		assertEquals(JobStatus.FINISHED, eg.getState());
 	}
+
+	/**
+	 * Test failover strategy which records local failover count.
+	 */
+	static class TestFailoverStrategy extends FailoverStrategy {
+
+		private int localFailoverCount = 0;
+
+		@Override
+		public void onTaskFailure(Execution taskExecution, Throwable cause) {
+			localFailoverCount++;
+		}
+
+		@Override
+		public void notifyNewVertices(List<ExecutionJobVertex> newJobVerticesTopological) {
+		}
+
+		@Override
+		public String getStrategyName() {
+			return "Test Failover Strategy";
+		}
+
+		int getLocalFailoverCount() {
+			return localFailoverCount;
+		}
+
+		// ------------------------------------------------------------------------
+		//  factory
+		// ------------------------------------------------------------------------
+
+		/**
+		 * Factory that instantiates the TestFailoverStrategy.
+		 */
+		public static class Factory implements FailoverStrategy.Factory {
+
+			@Override
+			public FailoverStrategy create(ExecutionGraph executionGraph) {
+				return new TestFailoverStrategy();
+			}
+		}
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java
index b9c1322..f17dddc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java
@@ -627,7 +627,7 @@ public class PipelinedFailoverRegionBuildingTest extends TestLogger {
 		final Configuration jobManagerConfig = new Configuration();
 		jobManagerConfig.setString(
 				JobManagerOptions.EXECUTION_FAILOVER_STRATEGY,
-				FailoverStrategyLoader.PIPELINED_REGION_RESTART_STRATEGY_NAME);
+				FailoverStrategyLoader.LEGACY_PIPELINED_REGION_RESTART_STRATEGY_NAME);
 
 		final Time timeout = Time.seconds(10L);
 		return ExecutionGraphBuilder.buildGraph(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategyBuildingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategyBuildingTest.java
index 81f5e38..1063222 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategyBuildingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategyBuildingTest.java
@@ -524,7 +524,7 @@ public class RestartPipelinedRegionStrategyBuildingTest extends TestLogger {
 	//  utilities
 	// ------------------------------------------------------------------------
 
-	private static void assertSameRegion(FailoverRegion ...regions) {
+	public static void assertSameRegion(FailoverRegion ...regions) {
 		checkNotNull(regions);
 		for (int i = 0; i < regions.length; i++) {
 			for (int j = i + 1; i < regions.length; i++) {
@@ -533,7 +533,7 @@ public class RestartPipelinedRegionStrategyBuildingTest extends TestLogger {
 		}
 	}
 
-	private static void assertDistinctRegions(FailoverRegion ...regions) {
+	public static void assertDistinctRegions(FailoverRegion ...regions) {
 		checkNotNull(regions);
 		for (int i = 0; i < regions.length; i++) {
 			for (int j = i + 1; j < regions.length; j++) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersionerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersionerTest.java
new file mode 100644
index 0000000..9ba26ea
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersionerTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link ExecutionVertexVersioner}.
+ */
+public class ExecutionVertexVersionerTest extends TestLogger {
+
+	private static final ExecutionVertexID TEST_EXECUTION_VERTEX_ID1 = new ExecutionVertexID(new JobVertexID(), 0);
+	private static final ExecutionVertexID TEST_EXECUTION_VERTEX_ID2 = new ExecutionVertexID(new JobVertexID(), 0);
+	private static final Collection<ExecutionVertexID> TEST_ALL_EXECUTION_VERTEX_IDS = Arrays.asList(
+		TEST_EXECUTION_VERTEX_ID1,
+		TEST_EXECUTION_VERTEX_ID2);
+
+	private ExecutionVertexVersioner executionVertexVersioner;
+
+	@Before
+	public void setUp() {
+		executionVertexVersioner = new ExecutionVertexVersioner();
+	}
+
+	@Test
+	public void isModifiedReturnsFalseIfVertexUnmodified() {
+		final ExecutionVertexVersion executionVertexVersion =
+			executionVertexVersioner.recordModification(TEST_EXECUTION_VERTEX_ID1);
+		assertFalse(executionVertexVersioner.isModified(executionVertexVersion));
+	}
+
+	@Test
+	public void isModifiedReturnsTrueIfVertexIsModified() {
+		final ExecutionVertexVersion executionVertexVersion =
+			executionVertexVersioner.recordModification(TEST_EXECUTION_VERTEX_ID1);
+		executionVertexVersioner.recordModification(TEST_EXECUTION_VERTEX_ID1);
+		assertTrue(executionVertexVersioner.isModified(executionVertexVersion));
+	}
+
+	@Test
+	public void throwsExceptionIfVertexWasNeverModified() {
+		try {
+			executionVertexVersioner.isModified(new ExecutionVertexVersion(TEST_EXECUTION_VERTEX_ID1, 0));
+			fail("Expected exception not thrown");
+		} catch (final IllegalStateException e) {
+			assertThat(e.getMessage(), containsString("Execution vertex "
+				+ TEST_EXECUTION_VERTEX_ID1 + " does not have a recorded version"));
+		}
+	}
+
+	@Test
+	public void getUnmodifiedVerticesAllVerticesModified() {
+		final Set<ExecutionVertexVersion> executionVertexVersions = new HashSet<>(
+			executionVertexVersioner.recordVertexModifications(TEST_ALL_EXECUTION_VERTEX_IDS).values());
+		executionVertexVersioner.recordVertexModifications(TEST_ALL_EXECUTION_VERTEX_IDS);
+
+		final Set<ExecutionVertexID> unmodifiedExecutionVertices =
+			executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions);
+
+		assertThat(unmodifiedExecutionVertices, is(empty()));
+	}
+
+	@Test
+	public void getUnmodifiedVerticesNoVertexModified() {
+		final Set<ExecutionVertexVersion> executionVertexVersions = new HashSet<>(
+			executionVertexVersioner.recordVertexModifications(TEST_ALL_EXECUTION_VERTEX_IDS).values());
+
+		final Set<ExecutionVertexID> unmodifiedExecutionVertices =
+			executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions);
+
+		assertThat(unmodifiedExecutionVertices, containsInAnyOrder(TEST_EXECUTION_VERTEX_ID1, TEST_EXECUTION_VERTEX_ID2));
+	}
+
+	@Test
+	public void getUnmodifiedVerticesPartOfVerticesModified() {
+		final Set<ExecutionVertexVersion> executionVertexVersions = new HashSet<>(
+			executionVertexVersioner.recordVertexModifications(TEST_ALL_EXECUTION_VERTEX_IDS).values());
+		executionVertexVersioner.recordModification(TEST_EXECUTION_VERTEX_ID1);
+
+		final Set<ExecutionVertexID> unmodifiedExecutionVertices =
+			executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions);
+
+		assertThat(unmodifiedExecutionVertices, containsInAnyOrder(TEST_EXECUTION_VERTEX_ID2));
+	}
+}