You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2021/02/26 13:31:15 UTC
[flink] 03/05: [hotfix][test] Adds unit test for local and global
failure happened concurrently
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1e99f0ad2cbd1c1bc69b11081d3ec47155dae39f
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Wed Feb 24 13:53:53 2021 +0100
[hotfix][test] Adds unit test for local and global failure happened concurrently
This test should verify that the scheduler works as expected when a local
failure succeeds a global fail-over operation before the restarting happened.
---
.../runtime/scheduler/DefaultSchedulerTest.java | 58 ++++++++++++++++++++++
1 file changed, 58 insertions(+)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index 699bb71..7462412 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecution;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -88,6 +89,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
import static org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory.useSameServicesForAllJobs;
import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.acknowledgePendingCheckpoint;
@@ -588,6 +591,61 @@ public class DefaultSchedulerTest extends TestLogger {
assertThat(deployedExecutionVertices, contains(executionVertexId, executionVertexId));
}
+ /**
+ * This test covers the use-case where a global fail-over is followed by a local task failure.
+ * It verifies (besides checking the expected deployments) that the assert in the global
+ * recovery handling of {@link SchedulerBase#restoreState} is not triggered due to version
+ * updates.
+ */
+ @Test
+ public void handleGlobalFailureWithLocalFailure() {
+ final JobGraph jobGraph = singleJobVertexJobGraph(2);
+ final JobVertex onlyJobVertex = getOnlyJobVertex(jobGraph);
+ enableCheckpointing(jobGraph);
+
+ final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
+
+ final List<ExecutionAttemptID> attemptIds =
+ StreamSupport.stream(
+ scheduler
+ .requestJob()
+ .getArchivedExecutionGraph()
+ .getAllExecutionVertices()
+ .spliterator(),
+ false)
+ .map(ArchivedExecutionVertex::getCurrentExecutionAttempt)
+ .map(ArchivedExecution::getAttemptId)
+ .collect(Collectors.toList());
+ final ExecutionAttemptID localFailureAttemptId = attemptIds.get(0);
+ scheduler.handleGlobalFailure(new Exception("global failure"));
+ // the local failure shouldn't affect the global fail-over
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ localFailureAttemptId,
+ ExecutionState.FAILED,
+ new Exception("local failure")));
+
+ for (ExecutionAttemptID attemptId : attemptIds) {
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(attemptId, ExecutionState.CANCELED));
+ }
+
+ taskRestartExecutor.triggerScheduledTasks();
+
+ final ExecutionVertexID executionVertexId0 =
+ new ExecutionVertexID(onlyJobVertex.getID(), 0);
+ final ExecutionVertexID executionVertexId1 =
+ new ExecutionVertexID(onlyJobVertex.getID(), 1);
+ assertThat(
+ "The execution vertices should be deployed in a specific order reflecting the scheduling start and the global fail-over afterwards.",
+ testExecutionVertexOperations.getDeployedVertices(),
+ contains(
+ executionVertexId0,
+ executionVertexId1,
+ executionVertexId0,
+ executionVertexId1));
+ }
+
@Test
public void vertexIsNotAffectedByOutdatedDeployment() {
final JobGraph jobGraph = singleJobVertexJobGraph(2);