You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2021/02/26 13:31:15 UTC

[flink] 03/05: [hotfix][test] Adds unit test for local and global failure happened concurrently

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1e99f0ad2cbd1c1bc69b11081d3ec47155dae39f
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Wed Feb 24 13:53:53 2021 +0100

    [hotfix][test] Adds unit test for local and global failure happened concurrently
    
    This test should verify that the scheduler works as expected when a local
    failure succeeds a global fail-over operation before the restarting happened.
---
 .../runtime/scheduler/DefaultSchedulerTest.java    | 58 ++++++++++++++++++++++
 1 file changed, 58 insertions(+)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index 699bb71..7462412 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecution;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -88,6 +89,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 import static org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory.useSameServicesForAllJobs;
 import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.acknowledgePendingCheckpoint;
@@ -588,6 +591,61 @@ public class DefaultSchedulerTest extends TestLogger {
         assertThat(deployedExecutionVertices, contains(executionVertexId, executionVertexId));
     }
 
+    /**
+     * This test covers the use-case where a global fail-over is followed by a local task failure.
+     * It verifies (besides checking the expected deployments) that the assert in the global
+     * recovery handling of {@link SchedulerBase#restoreState} is not triggered due to version
+     * updates.
+     */
+    @Test
+    public void handleGlobalFailureWithLocalFailure() {
+        final JobGraph jobGraph = singleJobVertexJobGraph(2);
+        final JobVertex onlyJobVertex = getOnlyJobVertex(jobGraph);
+        enableCheckpointing(jobGraph);
+
+        final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
+
+        final List<ExecutionAttemptID> attemptIds =
+                StreamSupport.stream(
+                                scheduler
+                                        .requestJob()
+                                        .getArchivedExecutionGraph()
+                                        .getAllExecutionVertices()
+                                        .spliterator(),
+                                false)
+                        .map(ArchivedExecutionVertex::getCurrentExecutionAttempt)
+                        .map(ArchivedExecution::getAttemptId)
+                        .collect(Collectors.toList());
+        final ExecutionAttemptID localFailureAttemptId = attemptIds.get(0);
+        scheduler.handleGlobalFailure(new Exception("global failure"));
+        // the local failure shouldn't affect the global fail-over
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        localFailureAttemptId,
+                        ExecutionState.FAILED,
+                        new Exception("local failure")));
+
+        for (ExecutionAttemptID attemptId : attemptIds) {
+            scheduler.updateTaskExecutionState(
+                    new TaskExecutionState(attemptId, ExecutionState.CANCELED));
+        }
+
+        taskRestartExecutor.triggerScheduledTasks();
+
+        final ExecutionVertexID executionVertexId0 =
+                new ExecutionVertexID(onlyJobVertex.getID(), 0);
+        final ExecutionVertexID executionVertexId1 =
+                new ExecutionVertexID(onlyJobVertex.getID(), 1);
+        assertThat(
+                "The execution vertices should be deployed in a specific order reflecting the scheduling start and the global fail-over afterwards.",
+                testExecutionVertexOperations.getDeployedVertices(),
+                contains(
+                        executionVertexId0,
+                        executionVertexId1,
+                        executionVertexId0,
+                        executionVertexId1));
+    }
+
     @Test
     public void vertexIsNotAffectedByOutdatedDeployment() {
         final JobGraph jobGraph = singleJobVertexJobGraph(2);