You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2022/07/20 08:27:46 UTC
[flink] branch master updated (dbe4f473ba9 -> 3278995372d)
This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
from dbe4f473ba9 [hotfic][doc] Fix JavaDoc of chapter ETL. This closes #20077
new 94ece419632 [hotfix][runtime] Strengthen SpeculativeScheduler and its tests
new 3278995372d [FLINK-28612][runtime] SpeculativeScheduler cancels pending slot allocation after canceling pending executions
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../adaptivebatch/SpeculativeScheduler.java | 40 +++++++++++++++++-----
.../adaptivebatch/SpeculativeSchedulerTest.java | 37 ++++++++++++++++++--
2 files changed, 66 insertions(+), 11 deletions(-)
[flink] 02/02: [FLINK-28612][runtime] SpeculativeScheduler cancels pending slot allocation after canceling pending executions
Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3278995372d1ea27b6fd86806e9a860a644694c7
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Fri Jul 15 16:06:13 2022 +0800
[FLINK-28612][runtime] SpeculativeScheduler cancels pending slot allocation after canceling pending executions
This closes #20312.
---
.../adaptivebatch/SpeculativeScheduler.java | 30 ++++++++++++++++------
.../adaptivebatch/SpeculativeSchedulerTest.java | 27 ++++++++++++++++++-
2 files changed, 48 insertions(+), 9 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
index 6d9302c1226..b1482caec55 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
@@ -178,15 +178,29 @@ public class SpeculativeScheduler extends AdaptiveBatchScheduler
private CompletableFuture<?> cancelPendingExecutions(
final ExecutionVertexID executionVertexId) {
- // cancel all the related pending requests to avoid that slots returned by the canceled
- // vertices are used to fulfill these pending requests
- // do not cancel the FINISHED execution
- cancelAllPendingSlotRequestsForVertex(executionVertexId);
- return FutureUtils.combineAll(
+ final List<Execution> pendingExecutions =
getExecutionVertex(executionVertexId).getCurrentExecutions().stream()
- .filter(e -> e.getState() != ExecutionState.FINISHED)
- .map(this::cancelExecution)
- .collect(Collectors.toList()));
+ .filter(
+ e ->
+ !e.getState().isTerminal()
+ && e.getState() != ExecutionState.CANCELING)
+ .collect(Collectors.toList());
+ if (pendingExecutions.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ log.info(
+ "Canceling {} un-finished executions of {} because one of its executions has finished.",
+ pendingExecutions.size(),
+ executionVertexId);
+
+ final CompletableFuture<?> future =
+ FutureUtils.combineAll(
+ pendingExecutions.stream()
+ .map(this::cancelExecution)
+ .collect(Collectors.toList()));
+ cancelAllPendingSlotRequestsForVertex(executionVertexId);
+ return future;
}
@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java
index ef78ad085f7..29d73161153 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java
@@ -45,6 +45,8 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.scheduler.DefaultExecutionOperations;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.TestExecutionOperationsDecorator;
+import org.apache.flink.runtime.scheduler.TestExecutionSlotAllocator;
+import org.apache.flink.runtime.scheduler.TestExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
@@ -90,6 +92,8 @@ class SpeculativeSchedulerTest {
private TestExecutionOperationsDecorator testExecutionOperations;
private TestBlocklistOperations testBlocklistOperations;
private TestRestartBackoffTimeStrategy restartStrategy;
+ private TestExecutionSlotAllocatorFactory testExecutionSlotAllocatorFactory;
+ private TestExecutionSlotAllocator testExecutionSlotAllocator;
@BeforeEach
void setUp() {
@@ -100,6 +104,9 @@ class SpeculativeSchedulerTest {
new TestExecutionOperationsDecorator(new DefaultExecutionOperations());
testBlocklistOperations = new TestBlocklistOperations();
restartStrategy = new TestRestartBackoffTimeStrategy(true, 0);
+ testExecutionSlotAllocatorFactory = new TestExecutionSlotAllocatorFactory();
+ testExecutionSlotAllocator =
+ testExecutionSlotAllocatorFactory.getTestExecutionSlotAllocator();
}
@AfterEach
@@ -213,7 +220,7 @@ class SpeculativeSchedulerTest {
}
@Test
- void testCancelOtherCurrentExecutionsWhenAnyExecutionFinished() {
+ void testCancelOtherDeployedCurrentExecutionsWhenAnyExecutionFinished() {
final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling();
final ExecutionVertex ev = getOnlyExecutionVertex(scheduler);
final Execution attempt1 = ev.getCurrentExecutionAttempt();
@@ -226,6 +233,23 @@ class SpeculativeSchedulerTest {
assertThat(attempt2.getState()).isEqualTo(ExecutionState.CANCELING);
}
+ @Test
+ void testCancelOtherScheduledCurrentExecutionsWhenAnyExecutionFinished() {
+ testExecutionSlotAllocator.disableAutoCompletePendingRequests();
+
+ final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling();
+ final ExecutionVertex ev = getOnlyExecutionVertex(scheduler);
+ final Execution attempt1 = ev.getCurrentExecutionAttempt();
+
+ testExecutionSlotAllocator.completePendingRequest(attempt1.getAttemptId());
+ notifySlowTask(scheduler, attempt1);
+ final Execution attempt2 = getExecution(ev, 1);
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(attempt1.getAttemptId(), ExecutionState.FINISHED));
+
+ assertThat(attempt2.getState()).isEqualTo(ExecutionState.CANCELED);
+ }
+
@Test
void testExceptionHistoryIfPartitionExceptionHappened() {
final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling();
@@ -408,6 +432,7 @@ class SpeculativeSchedulerTest {
.setFutureExecutor(futureExecutor)
.setDelayExecutor(taskRestartExecutor)
.setRestartBackoffTimeStrategy(restartStrategy)
+ .setExecutionSlotAllocatorFactory(testExecutionSlotAllocatorFactory)
.setJobMasterConfiguration(configuration);
}
[flink] 01/02: [hotfix][runtime] Strengthen SpeculativeScheduler and its tests
Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 94ece419632ce57f01558cd0fec9ab0d182d4f4b
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Fri Jul 15 15:45:32 2022 +0800
[hotfix][runtime] Strengthen SpeculativeScheduler and its tests
Stablizes tests and better exposes exceptions for troubleshooting.
---
.../runtime/scheduler/adaptivebatch/SpeculativeScheduler.java | 10 +++++++++-
.../scheduler/adaptivebatch/SpeculativeSchedulerTest.java | 10 +++++++++-
2 files changed, 18 insertions(+), 2 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
index 919ec05cf1d..6d9302c1226 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
@@ -307,7 +307,15 @@ public class SpeculativeScheduler extends AdaptiveBatchScheduler
return slowExecutions.stream()
.map(id -> getExecutionGraph().getRegisteredExecutions().get(id))
- .map(Execution::getAssignedResourceLocation)
+ .map(
+ e -> {
+ checkNotNull(
+ e.getAssignedResource(),
+ "The reported slow node have not been assigned a slot. "
+ + "This is unexpected and indicates that there is "
+ + "something wrong with the slow task detector.");
+ return e.getAssignedResourceLocation();
+ })
.map(TaskManagerLocation::getNodeId)
.collect(Collectors.toSet());
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java
index 75456468447..ef78ad085f7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java
@@ -20,6 +20,8 @@
package org.apache.flink.runtime.scheduler.adaptivebatch;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SlowTaskDetectorOptions;
import org.apache.flink.runtime.blocklist.BlockedNode;
import org.apache.flink.runtime.blocklist.BlocklistOperations;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
@@ -59,6 +61,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -394,13 +397,18 @@ class SpeculativeSchedulerTest {
private DefaultSchedulerBuilder createSchedulerBuilder(
final JobGraph jobGraph, final ComponentMainThreadExecutor mainThreadExecutor) {
+ // disable periodical slow task detection to avoid affecting the designed testing process
+ final Configuration configuration = new Configuration();
+ configuration.set(SlowTaskDetectorOptions.CHECK_INTERVAL, Duration.ofDays(1));
+
return new DefaultSchedulerBuilder(
jobGraph, mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor())
.setBlocklistOperations(testBlocklistOperations)
.setExecutionOperations(testExecutionOperations)
.setFutureExecutor(futureExecutor)
.setDelayExecutor(taskRestartExecutor)
- .setRestartBackoffTimeStrategy(restartStrategy);
+ .setRestartBackoffTimeStrategy(restartStrategy)
+ .setJobMasterConfiguration(configuration);
}
private static void notifySlowTask(