You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "gyfora (via GitHub)" <gi...@apache.org> on 2023/03/30 08:59:04 UTC

[GitHub] [flink] gyfora commented on a diff in pull request #22296: [FLINK-31468] Add mechanics to the Dispatcher for handling job resource requirements

gyfora commented on code in PR #22296:
URL: https://github.com/apache/flink/pull/22296#discussion_r1152949572


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java:
##########
@@ -108,4 +109,26 @@ default CompletableFuture<String> stopWithSavepointAndGetLocation(
             @RpcTimeout final Time timeout) {
         throw new UnsupportedOperationException();
     }
+
+    /**
+     * Read current {@link JobResourceRequirements job resource requirements} for a given job.
+     *
+     * @param jobId job to read the resource requirements for
+     * @return Future which that contains current resource requirements.
+     */
+    default CompletableFuture<JobResourceRequirements> requestJobResourceRequirements(JobID jobId) {

Review Comment:
   Would it make sense to call this simply `getJobResourceRequirements ` instead? 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1115,6 +1123,45 @@ private void checkJobClientAliveness() {
         }
     }
 
+    @Override
+    public CompletableFuture<JobResourceRequirements> requestJobResourceRequirements(JobID jobId) {
+        return performOperationOnJobMasterGateway(
+                jobId, JobMasterGateway::requestJobResourceRequirements);
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> updateJobResourceRequirements(
+            JobID jobId, JobResourceRequirements jobResourceRequirements) {
+        if (!pendingJobResourceRequirementsUpdates.add(jobId)) {
+            return FutureUtils.completedExceptionally(
+                    new ConcurrentModificationException(
+                            "Another update to the job [%s] resource requirements is in progress."));
+        }
+        return performOperationOnJobMasterGateway(
+                        jobId,
+                        jobMasterGateway ->
+                                jobMasterGateway.updateJobResourceRequirements(
+                                        jobResourceRequirements))
+                .thenApplyAsync(
+                        ack -> {
+                            try {
+                                jobGraphWriter.putJobResourceRequirements(
+                                        jobId, jobResourceRequirements);
+                            } catch (Exception e) {
+                                throw new CompletionException(
+                                        "We could not persist the resource requirements."
+                                                + " They're in effect, but we might not be able"
+                                                + " to recover them after failover. Please retry.",
+                                        e);
+                            }
+                            return ack;

Review Comment:
   Wouldn't it make sense to try to write it before applying them? Maybe it's better to not apply it at all then to apply it and lose it on failover



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org