You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2022/07/20 08:27:46 UTC

[flink] branch master updated (dbe4f473ba9 -> 3278995372d)

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from dbe4f473ba9 [hotfic][doc] Fix JavaDoc of chapter ETL. This closes #20077
     new 94ece419632 [hotfix][runtime] Strengthen SpeculativeScheduler and its tests
     new 3278995372d [FLINK-28612][runtime] SpeculativeScheduler cancels pending slot allocation after canceling pending executions

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../adaptivebatch/SpeculativeScheduler.java        | 40 +++++++++++++++++-----
 .../adaptivebatch/SpeculativeSchedulerTest.java    | 37 ++++++++++++++++++--
 2 files changed, 66 insertions(+), 11 deletions(-)


[flink] 02/02: [FLINK-28612][runtime] SpeculativeScheduler cancels pending slot allocation after canceling pending executions

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3278995372d1ea27b6fd86806e9a860a644694c7
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Fri Jul 15 16:06:13 2022 +0800

    [FLINK-28612][runtime] SpeculativeScheduler cancels pending slot allocation after canceling pending executions
    
    This closes #20312.
---
 .../adaptivebatch/SpeculativeScheduler.java        | 30 ++++++++++++++++------
 .../adaptivebatch/SpeculativeSchedulerTest.java    | 27 ++++++++++++++++++-
 2 files changed, 48 insertions(+), 9 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
index 6d9302c1226..b1482caec55 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
@@ -178,15 +178,29 @@ public class SpeculativeScheduler extends AdaptiveBatchScheduler
 
     private CompletableFuture<?> cancelPendingExecutions(
             final ExecutionVertexID executionVertexId) {
-        // cancel all the related pending requests to avoid that slots returned by the canceled
-        // vertices are used to fulfill these pending requests
-        // do not cancel the FINISHED execution
-        cancelAllPendingSlotRequestsForVertex(executionVertexId);
-        return FutureUtils.combineAll(
+        final List<Execution> pendingExecutions =
                 getExecutionVertex(executionVertexId).getCurrentExecutions().stream()
-                        .filter(e -> e.getState() != ExecutionState.FINISHED)
-                        .map(this::cancelExecution)
-                        .collect(Collectors.toList()));
+                        .filter(
+                                e ->
+                                        !e.getState().isTerminal()
+                                                && e.getState() != ExecutionState.CANCELING)
+                        .collect(Collectors.toList());
+        if (pendingExecutions.isEmpty()) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        log.info(
+                "Canceling {} un-finished executions of {} because one of its executions has finished.",
+                pendingExecutions.size(),
+                executionVertexId);
+
+        final CompletableFuture<?> future =
+                FutureUtils.combineAll(
+                        pendingExecutions.stream()
+                                .map(this::cancelExecution)
+                                .collect(Collectors.toList()));
+        cancelAllPendingSlotRequestsForVertex(executionVertexId);
+        return future;
     }
 
     @Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java
index ef78ad085f7..29d73161153 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java
@@ -45,6 +45,8 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.scheduler.DefaultExecutionOperations;
 import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.TestExecutionOperationsDecorator;
+import org.apache.flink.runtime.scheduler.TestExecutionSlotAllocator;
+import org.apache.flink.runtime.scheduler.TestExecutionSlotAllocatorFactory;
 import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
@@ -90,6 +92,8 @@ class SpeculativeSchedulerTest {
     private TestExecutionOperationsDecorator testExecutionOperations;
     private TestBlocklistOperations testBlocklistOperations;
     private TestRestartBackoffTimeStrategy restartStrategy;
+    private TestExecutionSlotAllocatorFactory testExecutionSlotAllocatorFactory;
+    private TestExecutionSlotAllocator testExecutionSlotAllocator;
 
     @BeforeEach
     void setUp() {
@@ -100,6 +104,9 @@ class SpeculativeSchedulerTest {
                 new TestExecutionOperationsDecorator(new DefaultExecutionOperations());
         testBlocklistOperations = new TestBlocklistOperations();
         restartStrategy = new TestRestartBackoffTimeStrategy(true, 0);
+        testExecutionSlotAllocatorFactory = new TestExecutionSlotAllocatorFactory();
+        testExecutionSlotAllocator =
+                testExecutionSlotAllocatorFactory.getTestExecutionSlotAllocator();
     }
 
     @AfterEach
@@ -213,7 +220,7 @@ class SpeculativeSchedulerTest {
     }
 
     @Test
-    void testCancelOtherCurrentExecutionsWhenAnyExecutionFinished() {
+    void testCancelOtherDeployedCurrentExecutionsWhenAnyExecutionFinished() {
         final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling();
         final ExecutionVertex ev = getOnlyExecutionVertex(scheduler);
         final Execution attempt1 = ev.getCurrentExecutionAttempt();
@@ -226,6 +233,23 @@ class SpeculativeSchedulerTest {
         assertThat(attempt2.getState()).isEqualTo(ExecutionState.CANCELING);
     }
 
+    @Test
+    void testCancelOtherScheduledCurrentExecutionsWhenAnyExecutionFinished() {
+        testExecutionSlotAllocator.disableAutoCompletePendingRequests();
+
+        final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling();
+        final ExecutionVertex ev = getOnlyExecutionVertex(scheduler);
+        final Execution attempt1 = ev.getCurrentExecutionAttempt();
+
+        testExecutionSlotAllocator.completePendingRequest(attempt1.getAttemptId());
+        notifySlowTask(scheduler, attempt1);
+        final Execution attempt2 = getExecution(ev, 1);
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(attempt1.getAttemptId(), ExecutionState.FINISHED));
+
+        assertThat(attempt2.getState()).isEqualTo(ExecutionState.CANCELED);
+    }
+
     @Test
     void testExceptionHistoryIfPartitionExceptionHappened() {
         final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling();
@@ -408,6 +432,7 @@ class SpeculativeSchedulerTest {
                 .setFutureExecutor(futureExecutor)
                 .setDelayExecutor(taskRestartExecutor)
                 .setRestartBackoffTimeStrategy(restartStrategy)
+                .setExecutionSlotAllocatorFactory(testExecutionSlotAllocatorFactory)
                 .setJobMasterConfiguration(configuration);
     }
 


[flink] 01/02: [hotfix][runtime] Strengthen SpeculativeScheduler and its tests

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 94ece419632ce57f01558cd0fec9ab0d182d4f4b
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Fri Jul 15 15:45:32 2022 +0800

    [hotfix][runtime] Strengthen SpeculativeScheduler and its tests
    
    Stablizes tests and better exposes exceptions for troubleshooting.
---
 .../runtime/scheduler/adaptivebatch/SpeculativeScheduler.java  | 10 +++++++++-
 .../scheduler/adaptivebatch/SpeculativeSchedulerTest.java      | 10 +++++++++-
 2 files changed, 18 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
index 919ec05cf1d..6d9302c1226 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
@@ -307,7 +307,15 @@ public class SpeculativeScheduler extends AdaptiveBatchScheduler
 
         return slowExecutions.stream()
                 .map(id -> getExecutionGraph().getRegisteredExecutions().get(id))
-                .map(Execution::getAssignedResourceLocation)
+                .map(
+                        e -> {
+                            checkNotNull(
+                                    e.getAssignedResource(),
+                                    "The reported slow node have not been assigned a slot. "
+                                            + "This is unexpected and indicates that there is "
+                                            + "something wrong with the slow task detector.");
+                            return e.getAssignedResourceLocation();
+                        })
                 .map(TaskManagerLocation::getNodeId)
                 .collect(Collectors.toSet());
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java
index 75456468447..ef78ad085f7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java
@@ -20,6 +20,8 @@
 package org.apache.flink.runtime.scheduler.adaptivebatch;
 
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SlowTaskDetectorOptions;
 import org.apache.flink.runtime.blocklist.BlockedNode;
 import org.apache.flink.runtime.blocklist.BlocklistOperations;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
@@ -59,6 +61,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -394,13 +397,18 @@ class SpeculativeSchedulerTest {
 
     private DefaultSchedulerBuilder createSchedulerBuilder(
             final JobGraph jobGraph, final ComponentMainThreadExecutor mainThreadExecutor) {
+        // disable periodical slow task detection to avoid affecting the designed testing process
+        final Configuration configuration = new Configuration();
+        configuration.set(SlowTaskDetectorOptions.CHECK_INTERVAL, Duration.ofDays(1));
+
         return new DefaultSchedulerBuilder(
                         jobGraph, mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor())
                 .setBlocklistOperations(testBlocklistOperations)
                 .setExecutionOperations(testExecutionOperations)
                 .setFutureExecutor(futureExecutor)
                 .setDelayExecutor(taskRestartExecutor)
-                .setRestartBackoffTimeStrategy(restartStrategy);
+                .setRestartBackoffTimeStrategy(restartStrategy)
+                .setJobMasterConfiguration(configuration);
     }
 
     private static void notifySlowTask(