You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/21 08:19:41 UTC

[GitHub] [flink] zhuzhurk commented on a diff in pull request #20296: [FLINK-28588] Enhance REST API for Speculative Execution

zhuzhurk commented on code in PR #20296:
URL: https://github.com/apache/flink/pull/20296#discussion_r926285695


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java:
##########
@@ -28,14 +28,18 @@ public class ResourceOverview implements Serializable {
     private static final long serialVersionUID = 7618746920569224557L;
 
     private static final ResourceOverview EMPTY_RESOURCE_OVERVIEW =
-            new ResourceOverview(0, 0, 0, ResourceProfile.ZERO, ResourceProfile.ZERO);
+            new ResourceOverview(0, 0, 0, 0, 0, ResourceProfile.ZERO, ResourceProfile.ZERO);
 
     private final int numberTaskManagers;
 
     private final int numberRegisteredSlots;
 
     private final int numberFreeSlots;
 
+    private final int numberBlockedTaskManagaers;

Review Comment:
   numberBlockedTaskManagaers -> numberBlockedTaskManagers



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java:
##########
@@ -65,6 +73,14 @@ public int getNumberFreeSlots() {
         return numberFreeSlots;
     }
 
+    public int getNumberBlockedTaskManagaers() {

Review Comment:
   getNumberBlockedTaskManagaers -> getNumberBlockedTaskManagers



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java:
##########
@@ -40,13 +43,25 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa
 
     private final ArchivedExecution currentExecution; // this field must never be null
 
+    private final Collection<AccessExecution> currentExecutions;
+
     // ------------------------------------------------------------------------
 
     public ArchivedExecutionVertex(ExecutionVertex vertex) {
         this.subTaskIndex = vertex.getParallelSubtaskIndex();
         this.executionHistory = getCopyOfExecutionHistory(vertex);
         this.taskNameWithSubtask = vertex.getTaskNameWithSubtaskIndex();
-        this.currentExecution = vertex.getCurrentExecutionAttempt().archive();
+
+        Execution vertexCurrentExecution = vertex.getCurrentExecutionAttempt();
+        assert vertex.getCurrentExecutions().contains(vertexCurrentExecution);
+        currentExecutions = new ArrayList<>(vertex.getCurrentExecutions().size());
+        currentExecution = vertexCurrentExecution.archive();
+        currentExecutions.add(currentExecution);
+        for (Execution execution : vertex.getCurrentExecutions()) {
+            if (execution != vertexCurrentExecution) {
+                currentExecutions.add(execution.archive());
+            }
+        }
     }
 
     public ArchivedExecutionVertex(

Review Comment:
   Let's annotate it as `VisibleForTesting` because now it is only used for testing purposes.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java:
##########
@@ -46,6 +47,8 @@ public interface AccessExecutionVertex {
      */
     AccessExecution getCurrentExecutionAttempt();
 
+    <T extends AccessExecution> Collection<T> getCurrentExecutions();

Review Comment:
   A java doc should be added for it.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java:
##########
@@ -64,6 +66,7 @@ public TaskManagerDetailsInfo(
             @JsonProperty(FIELD_NAME_AVAILABLE_RESOURCE) ResourceProfileInfo freeResource,
             @JsonProperty(FIELD_NAME_HARDWARE) HardwareDescription hardwareDescription,
             @JsonProperty(FIELD_NAME_MEMORY) TaskExecutorMemoryConfiguration memoryConfiguration,
+            @JsonProperty(FIELD_NAME_BLOCKED) @Nullable Boolean blocked,

Review Comment:
   Maybe it can just be a `boolean`? See the above comment.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java:
##########
@@ -103,4 +105,33 @@ public void testJobDetailsCompatibleUnmarshalling() throws IOException {
 
         assertEquals(expected, unmarshalled);
     }
+
+    @Test
+    public void testJobDetailsWithExecutionAttemptsMarshalling() throws JsonProcessingException {
+        Map<String, Map<Integer, Integer>> currentExecutionAttempts = new HashMap<>();
+        currentExecutionAttempts.computeIfAbsent("a", k -> new HashMap<>()).put(1, 2);
+        currentExecutionAttempts.computeIfAbsent("a", k -> new HashMap<>()).put(2, 4);
+        currentExecutionAttempts.computeIfAbsent("b", k -> new HashMap<>()).put(3, 1);
+
+        final JobDetails expected =
+                new JobDetails(
+                        new JobID(),
+                        "foobar",
+                        1L,
+                        10L,
+                        9L,
+                        JobStatus.RUNNING,
+                        8L,
+                        new int[] {1, 3, 3, 4, 7, 4, 2, 7, 3, 3},
+                        42,
+                        currentExecutionAttempts);
+
+        final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
+
+        final JsonNode marshalled = objectMapper.valueToTree(expected);
+
+        final JobDetails unmarshalled = objectMapper.treeToValue(marshalled, JobDetails.class);
+
+        assertEquals(expected, unmarshalled);

Review Comment:
   Flink is migrating to JUnit 5 so that all the new tests should use `JUnit5` and `assertJ`.
   You can migrate the existing tests to JUnit5 in a  hotfix commit before making functional changes.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java:
##########
@@ -28,14 +28,18 @@ public class ResourceOverview implements Serializable {
     private static final long serialVersionUID = 7618746920569224557L;
 
     private static final ResourceOverview EMPTY_RESOURCE_OVERVIEW =
-            new ResourceOverview(0, 0, 0, ResourceProfile.ZERO, ResourceProfile.ZERO);
+            new ResourceOverview(0, 0, 0, 0, 0, ResourceProfile.ZERO, ResourceProfile.ZERO);
 
     private final int numberTaskManagers;
 
     private final int numberRegisteredSlots;
 
     private final int numberFreeSlots;
 
+    private final int numberBlockedTaskManagaers;
+
+    private final int numberBlockedSlots;

Review Comment:
   I think `numberBlockedFreeSlots` would be more accurate and easier to understand.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterOverviewWithVersionTest.java:
##########
@@ -31,6 +31,6 @@ protected Class<ClusterOverviewWithVersion> getTestResponseClass() {
 
     @Override
     protected ClusterOverviewWithVersion getTestResponseInstance() {
-        return new ClusterOverviewWithVersion(1, 3, 3, 7, 4, 2, 0, "version", "commit");
+        return new ClusterOverviewWithVersion(2, 6, 3, 1, 3, 7, 4, 2, 0, "version", "commit");

Review Comment:
   It's better to keep the other params as is and just add numbers of the newly introduced params.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java:
##########
@@ -142,22 +142,37 @@ public static class TaskQueryScopeInfo extends QueryScopeInfo {
         public final String jobID;
         public final String vertexID;
         public final int subtaskIndex;
+        public final int attemptNum;
 
         public TaskQueryScopeInfo(String jobID, String vertexid, int subtaskIndex) {

Review Comment:
   Looks to me there are not many occurrences of the invocations of the constructors. So I prefer to change the original constructors and fix the tests, instead of adding new ones. 
   
   Besides that, the default attemptNumber should be 0 instead of -1.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java:
##########
@@ -671,7 +674,8 @@ public CompletableFuture<TaskManagerInfoWithSlots> requestTaskManagerDetailsInfo
                                     slotManager.getRegisteredResourceOf(instanceId),
                                     slotManager.getFreeResourceOf(instanceId),
                                     taskExecutor.getHardwareDescription(),
-                                    taskExecutor.getMemoryConfiguration()),
+                                    taskExecutor.getMemoryConfiguration(),
+                                    blocked),

Review Comment:
   nit: can invoke the `isBlockedTaskManager` here, just like others. This helps the code to look better.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java:
##########
@@ -630,6 +630,7 @@ public CompletableFuture<Collection<TaskManagerInfo>> requestTaskManagerInfo(Tim
             final ResourceID resourceId = taskExecutorEntry.getKey();
             final WorkerRegistration<WorkerType> taskExecutor = taskExecutorEntry.getValue();
 
+            boolean blocked = blocklistHandler.isBlockedTaskManager(taskExecutor.getResourceID());

Review Comment:
   It's better to make local variables `final`, if possible.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java:
##########
@@ -79,6 +95,11 @@ public ArchivedExecution getCurrentExecutionAttempt() {
         return currentExecution;
     }
 
+    @Override
+    public Collection<AccessExecution> getCurrentExecutions() {
+        return currentExecutions;

Review Comment:
   ```suggestion
           return Collections.unmodifiableCollection(currentExecutions);
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java:
##########
@@ -142,22 +142,37 @@ public static class TaskQueryScopeInfo extends QueryScopeInfo {
         public final String jobID;
         public final String vertexID;
         public final int subtaskIndex;
+        public final int attemptNum;

Review Comment:
   I would suggest to name it as `attemptNumber`, which is commonly used in a lot of places already.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java:
##########
@@ -125,6 +136,7 @@ public TaskManagerInfo(
         this.freeResource = freeResource;
         this.hardwareDescription = Preconditions.checkNotNull(hardwareDescription);
         this.memoryConfiguration = Preconditions.checkNotNull(memoryConfiguration);
+        this.blocked = blocked != null && blocked;

Review Comment:
   I prefer to add a bracket here to clearly show the order of the result computation.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java:
##########
@@ -495,6 +496,10 @@ protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initiali
         final SubtaskMetricsHandler subtaskMetricsHandler =
                 new SubtaskMetricsHandler(leaderRetriever, timeout, responseHeaders, metricFetcher);
 
+        final SubtaskAttemptMetricsHandler subtaskAttemptMetricsHandler =

Review Comment:
   I think this REST API is not in the scope of this FLIP. Therefore, I think we should remove it.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java:
##########
@@ -40,13 +43,25 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa
 
     private final ArchivedExecution currentExecution; // this field must never be null
 
+    private final Collection<AccessExecution> currentExecutions;
+
     // ------------------------------------------------------------------------
 
     public ArchivedExecutionVertex(ExecutionVertex vertex) {
         this.subTaskIndex = vertex.getParallelSubtaskIndex();
         this.executionHistory = getCopyOfExecutionHistory(vertex);
         this.taskNameWithSubtask = vertex.getTaskNameWithSubtaskIndex();
-        this.currentExecution = vertex.getCurrentExecutionAttempt().archive();
+
+        Execution vertexCurrentExecution = vertex.getCurrentExecutionAttempt();
+        assert vertex.getCurrentExecutions().contains(vertexCurrentExecution);

Review Comment:
   Better to use `Preconditions.checkState`.
   
   And it's better to check it in a more performant way, like:
   ```
   checkState(vertexCurrentExecution == vertex.getCurrentExecution(vertexCurrentExecution.getAttemptNumber));
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java:
##########
@@ -113,7 +123,8 @@ public TaskManagerInfo(
             @JsonProperty(FIELD_NAME_TOTAL_RESOURCE) ResourceProfileInfo totalResource,
             @JsonProperty(FIELD_NAME_AVAILABLE_RESOURCE) ResourceProfileInfo freeResource,
             @JsonProperty(FIELD_NAME_HARDWARE) HardwareDescription hardwareDescription,
-            @JsonProperty(FIELD_NAME_MEMORY) TaskExecutorMemoryConfiguration memoryConfiguration) {
+            @JsonProperty(FIELD_NAME_MEMORY) TaskExecutorMemoryConfiguration memoryConfiguration,
+            @JsonProperty(FIELD_NAME_BLOCKED) @Nullable Boolean blocked) {

Review Comment:
   Why declaring `blocked` to be a `@Nullable Boolean` if we finally will turn it into a `boolean` and exclude it if it is false?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java:
##########
@@ -174,23 +189,44 @@ public static class OperatorQueryScopeInfo extends QueryScopeInfo {
         public final String jobID;
         public final String vertexID;
         public final int subtaskIndex;
+        public final int attemptNum;
         public final String operatorName;
 
         public OperatorQueryScopeInfo(

Review Comment:
   Looks to me there are not many occurrences of the invocations of the constructors. So I prefer to change the original constructors and fix the tests, instead of adding new ones. 



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertexWithSpeculativeExecutionTest.java:
##########
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.TestingInternalFailuresListener;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Tests for the {@link SpeculativeExecutionVertex}. */
+class ArchivedExecutionVertexWithSpeculativeExecutionTest {
+
+    @RegisterExtension
+    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
+
+    private TestingInternalFailuresListener internalFailuresListener;
+
+    @BeforeEach
+    void setUp() {
+        internalFailuresListener = new TestingInternalFailuresListener();
+    }
+
+    @Test
+    void testCreateSpeculativeExecution() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        assertThat(ev.getCurrentExecutions()).hasSize(1);
+
+        ev.createNewSpeculativeExecution(System.currentTimeMillis());
+        assertThat(ev.getCurrentExecutions()).hasSize(2);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testResetExecutionVertex() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        e1.transitionState(ExecutionState.RUNNING);
+        e1.markFinished();
+        e2.cancel();
+        ev.resetForNewExecution();
+
+        assertThat(
+                        ev.getExecutionHistory()
+                                .getHistoricalExecution(0)
+                                .orElseThrow(NullPointerException::new)
+                                .getAttemptId())
+                .isEqualTo(e1.getAttemptId());
+        assertThat(
+                        ev.getExecutionHistory()
+                                .getHistoricalExecution(1)
+                                .orElseThrow(NullPointerException::new)
+                                .getAttemptId())
+                .isEqualTo(e2.getAttemptId());
+        assertThat(ev.getCurrentExecutions()).hasSize(1);
+        assertThat(ev.getCurrentExecutionAttempt().getAttemptNumber()).isEqualTo(2);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testCancel() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        ev.cancel();
+        assertThat(e1.getState()).isSameAs(ExecutionState.CANCELED);
+        assertThat(e2.getState()).isSameAs(ExecutionState.CANCELED);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testSuspend() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        ev.suspend();
+        assertThat(e1.getState()).isSameAs(ExecutionState.CANCELED);
+        assertThat(e2.getState()).isSameAs(ExecutionState.CANCELED);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testFail() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        ev.fail(new Exception("Forced test failure."));
+        assertThat(internalFailuresListener.getFailedTasks())
+                .containsExactly(e1.getAttemptId(), e2.getAttemptId());
+
+        ArchivedExecutionVertex aev = ev.archive();
+        compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testMarkFailed() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        ev.markFailed(new Exception("Forced test failure."));
+        assertThat(internalFailuresListener.getFailedTasks())
+                .containsExactly(e1.getAttemptId(), e2.getAttemptId());
+
+        ArchivedExecutionVertex aev = ev.archive();
+        compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testVertexTerminationAndJobTermination() throws Exception {
+        final JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
+        final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(jobVertex);
+        final ExecutionGraph eg = createExecutionGraph(jobGraph);
+        eg.transitionToRunning();
+
+        ExecutionJobVertex jv = eg.getJobVertex(jobVertex.getID());
+        assert jv != null;
+        final SpeculativeExecutionVertex ev = (SpeculativeExecutionVertex) jv.getTaskVertices()[0];
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+        final CompletableFuture<?> terminationFuture = ev.getTerminationFuture();
+
+        e1.transitionState(ExecutionState.RUNNING);
+        e1.markFinished();
+        assertThat(terminationFuture.isDone()).isFalse();
+        assertThat(eg.getState()).isSameAs(JobStatus.RUNNING);
+
+        e2.cancel();
+        assertThat(terminationFuture.isDone()).isTrue();
+        assertThat(eg.getState()).isSameAs(JobStatus.FINISHED);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testArchiveFailedExecutions() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        e1.transitionState(ExecutionState.RUNNING);
+
+        final Execution e2 = ev.createNewSpeculativeExecution(0);
+        e2.transitionState(ExecutionState.FAILED);
+
+        ev.archiveFailedExecution(e2.getAttemptId());
+        assertThat(ev.getCurrentExecutions()).hasSize(1);
+        assertThat(ev.currentExecution).isSameAs(e1);
+
+        final Execution e3 = ev.createNewSpeculativeExecution(0);
+        e3.transitionState(ExecutionState.RUNNING);
+        e1.transitionState(ExecutionState.FAILED);
+
+        ev.archiveFailedExecution(e1.getAttemptId());
+        assertThat(ev.getCurrentExecutions()).hasSize(1);
+        assertThat(ev.currentExecution).isSameAs(e3);
+
+        ArchivedExecutionVertex aev = ev.archive();
+        compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testArchiveTheOnlyCurrentExecution() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        e1.transitionState(ExecutionState.FAILED);
+
+        ev.archiveFailedExecution(e1.getAttemptId());
+
+        ArchivedExecutionVertex aev = ev.archive();
+        compareExecutionVertex(ev, aev);
+    }
+
+    @Test
+    void testGetExecutionState() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        e1.transitionState(ExecutionState.CANCELED);
+
+        // the latter added state is more likely to reach FINISH state
+        final List<ExecutionState> statesSortedByPriority = new ArrayList<>();
+        statesSortedByPriority.add(ExecutionState.FAILED);
+        statesSortedByPriority.add(ExecutionState.CANCELING);
+        statesSortedByPriority.add(ExecutionState.CREATED);
+        statesSortedByPriority.add(ExecutionState.SCHEDULED);
+        statesSortedByPriority.add(ExecutionState.DEPLOYING);
+        statesSortedByPriority.add(ExecutionState.INITIALIZING);
+        statesSortedByPriority.add(ExecutionState.RUNNING);
+        statesSortedByPriority.add(ExecutionState.FINISHED);
+
+        for (ExecutionState state : statesSortedByPriority) {
+            final Execution execution = ev.createNewSpeculativeExecution(0);
+            execution.transitionState(state);
+        }
+
+        ArchivedExecutionVertex aev = ev.archive();
+        compareExecutionVertex(ev, aev);
+    }
+
+    private SpeculativeExecutionVertex createSpeculativeExecutionVertex() throws Exception {
+        final JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
+        final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(jobVertex);
+        final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
+        ExecutionJobVertex jv = executionGraph.getJobVertex(jobVertex.getID());
+        assert jv != null;
+        return (SpeculativeExecutionVertex) jv.getTaskVertices()[0];
+    }
+
+    private ExecutionGraph createExecutionGraph(final JobGraph jobGraph) throws Exception {
+        final ExecutionGraph executionGraph =
+                TestingDefaultExecutionGraphBuilder.newBuilder()
+                        .setJobGraph(jobGraph)
+                        .setExecutionJobVertexFactory(new SpeculativeExecutionJobVertex.Factory())
+                        .build(EXECUTOR_RESOURCE.getExecutor());
+
+        executionGraph.setInternalTaskFailuresListener(internalFailuresListener);
+        executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+
+        return executionGraph;
+    }
+
+    private static void compareExecutionVertex(
+            AccessExecutionVertex runtimeVertex, AccessExecutionVertex archivedVertex) {
+        assertEquals(

Review Comment:
   JUnit assertions should be avoided. See https://flink.apache.org/contributing/code-style-and-quality-common.html#testing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org