You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/16 16:28:01 UTC
[flink] 12/13: [FLINK-17702][tests][refactor] Refactor test utils
to support different failover strategies.
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit bfef3db4bd95d03b4d551d99cd10aedde1c326fc
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu May 14 19:05:22 2020 +0200
[FLINK-17702][tests][refactor] Refactor test utils to support different failover strategies.
---
.../flink/runtime/scheduler/SchedulerBase.java | 9 +++
.../OperatorCoordinatorSchedulerTest.java | 64 ++++++++++++++--------
.../runtime/scheduler/SchedulerTestingUtils.java | 39 +++++++------
3 files changed, 72 insertions(+), 40 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 3d510fd..c2c911a8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -981,4 +981,13 @@ public abstract class SchedulerBase implements SchedulerNG {
}
return coordinatorMap;
}
+
+ // ------------------------------------------------------------------------
+ // access utils for testing
+ // ------------------------------------------------------------------------
+
+ @VisibleForTesting
+ JobID getJobId() {
+ return jobGraph.getJobID();
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index a3fb582..80ec9c9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorSer
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartAllFailoverStrategy;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -367,29 +368,21 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
return createSchedulerAndDeployTasks(new TestingOperatorCoordinator.Provider(testOperatorId));
}
+ private DefaultScheduler createSchedulerWithAllRestartOnFailureAndDeployTasks() throws Exception {
+ final DefaultScheduler scheduler = setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(testOperatorId), null, null, true);
+ scheduleAllTasksToRunning(scheduler);
+ return scheduler;
+ }
+
private DefaultScheduler createSchedulerAndDeployTasks(OperatorCoordinator.Provider provider) throws Exception {
final DefaultScheduler scheduler = setupTestJobAndScheduler(provider);
- scheduler.startScheduling();
- executor.triggerAll();
- executor.triggerScheduledTasks();
- SchedulerTestingUtils.setAllExecutionsToRunning(scheduler);
-
- // guard test assumptions: this brings tasks into RUNNING state
- assertEquals(ExecutionState.RUNNING, SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, 0));
-
+ scheduleAllTasksToRunning(scheduler);
return scheduler;
}
private DefaultScheduler createSchedulerAndDeployTasks(TaskExecutorOperatorEventGateway gateway) throws Exception {
- final DefaultScheduler scheduler = setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(testOperatorId), gateway, null);
- scheduler.startScheduling();
- executor.triggerAll();
- executor.triggerScheduledTasks();
- SchedulerTestingUtils.setAllExecutionsToRunning(scheduler);
-
- // guard test assumptions: this brings tasks into RUNNING state
- assertEquals(ExecutionState.RUNNING, SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, 0));
-
+ final DefaultScheduler scheduler = setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(testOperatorId), gateway, null, false);
+ scheduleAllTasksToRunning(scheduler);
return scheduler;
}
@@ -408,20 +401,22 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
final DefaultScheduler scheduler = setupTestJobAndScheduler(
new TestingOperatorCoordinator.Provider(testOperatorId),
null,
- savepointConfigurer);
+ savepointConfigurer,
+ false);
scheduler.startScheduling();
return scheduler;
}
private DefaultScheduler setupTestJobAndScheduler(OperatorCoordinator.Provider provider) throws Exception {
- return setupTestJobAndScheduler(provider, null, null);
+ return setupTestJobAndScheduler(provider, null, null, false);
}
private DefaultScheduler setupTestJobAndScheduler(
OperatorCoordinator.Provider provider,
@Nullable TaskExecutorOperatorEventGateway taskExecutorOperatorEventGateway,
- @Nullable Consumer<JobGraph> jobGraphPreProcessing) throws Exception {
+ @Nullable Consumer<JobGraph> jobGraphPreProcessing,
+ boolean restartAllOnFailover) throws Exception {
final OperatorIDPair opIds = OperatorIDPair.of(new OperatorID(), provider.getOperatorId());
final JobVertex vertex = new JobVertex("Vertex with OperatorCoordinator", testVertexId, Collections.singletonList(opIds));
@@ -435,15 +430,30 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
jobGraphPreProcessing.accept(jobGraph);
}
- final DefaultScheduler scheduler = taskExecutorOperatorEventGateway == null
- ? SchedulerTestingUtils.createScheduler(jobGraph, executor)
- : SchedulerTestingUtils.createScheduler(jobGraph, executor, taskExecutorOperatorEventGateway);
+ final SchedulerTestingUtils.DefaultSchedulerBuilder schedulerBuilder = taskExecutorOperatorEventGateway == null
+ ? SchedulerTestingUtils.createSchedulerBuilder(jobGraph, executor)
+ : SchedulerTestingUtils.createSchedulerBuilder(jobGraph, executor, taskExecutorOperatorEventGateway);
+ if (restartAllOnFailover) {
+ schedulerBuilder.setFailoverStrategyFactory(new RestartAllFailoverStrategy.Factory());
+ }
+
+ final DefaultScheduler scheduler = schedulerBuilder.build();
scheduler.setMainThreadExecutor(ComponentMainThreadExecutorServiceAdapter.forMainThread());
this.createdScheduler = scheduler;
return scheduler;
}
+ private void scheduleAllTasksToRunning(DefaultScheduler scheduler) {
+ scheduler.startScheduling();
+ executor.triggerAll();
+ executor.triggerScheduledTasks();
+ SchedulerTestingUtils.setAllExecutionsToRunning(scheduler);
+
+ // guard test assumptions: this brings tasks into RUNNING state
+ assertEquals(ExecutionState.RUNNING, SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, 0));
+ }
+
private TestingOperatorCoordinator getCoordinator(DefaultScheduler scheduler) {
final ExecutionJobVertex vertexWithCoordinator = getJobVertex(scheduler, testVertexId);
assertNotNull("vertex for coordinator not found", vertexWithCoordinator);
@@ -494,6 +504,14 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
assertEquals(ExecutionState.RUNNING, SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, 0));
}
+ private void cancelTask(DefaultScheduler scheduler, int subtask) {
+ SchedulerTestingUtils.canceledExecution(scheduler, testVertexId, subtask);
+ executor.triggerAll();
+
+ // guard the test assumptions: This must not lead to a restart, but must keep the task in FAILED state
+ assertEquals(ExecutionState.CANCELED, SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, subtask));
+ }
+
private CompletableFuture<CompletedCheckpoint> triggerCheckpoint(DefaultScheduler scheduler) throws Exception {
final CompletableFuture<CompletedCheckpoint> future = SchedulerTestingUtils.triggerCheckpoint(scheduler);
executor.triggerAll();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index 124e733..6f62067 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -142,14 +142,14 @@ public class SchedulerTestingUtils {
.build();
}
- public static DefaultScheduler createScheduler(
+ public static DefaultSchedulerBuilder createSchedulerBuilder(
JobGraph jobGraph,
ManuallyTriggeredScheduledExecutorService asyncExecutor) throws Exception {
return createScheduler(jobGraph, asyncExecutor, new SimpleAckingTaskManagerGateway());
}
- public static DefaultScheduler createScheduler(
+ public static DefaultSchedulerBuilder createSchedulerBuilder(
JobGraph jobGraph,
ManuallyTriggeredScheduledExecutorService asyncExecutor,
TaskExecutorOperatorEventGateway operatorEventGateway) throws Exception {
@@ -161,7 +161,7 @@ public class SchedulerTestingUtils {
return createScheduler(jobGraph, asyncExecutor, gateway);
}
- public static DefaultScheduler createScheduler(
+ public static DefaultSchedulerBuilder createScheduler(
JobGraph jobGraph,
ManuallyTriggeredScheduledExecutorService asyncExecutor,
TaskManagerGateway taskManagerGateway) throws Exception {
@@ -171,8 +171,7 @@ public class SchedulerTestingUtils {
.setDelayExecutor(asyncExecutor)
.setSchedulingStrategyFactory(new EagerSchedulingStrategy.Factory())
.setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, 0))
- .setExecutionSlotAllocatorFactory(new TestExecutionSlotAllocatorFactory(taskManagerGateway))
- .build();
+ .setExecutionSlotAllocatorFactory(new TestExecutionSlotAllocatorFactory(taskManagerGateway));
}
public static DefaultExecutionSlotAllocatorFactory createDefaultExecutionSlotAllocatorFactory(
@@ -240,32 +239,32 @@ public class SchedulerTestingUtils {
}
public static void failExecution(DefaultScheduler scheduler, JobVertexID jvid, int subtask) {
- final ExecutionJobVertex ejv = getJobVertex(scheduler, jvid);
- assert ejv != null;
- final ExecutionAttemptID attemptID = ejv.getTaskVertices()[subtask].getCurrentExecutionAttempt().getAttemptId();
+ final ExecutionAttemptID attemptID = getAttemptId(scheduler, jvid, subtask);
+ scheduler.updateTaskExecutionState(new TaskExecutionState(
+ scheduler.getJobId(), attemptID, ExecutionState.FAILED, new Exception("test task failure")));
+ }
+ public static void canceledExecution(DefaultScheduler scheduler, JobVertexID jvid, int subtask) {
+ final ExecutionAttemptID attemptID = getAttemptId(scheduler, jvid, subtask);
scheduler.updateTaskExecutionState(new TaskExecutionState(
- ejv.getJobId(), attemptID, ExecutionState.FAILED, new Exception("test task failure")));
+ scheduler.getJobId(), attemptID, ExecutionState.CANCELED, new Exception("test task failure")));
}
public static void setExecutionToRunning(DefaultScheduler scheduler, JobVertexID jvid, int subtask) {
- final ExecutionJobVertex ejv = getJobVertex(scheduler, jvid);
- assert ejv != null;
- final ExecutionAttemptID attemptID = ejv.getTaskVertices()[subtask].getCurrentExecutionAttempt().getAttemptId();
-
+ final ExecutionAttemptID attemptID = getAttemptId(scheduler, jvid, subtask);
scheduler.updateTaskExecutionState(new TaskExecutionState(
- ejv.getJobId(), attemptID, ExecutionState.RUNNING));
+ scheduler.getJobId(), attemptID, ExecutionState.RUNNING));
}
public static void setAllExecutionsToRunning(final DefaultScheduler scheduler) {
- final JobID jid = scheduler.requestJob().getJobID();
+ final JobID jid = scheduler.getJobId();
getAllCurrentExecutionAttempts(scheduler).forEach(
(attemptId) -> scheduler.updateTaskExecutionState(new TaskExecutionState(jid, attemptId, ExecutionState.RUNNING))
);
}
public static void setAllExecutionsToCancelled(final DefaultScheduler scheduler) {
- final JobID jid = scheduler.requestJob().getJobID();
+ final JobID jid = scheduler.getJobId();
getAllCurrentExecutionAttempts(scheduler).forEach(
(attemptId) -> scheduler.updateTaskExecutionState(new TaskExecutionState(jid, attemptId, ExecutionState.CANCELED))
);
@@ -273,7 +272,7 @@ public class SchedulerTestingUtils {
public static void acknowledgePendingCheckpoint(final DefaultScheduler scheduler, final long checkpointId) throws CheckpointException {
final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler);
- final JobID jid = scheduler.requestJob().getJobID();
+ final JobID jid = scheduler.getJobId();
for (ExecutionAttemptID attemptId : getAllCurrentExecutionAttempts(scheduler)) {
final AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(jid, attemptId, checkpointId);
@@ -333,6 +332,12 @@ public class SchedulerTestingUtils {
return scheduler.getExecutionVertex(id).getJobVertex();
}
+ public static ExecutionAttemptID getAttemptId(DefaultScheduler scheduler, JobVertexID jvid, int subtask) {
+ final ExecutionJobVertex ejv = getJobVertex(scheduler, jvid);
+ assert ejv != null;
+ return ejv.getTaskVertices()[subtask].getCurrentExecutionAttempt().getAttemptId();
+ }
+
// ------------------------------------------------------------------------
private static final class TaskExecutorOperatorEventGatewayAdapter extends SimpleAckingTaskManagerGateway {