You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "zentol (via GitHub)" <gi...@apache.org> on 2023/03/23 13:21:19 UTC

[GitHub] [flink] zentol opened a new pull request, #22251: [FLINK-31591] JobGraphWriter persists JobResourceRequirements

zentol opened a new pull request, #22251:
URL: https://github.com/apache/flink/pull/22251

   Extend the JGWriter to persist the JRRequirements in the JobGraph.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #22251: [FLINK-31591] JobGraphWriter persists JobResourceRequirements

Posted by "rkhachatryan (via GitHub)" <gi...@apache.org>.
rkhachatryan commented on code in PR #22251:
URL: https://github.com/apache/flink/pull/22251#discussion_r1147476097


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java:
##########
@@ -241,6 +243,22 @@ public void putJobGraph(JobGraph jobGraph) throws Exception {
         LOG.info("Added {} to {}.", jobGraph, jobGraphStateHandleStore);
     }
 
+    @Override
+    public void putJobResourceRequirements(
+            JobID jobId, JobResourceRequirements jobResourceRequirements) throws Exception {
+        synchronized (lock) {
+            @Nullable final JobGraph jobGraph = recoverJobGraph(jobId);
+            if (jobGraph == null) {
+                throw new FileNotFoundException(
+                        String.format(
+                                "JobGraph for job [%s] was not found in JobGraphStore and is needed for attaching JobResourceRequirements.",
+                                jobId));
+            }
+            JobResourceRequirements.writeToJobGraph(jobGraph, jobResourceRequirements);
+            putJobGraph(jobGraph);

Review Comment:
   Per my understanding, in the worst case, a contender JM **_might_** discover that it's not the leader anymore while adding checkpoint to ZK/etcd.
   
   CheckpointCounters use `getAndIncrement` for new checkpoints.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #22251: [FLINK-31591] JobGraphWriter persists JobResourceRequirements

Posted by "flinkbot (via GitHub)" <gi...@apache.org>.
flinkbot commented on PR #22251:
URL: https://github.com/apache/flink/pull/22251#issuecomment-1481190173

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a86ec3d522175fe13e69a6262a8fa39853817d7f",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a86ec3d522175fe13e69a6262a8fa39853817d7f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a86ec3d522175fe13e69a6262a8fa39853817d7f UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #22251: [FLINK-31591] JobGraphWriter persists JobResourceRequirements

Posted by "zentol (via GitHub)" <gi...@apache.org>.
zentol commented on code in PR #22251:
URL: https://github.com/apache/flink/pull/22251#discussion_r1147432019


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java:
##########
@@ -241,6 +243,22 @@ public void putJobGraph(JobGraph jobGraph) throws Exception {
         LOG.info("Added {} to {}.", jobGraph, jobGraphStateHandleStore);
     }
 
+    @Override
+    public void putJobResourceRequirements(
+            JobID jobId, JobResourceRequirements jobResourceRequirements) throws Exception {
+        synchronized (lock) {
+            @Nullable final JobGraph jobGraph = recoverJobGraph(jobId);
+            if (jobGraph == null) {
+                throw new FileNotFoundException(
+                        String.format(
+                                "JobGraph for job [%s] was not found in JobGraphStore and is needed for attaching JobResourceRequirements.",
+                                jobId));
+            }
+            JobResourceRequirements.writeToJobGraph(jobGraph, jobResourceRequirements);
+            putJobGraph(jobGraph);

Review Comment:
   A contender JM shouldn't do any writes.
   I think that's a general assumption that we're making, also for things like checkpoint counters etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol merged pull request #22251: [FLINK-31591] JobGraphWriter persists JobResourceRequirements

Posted by "zentol (via GitHub)" <gi...@apache.org>.
zentol merged PR #22251:
URL: https://github.com/apache/flink/pull/22251


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #22251: [FLINK-31591] JobGraphWriter persists JobResourceRequirements

Posted by "rkhachatryan (via GitHub)" <gi...@apache.org>.
rkhachatryan commented on code in PR #22251:
URL: https://github.com/apache/flink/pull/22251#discussion_r1147456868


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java:
##########
@@ -241,6 +243,22 @@ public void putJobGraph(JobGraph jobGraph) throws Exception {
         LOG.info("Added {} to {}.", jobGraph, jobGraphStateHandleStore);
     }
 
+    @Override
+    public void putJobResourceRequirements(
+            JobID jobId, JobResourceRequirements jobResourceRequirements) throws Exception {
+        synchronized (lock) {

Review Comment:
   Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #22251: [FLINK-31591] JobGraphWriter persists JobResourceRequirements

Posted by "rkhachatryan (via GitHub)" <gi...@apache.org>.
rkhachatryan commented on code in PR #22251:
URL: https://github.com/apache/flink/pull/22251#discussion_r1147384435


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java:
##########
@@ -241,6 +243,22 @@ public void putJobGraph(JobGraph jobGraph) throws Exception {
         LOG.info("Added {} to {}.", jobGraph, jobGraphStateHandleStore);
     }
 
+    @Override
+    public void putJobResourceRequirements(
+            JobID jobId, JobResourceRequirements jobResourceRequirements) throws Exception {
+        synchronized (lock) {
+            @Nullable final JobGraph jobGraph = recoverJobGraph(jobId);
+            if (jobGraph == null) {
+                throw new FileNotFoundException(

Review Comment:
   I think `java.util.NoSuchElementException` would be more appropriate here because it doesn't necessarily involves files.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java:
##########
@@ -368,6 +376,61 @@ public void testLocalCleanupShouldReleaseHandle() throws Exception {
         assertThat(actual, is(testingJobGraph.getJobID().toString()));
     }
 
+    @Test
+    public void testRecoverPersistedJobResourceRequirements() throws Exception {
+        final Map<String, RetrievableStateHandle<JobGraph>> handles = new HashMap<>();
+        final TestingStateHandleStore<JobGraph> stateHandleStore =
+                builder.setAddFunction(
+                                (key, state) -> {
+                                    final RetrievableStateHandle<JobGraph> handle =
+                                            jobGraphStorageHelper.store(state);
+                                    handles.put(key, handle);
+                                    return handle;
+                                })
+                        .setGetFunction(
+                                key -> {
+                                    final RetrievableStateHandle<JobGraph> handle =
+                                            handles.get(key);
+                                    if (handle != null) {
+                                        return handle;
+                                    }
+                                    throw new StateHandleStore.NotExistException("Does not exist.");
+                                })
+                        .build();
+
+        final JobResourceRequirements jobResourceRequirements =
+                JobResourceRequirements.newBuilder()
+                        .setParallelismForJobVertex(new JobVertexID(), 1, 1)
+                        .build();
+
+        final JobGraphStore jobGraphStore = createAndStartJobGraphStore(stateHandleStore);
+        jobGraphStore.putJobGraph(testingJobGraph);
+        jobGraphStore.putJobResourceRequirements(
+                testingJobGraph.getJobID(), jobResourceRequirements);
+
+        final Optional<JobResourceRequirements> maybeRecovered =
+                JobResourceRequirements.readFromJobGraph(
+                        Objects.requireNonNull(
+                                jobGraphStore.recoverJobGraph(testingJobGraph.getJobID())));
+        Assertions.assertThat(maybeRecovered).get().isEqualTo(jobResourceRequirements);

Review Comment:
   Maybe also check that new resource requirements overwrite already written ones?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java:
##########
@@ -241,6 +243,22 @@ public void putJobGraph(JobGraph jobGraph) throws Exception {
         LOG.info("Added {} to {}.", jobGraph, jobGraphStateHandleStore);
     }
 
+    @Override
+    public void putJobResourceRequirements(
+            JobID jobId, JobResourceRequirements jobResourceRequirements) throws Exception {
+        synchronized (lock) {

Review Comment:
   Could you explain why do we need synchronization here?
   It is already performed inside `recoverJobGraph` and `putJobGraph`. The latter also checks the version before writing, so there shouldn't be consistency issues IIUC.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java:
##########
@@ -241,6 +243,22 @@ public void putJobGraph(JobGraph jobGraph) throws Exception {
         LOG.info("Added {} to {}.", jobGraph, jobGraphStateHandleStore);
     }
 
+    @Override
+    public void putJobResourceRequirements(
+            JobID jobId, JobResourceRequirements jobResourceRequirements) throws Exception {
+        synchronized (lock) {
+            @Nullable final JobGraph jobGraph = recoverJobGraph(jobId);
+            if (jobGraph == null) {
+                throw new FileNotFoundException(
+                        String.format(
+                                "JobGraph for job [%s] was not found in JobGraphStore and is needed for attaching JobResourceRequirements.",
+                                jobId));
+            }
+            JobResourceRequirements.writeToJobGraph(jobGraph, jobResourceRequirements);
+            putJobGraph(jobGraph);

Review Comment:
   Could you explain how do we prevent concurrent writes?
   I'm concerned about a case when a stand-by JM runs through the same code path.
   
   IIUC, that was not possible before, because it was called only during job submission. But now it will also be called for existing jobs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #22251: [FLINK-31591] JobGraphWriter persists JobResourceRequirements

Posted by "zentol (via GitHub)" <gi...@apache.org>.
zentol commented on code in PR #22251:
URL: https://github.com/apache/flink/pull/22251#discussion_r1147432019


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java:
##########
@@ -241,6 +243,22 @@ public void putJobGraph(JobGraph jobGraph) throws Exception {
         LOG.info("Added {} to {}.", jobGraph, jobGraphStateHandleStore);
     }
 
+    @Override
+    public void putJobResourceRequirements(
+            JobID jobId, JobResourceRequirements jobResourceRequirements) throws Exception {
+        synchronized (lock) {
+            @Nullable final JobGraph jobGraph = recoverJobGraph(jobId);
+            if (jobGraph == null) {
+                throw new FileNotFoundException(
+                        String.format(
+                                "JobGraph for job [%s] was not found in JobGraphStore and is needed for attaching JobResourceRequirements.",
+                                jobId));
+            }
+            JobResourceRequirements.writeToJobGraph(jobGraph, jobResourceRequirements);
+            putJobGraph(jobGraph);

Review Comment:
   A contender JM shouldn't do any writes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #22251: [FLINK-31591] JobGraphWriter persists JobResourceRequirements

Posted by "rkhachatryan (via GitHub)" <gi...@apache.org>.
rkhachatryan commented on code in PR #22251:
URL: https://github.com/apache/flink/pull/22251#discussion_r1147851855


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java:
##########
@@ -241,6 +243,22 @@ public void putJobGraph(JobGraph jobGraph) throws Exception {
         LOG.info("Added {} to {}.", jobGraph, jobGraphStateHandleStore);
     }
 
+    @Override
+    public void putJobResourceRequirements(
+            JobID jobId, JobResourceRequirements jobResourceRequirements) throws Exception {
+        synchronized (lock) {
+            @Nullable final JobGraph jobGraph = recoverJobGraph(jobId);
+            if (jobGraph == null) {
+                throw new FileNotFoundException(
+                        String.format(
+                                "JobGraph for job [%s] was not found in JobGraphStore and is needed for attaching JobResourceRequirements.",
+                                jobId));
+            }
+            JobResourceRequirements.writeToJobGraph(jobGraph, jobResourceRequirements);
+            putJobGraph(jobGraph);

Review Comment:
   I think you're right, the only impact will be overwriting resource requirements, which is acceptable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #22251: [FLINK-31591] JobGraphWriter persists JobResourceRequirements

Posted by "rkhachatryan (via GitHub)" <gi...@apache.org>.
rkhachatryan commented on code in PR #22251:
URL: https://github.com/apache/flink/pull/22251#discussion_r1147383681


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java:
##########
@@ -241,6 +243,22 @@ public void putJobGraph(JobGraph jobGraph) throws Exception {
         LOG.info("Added {} to {}.", jobGraph, jobGraphStateHandleStore);
     }
 
+    @Override
+    public void putJobResourceRequirements(
+            JobID jobId, JobResourceRequirements jobResourceRequirements) throws Exception {
+        synchronized (lock) {
+            @Nullable final JobGraph jobGraph = recoverJobGraph(jobId);
+            if (jobGraph == null) {
+                throw new FileNotFoundException(
+                        String.format(
+                                "JobGraph for job [%s] was not found in JobGraphStore and is needed for attaching JobResourceRequirements.",
+                                jobId));
+            }
+            JobResourceRequirements.writeToJobGraph(jobGraph, jobResourceRequirements);
+            putJobGraph(jobGraph);

Review Comment:
   Could you explain how do we prevent concurrent writes?
   I'm concerned about a case when a contender JM runs through the same code path.
   
   IIUC, that was not possible before, because it was called only during job submission. 
   But now it will also be called for existing jobs, so it might be possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #22251: [FLINK-31591] JobGraphWriter persists JobResourceRequirements

Posted by "zentol (via GitHub)" <gi...@apache.org>.
zentol commented on code in PR #22251:
URL: https://github.com/apache/flink/pull/22251#discussion_r1147435369


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java:
##########
@@ -241,6 +243,22 @@ public void putJobGraph(JobGraph jobGraph) throws Exception {
         LOG.info("Added {} to {}.", jobGraph, jobGraphStateHandleStore);
     }
 
+    @Override
+    public void putJobResourceRequirements(
+            JobID jobId, JobResourceRequirements jobResourceRequirements) throws Exception {
+        synchronized (lock) {

Review Comment:
   To prevent this from running concurrently with a `globalCleanupAsync`, which could result in a job graph being written after the cleanup.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #22251: [FLINK-31591] JobGraphWriter persists JobResourceRequirements

Posted by "zentol (via GitHub)" <gi...@apache.org>.
zentol commented on code in PR #22251:
URL: https://github.com/apache/flink/pull/22251#discussion_r1147690649


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java:
##########
@@ -241,6 +243,22 @@ public void putJobGraph(JobGraph jobGraph) throws Exception {
         LOG.info("Added {} to {}.", jobGraph, jobGraphStateHandleStore);
     }
 
+    @Override
+    public void putJobResourceRequirements(
+            JobID jobId, JobResourceRequirements jobResourceRequirements) throws Exception {
+        synchronized (lock) {
+            @Nullable final JobGraph jobGraph = recoverJobGraph(jobId);
+            if (jobGraph == null) {
+                throw new FileNotFoundException(
+                        String.format(
+                                "JobGraph for job [%s] was not found in JobGraphStore and is needed for attaching JobResourceRequirements.",
+                                jobId));
+            }
+            JobResourceRequirements.writeToJobGraph(jobGraph, jobResourceRequirements);
+            putJobGraph(jobGraph);

Review Comment:
   That can indeed happen. It's unlikely to be a _problem_ though, because even in the worst case (i.e., a JM is writing the requirements while loosing leadership, and another one gains leadership and starts reading job graphs before that write is complete, which is already unlikely) you we might "forget" the last set requirements after a failover.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org