You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "zentol (via GitHub)" <gi...@apache.org> on 2023/04/06 13:20:11 UTC

[GitHub] [flink] zentol opened a new pull request, #22364: [FLINK-31744] Include JobVertex info in sparse EG

zentol opened a new pull request, #22364:
URL: https://github.com/apache/flink/pull/22364

   Include all JobVertex-level information in the sparse execution graph, created by the Adaptive Scheduler when in the WaitingForResources state, like the vertex id/name and the maxParallelism.
   
   This removes the need for the recently introduced `JobMasterGaterway#getMaxParallelismPerVertex`. I propose to remove this method.
   We could keep it though if we are concerned that the creation of the sparse EG is too expensive.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #22364: [FLINK-31744] Include JobVertex info in sparse EG

Posted by "flinkbot (via GitHub)" <gi...@apache.org>.
flinkbot commented on PR #22364:
URL: https://github.com/apache/flink/pull/22364#issuecomment-1499063492

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6109c840dde0f7a435d5076081c651d901272704",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6109c840dde0f7a435d5076081c651d901272704",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6109c840dde0f7a435d5076081c651d901272704 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dmvk commented on a diff in pull request #22364: [FLINK-31744] Include JobVertex info in sparse EG

Posted by "dmvk (via GitHub)" <gi...@apache.org>.
dmvk commented on code in PR #22364:
URL: https://github.com/apache/flink/pull/22364#discussion_r1162826386


##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java:
##########
@@ -195,6 +195,34 @@ void testArchivedCheckpointingSettingsNotNullIfCheckpointingIsEnabled() throws E
         ArchivedExecutionGraphTest.assertContainsCheckpointSettings(archivedExecutionGraph);
     }
 
+    @Test
+    void testArchivedJobVerticesPresent() throws Exception {
+        final JobGraph jobGraph = createJobGraph();
+        jobGraph.setSnapshotSettings(
+                new JobCheckpointingSettings(
+                        CheckpointCoordinatorConfiguration.builder().build(), null));
+
+        final ArchivedExecutionGraph archivedExecutionGraph =
+                new AdaptiveSchedulerBuilder(jobGraph, mainThreadExecutor)
+                        .build(EXECUTOR_RESOURCE.getExecutor())
+                        .getArchivedExecutionGraph(JobStatus.INITIALIZING, null);
+
+        ArchivedExecutionJobVertex jobVertex =
+                archivedExecutionGraph.getJobVertex(JOB_VERTEX.getID());
+        assertThat(jobVertex)
+                .isNotNull()
+                .satisfies(
+                        archived -> {
+                            assertThat(archived.getParallelism())
+                                    .isEqualTo(JOB_VERTEX.getParallelism());
+                            // JOB_VERTEX.maxP == -1, but we want the actual maxP determined by the
+                            // scheduler
+                            assertThat(archived.getMaxParallelism()).isEqualTo(128);
+                        });

Review Comment:
   nit:
   ```suggestion
           assertThat(jobVertex).isNotNull();
           assertThat(jobVertex.getParallelism()).isEqualTo(JOB_VERTEX.getParallelism());
           // JOB_VERTEX.maxP == -1, but we want the actual maxP determined by the scheduler
           assertThat(jobVertex.getMaxParallelism()).isEqualTo(128);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol merged pull request #22364: [FLINK-31744] Include JobVertex info in sparse EG

Posted by "zentol (via GitHub)" <gi...@apache.org>.
zentol merged PR #22364:
URL: https://github.com/apache/flink/pull/22364


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on pull request #22364: [FLINK-31744] Include JobVertex info in sparse EG

Posted by "zentol (via GitHub)" <gi...@apache.org>.
zentol commented on PR #22364:
URL: https://github.com/apache/flink/pull/22364#issuecomment-1503178156

   Added tests, which also revealed a bug in the archiving :see_no_evil: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dmvk commented on a diff in pull request #22364: [FLINK-31744] Include JobVertex info in sparse EG

Posted by "dmvk (via GitHub)" <gi...@apache.org>.
dmvk commented on code in PR #22364:
URL: https://github.com/apache/flink/pull/22364#discussion_r1162427721


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java:
##########
@@ -362,8 +367,59 @@ public static ArchivedExecutionGraph createSparseArchivedExecutionGraph(
             @Nullable Throwable throwable,
             @Nullable JobCheckpointingSettings checkpointingSettings,
             long initializationTimestamp) {
+        return createSparseArchivedExecutionGraph(
+                jobId,
+                jobName,
+                jobStatus,
+                Collections.emptyList(),
+                throwable,
+                checkpointingSettings,
+                initializationTimestamp);
+    }
+
+    public static ArchivedExecutionGraph createSparseArchivedExecutionGraphWithJobVertices(
+            JobID jobId,
+            String jobName,
+            JobStatus jobStatus,
+            @Nullable Throwable throwable,
+            @Nullable JobCheckpointingSettings checkpointingSettings,
+            long initializationTimestamp,
+            Iterable<JobVertex> jobVertices,
+            VertexParallelismStore initialParallelismStore) {
+        List<ArchivedExecutionJobVertex> archivedVerticesInCreationOrder = new ArrayList<>();
+        for (JobVertex jobVertex : jobVertices) {
+            VertexParallelismInformation parallelismInfo =

Review Comment:
   
   
   
   
   nit:
   ```suggestion
           final List<ArchivedExecutionJobVertex> archivedVerticesInCreationOrder = new ArrayList<>();
           for (JobVertex jobVertex : jobVertices) {
               final VertexParallelismInformation parallelismInfo =
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org