You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "rkhachatryan (via GitHub)" <gi...@apache.org> on 2023/03/30 09:33:06 UTC

[GitHub] [flink] rkhachatryan commented on a diff in pull request #22296: [FLINK-31468] Add mechanics to the Dispatcher for handling job resource requirements

rkhachatryan commented on code in PR #22296:
URL: https://github.com/apache/flink/pull/22296#discussion_r1152987364


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1115,6 +1123,45 @@ private void checkJobClientAliveness() {
         }
     }
 
+    @Override
+    public CompletableFuture<JobResourceRequirements> requestJobResourceRequirements(JobID jobId) {
+        return performOperationOnJobMasterGateway(
+                jobId, JobMasterGateway::requestJobResourceRequirements);
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> updateJobResourceRequirements(
+            JobID jobId, JobResourceRequirements jobResourceRequirements) {
+        if (!pendingJobResourceRequirementsUpdates.add(jobId)) {

Review Comment:
   The set is currently not thread-safe.
   Is this method always going to be executed by the same (main?) thread?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1115,6 +1123,45 @@ private void checkJobClientAliveness() {
         }
     }
 
+    @Override
+    public CompletableFuture<JobResourceRequirements> requestJobResourceRequirements(JobID jobId) {
+        return performOperationOnJobMasterGateway(
+                jobId, JobMasterGateway::requestJobResourceRequirements);
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> updateJobResourceRequirements(
+            JobID jobId, JobResourceRequirements jobResourceRequirements) {
+        if (!pendingJobResourceRequirementsUpdates.add(jobId)) {
+            return FutureUtils.completedExceptionally(
+                    new ConcurrentModificationException(
+                            "Another update to the job [%s] resource requirements is in progress."));
+        }
+        return performOperationOnJobMasterGateway(
+                        jobId,
+                        jobMasterGateway ->
+                                jobMasterGateway.updateJobResourceRequirements(
+                                        jobResourceRequirements))
+                .thenApplyAsync(
+                        ack -> {
+                            try {
+                                jobGraphWriter.putJobResourceRequirements(
+                                        jobId, jobResourceRequirements);
+                            } catch (Exception e) {
+                                throw new CompletionException(
+                                        "We could not persist the resource requirements."
+                                                + " They're in effect, but we might not be able"
+                                                + " to recover them after failover. Please retry.",
+                                        e);
+                            }
+                            return ack;

Review Comment:
   I tend to agree with both points (persist failure fails the whole operation; persist should happen before apply).
   
   Although if we persist before applying; then we should validate the requirements first (validation currently resides in `jobMasterGateway.updateJobResourceRequirements`).



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java:
##########
@@ -108,4 +109,26 @@ default CompletableFuture<String> stopWithSavepointAndGetLocation(
             @RpcTimeout final Time timeout) {
         throw new UnsupportedOperationException();
     }
+
+    /**
+     * Read current {@link JobResourceRequirements job resource requirements} for a given job.
+     *
+     * @param jobId job to read the resource requirements for
+     * @return Future which that contains current resource requirements.
+     */
+    default CompletableFuture<JobResourceRequirements> requestJobResourceRequirements(JobID jobId) {
+        throw new UnsupportedOperationException("Operation is not yet implemented.");
+    }
+
+    /**
+     * Update {@link JobResourceRequirements job resource requirements} for a given job.
+     *
+     * @param jobId job the given requirements belong to
+     * @param jobResourceRequirements new resource requirements for the job
+     * @return Future which is completed successfully when requirements are updated
+     */
+    default CompletableFuture<Acknowledge> updateJobResourceRequirements(

Review Comment:
   Would it make sense to clarify the following?
   1. When the future is completed, the requirements are persisted and applied, but might not be satisfied yet
   2. Semantics, as discussed in [thread](https://github.com/apache/flink/pull/22296/files#r1152951671)



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java:
##########
@@ -108,4 +109,26 @@ default CompletableFuture<String> stopWithSavepointAndGetLocation(
             @RpcTimeout final Time timeout) {
         throw new UnsupportedOperationException();
     }
+
+    /**
+     * Read current {@link JobResourceRequirements job resource requirements} for a given job.
+     *
+     * @param jobId job to read the resource requirements for
+     * @return Future which that contains current resource requirements.
+     */
+    default CompletableFuture<JobResourceRequirements> requestJobResourceRequirements(JobID jobId) {

Review Comment:
   In the parent `RestfulGateway` class, most methods are called `request` (although not all).
   So I guess it would be more consistent to start this one with `request` too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org