You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/07/05 08:19:54 UTC
[flink] branch master updated: [FLINK-13060][coordination] Respect
restart constraints in new RegionFailover strategy
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new fd85207 [FLINK-13060][coordination] Respect restart constraints in new RegionFailover strategy
fd85207 is described below
commit fd85207c946683f33b5f5f5d0d644c2ba2ccb381
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri Jul 5 10:19:36 2019 +0200
[FLINK-13060][coordination] Respect restart constraints in new RegionFailover strategy
---
.../AdaptedRestartPipelinedRegionStrategyNG.java | 10 ++++++
...inedRegionStrategyNGConcurrentFailoverTest.java | 2 ++
...startPipelinedRegionStrategyNGFailoverTest.java | 38 ++++++++++++++++++++--
3 files changed, 48 insertions(+), 2 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
index 5fddac3..3eef502 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
@@ -31,6 +31,8 @@ import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
import org.apache.flink.runtime.executiongraph.SchedulingUtils;
import org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
@@ -112,6 +114,14 @@ public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy {
}
private Runnable resetAndRescheduleTasks(final long globalModVersion, final Set<ExecutionVertexVersion> vertexVersions) {
+ final RestartStrategy restartStrategy = executionGraph.getRestartStrategy();
+ return () -> restartStrategy.restart(
+ createResetAndRescheduleTasksCallback(globalModVersion, vertexVersions),
+ executionGraph.getJobMasterMainThreadExecutor()
+ );
+ }
+
+ private RestartCallback createResetAndRescheduleTasksCallback(final long globalModVersion, final Set<ExecutionVertexVersion> vertexVersions) {
return () -> {
if (!isLocalFailoverValid(globalModVersion)) {
LOG.info("Skip current region failover as a global failover is ongoing.");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
index 20fb083..f5461f0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
@@ -126,6 +126,8 @@ public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest exten
// complete region failover blocker to trigger region failover recovery
failoverStrategy.getBlockerFuture().complete(null);
manualMainThreadExecutor.triggerAll();
+ manuallyTriggeredRestartStrategy.triggerAll();
+ manualMainThreadExecutor.triggerAll();
// verify that all tasks are recovered and no task is restarted more than once
assertEquals(ExecutionState.DEPLOYING, ev11.getExecutionState());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
index c4c1c32..ce7bf79 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
@@ -28,8 +28,9 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
-import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
@@ -117,6 +118,7 @@ public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLog
// vertices { ev11, ev21 } should be affected
ev11.getCurrentExecutionAttempt().fail(new Exception("Test Exception"));
manualMainThreadExecutor.triggerAll();
+ manualMainThreadExecutor.triggerScheduledTasks();
// verify vertex states and complete cancellation
assertVertexInState(ExecutionState.FAILED, ev11);
@@ -125,6 +127,7 @@ public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLog
assertVertexInState(ExecutionState.DEPLOYING, ev22);
ev21.getCurrentExecutionAttempt().completeCancelling();
manualMainThreadExecutor.triggerAll();
+ manualMainThreadExecutor.triggerScheduledTasks();
// verify vertex states
// in eager mode, all affected vertices should be scheduled in failover
@@ -168,6 +171,7 @@ public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLog
// regions {ev11}, {ev21}, {ev22} should be affected
ev11.getCurrentExecutionAttempt().fail(new Exception("Test Exception"));
manualMainThreadExecutor.triggerAll();
+ manualMainThreadExecutor.triggerScheduledTasks();
// verify vertex states
// only vertices with consumable inputs can be scheduled
@@ -249,6 +253,35 @@ public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLog
assertEquals(JobStatus.FAILED, eg.getState());
}
+ /**
+ * Tests that the execution of the restart logic of the failover strategy is dependent on the restart strategy
+ * calling {@link RestartCallback#triggerFullRecovery()}.
+ */
+ @Test
+ public void testFailoverExecutionDependentOnRestartStrategyRecoveryTrigger() throws Exception {
+ final JobGraph jobGraph = createBatchJobGraph();
+ final TestRestartStrategy restartStrategy = new TestRestartStrategy();
+
+ final ExecutionGraph eg = createExecutionGraph(jobGraph, restartStrategy);
+
+ final ExecutionVertex ev = eg.getAllExecutionVertices().iterator().next();
+
+ ev.fail(new Exception("Test Exception"));
+
+ manualMainThreadExecutor.triggerAll();
+
+ // the entire failover-procedure is being halted by the restart strategy not doing anything
+ // the only thing the failover strategy should do is cancel tasks that require it
+
+ // sanity check to ensure we actually called into the restart strategy
+ assertEquals(restartStrategy.getNumberOfQueuedActions(), 1);
+ // 3 out of 4 tasks will be canceled, and removed from the set of registered executions
+ assertEquals(eg.getRegisteredExecutions().size(), 1);
+ // no job state change should occur; in case of a failover we never switch to RESTARTING/CANCELED
+ // the important thing is that we don't switch to failed which would imply that we started a global failover
+ assertEquals(JobStatus.RUNNING, eg.getState());
+ }
+
@Test
public void testFailGlobalIfErrorOnRestartingTasks() throws Exception {
final JobGraph jobGraph = createStreamingJobGraph();
@@ -267,6 +300,7 @@ public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLog
completeCancelling(ev11, ev12, ev21, ev22);
manualMainThreadExecutor.triggerAll();
+ manualMainThreadExecutor.triggerScheduledTasks();
final long globalModVersionAfterFailure = eg.getGlobalModVersion();
@@ -340,7 +374,7 @@ public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLog
}
private ExecutionGraph createExecutionGraph(final JobGraph jobGraph) throws Exception {
- return createExecutionGraph(jobGraph, new InfiniteDelayRestartStrategy(10));
+ return createExecutionGraph(jobGraph, new FixedDelayRestartStrategy(10, 0));
}
private ExecutionGraph createExecutionGraph(