You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2019/08/07 07:54:30 UTC
[flink] branch release-1.9 updated: [FLINK-13452][runtime] Fail the
job globally when exception happens during reseting tasks of a region
This is an automated email from the ASF dual-hosted git repository.
gary pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new b931e9c [FLINK-13452][runtime] Fail the job globally when exception happens during reseting tasks of a region
b931e9c is described below
commit b931e9c10b231ed1823fe6a97bccf73bb835dbc2
Author: Yun Tang <my...@live.com>
AuthorDate: Tue Jul 30 03:57:46 2019 +0800
[FLINK-13452][runtime] Fail the job globally when exception happens during reseting tasks of a region
---
.../flink/runtime/concurrent/FutureUtils.java | 55 +++++++++++++
.../runtime/executiongraph/ExecutionGraph.java | 10 ++-
.../AdaptedRestartPipelinedRegionStrategyNG.java | 17 ++--
.../restart/FailureRateRestartStrategy.java | 16 ++--
.../restart/FixedDelayRestartStrategy.java | 8 +-
.../executiongraph/restart/NoRestartStrategy.java | 4 +-
.../executiongraph/restart/RestartStrategy.java | 5 +-
.../restart/ThrowingRestartStrategy.java | 4 +-
.../flink/runtime/concurrent/FutureUtilsTest.java | 59 ++++++++++++++
...startPipelinedRegionStrategyNGFailoverTest.java | 67 +---------------
.../executiongraph/TestRestartStrategy.java | 5 +-
.../restart/FailingRestartStrategy.java | 91 ++++++++++++++++++++++
.../restart/InfiniteDelayRestartStrategy.java | 6 +-
.../test/checkpointing/RegionFailoverITCase.java | 23 ++++--
14 files changed, 273 insertions(+), 97 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index c1613c5..47dbcc5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -186,6 +186,61 @@ public class FutureUtils {
scheduledExecutor);
}
+ /**
+ * Schedule the operation with the given delay.
+ *
+ * @param operation to schedule
+ * @param delay delay to schedule
+ * @param scheduledExecutor executor to be used for the operation
+ * @return Future which schedules the given operation with given delay.
+ */
+ public static CompletableFuture<Void> scheduleWithDelay(
+ final Runnable operation,
+ final Time delay,
+ final ScheduledExecutor scheduledExecutor) {
+ Supplier<Void> operationSupplier = () -> {
+ operation.run();
+ return null;
+ };
+ return scheduleWithDelay(operationSupplier, delay, scheduledExecutor);
+ }
+
+ /**
+ * Schedule the operation with the given delay.
+ *
+ * @param operation to schedule
+ * @param delay delay to schedule
+ * @param scheduledExecutor executor to be used for the operation
+ * @param <T> type of the result
+ * @return Future which schedules the given operation with given delay.
+ */
+ public static <T> CompletableFuture<T> scheduleWithDelay(
+ final Supplier<T> operation,
+ final Time delay,
+ final ScheduledExecutor scheduledExecutor) {
+ final CompletableFuture<T> resultFuture = new CompletableFuture<>();
+
+ ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule(
+ () -> {
+ try {
+ resultFuture.complete(operation.get());
+ } catch (Throwable t) {
+ resultFuture.completeExceptionally(t);
+ }
+ },
+ delay.getSize(),
+ delay.getUnit()
+ );
+
+ resultFuture.whenComplete(
+ (t, throwable) -> {
+ if (!scheduledFuture.isDone()) {
+ scheduledFuture.cancel(false);
+ }
+ });
+ return resultFuture;
+ }
+
private static <T> void retryOperationWithDelay(
final CompletableFuture<T> resultFuture,
final Supplier<CompletableFuture<T>> operation,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 0984274..e8134e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -1215,6 +1215,7 @@ public class ExecutionGraph implements AccessExecutionGraph {
scheduleForExecution();
}
+ // TODO remove the catch block if we align the schematics to not fail global within the restarter.
catch (Throwable t) {
LOG.warn("Failed to restart the job.", t);
failGlobal(t);
@@ -1433,8 +1434,13 @@ public class ExecutionGraph implements AccessExecutionGraph {
LOG.info("Restarting the job {} ({}).", getJobName(), getJobID());
RestartCallback restarter = new ExecutionGraphRestartCallback(this, globalModVersionForRestart);
- restartStrategy.restart(restarter, getJobMasterMainThreadExecutor());
-
+ FutureUtils.assertNoException(
+ restartStrategy
+ .restart(restarter, getJobMasterMainThreadExecutor())
+ .exceptionally((throwable) -> {
+ failGlobal(throwable);
+ return null;
+ }));
return true;
}
else if (!isRestartable && transitionState(currentState, JobStatus.FAILED, failureCause)) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
index ddb60af..164cd91 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
@@ -54,6 +54,7 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.BiFunction;
+import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -108,16 +109,18 @@ public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy {
FutureUtils.assertNoException(
cancelTasks(verticesToRestart)
- .thenRunAsync(resetAndRescheduleTasks(globalModVersion, vertexVersions), executionGraph.getJobMasterMainThreadExecutor())
+ .thenComposeAsync(resetAndRescheduleTasks(globalModVersion, vertexVersions), executionGraph.getJobMasterMainThreadExecutor())
.handle(failGlobalOnError()));
}
- private Runnable resetAndRescheduleTasks(final long globalModVersion, final Set<ExecutionVertexVersion> vertexVersions) {
- final RestartStrategy restartStrategy = executionGraph.getRestartStrategy();
- return () -> restartStrategy.restart(
- createResetAndRescheduleTasksCallback(globalModVersion, vertexVersions),
- executionGraph.getJobMasterMainThreadExecutor()
- );
+ private Function<Object, CompletableFuture<Void>> resetAndRescheduleTasks(final long globalModVersion, final Set<ExecutionVertexVersion> vertexVersions) {
+ return (ignored) -> {
+ final RestartStrategy restartStrategy = executionGraph.getRestartStrategy();
+ return restartStrategy.restart(
+ createResetAndRescheduleTasksCallback(globalModVersion, vertexVersions),
+ executionGraph.getJobMasterMainThreadExecutor()
+ );
+ };
}
private RestartCallback createResetAndRescheduleTasksCallback(final long globalModVersion, final Set<ExecutionVertexVersion> vertexVersions) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
index 78e51ab..0bdc3ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
@@ -22,15 +22,17 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.util.Preconditions;
-import scala.concurrent.duration.Duration;
-
import java.util.ArrayDeque;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import scala.concurrent.duration.Duration;
+
/**
* Restart strategy which tries to restart the given {@link ExecutionGraph} when failure rate exceeded
* with a fixed time delay in between.
@@ -68,18 +70,12 @@ public class FailureRateRestartStrategy implements RestartStrategy {
}
@Override
- public void restart(final RestartCallback restarter, ScheduledExecutor executor) {
+ public CompletableFuture<Void> restart(final RestartCallback restarter, ScheduledExecutor executor) {
if (isRestartTimestampsQueueFull()) {
restartTimestampsDeque.remove();
}
restartTimestampsDeque.add(System.currentTimeMillis());
-
- executor.schedule(new Runnable() {
- @Override
- public void run() {
- restarter.triggerFullRecovery();
- }
- }, delayInterval.getSize(), delayInterval.getUnit());
+ return FutureUtils.scheduleWithDelay(restarter::triggerFullRecovery, delayInterval, executor);
}
private boolean isRestartTimestampsQueueFull() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
index 439f806..5097bd7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
@@ -18,13 +18,15 @@
package org.apache.flink.runtime.executiongraph.restart;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.util.Preconditions;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CompletableFuture;
import scala.concurrent.duration.Duration;
@@ -60,9 +62,9 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
}
@Override
- public void restart(final RestartCallback restarter, ScheduledExecutor executor) {
+ public CompletableFuture<Void> restart(final RestartCallback restarter, ScheduledExecutor executor) {
currentRestartAttempt++;
- executor.schedule(restarter::triggerFullRecovery, delayBetweenRestartAttempts, TimeUnit.MILLISECONDS);
+ return FutureUtils.scheduleWithDelay(restarter::triggerFullRecovery, Time.milliseconds(delayBetweenRestartAttempts), executor);
}
/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
index b639614..7c93b1c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
@@ -22,6 +22,8 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import java.util.concurrent.CompletableFuture;
+
/**
* Restart strategy which does not restart an {@link ExecutionGraph}.
*/
@@ -33,7 +35,7 @@ public class NoRestartStrategy implements RestartStrategy {
}
@Override
- public void restart(RestartCallback restarter, ScheduledExecutor executor) {
+ public CompletableFuture<Void> restart(RestartCallback restarter, ScheduledExecutor executor) {
throw new UnsupportedOperationException("NoRestartStrategy does not support restart.");
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
index ffa2777..f137a9c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.executiongraph.restart;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import java.util.concurrent.CompletableFuture;
+
/**
* Strategy for {@link ExecutionGraph} restarts.
*/
@@ -42,6 +44,7 @@ public interface RestartStrategy {
*
* @param restarter The hook to restart the ExecutionGraph
* @param executor An scheduled executor to delay the restart
+ * @return A {@link CompletableFuture} that will be completed when the restarting process is done.
*/
- void restart(RestartCallback restarter, ScheduledExecutor executor);
+ CompletableFuture<Void> restart(RestartCallback restarter, ScheduledExecutor executor);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ThrowingRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ThrowingRestartStrategy.java
index e7355df..f017338 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ThrowingRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ThrowingRestartStrategy.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.executiongraph.restart;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import java.util.concurrent.CompletableFuture;
+
/**
* A restart strategy that validates that it is not in use by throwing {@link IllegalStateException}
@@ -34,7 +36,7 @@ public class ThrowingRestartStrategy implements RestartStrategy {
}
@Override
- public void restart(final RestartCallback restarter, final ScheduledExecutor executor) {
+ public CompletableFuture<Void> restart(final RestartCallback restarter, final ScheduledExecutor executor) {
throw new IllegalStateException("Unexpected restart() call");
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
index e16771c..95ee97a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
@@ -236,6 +236,65 @@ public class FutureUtilsTest extends TestLogger {
}
/**
+ * Tests that the operation could be scheduled with expected delay.
+ */
+ @Test
+ public void testScheduleWithDelay() throws Exception {
+ final ManuallyTriggeredScheduledExecutor scheduledExecutor = new ManuallyTriggeredScheduledExecutor();
+
+ final int expectedResult = 42;
+ CompletableFuture<Integer> completableFuture = FutureUtils.scheduleWithDelay(
+ () -> expectedResult,
+ Time.milliseconds(0),
+ scheduledExecutor);
+
+ scheduledExecutor.triggerScheduledTasks();
+ final int actualResult = completableFuture.get();
+
+ assertEquals(expectedResult, actualResult);
+ }
+
+ /**
+ * Tests that a scheduled task is canceled if the scheduled future is being cancelled.
+ */
+ @Test
+ public void testScheduleWithDelayCancellation() {
+ final ManuallyTriggeredScheduledExecutor scheduledExecutor = new ManuallyTriggeredScheduledExecutor();
+
+ final Runnable noOpRunnable = () -> {};
+ CompletableFuture<Void> completableFuture = FutureUtils.scheduleWithDelay(
+ noOpRunnable,
+ TestingUtils.infiniteTime(),
+ scheduledExecutor);
+
+ final ScheduledFuture<?> scheduledFuture = scheduledExecutor
+ .getScheduledTasks()
+ .iterator()
+ .next();
+
+ completableFuture.cancel(false);
+
+ assertTrue(completableFuture.isCancelled());
+ assertTrue(scheduledFuture.isCancelled());
+ }
+
+ /**
+ * Tests that the operation is never scheduled if the delay is virtually infinite.
+ */
+ @Test
+ public void testScheduleWithInfiniteDelayNeverSchedulesOperation() {
+ final Runnable noOpRunnable = () -> {};
+ final CompletableFuture<Void> completableFuture = FutureUtils.scheduleWithDelay(
+ noOpRunnable,
+ TestingUtils.infiniteTime(),
+ TestingUtils.defaultScheduledExecutor());
+
+ assertFalse(completableFuture.isDone());
+
+ completableFuture.cancel(false);
+ }
+
+ /**
* Tests that a future is timed out after the specified timeout.
*/
@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
index acf9869..abd940b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
@@ -19,20 +19,17 @@
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import org.apache.flink.runtime.executiongraph.restart.FailingRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
-import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
-import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.io.network.partition.PartitionTracker;
import org.apache.flink.runtime.io.network.partition.PartitionTrackerImpl;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -44,10 +41,6 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.util.TestLogger;
@@ -55,17 +48,13 @@ import org.apache.flink.util.TestLogger;
import org.junit.Before;
import org.junit.Test;
-import javax.annotation.Nullable;
-
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeoutException;
-import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
@@ -80,15 +69,12 @@ public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLog
private ComponentMainThreadExecutor componentMainThreadExecutor;
- private FailingSlotProviderDecorator slotProvider;
-
private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor;
@Before
public void setUp() {
manualMainThreadExecutor = new ManuallyTriggeredScheduledExecutor();
componentMainThreadExecutor = new ComponentMainThreadExecutorServiceAdapter(manualMainThreadExecutor, Thread.currentThread());
- slotProvider = new FailingSlotProviderDecorator(new SimpleSlotProvider(TEST_JOB_ID, 14));
}
/**
@@ -285,9 +271,9 @@ public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLog
}
@Test
- public void testFailGlobalIfErrorOnRestartingTasks() throws Exception {
+ public void testFailGlobalIfErrorOnRestartTasks() throws Exception {
final JobGraph jobGraph = createStreamingJobGraph();
- final ExecutionGraph eg = createExecutionGraph(jobGraph);
+ final ExecutionGraph eg = createExecutionGraph(jobGraph, new FailingRestartStrategy(1));
final Iterator<ExecutionVertex> vertexIterator = eg.getAllExecutionVertices().iterator();
final ExecutionVertex ev11 = vertexIterator.next();
@@ -297,7 +283,6 @@ public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLog
final long globalModVersionBeforeFailure = eg.getGlobalModVersion();
- slotProvider.setFailSlotAllocation(true);
ev11.fail(new Exception("Test Exception"));
completeCancelling(ev11, ev12, ev21, ev22);
@@ -391,7 +376,6 @@ public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLog
final ExecutionGraph eg = new ExecutionGraphTestUtils.TestingExecutionGraphBuilder(jobGraph)
.setRestartStrategy(restartStrategy)
.setFailoverStrategyFactory(TestAdaptedRestartPipelinedRegionStrategyNG::new)
- .setSlotProvider(slotProvider)
.setPartitionTracker(partitionTracker)
.build();
@@ -454,49 +438,4 @@ public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLog
return lastTasksToRestart;
}
}
-
- private static class FailingSlotProviderDecorator implements SlotProvider {
-
- private final SlotProvider delegate;
-
- private boolean failSlotAllocation = false;
-
- FailingSlotProviderDecorator(final SlotProvider delegate) {
- this.delegate = checkNotNull(delegate);
- }
-
- @Override
- public CompletableFuture<LogicalSlot> allocateBatchSlot(
- final SlotRequestId slotRequestId,
- final ScheduledUnit scheduledUnit,
- final SlotProfile slotProfile,
- final boolean allowQueuedScheduling) {
- return allocateSlot(slotRequestId, scheduledUnit, slotProfile, allowQueuedScheduling, null);
- }
-
- @Override
- public CompletableFuture<LogicalSlot> allocateSlot(
- final SlotRequestId slotRequestId,
- final ScheduledUnit scheduledUnit,
- final SlotProfile slotProfile,
- final boolean allowQueuedScheduling,
- final Time allocationTimeout) {
- if (failSlotAllocation) {
- return FutureUtils.completedExceptionally(new TimeoutException("Expected"));
- }
- return delegate.allocateSlot(slotRequestId, scheduledUnit, slotProfile, allowQueuedScheduling, allocationTimeout);
- }
-
- @Override
- public void cancelSlotRequest(
- final SlotRequestId slotRequestId,
- @Nullable final SlotSharingGroupId slotSharingGroupId,
- final Throwable cause) {
- delegate.cancelSlotRequest(slotRequestId, slotSharingGroupId, cause);
- }
-
- public void setFailSlotAllocation(final boolean failSlotAllocation) {
- this.failSlotAllocation = failSlotAllocation;
- }
- }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestRestartStrategy.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestRestartStrategy.java
index dd7053f..9a25c42 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestRestartStrategy.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestRestartStrategy.java
@@ -72,7 +72,7 @@ public class TestRestartStrategy implements RestartStrategy {
}
@Override
- public void restart(RestartCallback restarter, ScheduledExecutor executor) {
+ public CompletableFuture<Void> restart(RestartCallback restarter, ScheduledExecutor executor) {
++restartAttempts;
ExecutorAction executorAction = new ExecutorAction(restarter::triggerFullRecovery, executor);
@@ -80,8 +80,9 @@ public class TestRestartStrategy implements RestartStrategy {
synchronized (actionsQueue) {
actionsQueue.add(executorAction);
}
+ return new CompletableFuture<>();
} else {
- executorAction.trigger();
+ return executorAction.trigger();
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FailingRestartStrategy.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FailingRestartStrategy.java
new file mode 100644
index 0000000..fbeab79
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FailingRestartStrategy.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnegative;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A restart strategy which fails a predefined amount of times.
+ */
+public class FailingRestartStrategy implements RestartStrategy {
+
+ public static final ConfigOption<Integer> NUM_FAILURES_CONFIG_OPTION = ConfigOptions
+ .key("restart-strategy.failing.failures")
+ .defaultValue(1);
+
+ private final int numberOfFailures;
+
+ private int restartedTimes;
+
+ public FailingRestartStrategy(@Nonnegative int numberOfFailures) {
+ this.numberOfFailures = numberOfFailures;
+ }
+
+ @Override
+ public boolean canRestart() {
+ return true;
+ }
+
+ @Override
+ public CompletableFuture<Void> restart(RestartCallback restarter, ScheduledExecutor executor) {
+ ++restartedTimes;
+
+ if (restartedTimes <= numberOfFailures) {
+ return FutureUtils.completedExceptionally(new FlinkRuntimeException("Fail to restart for " + restartedTimes + " time(s)."));
+ } else {
+ return FutureUtils.scheduleWithDelay(restarter::triggerFullRecovery, Time.milliseconds(0L), executor);
+ }
+ }
+
+ /**
+ * Creates a {@link FailingRestartStrategyFactory} from the given Configuration.
+ */
+ public static FailingRestartStrategyFactory createFactory(Configuration configuration) {
+ int numberOfFailures = configuration.getInteger(NUM_FAILURES_CONFIG_OPTION);
+ return new FailingRestartStrategyFactory(numberOfFailures);
+ }
+
+ /**
+ * Factory for {@link FailingRestartStrategy}.
+ */
+ public static class FailingRestartStrategyFactory extends RestartStrategyFactory {
+ private static final long serialVersionUID = 1L;
+
+ private final int numberOfFailures;
+
+ public FailingRestartStrategyFactory(int numberOfFailures) {
+ this.numberOfFailures = numberOfFailures;
+ }
+
+ @Override
+ public RestartStrategy createRestartStrategy() {
+ return new FailingRestartStrategy(numberOfFailures);
+ }
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java
index d145298..7c64d72 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java
@@ -20,9 +20,12 @@ package org.apache.flink.runtime.executiongraph.restart;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.CompletableFuture;
+
/**
* Testing restart strategy which promise to restart {@link ExecutionGraph} after the infinite time delay.
* Actually {@link ExecutionGraph} will never be restarted. No additional threads will be used.
@@ -49,11 +52,12 @@ public class InfiniteDelayRestartStrategy implements RestartStrategy {
}
@Override
- public void restart(RestartCallback restarter, ScheduledExecutor executor) {
+ public CompletableFuture<Void> restart(RestartCallback restarter, ScheduledExecutor executor) {
LOG.info("Delaying retry of job execution forever");
if (maxRestartAttempts >= 0) {
restartAttemptCounter++;
}
+ return new CompletableFuture<>();
}
}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
index b951b48..3c44c2a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
@@ -20,7 +20,6 @@ package org.apache.flink.test.checkpointing;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
@@ -29,8 +28,10 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.executiongraph.restart.FailingRestartStrategy;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -78,10 +79,14 @@ public class RegionFailoverITCase extends TestLogger {
private static final int FAIL_BASE = 1000;
private static final int NUM_OF_REGIONS = 3;
private static final int MAX_PARALLELISM = 2 * NUM_OF_REGIONS;
- private static final Set<Integer> EXPECTED_INDICES = IntStream.range(0, NUM_OF_REGIONS).boxed().collect(Collectors.toSet());
+ private static final Set<Integer> EXPECTED_INDICES_MULTI_REGION = IntStream.range(0, NUM_OF_REGIONS).boxed().collect(Collectors.toSet());
+ private static final Set<Integer> EXPECTED_INDICES_SINGLE_REGION = Collections.singleton(0);
private static final int NUM_OF_RESTARTS = 3;
private static final int NUM_ELEMENTS = FAIL_BASE * 10;
+ private static final String SINGLE_REGION_SOURCE_NAME = "single-source";
+ private static final String MULTI_REGION_SOURCE_NAME = "multi-source";
+
private static AtomicLong lastCompletedCheckpointId = new AtomicLong(0);
private static AtomicInteger numCompletedCheckpoints = new AtomicInteger(0);
private static AtomicInteger jobFailedCnt = new AtomicInteger(0);
@@ -99,6 +104,9 @@ public class RegionFailoverITCase extends TestLogger {
public void setup() throws Exception {
Configuration configuration = new Configuration();
configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "region");
+ // global failover times: 3, region failover times: NUM_OF_RESTARTS
+ configuration.setInteger(FailingRestartStrategy.NUM_FAILURES_CONFIG_OPTION, 3);
+ configuration.setString(ConfigConstants.RESTART_STRATEGY, FailingRestartStrategy.class.getName());
cluster = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
@@ -159,12 +167,12 @@ public class RegionFailoverITCase extends TestLogger {
env.enableCheckpointing(200, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.disableOperatorChaining();
- env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(NUM_OF_RESTARTS, 0L));
env.getConfig().disableSysoutLogging();
// Use DataStreamUtils#reinterpretAsKeyed to avoid merge regions and this stream graph would exist num of 'NUM_OF_REGIONS' individual regions.
DataStreamUtils.reinterpretAsKeyedStream(
env.addSource(new StringGeneratingSourceFunction(NUM_ELEMENTS, NUM_ELEMENTS / NUM_OF_RESTARTS))
+ .name(MULTI_REGION_SOURCE_NAME)
.setParallelism(NUM_OF_REGIONS),
(KeySelector<Tuple2<Integer, Integer>, Integer>) value -> value.f0,
TypeInformation.of(Integer.class))
@@ -174,7 +182,8 @@ public class RegionFailoverITCase extends TestLogger {
.setParallelism(NUM_OF_REGIONS);
// another stream graph totally disconnected with the above one.
- env.addSource(new StringGeneratingSourceFunction(NUM_ELEMENTS, NUM_ELEMENTS / NUM_OF_RESTARTS)).setParallelism(1)
+ env.addSource(new StringGeneratingSourceFunction(NUM_ELEMENTS, NUM_ELEMENTS / NUM_OF_RESTARTS)).
+ name(SINGLE_REGION_SOURCE_NAME).setParallelism(1)
.map((MapFunction<Tuple2<Integer, Integer>, Object>) value -> value).setParallelism(1);
return env.getStreamGraph().getJobGraph();
@@ -282,7 +291,11 @@ public class RegionFailoverITCase extends TestLogger {
unionListState = context.getOperatorStateStore().getUnionListState(unionStateDescriptor);
Set<Integer> actualIndices = StreamSupport.stream(unionListState.get().spliterator(), false).collect(Collectors.toSet());
- Assert.assertTrue(CollectionUtils.isEqualCollection(EXPECTED_INDICES, actualIndices));
+ if (getRuntimeContext().getTaskName().contains(SINGLE_REGION_SOURCE_NAME)) {
+ Assert.assertTrue(CollectionUtils.isEqualCollection(EXPECTED_INDICES_SINGLE_REGION, actualIndices));
+ } else {
+ Assert.assertTrue(CollectionUtils.isEqualCollection(EXPECTED_INDICES_MULTI_REGION, actualIndices));
+ }
if (indexOfThisSubtask == 0) {
listState = context.getOperatorStateStore().getListState(stateDescriptor);