You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/16 16:28:02 UTC

[flink] 13/13: [FLINK-17702][scheduler] Cancellations during failover also notify the OperatorCoordinator as "failed tasks"

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 79aa7d115db5c4b5471b415dccbcfcb5b0d10249
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu May 14 22:50:56 2020 +0200

    [FLINK-17702][scheduler] Cancellations during failover also notify the OperatorCoordinator as "failed tasks"
    
    This closes #12137
---
 .../flink/runtime/scheduler/DefaultScheduler.java  | 24 +++++++++++-
 .../OperatorCoordinatorSchedulerTest.java          | 43 ++++++++++++++++++++++
 2 files changed, 66 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index e42af43..f1a9924 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableExceptio
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.ThrowingSlotProvider;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy;
@@ -274,8 +275,12 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
 	}
 
 	private CompletableFuture<?> cancelExecutionVertex(final ExecutionVertexID executionVertexId) {
+		final ExecutionVertex vertex = getExecutionVertex(executionVertexId);
+
+		notifyCoordinatorOfCancellation(vertex);
+
 		executionSlotAllocator.cancel(executionVertexId);
-		return executionVertexOperations.cancel(getExecutionVertex(executionVertexId));
+		return executionVertexOperations.cancel(vertex);
 	}
 
 	@Override
@@ -468,4 +473,21 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
 			handleTaskDeploymentFailure(executionVertexId, e);
 		}
 	}
+
+	private void notifyCoordinatorOfCancellation(ExecutionVertex vertex) {
+		// this method makes a best effort to filter out duplicate notifications, meaning cases where
+		// the coordinator was already notified for that specific task
+		// we don't notify if the task is already FAILED, CANCELLING, or CANCELED
+
+		final ExecutionState currentState = vertex.getExecutionState();
+		if (currentState == ExecutionState.FAILED ||
+				currentState == ExecutionState.CANCELING ||
+				currentState == ExecutionState.CANCELED) {
+			return;
+		}
+
+		for (OperatorCoordinator coordinator : vertex.getJobVertex().getOperatorCoordinators()) {
+			coordinator.subtaskFailed(vertex.getParallelSubtaskIndex(), null);
+		}
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index 80ec9c9..935e0ec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -52,6 +52,7 @@ import org.apache.flink.util.TestLogger;
 
 import org.hamcrest.Matcher;
 import org.junit.After;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import javax.annotation.Nullable;
@@ -67,6 +68,7 @@ import java.util.function.Consumer;
 import static org.apache.flink.core.testutils.FlinkMatchers.futureFailedWith;
 import static org.apache.flink.core.testutils.FlinkMatchers.futureWillCompleteExceptionally;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertArrayEquals;
@@ -173,6 +175,17 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 	}
 
 	@Test
+	public void cancellationAsPartOfFailoverNotifiesCoordinator() throws Exception {
+		final DefaultScheduler scheduler = createSchedulerWithAllRestartOnFailureAndDeployTasks();
+		final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
+
+		failTask(scheduler, 1);
+
+		assertEquals(2, coordinator.getFailedTasks().size());
+		assertThat(coordinator.getFailedTasks(), containsInAnyOrder(0, 1));
+	}
+
+	@Test
 	public void taskRepeatedFailureNotifyCoordinator() throws Exception {
 		final DefaultScheduler scheduler = createSchedulerAndDeployTasks();
 		final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
@@ -204,6 +217,36 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 		assertThat(result, futureFailedWith(TestException.class));
 	}
 
+	// THESE TESTS BELOW SHOULD LEGITIMATELY WORK, BUT THE SCHEDULER ITSELF SEEMS TO NOT HANDLE
+	// THIS SITUATION AT THE MOMENT
+	// WE KEEP THESE TESTS HERE TO ENABLE THEM ONCE THE SCHEDULER'S CONTRACT SUPPORTS THEM
+
+	@Ignore
+	@Test
+	public void deployingTaskCancellationNotifiesCoordinator() throws Exception {
+		final DefaultScheduler scheduler = createAndStartScheduler();
+		final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
+
+		cancelTask(scheduler, 1);
+
+		assertEquals(1, coordinator.getFailedTasks().size());
+		assertThat(coordinator.getFailedTasks(), contains(1));
+		assertThat(coordinator.getFailedTasks(), not(contains(0)));
+	}
+
+	@Ignore
+	@Test
+	public void runningTaskCancellationNotifiesCoordinator() throws Exception {
+		final DefaultScheduler scheduler = createSchedulerAndDeployTasks();
+		final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
+
+		cancelTask(scheduler, 0);
+
+		assertEquals(1, coordinator.getFailedTasks().size());
+		assertThat(coordinator.getFailedTasks(), contains(0));
+		assertThat(coordinator.getFailedTasks(), not(contains(1)));
+	}
+
 	// ------------------------------------------------------------------------
 	//  tests for checkpointing
 	// ------------------------------------------------------------------------