You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/08/01 10:30:57 UTC

[flink] 04/06: [FLINK-28588][rest] Archive all current executions in ArchivedExecutionVertex.

This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 44c00dbb4a083ff197442e2ce6440558be252787
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Tue Jul 26 16:39:40 2022 +0800

    [FLINK-28588][rest] Archive all current executions in ArchivedExecutionVertex.
---
 .../executiongraph/AccessExecutionVertex.java      |   9 +
 .../executiongraph/ArchivedExecutionVertex.java    |  26 ++-
 .../ArchivedSpeculativeExecutionVertex.java        |  52 -----
 .../executiongraph/SpeculativeExecutionVertex.java |   4 +-
 .../ArchivedExecutionGraphTestUtils.java           |  21 ++
 ...xecutionVertexWithSpeculativeExecutionTest.java | 223 +++++++++++++++++++++
 6 files changed, 280 insertions(+), 55 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java
index 6775424ba4b..f8d4581c7e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
+import java.util.Collection;
 import java.util.Optional;
 
 /** Common interface for the runtime {@link ExecutionVertex} and {@link ArchivedExecutionVertex}. */
@@ -46,6 +47,14 @@ public interface AccessExecutionVertex {
      */
     AccessExecution getCurrentExecutionAttempt();
 
+    /**
+     * Returns the current executions for this execution vertex. The returned collection must
+     * contain the current execution attempt.
+     *
+     * @return current executions
+     */
+    <T extends AccessExecution> Collection<T> getCurrentExecutions();
+
     /**
      * Returns the current {@link ExecutionState} for this execution vertex.
      *
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
index e5feba5445a..d9f8448a702 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
@@ -18,10 +18,14 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Optional;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -40,15 +44,29 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa
 
     private final ArchivedExecution currentExecution; // this field must never be null
 
+    private final Collection<AccessExecution> currentExecutions;
+
     // ------------------------------------------------------------------------
 
     public ArchivedExecutionVertex(ExecutionVertex vertex) {
         this.subTaskIndex = vertex.getParallelSubtaskIndex();
         this.executionHistory = getCopyOfExecutionHistory(vertex);
         this.taskNameWithSubtask = vertex.getTaskNameWithSubtaskIndex();
-        this.currentExecution = vertex.getCurrentExecutionAttempt().archive();
+
+        Execution vertexCurrentExecution = vertex.getCurrentExecutionAttempt();
+        ArrayList<AccessExecution> currentExecutionList =
+                new ArrayList<>(vertex.getCurrentExecutions().size());
+        currentExecution = vertexCurrentExecution.archive();
+        currentExecutionList.add(currentExecution);
+        for (Execution execution : vertex.getCurrentExecutions()) {
+            if (execution != vertexCurrentExecution) {
+                currentExecutionList.add(execution.archive());
+            }
+        }
+        currentExecutions = Collections.unmodifiableList(currentExecutionList);
     }
 
+    @VisibleForTesting
     public ArchivedExecutionVertex(
             int subTaskIndex,
             String taskNameWithSubtask,
@@ -58,6 +76,7 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa
         this.taskNameWithSubtask = checkNotNull(taskNameWithSubtask);
         this.currentExecution = checkNotNull(currentExecution);
         this.executionHistory = checkNotNull(executionHistory);
+        this.currentExecutions = Collections.singletonList(currentExecution);
     }
 
     // --------------------------------------------------------------------------------------------
@@ -79,6 +98,11 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa
         return currentExecution;
     }
 
+    @Override
+    public Collection<AccessExecution> getCurrentExecutions() {
+        return currentExecutions;
+    }
+
     @Override
     public ExecutionState getExecutionState() {
         return currentExecution.getState();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedSpeculativeExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedSpeculativeExecutionVertex.java
deleted file mode 100644
index 263049414c5..00000000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedSpeculativeExecutionVertex.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-/**
- * {@link ArchivedSpeculativeExecutionVertex} is a readonly representation of {@link
- * SpeculativeExecutionVertex}.
- */
-public class ArchivedSpeculativeExecutionVertex extends ArchivedExecutionVertex {
-
-    private static final long serialVersionUID = 1L;
-
-    public ArchivedSpeculativeExecutionVertex(SpeculativeExecutionVertex vertex) {
-        super(
-                vertex.getParallelSubtaskIndex(),
-                vertex.getTaskNameWithSubtaskIndex(),
-                vertex.getCurrentExecutionAttempt().archive(),
-                getCopyOfExecutionHistory(vertex));
-    }
-
-    private static ExecutionHistory getCopyOfExecutionHistory(SpeculativeExecutionVertex vertex) {
-        final ExecutionHistory executionHistory =
-                ArchivedExecutionVertex.getCopyOfExecutionHistory(vertex);
-
-        // add all the executions to the execution history except for the only admitted current
-        // execution
-        final Execution currentAttempt = vertex.getCurrentExecutionAttempt();
-        for (Execution execution : vertex.getCurrentExecutions()) {
-            if (execution.getAttemptNumber() != currentAttempt.getAttemptNumber()) {
-                executionHistory.add(execution.archive());
-            }
-        }
-
-        return executionHistory;
-    }
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java
index 41ef1c0ac2f..71e28779637 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java
@@ -285,8 +285,8 @@ public class SpeculativeExecutionVertex extends ExecutionVertex {
     }
 
     @Override
-    public ArchivedSpeculativeExecutionVertex archive() {
-        return new ArchivedSpeculativeExecutionVertex(this);
+    public ArchivedExecutionVertex archive() {
+        return new ArchivedExecutionVertex(this);
     }
 
     @Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTestUtils.java
index 55a10cd5396..fc269901b34 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTestUtils.java
@@ -24,6 +24,10 @@ import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.SerializedValue;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
@@ -68,6 +72,23 @@ class ArchivedExecutionGraphTestUtils {
         compareExecution(
                 runtimeVertex.getCurrentExecutionAttempt(),
                 archivedVertex.getCurrentExecutionAttempt());
+
+        compareExecutions(
+                runtimeVertex.getCurrentExecutions(), archivedVertex.getCurrentExecutions());
+    }
+
+    private static <RT extends AccessExecution, AT extends AccessExecution> void compareExecutions(
+            Collection<RT> runtimeExecutions, Collection<AT> archivedExecutions) {
+        assertThat(runtimeExecutions).hasSameSizeAs(archivedExecutions);
+
+        List<RT> sortedRuntimeExecutions = new ArrayList<>(runtimeExecutions);
+        List<AT> sortedArchivedExecutions = new ArrayList<>(archivedExecutions);
+        sortedRuntimeExecutions.sort(Comparator.comparingInt(AccessExecution::getAttemptNumber));
+        sortedArchivedExecutions.sort(Comparator.comparingInt(AccessExecution::getAttemptNumber));
+
+        for (int i = 0; i < runtimeExecutions.size(); i++) {
+            compareExecution(sortedRuntimeExecutions.get(i), sortedArchivedExecutions.get(i));
+        }
     }
 
     private static void compareExecution(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertexWithSpeculativeExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertexWithSpeculativeExecutionTest.java
new file mode 100644
index 00000000000..c9e83882b1c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertexWithSpeculativeExecutionTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.TestingInternalFailuresListener;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests for the {@link ArchivedExecutionVertex} created from a {@link SpeculativeExecutionVertex}.
+ */
+class ArchivedExecutionVertexWithSpeculativeExecutionTest {
+
+    @RegisterExtension
+    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
+
+    private TestingInternalFailuresListener internalFailuresListener;
+
+    @BeforeEach
+    void setUp() {
+        internalFailuresListener = new TestingInternalFailuresListener();
+    }
+
+    @Test
+    void testCreateSpeculativeExecution() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testResetExecutionVertex() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        e1.transitionState(ExecutionState.RUNNING);
+        e1.markFinished();
+        e2.cancel();
+        ev.resetForNewExecution();
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testCancel() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        ev.createNewSpeculativeExecution(System.currentTimeMillis());
+        ev.cancel();
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testSuspend() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        ev.createNewSpeculativeExecution(System.currentTimeMillis());
+        ev.suspend();
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testFail() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        ev.createNewSpeculativeExecution(System.currentTimeMillis());
+        ev.fail(new Exception("Forced test failure."));
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testMarkFailed() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        ev.createNewSpeculativeExecution(System.currentTimeMillis());
+        ev.markFailed(new Exception("Forced test failure."));
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testVertexTerminationAndJobTermination() throws Exception {
+        final JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
+        final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(jobVertex);
+        final ExecutionGraph eg = createExecutionGraph(jobGraph);
+        eg.transitionToRunning();
+
+        ExecutionJobVertex jv = eg.getJobVertex(jobVertex.getID());
+        assertThat(jv).isNotNull();
+        final SpeculativeExecutionVertex ev = (SpeculativeExecutionVertex) jv.getTaskVertices()[0];
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        e1.transitionState(ExecutionState.RUNNING);
+        e1.markFinished();
+        e2.cancel();
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testArchiveFailedExecutions() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        e1.transitionState(ExecutionState.RUNNING);
+
+        final Execution e2 = ev.createNewSpeculativeExecution(0);
+        e2.transitionState(ExecutionState.FAILED);
+        ev.archiveFailedExecution(e2.getAttemptId());
+
+        final Execution e3 = ev.createNewSpeculativeExecution(0);
+        e3.transitionState(ExecutionState.RUNNING);
+        e1.transitionState(ExecutionState.FAILED);
+        ev.archiveFailedExecution(e1.getAttemptId());
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testArchiveTheOnlyCurrentExecution() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        e1.transitionState(ExecutionState.FAILED);
+
+        ev.archiveFailedExecution(e1.getAttemptId());
+
+        ArchivedExecutionVertex aev = ev.archive();
+        ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testGetExecutionState() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        e1.transitionState(ExecutionState.CANCELED);
+
+        // the latter added state is more likely to reach FINISH state
+        final List<ExecutionState> statesSortedByPriority = new ArrayList<>();
+        statesSortedByPriority.add(ExecutionState.FAILED);
+        statesSortedByPriority.add(ExecutionState.CANCELING);
+        statesSortedByPriority.add(ExecutionState.CREATED);
+        statesSortedByPriority.add(ExecutionState.SCHEDULED);
+        statesSortedByPriority.add(ExecutionState.DEPLOYING);
+        statesSortedByPriority.add(ExecutionState.INITIALIZING);
+        statesSortedByPriority.add(ExecutionState.RUNNING);
+        statesSortedByPriority.add(ExecutionState.FINISHED);
+
+        for (ExecutionState state : statesSortedByPriority) {
+            final Execution execution = ev.createNewSpeculativeExecution(0);
+            execution.transitionState(state);
+
+            // Check the AchievedExecutionVertex in each state.
+            ArchivedExecutionVertex aev = ev.archive();
+            ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+        }
+    }
+
+    private SpeculativeExecutionVertex createSpeculativeExecutionVertex() throws Exception {
+        final JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
+        final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(jobVertex);
+        final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
+        ExecutionJobVertex jv = executionGraph.getJobVertex(jobVertex.getID());
+        assertThat(jv).isNotNull();
+        return (SpeculativeExecutionVertex) jv.getTaskVertices()[0];
+    }
+
+    private ExecutionGraph createExecutionGraph(final JobGraph jobGraph) throws Exception {
+        final ExecutionGraph executionGraph =
+                TestingDefaultExecutionGraphBuilder.newBuilder()
+                        .setJobGraph(jobGraph)
+                        .setExecutionJobVertexFactory(new SpeculativeExecutionJobVertex.Factory())
+                        .build(EXECUTOR_RESOURCE.getExecutor());
+
+        executionGraph.setInternalTaskFailuresListener(internalFailuresListener);
+        executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+
+        return executionGraph;
+    }
+}