You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/16 16:28:02 UTC
[flink] 13/13: [FLINK-17702][scheduler] Cancellations during
failover also notify the OperatorCoordinator as "failed tasks"
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 79aa7d115db5c4b5471b415dccbcfcb5b0d10249
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu May 14 22:50:56 2020 +0200
[FLINK-17702][scheduler] Cancellations during failover also notify the OperatorCoordinator as "failed tasks"
This closes #12137
---
.../flink/runtime/scheduler/DefaultScheduler.java | 24 +++++++++++-
.../OperatorCoordinatorSchedulerTest.java | 43 ++++++++++++++++++++++
2 files changed, 66 insertions(+), 1 deletion(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index e42af43..f1a9924 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableExceptio
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.ThrowingSlotProvider;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy;
@@ -274,8 +275,12 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
}
private CompletableFuture<?> cancelExecutionVertex(final ExecutionVertexID executionVertexId) {
+ final ExecutionVertex vertex = getExecutionVertex(executionVertexId);
+
+ notifyCoordinatorOfCancellation(vertex);
+
executionSlotAllocator.cancel(executionVertexId);
- return executionVertexOperations.cancel(getExecutionVertex(executionVertexId));
+ return executionVertexOperations.cancel(vertex);
}
@Override
@@ -468,4 +473,21 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
handleTaskDeploymentFailure(executionVertexId, e);
}
}
+
+ private void notifyCoordinatorOfCancellation(ExecutionVertex vertex) {
+ // this method makes a best effort to filter out duplicate notifications, meaning cases where
+ // the coordinator was already notified for that specific task
+ // we don't notify if the task is already FAILED, CANCELLING, or CANCELED
+
+ final ExecutionState currentState = vertex.getExecutionState();
+ if (currentState == ExecutionState.FAILED ||
+ currentState == ExecutionState.CANCELING ||
+ currentState == ExecutionState.CANCELED) {
+ return;
+ }
+
+ for (OperatorCoordinator coordinator : vertex.getJobVertex().getOperatorCoordinators()) {
+ coordinator.subtaskFailed(vertex.getParallelSubtaskIndex(), null);
+ }
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index 80ec9c9..935e0ec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -52,6 +52,7 @@ import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.junit.After;
+import org.junit.Ignore;
import org.junit.Test;
import javax.annotation.Nullable;
@@ -67,6 +68,7 @@ import java.util.function.Consumer;
import static org.apache.flink.core.testutils.FlinkMatchers.futureFailedWith;
import static org.apache.flink.core.testutils.FlinkMatchers.futureWillCompleteExceptionally;
import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertArrayEquals;
@@ -173,6 +175,17 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
}
@Test
+ public void cancellationAsPartOfFailoverNotifiesCoordinator() throws Exception {
+ final DefaultScheduler scheduler = createSchedulerWithAllRestartOnFailureAndDeployTasks();
+ final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
+
+ failTask(scheduler, 1);
+
+ assertEquals(2, coordinator.getFailedTasks().size());
+ assertThat(coordinator.getFailedTasks(), containsInAnyOrder(0, 1));
+ }
+
+ @Test
public void taskRepeatedFailureNotifyCoordinator() throws Exception {
final DefaultScheduler scheduler = createSchedulerAndDeployTasks();
final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
@@ -204,6 +217,36 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
assertThat(result, futureFailedWith(TestException.class));
}
+ // THESE TESTS BELOW SHOULD LEGITIMATELY WORK, BUT THE SCHEDULER ITSELF SEEMS TO NOT HANDLE
+ // THIS SITUATION AT THE MOMENT
+ // WE KEEP THESE TESTS HERE TO ENABLE THEM ONCE THE SCHEDULER'S CONTRACT SUPPORTS THEM
+
+ @Ignore
+ @Test
+ public void deployingTaskCancellationNotifiesCoordinator() throws Exception {
+ final DefaultScheduler scheduler = createAndStartScheduler();
+ final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
+
+ cancelTask(scheduler, 1);
+
+ assertEquals(1, coordinator.getFailedTasks().size());
+ assertThat(coordinator.getFailedTasks(), contains(1));
+ assertThat(coordinator.getFailedTasks(), not(contains(0)));
+ }
+
+ @Ignore
+ @Test
+ public void runningTaskCancellationNotifiesCoordinator() throws Exception {
+ final DefaultScheduler scheduler = createSchedulerAndDeployTasks();
+ final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
+
+ cancelTask(scheduler, 0);
+
+ assertEquals(1, coordinator.getFailedTasks().size());
+ assertThat(coordinator.getFailedTasks(), contains(0));
+ assertThat(coordinator.getFailedTasks(), not(contains(1)));
+ }
+
// ------------------------------------------------------------------------
// tests for checkpointing
// ------------------------------------------------------------------------