You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/16 16:28:01 UTC

[flink] 12/13: [FLINK-17702][tests][refactor] Refactor test utils to support different failover strategies.

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bfef3db4bd95d03b4d551d99cd10aedde1c326fc
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu May 14 19:05:22 2020 +0200

    [FLINK-17702][tests][refactor] Refactor test utils to support different failover strategies.
---
 .../flink/runtime/scheduler/SchedulerBase.java     |  9 +++
 .../OperatorCoordinatorSchedulerTest.java          | 64 ++++++++++++++--------
 .../runtime/scheduler/SchedulerTestingUtils.java   | 39 +++++++------
 3 files changed, 72 insertions(+), 40 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 3d510fd..c2c911a8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -981,4 +981,13 @@ public abstract class SchedulerBase implements SchedulerNG {
 		}
 		return coordinatorMap;
 	}
+
+	// ------------------------------------------------------------------------
+	//  access utils for testing
+	// ------------------------------------------------------------------------
+
+	@VisibleForTesting
+	JobID getJobId() {
+		return jobGraph.getJobID();
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index a3fb582..80ec9c9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorSer
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartAllFailoverStrategy;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -367,29 +368,21 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 		return createSchedulerAndDeployTasks(new TestingOperatorCoordinator.Provider(testOperatorId));
 	}
 
+	private DefaultScheduler createSchedulerWithAllRestartOnFailureAndDeployTasks() throws Exception {
+		final DefaultScheduler scheduler = setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(testOperatorId), null, null, true);
+		scheduleAllTasksToRunning(scheduler);
+		return scheduler;
+	}
+
 	private DefaultScheduler createSchedulerAndDeployTasks(OperatorCoordinator.Provider provider) throws Exception {
 		final DefaultScheduler scheduler = setupTestJobAndScheduler(provider);
-		scheduler.startScheduling();
-		executor.triggerAll();
-		executor.triggerScheduledTasks();
-		SchedulerTestingUtils.setAllExecutionsToRunning(scheduler);
-
-		// guard test assumptions: this brings tasks into RUNNING state
-		assertEquals(ExecutionState.RUNNING, SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, 0));
-
+		scheduleAllTasksToRunning(scheduler);
 		return scheduler;
 	}
 
 	private DefaultScheduler createSchedulerAndDeployTasks(TaskExecutorOperatorEventGateway gateway) throws Exception {
-		final DefaultScheduler scheduler = setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(testOperatorId), gateway, null);
-		scheduler.startScheduling();
-		executor.triggerAll();
-		executor.triggerScheduledTasks();
-		SchedulerTestingUtils.setAllExecutionsToRunning(scheduler);
-
-		// guard test assumptions: this brings tasks into RUNNING state
-		assertEquals(ExecutionState.RUNNING, SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, 0));
-
+		final DefaultScheduler scheduler = setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(testOperatorId), gateway, null, false);
+		scheduleAllTasksToRunning(scheduler);
 		return scheduler;
 	}
 
@@ -408,20 +401,22 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 		final DefaultScheduler scheduler = setupTestJobAndScheduler(
 				new TestingOperatorCoordinator.Provider(testOperatorId),
 				null,
-				savepointConfigurer);
+				savepointConfigurer,
+				false);
 
 		scheduler.startScheduling();
 		return scheduler;
 	}
 
 	private DefaultScheduler setupTestJobAndScheduler(OperatorCoordinator.Provider provider) throws Exception {
-		return setupTestJobAndScheduler(provider, null, null);
+		return setupTestJobAndScheduler(provider, null, null, false);
 	}
 
 	private DefaultScheduler setupTestJobAndScheduler(
 			OperatorCoordinator.Provider provider,
 			@Nullable TaskExecutorOperatorEventGateway taskExecutorOperatorEventGateway,
-			@Nullable Consumer<JobGraph> jobGraphPreProcessing) throws Exception {
+			@Nullable Consumer<JobGraph> jobGraphPreProcessing,
+			boolean restartAllOnFailover) throws Exception {
 
 		final OperatorIDPair opIds = OperatorIDPair.of(new OperatorID(), provider.getOperatorId());
 		final JobVertex vertex = new JobVertex("Vertex with OperatorCoordinator", testVertexId, Collections.singletonList(opIds));
@@ -435,15 +430,30 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 			jobGraphPreProcessing.accept(jobGraph);
 		}
 
-		final DefaultScheduler scheduler = taskExecutorOperatorEventGateway == null
-				? SchedulerTestingUtils.createScheduler(jobGraph, executor)
-				: SchedulerTestingUtils.createScheduler(jobGraph, executor, taskExecutorOperatorEventGateway);
+		final SchedulerTestingUtils.DefaultSchedulerBuilder schedulerBuilder = taskExecutorOperatorEventGateway == null
+				? SchedulerTestingUtils.createSchedulerBuilder(jobGraph, executor)
+				: SchedulerTestingUtils.createSchedulerBuilder(jobGraph, executor, taskExecutorOperatorEventGateway);
+		if (restartAllOnFailover) {
+			schedulerBuilder.setFailoverStrategyFactory(new RestartAllFailoverStrategy.Factory());
+		}
+
+		final DefaultScheduler scheduler = schedulerBuilder.build();
 		scheduler.setMainThreadExecutor(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
 		this.createdScheduler = scheduler;
 		return scheduler;
 	}
 
+	private void scheduleAllTasksToRunning(DefaultScheduler scheduler) {
+		scheduler.startScheduling();
+		executor.triggerAll();
+		executor.triggerScheduledTasks();
+		SchedulerTestingUtils.setAllExecutionsToRunning(scheduler);
+
+		// guard test assumptions: this brings tasks into RUNNING state
+		assertEquals(ExecutionState.RUNNING, SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, 0));
+	}
+
 	private TestingOperatorCoordinator getCoordinator(DefaultScheduler scheduler) {
 		final ExecutionJobVertex vertexWithCoordinator = getJobVertex(scheduler, testVertexId);
 		assertNotNull("vertex for coordinator not found", vertexWithCoordinator);
@@ -494,6 +504,14 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 		assertEquals(ExecutionState.RUNNING, SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, 0));
 	}
 
+	private void cancelTask(DefaultScheduler scheduler, int subtask) {
+		SchedulerTestingUtils.canceledExecution(scheduler, testVertexId, subtask);
+		executor.triggerAll();
+
+		// guard the test assumptions: This must not lead to a restart, but must keep the task in FAILED state
+		assertEquals(ExecutionState.CANCELED, SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, subtask));
+	}
+
 	private CompletableFuture<CompletedCheckpoint> triggerCheckpoint(DefaultScheduler scheduler) throws Exception {
 		final CompletableFuture<CompletedCheckpoint> future = SchedulerTestingUtils.triggerCheckpoint(scheduler);
 		executor.triggerAll();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index 124e733..6f62067 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -142,14 +142,14 @@ public class SchedulerTestingUtils {
 			.build();
 	}
 
-	public static DefaultScheduler createScheduler(
+	public static DefaultSchedulerBuilder createSchedulerBuilder(
 			JobGraph jobGraph,
 			ManuallyTriggeredScheduledExecutorService asyncExecutor) throws Exception {
 
 		return createScheduler(jobGraph, asyncExecutor, new SimpleAckingTaskManagerGateway());
 	}
 
-	public static DefaultScheduler createScheduler(
+	public static DefaultSchedulerBuilder createSchedulerBuilder(
 			JobGraph jobGraph,
 			ManuallyTriggeredScheduledExecutorService asyncExecutor,
 			TaskExecutorOperatorEventGateway operatorEventGateway) throws Exception {
@@ -161,7 +161,7 @@ public class SchedulerTestingUtils {
 		return createScheduler(jobGraph, asyncExecutor, gateway);
 	}
 
-	public static DefaultScheduler createScheduler(
+	public static DefaultSchedulerBuilder createScheduler(
 			JobGraph jobGraph,
 			ManuallyTriggeredScheduledExecutorService asyncExecutor,
 			TaskManagerGateway taskManagerGateway) throws Exception {
@@ -171,8 +171,7 @@ public class SchedulerTestingUtils {
 			.setDelayExecutor(asyncExecutor)
 			.setSchedulingStrategyFactory(new EagerSchedulingStrategy.Factory())
 			.setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, 0))
-			.setExecutionSlotAllocatorFactory(new TestExecutionSlotAllocatorFactory(taskManagerGateway))
-			.build();
+			.setExecutionSlotAllocatorFactory(new TestExecutionSlotAllocatorFactory(taskManagerGateway));
 	}
 
 	public static DefaultExecutionSlotAllocatorFactory createDefaultExecutionSlotAllocatorFactory(
@@ -240,32 +239,32 @@ public class SchedulerTestingUtils {
 	}
 
 	public static void failExecution(DefaultScheduler scheduler, JobVertexID jvid, int subtask) {
-		final ExecutionJobVertex ejv = getJobVertex(scheduler, jvid);
-		assert ejv != null;
-		final ExecutionAttemptID attemptID = ejv.getTaskVertices()[subtask].getCurrentExecutionAttempt().getAttemptId();
+		final ExecutionAttemptID attemptID = getAttemptId(scheduler, jvid, subtask);
+		scheduler.updateTaskExecutionState(new TaskExecutionState(
+			scheduler.getJobId(), attemptID, ExecutionState.FAILED, new Exception("test task failure")));
+	}
 
+	public static void canceledExecution(DefaultScheduler scheduler, JobVertexID jvid, int subtask) {
+		final ExecutionAttemptID attemptID = getAttemptId(scheduler, jvid, subtask);
 		scheduler.updateTaskExecutionState(new TaskExecutionState(
-			ejv.getJobId(), attemptID, ExecutionState.FAILED, new Exception("test task failure")));
+			scheduler.getJobId(), attemptID, ExecutionState.CANCELED, new Exception("test task failure")));
 	}
 
 	public static void setExecutionToRunning(DefaultScheduler scheduler, JobVertexID jvid, int subtask) {
-		final ExecutionJobVertex ejv = getJobVertex(scheduler, jvid);
-		assert ejv != null;
-		final ExecutionAttemptID attemptID = ejv.getTaskVertices()[subtask].getCurrentExecutionAttempt().getAttemptId();
-
+		final ExecutionAttemptID attemptID = getAttemptId(scheduler, jvid, subtask);
 		scheduler.updateTaskExecutionState(new TaskExecutionState(
-			ejv.getJobId(), attemptID, ExecutionState.RUNNING));
+			scheduler.getJobId(), attemptID, ExecutionState.RUNNING));
 	}
 
 	public static void setAllExecutionsToRunning(final DefaultScheduler scheduler) {
-		final JobID jid = scheduler.requestJob().getJobID();
+		final JobID jid = scheduler.getJobId();
 		getAllCurrentExecutionAttempts(scheduler).forEach(
 			(attemptId) -> scheduler.updateTaskExecutionState(new TaskExecutionState(jid, attemptId, ExecutionState.RUNNING))
 		);
 	}
 
 	public static void setAllExecutionsToCancelled(final DefaultScheduler scheduler) {
-		final JobID jid = scheduler.requestJob().getJobID();
+		final JobID jid = scheduler.getJobId();
 		getAllCurrentExecutionAttempts(scheduler).forEach(
 			(attemptId) -> scheduler.updateTaskExecutionState(new TaskExecutionState(jid, attemptId, ExecutionState.CANCELED))
 		);
@@ -273,7 +272,7 @@ public class SchedulerTestingUtils {
 
 	public static void acknowledgePendingCheckpoint(final DefaultScheduler scheduler, final long checkpointId) throws CheckpointException {
 		final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler);
-		final JobID jid = scheduler.requestJob().getJobID();
+		final JobID jid = scheduler.getJobId();
 
 		for (ExecutionAttemptID attemptId : getAllCurrentExecutionAttempts(scheduler)) {
 			final AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(jid, attemptId, checkpointId);
@@ -333,6 +332,12 @@ public class SchedulerTestingUtils {
 		return scheduler.getExecutionVertex(id).getJobVertex();
 	}
 
+	public static ExecutionAttemptID getAttemptId(DefaultScheduler scheduler, JobVertexID jvid, int subtask) {
+		final ExecutionJobVertex ejv = getJobVertex(scheduler, jvid);
+		assert ejv != null;
+		return ejv.getTaskVertices()[subtask].getCurrentExecutionAttempt().getAttemptId();
+	}
+
 	// ------------------------------------------------------------------------
 
 	private static final class TaskExecutorOperatorEventGatewayAdapter extends SimpleAckingTaskManagerGateway {