You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/08/01 10:30:57 UTC
[flink] 04/06: [FLINK-28588][rest] Archive all current executions in ArchivedExecutionVertex.
This is an automated email from the ASF dual-hosted git repository.
gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 44c00dbb4a083ff197442e2ce6440558be252787
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Tue Jul 26 16:39:40 2022 +0800
[FLINK-28588][rest] Archive all current executions in ArchivedExecutionVertex.
---
.../executiongraph/AccessExecutionVertex.java | 9 +
.../executiongraph/ArchivedExecutionVertex.java | 26 ++-
.../ArchivedSpeculativeExecutionVertex.java | 52 -----
.../executiongraph/SpeculativeExecutionVertex.java | 4 +-
.../ArchivedExecutionGraphTestUtils.java | 21 ++
...xecutionVertexWithSpeculativeExecutionTest.java | 223 +++++++++++++++++++++
6 files changed, 280 insertions(+), 55 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java
index 6775424ba4b..f8d4581c7e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import java.util.Collection;
import java.util.Optional;
/** Common interface for the runtime {@link ExecutionVertex} and {@link ArchivedExecutionVertex}. */
@@ -46,6 +47,14 @@ public interface AccessExecutionVertex {
*/
AccessExecution getCurrentExecutionAttempt();
+ /**
+ * Returns the current executions for this execution vertex. The returned collection must
+ * contain the current execution attempt.
+ *
+ * @return current executions
+ */
+ <T extends AccessExecution> Collection<T> getCurrentExecutions();
+
/**
* Returns the current {@link ExecutionState} for this execution vertex.
*
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
index e5feba5445a..d9f8448a702 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
@@ -18,10 +18,14 @@
package org.apache.flink.runtime.executiongraph;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Optional;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -40,15 +44,29 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa
private final ArchivedExecution currentExecution; // this field must never be null
+ private final Collection<AccessExecution> currentExecutions;
+
// ------------------------------------------------------------------------
public ArchivedExecutionVertex(ExecutionVertex vertex) {
this.subTaskIndex = vertex.getParallelSubtaskIndex();
this.executionHistory = getCopyOfExecutionHistory(vertex);
this.taskNameWithSubtask = vertex.getTaskNameWithSubtaskIndex();
- this.currentExecution = vertex.getCurrentExecutionAttempt().archive();
+
+ Execution vertexCurrentExecution = vertex.getCurrentExecutionAttempt();
+ ArrayList<AccessExecution> currentExecutionList =
+ new ArrayList<>(vertex.getCurrentExecutions().size());
+ currentExecution = vertexCurrentExecution.archive();
+ currentExecutionList.add(currentExecution);
+ for (Execution execution : vertex.getCurrentExecutions()) {
+ if (execution != vertexCurrentExecution) {
+ currentExecutionList.add(execution.archive());
+ }
+ }
+ currentExecutions = Collections.unmodifiableList(currentExecutionList);
}
+ @VisibleForTesting
public ArchivedExecutionVertex(
int subTaskIndex,
String taskNameWithSubtask,
@@ -58,6 +76,7 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa
this.taskNameWithSubtask = checkNotNull(taskNameWithSubtask);
this.currentExecution = checkNotNull(currentExecution);
this.executionHistory = checkNotNull(executionHistory);
+ this.currentExecutions = Collections.singletonList(currentExecution);
}
// --------------------------------------------------------------------------------------------
@@ -79,6 +98,11 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa
return currentExecution;
}
+ @Override
+ public Collection<AccessExecution> getCurrentExecutions() {
+ return currentExecutions;
+ }
+
@Override
public ExecutionState getExecutionState() {
return currentExecution.getState();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedSpeculativeExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedSpeculativeExecutionVertex.java
deleted file mode 100644
index 263049414c5..00000000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedSpeculativeExecutionVertex.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-/**
- * {@link ArchivedSpeculativeExecutionVertex} is a readonly representation of {@link
- * SpeculativeExecutionVertex}.
- */
-public class ArchivedSpeculativeExecutionVertex extends ArchivedExecutionVertex {
-
- private static final long serialVersionUID = 1L;
-
- public ArchivedSpeculativeExecutionVertex(SpeculativeExecutionVertex vertex) {
- super(
- vertex.getParallelSubtaskIndex(),
- vertex.getTaskNameWithSubtaskIndex(),
- vertex.getCurrentExecutionAttempt().archive(),
- getCopyOfExecutionHistory(vertex));
- }
-
- private static ExecutionHistory getCopyOfExecutionHistory(SpeculativeExecutionVertex vertex) {
- final ExecutionHistory executionHistory =
- ArchivedExecutionVertex.getCopyOfExecutionHistory(vertex);
-
- // add all the executions to the execution history except for the only admitted current
- // execution
- final Execution currentAttempt = vertex.getCurrentExecutionAttempt();
- for (Execution execution : vertex.getCurrentExecutions()) {
- if (execution.getAttemptNumber() != currentAttempt.getAttemptNumber()) {
- executionHistory.add(execution.archive());
- }
- }
-
- return executionHistory;
- }
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java
index 41ef1c0ac2f..71e28779637 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java
@@ -285,8 +285,8 @@ public class SpeculativeExecutionVertex extends ExecutionVertex {
}
@Override
- public ArchivedSpeculativeExecutionVertex archive() {
- return new ArchivedSpeculativeExecutionVertex(this);
+ public ArchivedExecutionVertex archive() {
+ return new ArchivedExecutionVertex(this);
}
@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTestUtils.java
index 55a10cd5396..fc269901b34 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTestUtils.java
@@ -24,6 +24,10 @@ import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.SerializedValue;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -68,6 +72,23 @@ class ArchivedExecutionGraphTestUtils {
compareExecution(
runtimeVertex.getCurrentExecutionAttempt(),
archivedVertex.getCurrentExecutionAttempt());
+
+ compareExecutions(
+ runtimeVertex.getCurrentExecutions(), archivedVertex.getCurrentExecutions());
+ }
+
+ private static <RT extends AccessExecution, AT extends AccessExecution> void compareExecutions(
+ Collection<RT> runtimeExecutions, Collection<AT> archivedExecutions) {
+ assertThat(runtimeExecutions).hasSameSizeAs(archivedExecutions);
+
+ List<RT> sortedRuntimeExecutions = new ArrayList<>(runtimeExecutions);
+ List<AT> sortedArchivedExecutions = new ArrayList<>(archivedExecutions);
+ sortedRuntimeExecutions.sort(Comparator.comparingInt(AccessExecution::getAttemptNumber));
+ sortedArchivedExecutions.sort(Comparator.comparingInt(AccessExecution::getAttemptNumber));
+
+ for (int i = 0; i < runtimeExecutions.size(); i++) {
+ compareExecution(sortedRuntimeExecutions.get(i), sortedArchivedExecutions.get(i));
+ }
}
private static void compareExecution(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertexWithSpeculativeExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertexWithSpeculativeExecutionTest.java
new file mode 100644
index 00000000000..c9e83882b1c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertexWithSpeculativeExecutionTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.TestingInternalFailuresListener;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests for the {@link ArchivedExecutionVertex} created from a {@link SpeculativeExecutionVertex}.
+ */
+class ArchivedExecutionVertexWithSpeculativeExecutionTest {
+
+ @RegisterExtension
+ private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
+ TestingUtils.defaultExecutorExtension();
+
+ private TestingInternalFailuresListener internalFailuresListener;
+
+ @BeforeEach
+ void setUp() {
+ internalFailuresListener = new TestingInternalFailuresListener();
+ }
+
+ @Test
+ void testCreateSpeculativeExecution() throws Exception {
+ final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+ ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+ ArchivedExecutionVertex aev = ev.archive();
+ ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+ }
+
+ @Test
+ void testResetExecutionVertex() throws Exception {
+ final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+ final Execution e1 = ev.getCurrentExecutionAttempt();
+ final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+ e1.transitionState(ExecutionState.RUNNING);
+ e1.markFinished();
+ e2.cancel();
+ ev.resetForNewExecution();
+
+ ArchivedExecutionVertex aev = ev.archive();
+ ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+ }
+
+ @Test
+ void testCancel() throws Exception {
+ final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+ ev.createNewSpeculativeExecution(System.currentTimeMillis());
+ ev.cancel();
+
+ ArchivedExecutionVertex aev = ev.archive();
+ ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+ }
+
+ @Test
+ void testSuspend() throws Exception {
+ final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+ ev.createNewSpeculativeExecution(System.currentTimeMillis());
+ ev.suspend();
+
+ ArchivedExecutionVertex aev = ev.archive();
+ ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+ }
+
+ @Test
+ void testFail() throws Exception {
+ final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+ ev.createNewSpeculativeExecution(System.currentTimeMillis());
+ ev.fail(new Exception("Forced test failure."));
+
+ ArchivedExecutionVertex aev = ev.archive();
+ ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+ }
+
+ @Test
+ void testMarkFailed() throws Exception {
+ final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+ ev.createNewSpeculativeExecution(System.currentTimeMillis());
+ ev.markFailed(new Exception("Forced test failure."));
+
+ ArchivedExecutionVertex aev = ev.archive();
+ ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+ }
+
+ @Test
+ void testVertexTerminationAndJobTermination() throws Exception {
+ final JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
+ final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(jobVertex);
+ final ExecutionGraph eg = createExecutionGraph(jobGraph);
+ eg.transitionToRunning();
+
+ ExecutionJobVertex jv = eg.getJobVertex(jobVertex.getID());
+ assertThat(jv).isNotNull();
+ final SpeculativeExecutionVertex ev = (SpeculativeExecutionVertex) jv.getTaskVertices()[0];
+ final Execution e1 = ev.getCurrentExecutionAttempt();
+ final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+ e1.transitionState(ExecutionState.RUNNING);
+ e1.markFinished();
+ e2.cancel();
+
+ ArchivedExecutionVertex aev = ev.archive();
+ ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+ }
+
+ @Test
+ void testArchiveFailedExecutions() throws Exception {
+ final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+
+ final Execution e1 = ev.getCurrentExecutionAttempt();
+ e1.transitionState(ExecutionState.RUNNING);
+
+ final Execution e2 = ev.createNewSpeculativeExecution(0);
+ e2.transitionState(ExecutionState.FAILED);
+ ev.archiveFailedExecution(e2.getAttemptId());
+
+ final Execution e3 = ev.createNewSpeculativeExecution(0);
+ e3.transitionState(ExecutionState.RUNNING);
+ e1.transitionState(ExecutionState.FAILED);
+ ev.archiveFailedExecution(e1.getAttemptId());
+
+ ArchivedExecutionVertex aev = ev.archive();
+ ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+ }
+
+ @Test
+ void testArchiveTheOnlyCurrentExecution() throws Exception {
+ final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+
+ final Execution e1 = ev.getCurrentExecutionAttempt();
+ e1.transitionState(ExecutionState.FAILED);
+
+ ev.archiveFailedExecution(e1.getAttemptId());
+
+ ArchivedExecutionVertex aev = ev.archive();
+ ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+ }
+
+ @Test
+ void testGetExecutionState() throws Exception {
+ final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+
+ final Execution e1 = ev.getCurrentExecutionAttempt();
+ e1.transitionState(ExecutionState.CANCELED);
+
+ // the latter added state is more likely to reach FINISH state
+ final List<ExecutionState> statesSortedByPriority = new ArrayList<>();
+ statesSortedByPriority.add(ExecutionState.FAILED);
+ statesSortedByPriority.add(ExecutionState.CANCELING);
+ statesSortedByPriority.add(ExecutionState.CREATED);
+ statesSortedByPriority.add(ExecutionState.SCHEDULED);
+ statesSortedByPriority.add(ExecutionState.DEPLOYING);
+ statesSortedByPriority.add(ExecutionState.INITIALIZING);
+ statesSortedByPriority.add(ExecutionState.RUNNING);
+ statesSortedByPriority.add(ExecutionState.FINISHED);
+
+ for (ExecutionState state : statesSortedByPriority) {
+ final Execution execution = ev.createNewSpeculativeExecution(0);
+ execution.transitionState(state);
+
+ // Check the AchievedExecutionVertex in each state.
+ ArchivedExecutionVertex aev = ev.archive();
+ ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev);
+ }
+ }
+
+ private SpeculativeExecutionVertex createSpeculativeExecutionVertex() throws Exception {
+ final JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
+ final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(jobVertex);
+ final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
+ ExecutionJobVertex jv = executionGraph.getJobVertex(jobVertex.getID());
+ assertThat(jv).isNotNull();
+ return (SpeculativeExecutionVertex) jv.getTaskVertices()[0];
+ }
+
+ private ExecutionGraph createExecutionGraph(final JobGraph jobGraph) throws Exception {
+ final ExecutionGraph executionGraph =
+ TestingDefaultExecutionGraphBuilder.newBuilder()
+ .setJobGraph(jobGraph)
+ .setExecutionJobVertexFactory(new SpeculativeExecutionJobVertex.Factory())
+ .build(EXECUTOR_RESOURCE.getExecutor());
+
+ executionGraph.setInternalTaskFailuresListener(internalFailuresListener);
+ executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+
+ return executionGraph;
+ }
+}