You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/07/05 08:19:54 UTC

[flink] branch master updated: [FLINK-13060][coordination] Respect restart constraints in new RegionFailover strategy

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new fd85207  [FLINK-13060][coordination] Respect restart constraints in new RegionFailover strategy
fd85207 is described below

commit fd85207c946683f33b5f5f5d0d644c2ba2ccb381
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri Jul 5 10:19:36 2019 +0200

    [FLINK-13060][coordination] Respect restart constraints in new RegionFailover strategy
---
 .../AdaptedRestartPipelinedRegionStrategyNG.java   | 10 ++++++
 ...inedRegionStrategyNGConcurrentFailoverTest.java |  2 ++
 ...startPipelinedRegionStrategyNGFailoverTest.java | 38 ++++++++++++++++++++--
 3 files changed, 48 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
index 5fddac3..3eef502 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
@@ -31,6 +31,8 @@ import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
 import org.apache.flink.runtime.executiongraph.SchedulingUtils;
 import org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology;
 import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
@@ -112,6 +114,14 @@ public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy {
 	}
 
 	private Runnable resetAndRescheduleTasks(final long globalModVersion, final Set<ExecutionVertexVersion> vertexVersions) {
+		final RestartStrategy restartStrategy = executionGraph.getRestartStrategy();
+		return () -> restartStrategy.restart(
+			createResetAndRescheduleTasksCallback(globalModVersion, vertexVersions),
+			executionGraph.getJobMasterMainThreadExecutor()
+		);
+	}
+
+	private RestartCallback createResetAndRescheduleTasksCallback(final long globalModVersion, final Set<ExecutionVertexVersion> vertexVersions) {
 		return () -> {
 			if (!isLocalFailoverValid(globalModVersion)) {
 				LOG.info("Skip current region failover as a global failover is ongoing.");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
index 20fb083..f5461f0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
@@ -126,6 +126,8 @@ public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest exten
 		// complete region failover blocker to trigger region failover recovery
 		failoverStrategy.getBlockerFuture().complete(null);
 		manualMainThreadExecutor.triggerAll();
+		manuallyTriggeredRestartStrategy.triggerAll();
+		manualMainThreadExecutor.triggerAll();
 
 		// verify that all tasks are recovered and no task is restarted more than once
 		assertEquals(ExecutionState.DEPLOYING, ev11.getExecutionState());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
index c4c1c32..ce7bf79 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
@@ -28,8 +28,9 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
-import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
@@ -117,6 +118,7 @@ public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLog
 		// vertices { ev11, ev21 } should be affected
 		ev11.getCurrentExecutionAttempt().fail(new Exception("Test Exception"));
 		manualMainThreadExecutor.triggerAll();
+		manualMainThreadExecutor.triggerScheduledTasks();
 
 		// verify vertex states and complete cancellation
 		assertVertexInState(ExecutionState.FAILED, ev11);
@@ -125,6 +127,7 @@ public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLog
 		assertVertexInState(ExecutionState.DEPLOYING, ev22);
 		ev21.getCurrentExecutionAttempt().completeCancelling();
 		manualMainThreadExecutor.triggerAll();
+		manualMainThreadExecutor.triggerScheduledTasks();
 
 		// verify vertex states
 		// in eager mode, all affected vertices should be scheduled in failover
@@ -168,6 +171,7 @@ public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLog
 		// regions {ev11}, {ev21}, {ev22} should be affected
 		ev11.getCurrentExecutionAttempt().fail(new Exception("Test Exception"));
 		manualMainThreadExecutor.triggerAll();
+		manualMainThreadExecutor.triggerScheduledTasks();
 
 		// verify vertex states
 		// only vertices with consumable inputs can be scheduled
@@ -249,6 +253,35 @@ public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLog
 		assertEquals(JobStatus.FAILED, eg.getState());
 	}
 
+	/**
+	 * Tests that the execution of the restart logic of the failover strategy is dependent on the restart strategy
+	 * calling {@link RestartCallback#triggerFullRecovery()}.
+	 */
+	@Test
+	public void testFailoverExecutionDependentOnRestartStrategyRecoveryTrigger() throws Exception {
+		final JobGraph jobGraph = createBatchJobGraph();
+		final TestRestartStrategy restartStrategy = new TestRestartStrategy();
+
+		final ExecutionGraph eg = createExecutionGraph(jobGraph, restartStrategy);
+
+		final ExecutionVertex ev = eg.getAllExecutionVertices().iterator().next();
+
+		ev.fail(new Exception("Test Exception"));
+
+		manualMainThreadExecutor.triggerAll();
+
+		// the entire failover-procedure is being halted by the restart strategy not doing anything
+		// the only thing the failover strategy should do is cancel tasks that require it
+
+		// sanity check to ensure we actually called into the restart strategy
+		assertEquals(restartStrategy.getNumberOfQueuedActions(), 1);
+		// 3 out of 4 tasks will be canceled, and removed from the set of registered executions
+		assertEquals(eg.getRegisteredExecutions().size(), 1);
+		// no job state change should occur; in case of a failover we never switch to RESTARTING/CANCELED
+		// the important thing is that we don't switch to failed which would imply that we started a global failover
+		assertEquals(JobStatus.RUNNING, eg.getState());
+	}
+
 	@Test
 	public void testFailGlobalIfErrorOnRestartingTasks() throws Exception {
 		final JobGraph jobGraph = createStreamingJobGraph();
@@ -267,6 +300,7 @@ public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLog
 		completeCancelling(ev11, ev12, ev21, ev22);
 
 		manualMainThreadExecutor.triggerAll();
+		manualMainThreadExecutor.triggerScheduledTasks();
 
 		final long globalModVersionAfterFailure = eg.getGlobalModVersion();
 
@@ -340,7 +374,7 @@ public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLog
 	}
 
 	private ExecutionGraph createExecutionGraph(final JobGraph jobGraph) throws Exception {
-		return createExecutionGraph(jobGraph, new InfiniteDelayRestartStrategy(10));
+		return createExecutionGraph(jobGraph, new FixedDelayRestartStrategy(10, 0));
 	}
 
 	private ExecutionGraph createExecutionGraph(