You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2021/02/11 14:05:04 UTC

[flink] branch master updated (56e0b92 -> fe41328)

This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 56e0b92  [hotfix][ci] Fix hugo docs builds
     new 3999623  [FLINK-21258] Add Canceling state for DeclarativeScheduler
     new fe41328  [FLINK-21258] Add test for Canceling state

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/executiongraph/ExecutionGraph.java     |   3 +-
 .../runtime/scheduler/declarative/Canceling.java   |  69 +++++++
 .../scheduler/declarative/CancelingTest.java       | 228 +++++++++++++++++++++
 .../scheduler/declarative/ExecutingTest.java       |  22 +-
 .../scheduler/declarative/RestartingTest.java      |   2 +-
 5 files changed, 312 insertions(+), 12 deletions(-)
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/Canceling.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/CancelingTest.java


[flink] 01/02: [FLINK-21258] Add Canceling state for DeclarativeScheduler

Posted by rm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 39996236d6a591311e15ee58a367c9b7cf124c5d
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Feb 9 10:28:05 2021 +0100

    [FLINK-21258] Add Canceling state for DeclarativeScheduler
---
 .../runtime/scheduler/declarative/Canceling.java   | 69 ++++++++++++++++++++++
 1 file changed, 69 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/Canceling.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/Canceling.java
new file mode 100644
index 0000000..ea48ccd
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/Canceling.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+
+import org.slf4j.Logger;
+
+/** State which describes a job which is currently being canceled. */
+class Canceling extends StateWithExecutionGraph {
+
+    private final Context context;
+
+    Canceling(
+            Context context,
+            ExecutionGraph executionGraph,
+            ExecutionGraphHandler executionGraphHandler,
+            OperatorCoordinatorHandler operatorCoordinatorHandler,
+            Logger logger) {
+        super(context, executionGraph, executionGraphHandler, operatorCoordinatorHandler, logger);
+        this.context = context;
+    }
+
+    @Override
+    public void onEnter() {
+        getExecutionGraph().cancel();
+    }
+
+    @Override
+    public void cancel() {
+        // we are already in the state canceling
+    }
+
+    @Override
+    public void handleGlobalFailure(Throwable cause) {
+        // ignore global failures
+    }
+
+    @Override
+    boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionStateTransition) {
+        return getExecutionGraph().updateState(taskExecutionStateTransition);
+    }
+
+    @Override
+    void onGloballyTerminalState(JobStatus globallyTerminalState) {
+        context.goToFinished(ArchivedExecutionGraph.createFrom(getExecutionGraph()));
+    }
+}


[flink] 02/02: [FLINK-21258] Add test for Canceling state

Posted by rm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fe413286b8082c4089a05604da0aba285950563e
Author: Robert Metzger <rm...@apache.org>
AuthorDate: Tue Feb 9 10:25:46 2021 +0100

    [FLINK-21258] Add test for Canceling state
    
    This closes #14909
---
 .../runtime/executiongraph/ExecutionGraph.java     |   3 +-
 .../scheduler/declarative/CancelingTest.java       | 228 +++++++++++++++++++++
 .../scheduler/declarative/ExecutingTest.java       |  22 +-
 .../scheduler/declarative/RestartingTest.java      |   2 +-
 4 files changed, 243 insertions(+), 12 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index a057b8c..85a7cde 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -895,7 +895,8 @@ public class ExecutionGraph implements AccessExecutionGraph {
         }
     }
 
-    private ConjunctFuture<Void> cancelVerticesAsync() {
+    @VisibleForTesting
+    protected ConjunctFuture<Void> cancelVerticesAsync() {
         final ArrayList<CompletableFuture<?>> futures =
                 new ArrayList<>(verticesInCreationOrder.size());
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/CancelingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/CancelingTest.java
new file mode 100644
index 0000000..aa14a0a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/CancelingTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.NoOpExecutionDeploymentListener;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategyFactoryLoader;
+import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link Canceling} state of the declarative scheduler. */
+public class CancelingTest extends TestLogger {
+
+    @Test
+    public void testExecutionGraphCancelationOnEnter() throws Exception {
+        try (MockStateWithExecutionGraphContext ctx = new MockStateWithExecutionGraphContext()) {
+            MockExecutionGraph mockExecutionGraph = new MockExecutionGraph();
+            Canceling canceling = createCancelingState(ctx, mockExecutionGraph);
+
+            canceling.onEnter(); // transition of EG from RUNNING to CANCELLING
+            assertThat(mockExecutionGraph.getState(), is(JobStatus.CANCELLING));
+        }
+    }
+
+    @Test
+    public void testTransitionToFinishedWhenCancellationCompletes() throws Exception {
+        try (MockStateWithExecutionGraphContext ctx = new MockStateWithExecutionGraphContext()) {
+            MockExecutionGraph mockExecutionGraph = new MockExecutionGraph();
+            Canceling canceling = createCancelingState(ctx, mockExecutionGraph);
+            canceling.onEnter(); // transition of EG from RUNNING to CANCELLING
+            assertThat(mockExecutionGraph.getState(), is(JobStatus.CANCELLING));
+            ctx.setExpectFinished(
+                    archivedExecutionGraph ->
+                            assertThat(archivedExecutionGraph.getState(), is(JobStatus.CANCELED)));
+            // this transitions the EG from CANCELLING to CANCELLED.
+            mockExecutionGraph.completeCancellation();
+        }
+    }
+
+    @Test
+    public void testTransitionToSuspend() throws Exception {
+        try (MockStateWithExecutionGraphContext ctx = new MockStateWithExecutionGraphContext()) {
+            Canceling canceling = createCancelingState(ctx, new MockExecutionGraph());
+            canceling.onEnter();
+            ctx.setExpectFinished(
+                    archivedExecutionGraph ->
+                            assertThat(archivedExecutionGraph.getState(), is(JobStatus.SUSPENDED)));
+            canceling.suspend(new RuntimeException("suspend"));
+        }
+    }
+
+    @Test
+    public void testCancelIsIgnored() throws Exception {
+        try (MockStateWithExecutionGraphContext ctx = new MockStateWithExecutionGraphContext()) {
+            Canceling canceling = createCancelingState(ctx, new MockExecutionGraph());
+            canceling.onEnter();
+            canceling.cancel();
+            ctx.assertNoStateTransition();
+        }
+    }
+
+    @Test
+    public void testGlobalFailuresAreIgnored() throws Exception {
+        try (MockStateWithExecutionGraphContext ctx = new MockStateWithExecutionGraphContext()) {
+            Canceling canceling = createCancelingState(ctx, new MockExecutionGraph());
+            canceling.onEnter();
+            canceling.handleGlobalFailure(new RuntimeException("test"));
+            ctx.assertNoStateTransition();
+        }
+    }
+
+    @Test
+    public void testTaskFailuresAreIgnored() throws Exception {
+        try (MockStateWithExecutionGraphContext ctx = new MockStateWithExecutionGraphContext()) {
+            MockExecutionGraph meg = new MockExecutionGraph();
+            Canceling canceling = createCancelingState(ctx, meg);
+            canceling.onEnter();
+            // register execution at EG
+            ExecutingTest.MockExecutionJobVertex ejv =
+                    new ExecutingTest.MockExecutionJobVertex(canceling.getExecutionGraph());
+            TaskExecutionStateTransition update =
+                    new TaskExecutionStateTransition(
+                            new TaskExecutionState(
+                                    canceling.getJob().getJobID(),
+                                    ejv.getMockExecutionVertex()
+                                            .getCurrentExecutionAttempt()
+                                            .getAttemptId(),
+                                    ExecutionState.FAILED,
+                                    new RuntimeException()));
+            canceling.updateTaskExecutionState(update);
+            ctx.assertNoStateTransition();
+            assertThat(meg.isFailGlobalCalled(), is(true));
+        }
+    }
+
+    private Canceling createCancelingState(
+            MockStateWithExecutionGraphContext ctx, ExecutionGraph executionGraph) {
+        final ExecutionGraphHandler executionGraphHandler =
+                new ExecutionGraphHandler(
+                        executionGraph,
+                        log,
+                        ctx.getMainThreadExecutor(),
+                        ctx.getMainThreadExecutor());
+        final OperatorCoordinatorHandler operatorCoordinatorHandler =
+                new OperatorCoordinatorHandler(
+                        executionGraph,
+                        (throwable) -> {
+                            throw new RuntimeException("Error in test", throwable);
+                        });
+        executionGraph.transitionToRunning();
+        Canceling canceling =
+                new Canceling(
+                        ctx,
+                        executionGraph,
+                        executionGraphHandler,
+                        operatorCoordinatorHandler,
+                        log);
+        return canceling;
+    }
+
+    /**
+     * Mocked ExecutionGraph, which stays in CANCELLING, when cancel() gets called, until the
+     * "completeCancellationFuture" is completed.
+     */
+    private static class MockExecutionGraph extends ExecutionGraph {
+
+        private final CompletableFuture<?> completeCancellationFuture = new CompletableFuture<>();
+        private boolean isCancelling = false;
+        private boolean isFailGlobalCalled = false;
+
+        public MockExecutionGraph() throws IOException {
+            super(
+                    new JobInformation(
+                            new JobID(),
+                            "Test Job",
+                            new SerializedValue<>(new ExecutionConfig()),
+                            new Configuration(),
+                            Collections.emptyList(),
+                            Collections.emptyList()),
+                    TestingUtils.defaultExecutor(),
+                    TestingUtils.defaultExecutor(),
+                    AkkaUtils.getDefaultTimeout(),
+                    1,
+                    ExecutionGraph.class.getClassLoader(),
+                    VoidBlobWriter.getInstance(),
+                    PartitionReleaseStrategyFactoryLoader.loadPartitionReleaseStrategyFactory(
+                            new Configuration()),
+                    NettyShuffleMaster.INSTANCE,
+                    NoOpJobMasterPartitionTracker.INSTANCE,
+                    ScheduleMode.EAGER,
+                    NoOpExecutionDeploymentListener.get(),
+                    (execution, newState) -> {},
+                    0L);
+            this.setJsonPlan(""); // field must not be null for ArchivedExecutionGraph creation
+        }
+
+        void completeCancellation() {
+            completeCancellationFuture.complete(null);
+        }
+
+        public boolean isCancelling() {
+            return isCancelling;
+        }
+
+        public boolean isFailGlobalCalled() {
+            return isFailGlobalCalled;
+        }
+
+        // overwrites for the tests
+        @Override
+        public void cancel() {
+            super.cancel();
+            this.isCancelling = true;
+        }
+
+        @Override
+        public void failGlobal(Throwable t) {
+            isFailGlobalCalled = true;
+        }
+
+        @Override
+        protected FutureUtils.ConjunctFuture<Void> cancelVerticesAsync() {
+            return FutureUtils.completeAll(Collections.singleton(completeCancellationFuture));
+        }
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/ExecutingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/ExecutingTest.java
index d1b51d8..e7459ff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/ExecutingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/ExecutingTest.java
@@ -246,7 +246,7 @@ public class ExecutingTest extends TestLogger {
         }
     }
 
-    private TaskExecutionStateTransition createFailingStateTransition(JobID jobId) {
+    private static TaskExecutionStateTransition createFailingStateTransition(JobID jobId) {
         return new TaskExecutionStateTransition(
                 new TaskExecutionState(
                         jobId,
@@ -568,17 +568,15 @@ public class ExecutingTest extends TestLogger {
         }
     }
 
-    private static class MockExecutionJobVertex extends ExecutionJobVertex {
+    static class MockExecutionJobVertex extends ExecutionJobVertex {
         private final MockExecutionVertex mockExecutionVertex;
 
         MockExecutionJobVertex() throws JobException, JobExecutionException {
-            super(
-                    TestingExecutionGraphBuilder.newBuilder().build(),
-                    new JobVertex("test"),
-                    1,
-                    1,
-                    Time.milliseconds(1L),
-                    1L);
+            this(TestingExecutionGraphBuilder.newBuilder().build());
+        }
+
+        MockExecutionJobVertex(ExecutionGraph executionGraph) throws JobException {
+            super(executionGraph, new JobVertex("test"), 1, 1, Time.milliseconds(1L), 1L);
             mockExecutionVertex = new MockExecutionVertex(this);
         }
 
@@ -587,12 +585,16 @@ public class ExecutingTest extends TestLogger {
             return new ExecutionVertex[] {mockExecutionVertex};
         }
 
+        public MockExecutionVertex getMockExecutionVertex() {
+            return mockExecutionVertex;
+        }
+
         public boolean isExecutionDeployed() {
             return mockExecutionVertex.isDeployed();
         }
     }
 
-    private static class MockExecutionVertex extends ExecutionVertex {
+    static class MockExecutionVertex extends ExecutionVertex {
         private boolean deployed = false;
 
         MockExecutionVertex(ExecutionJobVertex jobVertex) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/RestartingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/RestartingTest.java
index a34f5ab..927eb41 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/RestartingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/RestartingTest.java
@@ -183,7 +183,7 @@ public class RestartingTest extends TestLogger {
         }
     }
 
-    private static class CancellableExecutionGraph extends ExecutionGraph {
+    static class CancellableExecutionGraph extends ExecutionGraph {
         private boolean cancelled = false;
 
         CancellableExecutionGraph() throws IOException {