You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2021/05/12 18:54:06 UTC

[flink] 02/02: [FLINK-22574] Adaptive Scheduler: Fix cancellation while in Restarting state.

This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 02d30ace69dc18555a5085eccf70ee884e73a16e
Author: Robert Metzger <rm...@apache.org>
AuthorDate: Fri May 7 08:35:04 2021 +0200

    [FLINK-22574] Adaptive Scheduler: Fix cancellation while in Restarting state.
    
    The Canceling state of Adaptive Scheduler was expecting the ExecutionGraph to be in state RUNNING when entering the state.
    However, the Restarting state is cancelling the ExecutionGraph already, thus the ExectionGraph can be in state CANCELING or CANCELED when entering the Canceling state.
    
    Calling the ExecutionGraph.cancel() method in the Canceling state while being in ExecutionGraph.state = CANCELED || CANCELLED is not a problem.
    
    The change is guarded by a new ITCase, as this issue affects the interplay between different AS states.
    
    This closes #15882
---
 .../adaptive/StateWithExecutionGraph.java          |  2 -
 .../adaptive/AdaptiveSchedulerSimpleITCase.java    | 44 ++++++++++++++++++++++
 2 files changed, 44 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
index 9962c78..91d688f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
@@ -91,8 +91,6 @@ abstract class StateWithExecutionGraph implements State {
         this.operatorCoordinatorHandler = operatorCoordinatorHandler;
         this.kvStateHandler = new KvStateHandler(executionGraph);
         this.logger = logger;
-        Preconditions.checkState(
-                executionGraph.getState() == JobStatus.RUNNING, "Assuming running execution graph");
 
         FutureUtils.assertNoException(
                 executionGraph
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerSimpleITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerSimpleITCase.java
index 9280dbc..992d2c4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerSimpleITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerSimpleITCase.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.scheduler.adaptive;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.execution.Environment;
@@ -33,6 +35,7 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.MiniClusterResource;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -43,6 +46,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.time.Duration;
+import java.time.temporal.ChronoUnit;
 
 import static org.junit.Assert.assertTrue;
 
@@ -105,6 +109,34 @@ public class AdaptiveSchedulerSimpleITCase extends TestLogger {
     }
 
     @Test
+    public void testJobCancellationWhileRestartingSucceeds() throws Exception {
+        final long timeInRestartingState = 10000L;
+
+        final MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
+        final JobVertex alwaysFailingOperator = new JobVertex("Always failing operator");
+        alwaysFailingOperator.setInvokableClass(AlwaysFailingInvokable.class);
+        alwaysFailingOperator.setParallelism(1);
+
+        final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(alwaysFailingOperator);
+        ExecutionConfig executionConfig = new ExecutionConfig();
+        // configure a high delay between attempts: We'll stay in RESTARTING for 10 seconds.
+        executionConfig.setRestartStrategy(
+                RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, timeInRestartingState));
+        jobGraph.setExecutionConfig(executionConfig);
+
+        miniCluster.submitJob(jobGraph).join();
+
+        // wait until we are in RESTARTING state
+        CommonTestUtils.waitUntilCondition(
+                () -> miniCluster.getJobStatus(jobGraph.getJobID()).get() == JobStatus.RESTARTING,
+                Deadline.fromNow(Duration.of(timeInRestartingState, ChronoUnit.MILLIS)),
+                5);
+
+        // now cancel while in RESTARTING state
+        miniCluster.cancelJob(jobGraph.getJobID()).get();
+    }
+
+    @Test
     public void testGlobalFailoverIfTaskFails() throws Throwable {
         final MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
         final JobGraph jobGraph = createOnceFailingJobGraph();
@@ -160,4 +192,16 @@ public class AdaptiveSchedulerSimpleITCase extends TestLogger {
             hasFailed = false;
         }
     }
+
+    /** Always failing {@link AbstractInvokable}. */
+    public static final class AlwaysFailingInvokable extends AbstractInvokable {
+        public AlwaysFailingInvokable(Environment environment) {
+            super(environment);
+        }
+
+        @Override
+        public void invoke() throws Exception {
+            throw new FlinkRuntimeException("Test failure.");
+        }
+    }
 }