You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2019/08/07 07:54:30 UTC

[flink] branch release-1.9 updated: [FLINK-13452][runtime] Fail the job globally when exception happens during reseting tasks of a region

This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new b931e9c  [FLINK-13452][runtime] Fail the job globally when exception happens during reseting tasks of a region
b931e9c is described below

commit b931e9c10b231ed1823fe6a97bccf73bb835dbc2
Author: Yun Tang <my...@live.com>
AuthorDate: Tue Jul 30 03:57:46 2019 +0800

    [FLINK-13452][runtime] Fail the job globally when exception happens during reseting tasks of a region
---
 .../flink/runtime/concurrent/FutureUtils.java      | 55 +++++++++++++
 .../runtime/executiongraph/ExecutionGraph.java     | 10 ++-
 .../AdaptedRestartPipelinedRegionStrategyNG.java   | 17 ++--
 .../restart/FailureRateRestartStrategy.java        | 16 ++--
 .../restart/FixedDelayRestartStrategy.java         |  8 +-
 .../executiongraph/restart/NoRestartStrategy.java  |  4 +-
 .../executiongraph/restart/RestartStrategy.java    |  5 +-
 .../restart/ThrowingRestartStrategy.java           |  4 +-
 .../flink/runtime/concurrent/FutureUtilsTest.java  | 59 ++++++++++++++
 ...startPipelinedRegionStrategyNGFailoverTest.java | 67 +---------------
 .../executiongraph/TestRestartStrategy.java        |  5 +-
 .../restart/FailingRestartStrategy.java            | 91 ++++++++++++++++++++++
 .../restart/InfiniteDelayRestartStrategy.java      |  6 +-
 .../test/checkpointing/RegionFailoverITCase.java   | 23 ++++--
 14 files changed, 273 insertions(+), 97 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index c1613c5..47dbcc5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -186,6 +186,61 @@ public class FutureUtils {
 			scheduledExecutor);
 	}
 
+	/**
+	 * Schedule the operation with the given delay.
+	 *
+	 * @param operation to schedule
+	 * @param delay delay to schedule
+	 * @param scheduledExecutor executor to be used for the operation
+	 * @return Future which schedules the given operation with given delay.
+	 */
+	public static CompletableFuture<Void> scheduleWithDelay(
+			final Runnable operation,
+			final Time delay,
+			final ScheduledExecutor scheduledExecutor) {
+		Supplier<Void> operationSupplier = () -> {
+			operation.run();
+			return null;
+		};
+		return scheduleWithDelay(operationSupplier, delay, scheduledExecutor);
+	}
+
+	/**
+	 * Schedule the operation with the given delay.
+	 *
+	 * @param operation to schedule
+	 * @param delay delay to schedule
+	 * @param scheduledExecutor executor to be used for the operation
+	 * @param <T> type of the result
+	 * @return Future which schedules the given operation with given delay.
+	 */
+	public static <T> CompletableFuture<T> scheduleWithDelay(
+			final Supplier<T> operation,
+			final Time delay,
+			final ScheduledExecutor scheduledExecutor) {
+		final CompletableFuture<T> resultFuture = new CompletableFuture<>();
+
+		ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule(
+			() -> {
+				try {
+					resultFuture.complete(operation.get());
+				} catch (Throwable t) {
+					resultFuture.completeExceptionally(t);
+				}
+			},
+			delay.getSize(),
+			delay.getUnit()
+		);
+
+		resultFuture.whenComplete(
+			(t, throwable) -> {
+				if (!scheduledFuture.isDone()) {
+					scheduledFuture.cancel(false);
+				}
+			});
+		return resultFuture;
+	}
+
 	private static <T> void retryOperationWithDelay(
 			final CompletableFuture<T> resultFuture,
 			final Supplier<CompletableFuture<T>> operation,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 0984274..e8134e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -1215,6 +1215,7 @@ public class ExecutionGraph implements AccessExecutionGraph {
 
 			scheduleForExecution();
 		}
+		// TODO remove the catch block if we align the schematics to not fail global within the restarter.
 		catch (Throwable t) {
 			LOG.warn("Failed to restart the job.", t);
 			failGlobal(t);
@@ -1433,8 +1434,13 @@ public class ExecutionGraph implements AccessExecutionGraph {
 					LOG.info("Restarting the job {} ({}).", getJobName(), getJobID());
 
 					RestartCallback restarter = new ExecutionGraphRestartCallback(this, globalModVersionForRestart);
-					restartStrategy.restart(restarter, getJobMasterMainThreadExecutor());
-
+					FutureUtils.assertNoException(
+						restartStrategy
+							.restart(restarter, getJobMasterMainThreadExecutor())
+							.exceptionally((throwable) -> {
+								failGlobal(throwable);
+								return null;
+							}));
 					return true;
 				}
 				else if (!isRestartable && transitionState(currentState, JobStatus.FAILED, failureCause)) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
index ddb60af..164cd91 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
@@ -54,6 +54,7 @@ import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.function.BiFunction;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -108,16 +109,18 @@ public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy {
 
 		FutureUtils.assertNoException(
 			cancelTasks(verticesToRestart)
-				.thenRunAsync(resetAndRescheduleTasks(globalModVersion, vertexVersions), executionGraph.getJobMasterMainThreadExecutor())
+				.thenComposeAsync(resetAndRescheduleTasks(globalModVersion, vertexVersions), executionGraph.getJobMasterMainThreadExecutor())
 				.handle(failGlobalOnError()));
 	}
 
-	private Runnable resetAndRescheduleTasks(final long globalModVersion, final Set<ExecutionVertexVersion> vertexVersions) {
-		final RestartStrategy restartStrategy = executionGraph.getRestartStrategy();
-		return () -> restartStrategy.restart(
-			createResetAndRescheduleTasksCallback(globalModVersion, vertexVersions),
-			executionGraph.getJobMasterMainThreadExecutor()
-		);
+	private Function<Object, CompletableFuture<Void>> resetAndRescheduleTasks(final long globalModVersion, final Set<ExecutionVertexVersion> vertexVersions) {
+		return (ignored) -> {
+			final RestartStrategy restartStrategy = executionGraph.getRestartStrategy();
+			return restartStrategy.restart(
+				createResetAndRescheduleTasksCallback(globalModVersion, vertexVersions),
+				executionGraph.getJobMasterMainThreadExecutor()
+			);
+		};
 	}
 
 	private RestartCallback createResetAndRescheduleTasksCallback(final long globalModVersion, final Set<ExecutionVertexVersion> vertexVersions) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
index 78e51ab..0bdc3ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
@@ -22,15 +22,17 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.util.Preconditions;
 
-import scala.concurrent.duration.Duration;
-
 import java.util.ArrayDeque;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
+import scala.concurrent.duration.Duration;
+
 /**
  * Restart strategy which tries to restart the given {@link ExecutionGraph} when failure rate exceeded
  * with a fixed time delay in between.
@@ -68,18 +70,12 @@ public class FailureRateRestartStrategy implements RestartStrategy {
 	}
 
 	@Override
-	public void restart(final RestartCallback restarter, ScheduledExecutor executor) {
+	public CompletableFuture<Void> restart(final RestartCallback restarter, ScheduledExecutor executor) {
 		if (isRestartTimestampsQueueFull()) {
 			restartTimestampsDeque.remove();
 		}
 		restartTimestampsDeque.add(System.currentTimeMillis());
-
-		executor.schedule(new Runnable() {
-			@Override
-			public void run() {
-				restarter.triggerFullRecovery();
-			}
-		}, delayInterval.getSize(), delayInterval.getUnit());
+		return FutureUtils.scheduleWithDelay(restarter::triggerFullRecovery, delayInterval, executor);
 	}
 
 	private boolean isRestartTimestampsQueueFull() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
index 439f806..5097bd7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
@@ -18,13 +18,15 @@
 
 package org.apache.flink.runtime.executiongraph.restart;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.util.Preconditions;
 
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CompletableFuture;
 
 import scala.concurrent.duration.Duration;
 
@@ -60,9 +62,9 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
 	}
 
 	@Override
-	public void restart(final RestartCallback restarter, ScheduledExecutor executor) {
+	public CompletableFuture<Void> restart(final RestartCallback restarter, ScheduledExecutor executor) {
 		currentRestartAttempt++;
-		executor.schedule(restarter::triggerFullRecovery, delayBetweenRestartAttempts, TimeUnit.MILLISECONDS);
+		return FutureUtils.scheduleWithDelay(restarter::triggerFullRecovery, Time.milliseconds(delayBetweenRestartAttempts), executor);
 	}
 
 	/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
index b639614..7c93b1c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
@@ -22,6 +22,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 
+import java.util.concurrent.CompletableFuture;
+
 /**
  * Restart strategy which does not restart an {@link ExecutionGraph}.
  */
@@ -33,7 +35,7 @@ public class NoRestartStrategy implements RestartStrategy {
 	}
 
 	@Override
-	public void restart(RestartCallback restarter, ScheduledExecutor executor) {
+	public CompletableFuture<Void> restart(RestartCallback restarter, ScheduledExecutor executor) {
 		throw new UnsupportedOperationException("NoRestartStrategy does not support restart.");
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
index ffa2777..f137a9c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.executiongraph.restart;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 
+import java.util.concurrent.CompletableFuture;
+
 /**
  * Strategy for {@link ExecutionGraph} restarts.
  */
@@ -42,6 +44,7 @@ public interface RestartStrategy {
 	 *
 	 * @param restarter The hook to restart the ExecutionGraph
 	 * @param executor An scheduled executor to delay the restart
+	 * @return A {@link CompletableFuture} that will be completed when the restarting process is done.
 	 */
-	void restart(RestartCallback restarter, ScheduledExecutor executor);
+	CompletableFuture<Void> restart(RestartCallback restarter, ScheduledExecutor executor);
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ThrowingRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ThrowingRestartStrategy.java
index e7355df..f017338 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ThrowingRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ThrowingRestartStrategy.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.executiongraph.restart;
 
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 
+import java.util.concurrent.CompletableFuture;
+
 
 /**
  * A restart strategy that validates that it is not in use by throwing {@link IllegalStateException}
@@ -34,7 +36,7 @@ public class ThrowingRestartStrategy implements RestartStrategy {
 	}
 
 	@Override
-	public void restart(final RestartCallback restarter, final ScheduledExecutor executor) {
+	public CompletableFuture<Void> restart(final RestartCallback restarter, final ScheduledExecutor executor) {
 		throw new IllegalStateException("Unexpected restart() call");
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
index e16771c..95ee97a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
@@ -236,6 +236,65 @@ public class FutureUtilsTest extends TestLogger {
 	}
 
 	/**
+	 * Tests that the operation could be scheduled with expected delay.
+	 */
+	@Test
+	public void testScheduleWithDelay() throws Exception {
+		final ManuallyTriggeredScheduledExecutor scheduledExecutor = new ManuallyTriggeredScheduledExecutor();
+
+		final int expectedResult = 42;
+		CompletableFuture<Integer> completableFuture = FutureUtils.scheduleWithDelay(
+			() -> expectedResult,
+			Time.milliseconds(0),
+			scheduledExecutor);
+
+		scheduledExecutor.triggerScheduledTasks();
+		final int actualResult = completableFuture.get();
+
+		assertEquals(expectedResult, actualResult);
+	}
+
+	/**
+	 * Tests that a scheduled task is canceled if the scheduled future is being cancelled.
+	 */
+	@Test
+	public void testScheduleWithDelayCancellation() {
+		final ManuallyTriggeredScheduledExecutor scheduledExecutor = new ManuallyTriggeredScheduledExecutor();
+
+		final Runnable noOpRunnable = () -> {};
+		CompletableFuture<Void> completableFuture = FutureUtils.scheduleWithDelay(
+			noOpRunnable,
+			TestingUtils.infiniteTime(),
+			scheduledExecutor);
+
+		final ScheduledFuture<?> scheduledFuture = scheduledExecutor
+			.getScheduledTasks()
+			.iterator()
+			.next();
+
+		completableFuture.cancel(false);
+
+		assertTrue(completableFuture.isCancelled());
+		assertTrue(scheduledFuture.isCancelled());
+	}
+
+	/**
+	 * Tests that the operation is never scheduled if the delay is virtually infinite.
+	 */
+	@Test
+	public void testScheduleWithInfiniteDelayNeverSchedulesOperation() {
+		final Runnable noOpRunnable = () -> {};
+		final CompletableFuture<Void> completableFuture = FutureUtils.scheduleWithDelay(
+			noOpRunnable,
+			TestingUtils.infiniteTime(),
+			TestingUtils.defaultScheduledExecutor());
+
+		assertFalse(completableFuture.isDone());
+
+		completableFuture.cancel(false);
+	}
+
+	/**
 	 * Tests that a future is timed out after the specified timeout.
 	 */
 	@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
index acf9869..abd940b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
@@ -19,20 +19,17 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import org.apache.flink.runtime.executiongraph.restart.FailingRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
-import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
-import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.io.network.partition.PartitionTracker;
 import org.apache.flink.runtime.io.network.partition.PartitionTrackerImpl;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -44,10 +41,6 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.util.TestLogger;
@@ -55,17 +48,13 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Before;
 import org.junit.Test;
 
-import javax.annotation.Nullable;
-
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeoutException;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
@@ -80,15 +69,12 @@ public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLog
 
 	private ComponentMainThreadExecutor componentMainThreadExecutor;
 
-	private FailingSlotProviderDecorator slotProvider;
-
 	private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor;
 
 	@Before
 	public void setUp() {
 		manualMainThreadExecutor = new ManuallyTriggeredScheduledExecutor();
 		componentMainThreadExecutor = new ComponentMainThreadExecutorServiceAdapter(manualMainThreadExecutor, Thread.currentThread());
-		slotProvider = new FailingSlotProviderDecorator(new SimpleSlotProvider(TEST_JOB_ID, 14));
 	}
 
 	/**
@@ -285,9 +271,9 @@ public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLog
 	}
 
 	@Test
-	public void testFailGlobalIfErrorOnRestartingTasks() throws Exception {
+	public void testFailGlobalIfErrorOnRestartTasks() throws Exception {
 		final JobGraph jobGraph = createStreamingJobGraph();
-		final ExecutionGraph eg = createExecutionGraph(jobGraph);
+		final ExecutionGraph eg = createExecutionGraph(jobGraph, new FailingRestartStrategy(1));
 
 		final Iterator<ExecutionVertex> vertexIterator = eg.getAllExecutionVertices().iterator();
 		final ExecutionVertex ev11 = vertexIterator.next();
@@ -297,7 +283,6 @@ public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLog
 
 		final long globalModVersionBeforeFailure = eg.getGlobalModVersion();
 
-		slotProvider.setFailSlotAllocation(true);
 		ev11.fail(new Exception("Test Exception"));
 		completeCancelling(ev11, ev12, ev21, ev22);
 
@@ -391,7 +376,6 @@ public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLog
 		final ExecutionGraph eg = new ExecutionGraphTestUtils.TestingExecutionGraphBuilder(jobGraph)
 			.setRestartStrategy(restartStrategy)
 			.setFailoverStrategyFactory(TestAdaptedRestartPipelinedRegionStrategyNG::new)
-			.setSlotProvider(slotProvider)
 			.setPartitionTracker(partitionTracker)
 			.build();
 
@@ -454,49 +438,4 @@ public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLog
 			return lastTasksToRestart;
 		}
 	}
-
-	private static class FailingSlotProviderDecorator implements SlotProvider {
-
-		private final SlotProvider delegate;
-
-		private boolean failSlotAllocation = false;
-
-		FailingSlotProviderDecorator(final SlotProvider delegate) {
-			this.delegate = checkNotNull(delegate);
-		}
-
-		@Override
-		public CompletableFuture<LogicalSlot> allocateBatchSlot(
-			final SlotRequestId slotRequestId,
-			final ScheduledUnit scheduledUnit,
-			final SlotProfile slotProfile,
-			final boolean allowQueuedScheduling) {
-			return allocateSlot(slotRequestId, scheduledUnit, slotProfile, allowQueuedScheduling, null);
-		}
-
-		@Override
-		public CompletableFuture<LogicalSlot> allocateSlot(
-				final SlotRequestId slotRequestId,
-				final ScheduledUnit scheduledUnit,
-				final SlotProfile slotProfile,
-				final boolean allowQueuedScheduling,
-				final Time allocationTimeout) {
-			if (failSlotAllocation) {
-				return FutureUtils.completedExceptionally(new TimeoutException("Expected"));
-			}
-			return delegate.allocateSlot(slotRequestId, scheduledUnit, slotProfile, allowQueuedScheduling, allocationTimeout);
-		}
-
-		@Override
-		public void cancelSlotRequest(
-				final SlotRequestId slotRequestId,
-				@Nullable final SlotSharingGroupId slotSharingGroupId,
-				final Throwable cause) {
-			delegate.cancelSlotRequest(slotRequestId, slotSharingGroupId, cause);
-		}
-
-		public void setFailSlotAllocation(final boolean failSlotAllocation) {
-			this.failSlotAllocation = failSlotAllocation;
-		}
-	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestRestartStrategy.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestRestartStrategy.java
index dd7053f..9a25c42 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestRestartStrategy.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestRestartStrategy.java
@@ -72,7 +72,7 @@ public class TestRestartStrategy implements RestartStrategy {
 	}
 
 	@Override
-	public void restart(RestartCallback restarter, ScheduledExecutor executor) {
+	public CompletableFuture<Void> restart(RestartCallback restarter, ScheduledExecutor executor) {
 
 		++restartAttempts;
 		ExecutorAction executorAction = new ExecutorAction(restarter::triggerFullRecovery, executor);
@@ -80,8 +80,9 @@ public class TestRestartStrategy implements RestartStrategy {
 			synchronized (actionsQueue) {
 				actionsQueue.add(executorAction);
 			}
+			return new CompletableFuture<>();
 		} else {
-			executorAction.trigger();
+			return executorAction.trigger();
 		}
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FailingRestartStrategy.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FailingRestartStrategy.java
new file mode 100644
index 0000000..fbeab79
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FailingRestartStrategy.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnegative;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A restart strategy which fails a predefined amount of times.
+ */
+public class FailingRestartStrategy implements RestartStrategy {
+
+	public static final ConfigOption<Integer> NUM_FAILURES_CONFIG_OPTION = ConfigOptions
+		.key("restart-strategy.failing.failures")
+		.defaultValue(1);
+
+	private final int numberOfFailures;
+
+	private int restartedTimes;
+
+	public FailingRestartStrategy(@Nonnegative int numberOfFailures) {
+		this.numberOfFailures = numberOfFailures;
+	}
+
+	@Override
+	public boolean canRestart() {
+		return true;
+	}
+
+	@Override
+	public CompletableFuture<Void> restart(RestartCallback restarter, ScheduledExecutor executor) {
+		++restartedTimes;
+
+		if (restartedTimes <= numberOfFailures) {
+			return FutureUtils.completedExceptionally(new FlinkRuntimeException("Fail to restart for " + restartedTimes + " time(s)."));
+		} else {
+			return FutureUtils.scheduleWithDelay(restarter::triggerFullRecovery, Time.milliseconds(0L), executor);
+		}
+	}
+
+	/**
+	 * Creates a {@link FailingRestartStrategyFactory} from the given Configuration.
+	 */
+	public static FailingRestartStrategyFactory createFactory(Configuration configuration) {
+		int numberOfFailures = configuration.getInteger(NUM_FAILURES_CONFIG_OPTION);
+		return new FailingRestartStrategyFactory(numberOfFailures);
+	}
+
+	/**
+	 * Factory for {@link FailingRestartStrategy}.
+	 */
+	public static class FailingRestartStrategyFactory extends RestartStrategyFactory {
+		private static final long serialVersionUID = 1L;
+
+		private final int numberOfFailures;
+
+		public FailingRestartStrategyFactory(int numberOfFailures) {
+			this.numberOfFailures = numberOfFailures;
+		}
+
+		@Override
+		public RestartStrategy createRestartStrategy() {
+			return new FailingRestartStrategy(numberOfFailures);
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java
index d145298..7c64d72 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java
@@ -20,9 +20,12 @@ package org.apache.flink.runtime.executiongraph.restart;
 
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.CompletableFuture;
+
 /**
  * Testing restart strategy which promise to restart {@link ExecutionGraph} after the infinite time delay.
  * Actually {@link ExecutionGraph} will never be restarted. No additional threads will be used.
@@ -49,11 +52,12 @@ public class InfiniteDelayRestartStrategy implements RestartStrategy {
 	}
 
 	@Override
-	public void restart(RestartCallback restarter, ScheduledExecutor executor) {
+	public CompletableFuture<Void> restart(RestartCallback restarter, ScheduledExecutor executor) {
 		LOG.info("Delaying retry of job execution forever");
 
 		if (maxRestartAttempts >= 0) {
 			restartAttemptCounter++;
 		}
+		return new CompletableFuture<>();
 	}
 }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
index b951b48..3c44c2a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
@@ -20,7 +20,6 @@ package org.apache.flink.test.checkpointing;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
@@ -29,8 +28,10 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.executiongraph.restart.FailingRestartStrategy;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -78,10 +79,14 @@ public class RegionFailoverITCase extends TestLogger {
 	private static final int FAIL_BASE = 1000;
 	private static final int NUM_OF_REGIONS = 3;
 	private static final int MAX_PARALLELISM = 2 * NUM_OF_REGIONS;
-	private static final Set<Integer> EXPECTED_INDICES = IntStream.range(0, NUM_OF_REGIONS).boxed().collect(Collectors.toSet());
+	private static final Set<Integer> EXPECTED_INDICES_MULTI_REGION = IntStream.range(0, NUM_OF_REGIONS).boxed().collect(Collectors.toSet());
+	private static final Set<Integer> EXPECTED_INDICES_SINGLE_REGION = Collections.singleton(0);
 	private static final int NUM_OF_RESTARTS = 3;
 	private static final int NUM_ELEMENTS = FAIL_BASE * 10;
 
+	private static final String SINGLE_REGION_SOURCE_NAME = "single-source";
+	private static final String MULTI_REGION_SOURCE_NAME = "multi-source";
+
 	private static AtomicLong lastCompletedCheckpointId = new AtomicLong(0);
 	private static AtomicInteger numCompletedCheckpoints = new AtomicInteger(0);
 	private static AtomicInteger jobFailedCnt = new AtomicInteger(0);
@@ -99,6 +104,9 @@ public class RegionFailoverITCase extends TestLogger {
 	public void setup() throws Exception {
 		Configuration configuration = new Configuration();
 		configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "region");
+		// global failover times: 3, region failover times: NUM_OF_RESTARTS
+		configuration.setInteger(FailingRestartStrategy.NUM_FAILURES_CONFIG_OPTION, 3);
+		configuration.setString(ConfigConstants.RESTART_STRATEGY, FailingRestartStrategy.class.getName());
 
 		cluster = new MiniClusterWithClientResource(
 			new MiniClusterResourceConfiguration.Builder()
@@ -159,12 +167,12 @@ public class RegionFailoverITCase extends TestLogger {
 		env.enableCheckpointing(200, CheckpointingMode.EXACTLY_ONCE);
 		env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 		env.disableOperatorChaining();
-		env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(NUM_OF_RESTARTS, 0L));
 		env.getConfig().disableSysoutLogging();
 
 		// Use DataStreamUtils#reinterpretAsKeyed to avoid merge regions and this stream graph would exist num of 'NUM_OF_REGIONS' individual regions.
 		DataStreamUtils.reinterpretAsKeyedStream(
 			env.addSource(new StringGeneratingSourceFunction(NUM_ELEMENTS, NUM_ELEMENTS / NUM_OF_RESTARTS))
+				.name(MULTI_REGION_SOURCE_NAME)
 				.setParallelism(NUM_OF_REGIONS),
 			(KeySelector<Tuple2<Integer, Integer>, Integer>) value -> value.f0,
 			TypeInformation.of(Integer.class))
@@ -174,7 +182,8 @@ public class RegionFailoverITCase extends TestLogger {
 			.setParallelism(NUM_OF_REGIONS);
 
 		// another stream graph totally disconnected with the above one.
-		env.addSource(new StringGeneratingSourceFunction(NUM_ELEMENTS, NUM_ELEMENTS / NUM_OF_RESTARTS)).setParallelism(1)
+		env.addSource(new StringGeneratingSourceFunction(NUM_ELEMENTS, NUM_ELEMENTS / NUM_OF_RESTARTS)).
+			name(SINGLE_REGION_SOURCE_NAME).setParallelism(1)
 			.map((MapFunction<Tuple2<Integer, Integer>, Object>) value -> value).setParallelism(1);
 
 		return env.getStreamGraph().getJobGraph();
@@ -282,7 +291,11 @@ public class RegionFailoverITCase extends TestLogger {
 
 				unionListState = context.getOperatorStateStore().getUnionListState(unionStateDescriptor);
 				Set<Integer> actualIndices = StreamSupport.stream(unionListState.get().spliterator(), false).collect(Collectors.toSet());
-				Assert.assertTrue(CollectionUtils.isEqualCollection(EXPECTED_INDICES, actualIndices));
+				if (getRuntimeContext().getTaskName().contains(SINGLE_REGION_SOURCE_NAME)) {
+					Assert.assertTrue(CollectionUtils.isEqualCollection(EXPECTED_INDICES_SINGLE_REGION, actualIndices));
+				} else {
+					Assert.assertTrue(CollectionUtils.isEqualCollection(EXPECTED_INDICES_MULTI_REGION, actualIndices));
+				}
 
 				if (indexOfThisSubtask == 0) {
 					listState = context.getOperatorStateStore().getListState(stateDescriptor);