You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "zentol (via GitHub)" <gi...@apache.org> on 2023/03/29 09:47:30 UTC

[GitHub] [flink] zentol opened a new pull request, #22296: [FLINK-31468] Add mechanics to the Dispatcher for handling job resource requirements

zentol opened a new pull request, #22296:
URL: https://github.com/apache/flink/pull/22296

   Extends the support of querying/updating resource requirements to the Dispatcher layer.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gyfora commented on a diff in pull request #22296: [FLINK-31468] Add mechanics to the Dispatcher for handling job resource requirements

Posted by "gyfora (via GitHub)" <gi...@apache.org>.
gyfora commented on code in PR #22296:
URL: https://github.com/apache/flink/pull/22296#discussion_r1153146215


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1115,6 +1123,45 @@ private void checkJobClientAliveness() {
         }
     }
 
+    @Override
+    public CompletableFuture<JobResourceRequirements> requestJobResourceRequirements(JobID jobId) {
+        return performOperationOnJobMasterGateway(
+                jobId, JobMasterGateway::requestJobResourceRequirements);
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> updateJobResourceRequirements(
+            JobID jobId, JobResourceRequirements jobResourceRequirements) {
+        if (!pendingJobResourceRequirementsUpdates.add(jobId)) {
+            return FutureUtils.completedExceptionally(
+                    new ConcurrentModificationException(
+                            "Another update to the job [%s] resource requirements is in progress."));
+        }
+        return performOperationOnJobMasterGateway(
+                        jobId,
+                        jobMasterGateway ->
+                                jobMasterGateway.updateJobResourceRequirements(
+                                        jobResourceRequirements))
+                .thenApplyAsync(
+                        ack -> {
+                            try {
+                                jobGraphWriter.putJobResourceRequirements(
+                                        jobId, jobResourceRequirements);
+                            } catch (Exception e) {
+                                throw new CompletionException(
+                                        "We could not persist the resource requirements."
+                                                + " They're in effect, but we might not be able"
+                                                + " to recover them after failover. Please retry.",
+                                        e);
+                            }
+                            return ack;

Review Comment:
   sounds good :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #22296: [FLINK-31468] Add mechanics to the Dispatcher for handling job resource requirements

Posted by "rkhachatryan (via GitHub)" <gi...@apache.org>.
rkhachatryan commented on code in PR #22296:
URL: https://github.com/apache/flink/pull/22296#discussion_r1154221880


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1115,6 +1127,66 @@ private void checkJobClientAliveness() {
         }
     }
 
+    @Override
+    public CompletableFuture<JobResourceRequirements> requestJobResourceRequirements(JobID jobId) {
+        return performOperationOnJobMasterGateway(
+                jobId, JobMasterGateway::requestJobResourceRequirements);
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> updateJobResourceRequirements(
+            JobID jobId, JobResourceRequirements jobResourceRequirements) {
+        if (!pendingJobResourceRequirementsUpdates.add(jobId)) {
+            return FutureUtils.completedExceptionally(
+                    new ConcurrentModificationException(
+                            "Another update to the job [%s] resource requirements is in progress."));
+        }
+        return performOperationOnJobMasterGateway(
+                        jobId, JobMasterGateway::getMaxParallelismPerVertex)
+                .thenAccept(

Review Comment:
   Good point!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #22296: [FLINK-31468] Add mechanics to the Dispatcher for handling job resource requirements

Posted by "zentol (via GitHub)" <gi...@apache.org>.
zentol commented on code in PR #22296:
URL: https://github.com/apache/flink/pull/22296#discussion_r1153058503


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1115,6 +1123,45 @@ private void checkJobClientAliveness() {
         }
     }
 
+    @Override
+    public CompletableFuture<JobResourceRequirements> requestJobResourceRequirements(JobID jobId) {
+        return performOperationOnJobMasterGateway(
+                jobId, JobMasterGateway::requestJobResourceRequirements);
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> updateJobResourceRequirements(
+            JobID jobId, JobResourceRequirements jobResourceRequirements) {
+        if (!pendingJobResourceRequirementsUpdates.add(jobId)) {

Review Comment:
   >  Is this method always going to be executed by the same (main?) thread?
   
   yes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #22296: [FLINK-31468] Add mechanics to the Dispatcher for handling job resource requirements

Posted by "zentol (via GitHub)" <gi...@apache.org>.
zentol commented on code in PR #22296:
URL: https://github.com/apache/flink/pull/22296#discussion_r1154129462


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1115,6 +1127,66 @@ private void checkJobClientAliveness() {
         }
     }
 
+    @Override
+    public CompletableFuture<JobResourceRequirements> requestJobResourceRequirements(JobID jobId) {
+        return performOperationOnJobMasterGateway(
+                jobId, JobMasterGateway::requestJobResourceRequirements);
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> updateJobResourceRequirements(
+            JobID jobId, JobResourceRequirements jobResourceRequirements) {
+        if (!pendingJobResourceRequirementsUpdates.add(jobId)) {
+            return FutureUtils.completedExceptionally(
+                    new ConcurrentModificationException(
+                            "Another update to the job [%s] resource requirements is in progress."));
+        }
+        return performOperationOnJobMasterGateway(
+                        jobId, JobMasterGateway::getMaxParallelismPerVertex)
+                .thenAccept(

Review Comment:
   needs tests that validation error skip persist/apply



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1115,6 +1127,66 @@ private void checkJobClientAliveness() {
         }
     }
 
+    @Override
+    public CompletableFuture<JobResourceRequirements> requestJobResourceRequirements(JobID jobId) {
+        return performOperationOnJobMasterGateway(
+                jobId, JobMasterGateway::requestJobResourceRequirements);
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> updateJobResourceRequirements(
+            JobID jobId, JobResourceRequirements jobResourceRequirements) {
+        if (!pendingJobResourceRequirementsUpdates.add(jobId)) {
+            return FutureUtils.completedExceptionally(
+                    new ConcurrentModificationException(
+                            "Another update to the job [%s] resource requirements is in progress."));
+        }
+        return performOperationOnJobMasterGateway(
+                        jobId, JobMasterGateway::getMaxParallelismPerVertex)
+                .thenAccept(
+                        maxParallelismPerJobVertex ->
+                                validateMaxParallelism(
+                                        jobResourceRequirements, maxParallelismPerJobVertex))
+                .thenRunAsync(

Review Comment:
   needs tests that persist error skips apply



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #22296: [FLINK-31468] Add mechanics to the Dispatcher for handling job resource requirements

Posted by "rkhachatryan (via GitHub)" <gi...@apache.org>.
rkhachatryan commented on code in PR #22296:
URL: https://github.com/apache/flink/pull/22296#discussion_r1153123314


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1115,6 +1123,45 @@ private void checkJobClientAliveness() {
         }
     }
 
+    @Override
+    public CompletableFuture<JobResourceRequirements> requestJobResourceRequirements(JobID jobId) {
+        return performOperationOnJobMasterGateway(
+                jobId, JobMasterGateway::requestJobResourceRequirements);
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> updateJobResourceRequirements(
+            JobID jobId, JobResourceRequirements jobResourceRequirements) {
+        if (!pendingJobResourceRequirementsUpdates.add(jobId)) {
+            return FutureUtils.completedExceptionally(
+                    new ConcurrentModificationException(
+                            "Another update to the job [%s] resource requirements is in progress."));
+        }
+        return performOperationOnJobMasterGateway(
+                        jobId,
+                        jobMasterGateway ->
+                                jobMasterGateway.updateJobResourceRequirements(
+                                        jobResourceRequirements))
+                .thenApplyAsync(
+                        ack -> {
+                            try {
+                                jobGraphWriter.putJobResourceRequirements(
+                                        jobId, jobResourceRequirements);
+                            } catch (Exception e) {
+                                throw new CompletionException(
+                                        "We could not persist the resource requirements."
+                                                + " They're in effect, but we might not be able"
+                                                + " to recover them after failover. Please retry.",
+                                        e);
+                            }
+                            return ack;

Review Comment:
   Makes sense, I'd be totally fine with that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #22296: [FLINK-31468] Add mechanics to the Dispatcher for handling job resource requirements

Posted by "zentol (via GitHub)" <gi...@apache.org>.
zentol commented on code in PR #22296:
URL: https://github.com/apache/flink/pull/22296#discussion_r1153479180


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1115,6 +1127,66 @@ private void checkJobClientAliveness() {
         }
     }
 
+    @Override
+    public CompletableFuture<JobResourceRequirements> requestJobResourceRequirements(JobID jobId) {
+        return performOperationOnJobMasterGateway(
+                jobId, JobMasterGateway::requestJobResourceRequirements);
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> updateJobResourceRequirements(
+            JobID jobId, JobResourceRequirements jobResourceRequirements) {
+        if (!pendingJobResourceRequirementsUpdates.add(jobId)) {
+            return FutureUtils.completedExceptionally(
+                    new ConcurrentModificationException(
+                            "Another update to the job [%s] resource requirements is in progress."));

Review Comment:
   > How about throwing RestHandlerException here, maybe with code 409 Conflict?
   
   good idea



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol merged pull request #22296: [FLINK-31468] Add mechanics to the Dispatcher for handling job resource requirements

Posted by "zentol (via GitHub)" <gi...@apache.org>.
zentol merged PR #22296:
URL: https://github.com/apache/flink/pull/22296


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gyfora commented on a diff in pull request #22296: [FLINK-31468] Add mechanics to the Dispatcher for handling job resource requirements

Posted by "gyfora (via GitHub)" <gi...@apache.org>.
gyfora commented on code in PR #22296:
URL: https://github.com/apache/flink/pull/22296#discussion_r1152949572


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java:
##########
@@ -108,4 +109,26 @@ default CompletableFuture<String> stopWithSavepointAndGetLocation(
             @RpcTimeout final Time timeout) {
         throw new UnsupportedOperationException();
     }
+
+    /**
+     * Read current {@link JobResourceRequirements job resource requirements} for a given job.
+     *
+     * @param jobId job to read the resource requirements for
+     * @return Future which that contains current resource requirements.
+     */
+    default CompletableFuture<JobResourceRequirements> requestJobResourceRequirements(JobID jobId) {

Review Comment:
   Would it make sense to call this simply `getJobResourceRequirements ` instead? 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1115,6 +1123,45 @@ private void checkJobClientAliveness() {
         }
     }
 
+    @Override
+    public CompletableFuture<JobResourceRequirements> requestJobResourceRequirements(JobID jobId) {
+        return performOperationOnJobMasterGateway(
+                jobId, JobMasterGateway::requestJobResourceRequirements);
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> updateJobResourceRequirements(
+            JobID jobId, JobResourceRequirements jobResourceRequirements) {
+        if (!pendingJobResourceRequirementsUpdates.add(jobId)) {
+            return FutureUtils.completedExceptionally(
+                    new ConcurrentModificationException(
+                            "Another update to the job [%s] resource requirements is in progress."));
+        }
+        return performOperationOnJobMasterGateway(
+                        jobId,
+                        jobMasterGateway ->
+                                jobMasterGateway.updateJobResourceRequirements(
+                                        jobResourceRequirements))
+                .thenApplyAsync(
+                        ack -> {
+                            try {
+                                jobGraphWriter.putJobResourceRequirements(
+                                        jobId, jobResourceRequirements);
+                            } catch (Exception e) {
+                                throw new CompletionException(
+                                        "We could not persist the resource requirements."
+                                                + " They're in effect, but we might not be able"
+                                                + " to recover them after failover. Please retry.",
+                                        e);
+                            }
+                            return ack;

Review Comment:
   Wouldn't it make sense to try to write it before applying them? Maybe it's better to not apply it at all then to apply it and lose it on failover



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #22296: [FLINK-31468] Add mechanics to the Dispatcher for handling job resource requirements

Posted by "rkhachatryan (via GitHub)" <gi...@apache.org>.
rkhachatryan commented on code in PR #22296:
URL: https://github.com/apache/flink/pull/22296#discussion_r1152990278


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java:
##########
@@ -108,4 +109,26 @@ default CompletableFuture<String> stopWithSavepointAndGetLocation(
             @RpcTimeout final Time timeout) {
         throw new UnsupportedOperationException();
     }
+
+    /**
+     * Read current {@link JobResourceRequirements job resource requirements} for a given job.
+     *
+     * @param jobId job to read the resource requirements for
+     * @return Future which that contains current resource requirements.
+     */
+    default CompletableFuture<JobResourceRequirements> requestJobResourceRequirements(JobID jobId) {
+        throw new UnsupportedOperationException("Operation is not yet implemented.");
+    }
+
+    /**
+     * Update {@link JobResourceRequirements job resource requirements} for a given job.
+     *
+     * @param jobId job the given requirements belong to
+     * @param jobResourceRequirements new resource requirements for the job
+     * @return Future which is completed successfully when requirements are updated
+     */
+    default CompletableFuture<Acknowledge> updateJobResourceRequirements(

Review Comment:
   Would it make sense to clarify the following?
   1. When the future is completed, the requirements are persisted and applied, but might not be satisfied yet
   2. Semantics, as discussed in [thread](https://github.com/apache/flink/pull/22296#discussion_r1152951671)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #22296: [FLINK-31468] Add mechanics to the Dispatcher for handling job resource requirements

Posted by "rkhachatryan (via GitHub)" <gi...@apache.org>.
rkhachatryan commented on code in PR #22296:
URL: https://github.com/apache/flink/pull/22296#discussion_r1152994060


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java:
##########
@@ -108,4 +109,26 @@ default CompletableFuture<String> stopWithSavepointAndGetLocation(
             @RpcTimeout final Time timeout) {
         throw new UnsupportedOperationException();
     }
+
+    /**
+     * Read current {@link JobResourceRequirements job resource requirements} for a given job.
+     *
+     * @param jobId job to read the resource requirements for
+     * @return Future which that contains current resource requirements.
+     */
+    default CompletableFuture<JobResourceRequirements> requestJobResourceRequirements(JobID jobId) {

Review Comment:
   In the parent `RestfulGateway` class, most methods are called `requestXxx` (although not all).
   So I guess it would be more consistent to start this one with `request` too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #22296: [FLINK-31468] Add mechanics to the Dispatcher for handling job resource requirements

Posted by "flinkbot (via GitHub)" <gi...@apache.org>.
flinkbot commented on PR #22296:
URL: https://github.com/apache/flink/pull/22296#issuecomment-1488292707

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a8d585e8fb04fc678d7fdb3d3d4859a3ae009752",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a8d585e8fb04fc678d7fdb3d3d4859a3ae009752",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a8d585e8fb04fc678d7fdb3d3d4859a3ae009752 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #22296: [FLINK-31468] Add mechanics to the Dispatcher for handling job resource requirements

Posted by "zentol (via GitHub)" <gi...@apache.org>.
zentol commented on code in PR #22296:
URL: https://github.com/apache/flink/pull/22296#discussion_r1153130025


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1115,6 +1123,45 @@ private void checkJobClientAliveness() {
         }
     }
 
+    @Override
+    public CompletableFuture<JobResourceRequirements> requestJobResourceRequirements(JobID jobId) {
+        return performOperationOnJobMasterGateway(
+                jobId, JobMasterGateway::requestJobResourceRequirements);
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> updateJobResourceRequirements(
+            JobID jobId, JobResourceRequirements jobResourceRequirements) {
+        if (!pendingJobResourceRequirementsUpdates.add(jobId)) {
+            return FutureUtils.completedExceptionally(
+                    new ConcurrentModificationException(
+                            "Another update to the job [%s] resource requirements is in progress."));
+        }
+        return performOperationOnJobMasterGateway(
+                        jobId,
+                        jobMasterGateway ->
+                                jobMasterGateway.updateJobResourceRequirements(
+                                        jobResourceRequirements))
+                .thenApplyAsync(
+                        ack -> {
+                            try {
+                                jobGraphWriter.putJobResourceRequirements(
+                                        jobId, jobResourceRequirements);
+                            } catch (Exception e) {
+                                throw new CompletionException(
+                                        "We could not persist the resource requirements."
+                                                + " They're in effect, but we might not be able"
+                                                + " to recover them after failover. Please retry.",
+                                        e);
+                            }
+                            return ack;

Review Comment:
   actually, moving it to the dispatcher now should be simple enough, and we can later pull it up into the rest handlers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #22296: [FLINK-31468] Add mechanics to the Dispatcher for handling job resource requirements

Posted by "zentol (via GitHub)" <gi...@apache.org>.
zentol commented on code in PR #22296:
URL: https://github.com/apache/flink/pull/22296#discussion_r1153107044


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1115,6 +1123,45 @@ private void checkJobClientAliveness() {
         }
     }
 
+    @Override
+    public CompletableFuture<JobResourceRequirements> requestJobResourceRequirements(JobID jobId) {
+        return performOperationOnJobMasterGateway(
+                jobId, JobMasterGateway::requestJobResourceRequirements);
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> updateJobResourceRequirements(
+            JobID jobId, JobResourceRequirements jobResourceRequirements) {
+        if (!pendingJobResourceRequirementsUpdates.add(jobId)) {
+            return FutureUtils.completedExceptionally(
+                    new ConcurrentModificationException(
+                            "Another update to the job [%s] resource requirements is in progress."));
+        }
+        return performOperationOnJobMasterGateway(
+                        jobId,
+                        jobMasterGateway ->
+                                jobMasterGateway.updateJobResourceRequirements(
+                                        jobResourceRequirements))
+                .thenApplyAsync(
+                        ack -> {
+                            try {
+                                jobGraphWriter.putJobResourceRequirements(
+                                        jobId, jobResourceRequirements);
+                            } catch (Exception e) {
+                                throw new CompletionException(
+                                        "We could not persist the resource requirements."
+                                                + " They're in effect, but we might not be able"
+                                                + " to recover them after failover. Please retry.",
+                                        e);
+                            }
+                            return ack;

Review Comment:
   I will move the validation to the rest handlers, and while I'm at it also fix FLINK-31672.
   
   Since the rest handlers will be added in the next PR, would it be alright to keep it as-is for now and change this later?
   I don't want to change the JobMaster interface now to support this order of operations only to remove it in the very next PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #22296: [FLINK-31468] Add mechanics to the Dispatcher for handling job resource requirements

Posted by "zentol (via GitHub)" <gi...@apache.org>.
zentol commented on code in PR #22296:
URL: https://github.com/apache/flink/pull/22296#discussion_r1154104747


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##########
@@ -944,6 +940,19 @@ public CompletableFuture<Acknowledge> notifyNewBlockedNodes(Collection<BlockedNo
         return CompletableFuture.completedFuture(Acknowledge.get());
     }
 
+    @Override
+    public CompletableFuture<Map<JobVertexID, Integer>> getMaxParallelismPerVertex() {
+        final Map<JobVertexID, Integer> maxParallelismPerVertex = new HashMap<>();
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            maxParallelismPerVertex.put(
+                    vertex.getID(),
+                    vertex.getMaxParallelism() != JobVertex.MAX_PARALLELISM_DEFAULT
+                            ? vertex.getMaxParallelism()

Review Comment:
   Should add a test that this now respects the user-specified max parallelism



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #22296: [FLINK-31468] Add mechanics to the Dispatcher for handling job resource requirements

Posted by "zentol (via GitHub)" <gi...@apache.org>.
zentol commented on code in PR #22296:
URL: https://github.com/apache/flink/pull/22296#discussion_r1153067234


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1115,6 +1123,45 @@ private void checkJobClientAliveness() {
         }
     }
 
+    @Override
+    public CompletableFuture<JobResourceRequirements> requestJobResourceRequirements(JobID jobId) {
+        return performOperationOnJobMasterGateway(
+                jobId, JobMasterGateway::requestJobResourceRequirements);
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> updateJobResourceRequirements(
+            JobID jobId, JobResourceRequirements jobResourceRequirements) {
+        if (!pendingJobResourceRequirementsUpdates.add(jobId)) {
+            return FutureUtils.completedExceptionally(
+                    new ConcurrentModificationException(
+                            "Another update to the job [%s] resource requirements is in progress."));
+        }
+        return performOperationOnJobMasterGateway(
+                        jobId,
+                        jobMasterGateway ->
+                                jobMasterGateway.updateJobResourceRequirements(
+                                        jobResourceRequirements))
+                .thenApplyAsync(
+                        ack -> {
+                            try {
+                                jobGraphWriter.putJobResourceRequirements(
+                                        jobId, jobResourceRequirements);
+                            } catch (Exception e) {
+                                throw new CompletionException(
+                                        "We could not persist the resource requirements."
+                                                + " They're in effect, but we might not be able"
+                                                + " to recover them after failover. Please retry.",
+                                        e);
+                            }
+                            return ack;

Review Comment:
   so validate -> persist -> apply? That sounds reasonable to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #22296: [FLINK-31468] Add mechanics to the Dispatcher for handling job resource requirements

Posted by "rkhachatryan (via GitHub)" <gi...@apache.org>.
rkhachatryan commented on code in PR #22296:
URL: https://github.com/apache/flink/pull/22296#discussion_r1152987364


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1115,6 +1123,45 @@ private void checkJobClientAliveness() {
         }
     }
 
+    @Override
+    public CompletableFuture<JobResourceRequirements> requestJobResourceRequirements(JobID jobId) {
+        return performOperationOnJobMasterGateway(
+                jobId, JobMasterGateway::requestJobResourceRequirements);
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> updateJobResourceRequirements(
+            JobID jobId, JobResourceRequirements jobResourceRequirements) {
+        if (!pendingJobResourceRequirementsUpdates.add(jobId)) {

Review Comment:
   The set is currently not thread-safe.
   Is this method always going to be executed by the same (main?) thread?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1115,6 +1123,45 @@ private void checkJobClientAliveness() {
         }
     }
 
+    @Override
+    public CompletableFuture<JobResourceRequirements> requestJobResourceRequirements(JobID jobId) {
+        return performOperationOnJobMasterGateway(
+                jobId, JobMasterGateway::requestJobResourceRequirements);
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> updateJobResourceRequirements(
+            JobID jobId, JobResourceRequirements jobResourceRequirements) {
+        if (!pendingJobResourceRequirementsUpdates.add(jobId)) {
+            return FutureUtils.completedExceptionally(
+                    new ConcurrentModificationException(
+                            "Another update to the job [%s] resource requirements is in progress."));
+        }
+        return performOperationOnJobMasterGateway(
+                        jobId,
+                        jobMasterGateway ->
+                                jobMasterGateway.updateJobResourceRequirements(
+                                        jobResourceRequirements))
+                .thenApplyAsync(
+                        ack -> {
+                            try {
+                                jobGraphWriter.putJobResourceRequirements(
+                                        jobId, jobResourceRequirements);
+                            } catch (Exception e) {
+                                throw new CompletionException(
+                                        "We could not persist the resource requirements."
+                                                + " They're in effect, but we might not be able"
+                                                + " to recover them after failover. Please retry.",
+                                        e);
+                            }
+                            return ack;

Review Comment:
   I tend to agree with both points (persist failure fails the whole operation; persist should happen before apply).
   
   Although if we persist before applying; then we should validate the requirements first (validation currently resides in `jobMasterGateway.updateJobResourceRequirements`).



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java:
##########
@@ -108,4 +109,26 @@ default CompletableFuture<String> stopWithSavepointAndGetLocation(
             @RpcTimeout final Time timeout) {
         throw new UnsupportedOperationException();
     }
+
+    /**
+     * Read current {@link JobResourceRequirements job resource requirements} for a given job.
+     *
+     * @param jobId job to read the resource requirements for
+     * @return Future which that contains current resource requirements.
+     */
+    default CompletableFuture<JobResourceRequirements> requestJobResourceRequirements(JobID jobId) {
+        throw new UnsupportedOperationException("Operation is not yet implemented.");
+    }
+
+    /**
+     * Update {@link JobResourceRequirements job resource requirements} for a given job.
+     *
+     * @param jobId job the given requirements belong to
+     * @param jobResourceRequirements new resource requirements for the job
+     * @return Future which is completed successfully when requirements are updated
+     */
+    default CompletableFuture<Acknowledge> updateJobResourceRequirements(

Review Comment:
   Would it make sense to clarify the following?
   1. When the future is completed, the requirements are persisted and applied, but might not be satisfied yet
   2. Semantics, as discussed in [thread](https://github.com/apache/flink/pull/22296/files#r1152951671)



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java:
##########
@@ -108,4 +109,26 @@ default CompletableFuture<String> stopWithSavepointAndGetLocation(
             @RpcTimeout final Time timeout) {
         throw new UnsupportedOperationException();
     }
+
+    /**
+     * Read current {@link JobResourceRequirements job resource requirements} for a given job.
+     *
+     * @param jobId job to read the resource requirements for
+     * @return Future which that contains current resource requirements.
+     */
+    default CompletableFuture<JobResourceRequirements> requestJobResourceRequirements(JobID jobId) {

Review Comment:
   In the parent `RestfulGateway` class, most methods are called `request` (although not all).
   So I guess it would be more consistent to start this one with `request` too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #22296: [FLINK-31468] Add mechanics to the Dispatcher for handling job resource requirements

Posted by "zentol (via GitHub)" <gi...@apache.org>.
zentol commented on code in PR #22296:
URL: https://github.com/apache/flink/pull/22296#discussion_r1153296770


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1115,6 +1123,45 @@ private void checkJobClientAliveness() {
         }
     }
 
+    @Override
+    public CompletableFuture<JobResourceRequirements> requestJobResourceRequirements(JobID jobId) {
+        return performOperationOnJobMasterGateway(
+                jobId, JobMasterGateway::requestJobResourceRequirements);
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> updateJobResourceRequirements(
+            JobID jobId, JobResourceRequirements jobResourceRequirements) {
+        if (!pendingJobResourceRequirementsUpdates.add(jobId)) {
+            return FutureUtils.completedExceptionally(
+                    new ConcurrentModificationException(
+                            "Another update to the job [%s] resource requirements is in progress."));
+        }
+        return performOperationOnJobMasterGateway(
+                        jobId,
+                        jobMasterGateway ->
+                                jobMasterGateway.updateJobResourceRequirements(
+                                        jobResourceRequirements))
+                .thenApplyAsync(
+                        ack -> {
+                            try {
+                                jobGraphWriter.putJobResourceRequirements(
+                                        jobId, jobResourceRequirements);
+                            } catch (Exception e) {
+                                throw new CompletionException(
+                                        "We could not persist the resource requirements."
+                                                + " They're in effect, but we might not be able"
+                                                + " to recover them after failover. Please retry.",
+                                        e);
+                            }
+                            return ack;

Review Comment:
   To make this work properly I had to extend the JobMasterGateway to expose the max parallelism per vertex.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #22296: [FLINK-31468] Add mechanics to the Dispatcher for handling job resource requirements

Posted by "rkhachatryan (via GitHub)" <gi...@apache.org>.
rkhachatryan commented on code in PR #22296:
URL: https://github.com/apache/flink/pull/22296#discussion_r1153443702


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1115,6 +1127,66 @@ private void checkJobClientAliveness() {
         }
     }
 
+    @Override
+    public CompletableFuture<JobResourceRequirements> requestJobResourceRequirements(JobID jobId) {
+        return performOperationOnJobMasterGateway(
+                jobId, JobMasterGateway::requestJobResourceRequirements);
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> updateJobResourceRequirements(
+            JobID jobId, JobResourceRequirements jobResourceRequirements) {
+        if (!pendingJobResourceRequirementsUpdates.add(jobId)) {
+            return FutureUtils.completedExceptionally(
+                    new ConcurrentModificationException(
+                            "Another update to the job [%s] resource requirements is in progress."));

Review Comment:
   The error handling now is a bit inconsistent:
   - here we throw a regular `ConcurrentModificationException` (so http client will get HTTP/500?)
   - in `validateMaxParallelism`, we throw `RestHandlerException`
   
   How about throwing `RestHandlerException` here, maybe with code `409 Conflict`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1115,6 +1127,66 @@ private void checkJobClientAliveness() {
         }
     }
 
+    @Override
+    public CompletableFuture<JobResourceRequirements> requestJobResourceRequirements(JobID jobId) {
+        return performOperationOnJobMasterGateway(
+                jobId, JobMasterGateway::requestJobResourceRequirements);
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> updateJobResourceRequirements(
+            JobID jobId, JobResourceRequirements jobResourceRequirements) {
+        if (!pendingJobResourceRequirementsUpdates.add(jobId)) {
+            return FutureUtils.completedExceptionally(
+                    new ConcurrentModificationException(
+                            "Another update to the job [%s] resource requirements is in progress."));
+        }
+        return performOperationOnJobMasterGateway(
+                        jobId, JobMasterGateway::getMaxParallelismPerVertex)
+                .thenAccept(
+                        maxParallelismPerJobVertex ->
+                                validateMaxParallelism(
+                                        jobResourceRequirements, maxParallelismPerJobVertex))
+                .thenRunAsync(
+                        () -> {
+                            try {
+                                jobGraphWriter.putJobResourceRequirements(
+                                        jobId, jobResourceRequirements);
+                            } catch (Exception e) {
+                                throw new CompletionException(
+                                        "The resource requirements could not be persisted and have not been applied. Please retry.",
+                                        e);
+                            }
+                        },
+                        ioExecutor)
+                .thenComposeAsync(
+                        ignored ->
+                                performOperationOnJobMasterGateway(
+                                        jobId,
+                                        jobMasterGateway ->
+                                                jobMasterGateway.updateJobResourceRequirements(
+                                                        jobResourceRequirements)),
+                        getMainThreadExecutor())
+                .whenComplete((ack, error) -> pendingJobResourceRequirementsUpdates.remove(jobId));

Review Comment:
   **nit**: log error on debug level?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java:
##########
@@ -1164,6 +1153,189 @@ public void testRetrieveJobResultAfterSubmissionOfFailedJob() throws Exception {
                                         .hasMessage("Test exception."));
     }
 
+    @Test
+    public void testInvalidResourceRequirementsUpdate() throws Exception {
+        dispatcher =
+                createAndStartDispatcher(
+                        heartbeatServices,
+                        haServices,
+                        JobMasterServiceLeadershipRunnerFactory.INSTANCE);
+        final DispatcherGateway dispatcherGateway =
+                dispatcher.getSelfGateway(DispatcherGateway.class);
+        jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+
+        dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+
+        // We can try updating the JRR once the scheduler has been started.
+        CommonTestUtils.waitUntilCondition(
+                () -> {
+                    final JobStatus status =
+                            dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get();
+                    // need to check for CREATED in case adaptive scheduler is used
+                    return status == JobStatus.CREATED || status == JobStatus.RUNNING;
+                });
+
+        assertThatFuture(
+                        dispatcherGateway.updateJobResourceRequirements(
+                                jobId, JobResourceRequirements.empty()))
+                .eventuallyFailsWith(ExecutionException.class)
+                .withCauseInstanceOf(RestHandlerException.class);
+    }
+
+    @Test
+    public void testJobResourceRequirementsCanBeOnlyUpdatedOnInitializedJobMasters()
+            throws Exception {
+        final JobManagerRunnerWithBlockingJobMasterFactory blockingJobMaster =
+                new JobManagerRunnerWithBlockingJobMasterFactory(
+                        this::withMaxParallelismPerVertexResponse);
+        dispatcher = createAndStartDispatcher(heartbeatServices, haServices, blockingJobMaster);
+        final DispatcherGateway dispatcherGateway =
+                dispatcher.getSelfGateway(DispatcherGateway.class);
+        jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+
+        assertThatFuture(
+                        dispatcherGateway.updateJobResourceRequirements(
+                                jobId, JobResourceRequirements.empty()))
+                .eventuallyFailsWith(ExecutionException.class)
+                .withCauseInstanceOf(FlinkJobNotFoundException.class);
+
+        dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+        blockingJobMaster.waitForBlockingInit();
+
+        try {
+            assertThatFuture(
+                            dispatcherGateway.updateJobResourceRequirements(
+                                    jobId, JobResourceRequirements.empty()))
+                    .eventuallyFailsWith(ExecutionException.class)
+                    .withCauseInstanceOf(UnavailableDispatcherOperationException.class);
+        } finally {
+            // Unblocking the job master in the "finally block" prevents getting
+            // stuck during the RPC system tear down in case of test failure.
+            blockingJobMaster.unblockJobMasterInitialization();
+        }
+
+        // We can update the JRR once the job transitions to RUNNING.
+        awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
+        assertThatFuture(
+                        dispatcherGateway.updateJobResourceRequirements(
+                                jobId, getJobRequirements()))
+                .eventuallySucceeds();
+    }
+
+    @Test
+    public void testJobResourceRequirementsAreGuardedAgainstConcurrentModification()
+            throws Exception {
+        final CompletableFuture<Acknowledge> blockedUpdatesToJobMasterFuture =
+                new CompletableFuture<>();
+        final JobManagerRunnerWithBlockingJobMasterFactory blockingJobMaster =
+                new JobManagerRunnerWithBlockingJobMasterFactory(
+                        builder ->
+                                withMaxParallelismPerVertexResponse(builder)
+                                        .setUpdateJobResourceRequirementsFunction(
+                                                jobResourceRequirements ->
+                                                        blockedUpdatesToJobMasterFuture));
+        dispatcher = createAndStartDispatcher(heartbeatServices, haServices, blockingJobMaster);
+        final DispatcherGateway dispatcherGateway =
+                dispatcher.getSelfGateway(DispatcherGateway.class);
+
+        // We intentionally perform the test on two jobs to make sure the
+        // concurrent modification is only prevented on the per-job level.
+        final JobGraph firstJobGraph = InstantiationUtil.clone(jobGraph);
+        firstJobGraph.setJobID(JobID.generate());
+        final JobGraph secondJobGraph = InstantiationUtil.clone(jobGraph);
+        secondJobGraph.setJobID(JobID.generate());
+
+        final CompletableFuture<?> firstPendingUpdateFuture =
+                testConcurrentModificationIsPrevented(
+                        dispatcherGateway, blockingJobMaster, firstJobGraph);
+        Assertions.assertThat(firstPendingUpdateFuture).isNotCompleted();
+        final CompletableFuture<?> secondPendingUpdateFuture =
+                testConcurrentModificationIsPrevented(
+                        dispatcherGateway, blockingJobMaster, secondJobGraph);
+        Assertions.assertThat(secondPendingUpdateFuture).isNotCompleted();
+
+        blockedUpdatesToJobMasterFuture.complete(Acknowledge.get());
+        assertThatFuture(firstPendingUpdateFuture).eventuallySucceeds();
+        assertThatFuture(secondPendingUpdateFuture).eventuallySucceeds();

Review Comment:
   **nit**: move both assertions closer to `complete` call - to make it clear why they shouldn't be completed yet?
   
   ```
           final CompletableFuture<?> firstPendingUpdateFuture =
                   testConcurrentModificationIsPrevented(
                           dispatcherGateway, blockingJobMaster, firstJobGraph);
           final CompletableFuture<?> secondPendingUpdateFuture =
                   testConcurrentModificationIsPrevented(
                           dispatcherGateway, blockingJobMaster, secondJobGraph);
   
           assertThat(firstPendingUpdateFuture).isNotCompleted();
           assertThat(secondPendingUpdateFuture).isNotCompleted();
           blockedUpdatesToJobMasterFuture.complete(Acknowledge.get());
           assertThatFuture(firstPendingUpdateFuture).eventuallySucceeds();
           assertThatFuture(secondPendingUpdateFuture).eventuallySucceeds();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org