You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "zentol (via GitHub)" <gi...@apache.org> on 2023/03/28 12:09:50 UTC

[GitHub] [flink] zentol opened a new pull request, #22288: [FLINK-31590] Add mechanics to the JobMaster for handling job resource requirements.

zentol opened a new pull request, #22288:
URL: https://github.com/apache/flink/pull/22288

   Extends the support of querying/updating resource requirements to the JobMaster layer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #22288: [FLINK-31590] Add mechanics to the JobMaster for handling job resource requirements.

Posted by "rkhachatryan (via GitHub)" <gi...@apache.org>.
rkhachatryan commented on code in PR #22288:
URL: https://github.com/apache/flink/pull/22288#discussion_r1151567502


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##########
@@ -937,6 +944,34 @@ public CompletableFuture<Acknowledge> notifyNewBlockedNodes(Collection<BlockedNo
         return CompletableFuture.completedFuture(Acknowledge.get());
     }
 
+    @Override
+    public CompletableFuture<JobResourceRequirements> requestJobResourceRequirements() {
+        return CompletableFuture.completedFuture(schedulerNG.requestJobResourceRequirements());
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> updateJobResourceRequirements(
+            JobResourceRequirements jobResourceRequirements) {
+        final Map<JobVertexID, Integer> maxParallelismPerJobVertex = new HashMap<>();
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            maxParallelismPerJobVertex.put(
+                    vertex.getID(), SchedulerBase.getDefaultMaxParallelism(vertex));
+        }
+        final List<String> validationErrors =
+                JobResourceRequirements.validate(
+                        jobResourceRequirements, maxParallelismPerJobVertex);
+        if (validationErrors.isEmpty()) {
+            schedulerNG.updateJobResourceRequirements(jobResourceRequirements);

Review Comment:
   I see, thanks for clarifying.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #22288: [FLINK-31590] Add mechanics to the JobMaster for handling job resource requirements.

Posted by "zentol (via GitHub)" <gi...@apache.org>.
zentol commented on code in PR #22288:
URL: https://github.com/apache/flink/pull/22288#discussion_r1151538468


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##########
@@ -937,6 +944,34 @@ public CompletableFuture<Acknowledge> notifyNewBlockedNodes(Collection<BlockedNo
         return CompletableFuture.completedFuture(Acknowledge.get());
     }
 
+    @Override
+    public CompletableFuture<JobResourceRequirements> requestJobResourceRequirements() {
+        return CompletableFuture.completedFuture(schedulerNG.requestJobResourceRequirements());
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> updateJobResourceRequirements(
+            JobResourceRequirements jobResourceRequirements) {
+        final Map<JobVertexID, Integer> maxParallelismPerJobVertex = new HashMap<>();
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            maxParallelismPerJobVertex.put(
+                    vertex.getID(), SchedulerBase.getDefaultMaxParallelism(vertex));
+        }
+        final List<String> validationErrors =
+                JobResourceRequirements.validate(
+                        jobResourceRequirements, maxParallelismPerJobVertex);
+        if (validationErrors.isEmpty()) {
+            schedulerNG.updateJobResourceRequirements(jobResourceRequirements);

Review Comment:
   The UI will get some buttons for rescaling, and we'd like to hide it if it's not supported by the current cluster configuration. This means we already check the support in the REST layer.
   When we do that it's little additional effort to also change the handlers to immediately fail with an error.
   We'd shortcut the process that we know would fail anyway, and centralize the handling of whether this is supported in one place.
   
   If we weren't hiding the buttons I'd agree with you (and in fact I implemented that until I remembered about the button hiding :see_no_evil: )



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #22288: [FLINK-31590] Add mechanics to the JobMaster for handling job resource requirements.

Posted by "rkhachatryan (via GitHub)" <gi...@apache.org>.
rkhachatryan commented on code in PR #22288:
URL: https://github.com/apache/flink/pull/22288#discussion_r1150621201


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##########
@@ -937,6 +944,34 @@ public CompletableFuture<Acknowledge> notifyNewBlockedNodes(Collection<BlockedNo
         return CompletableFuture.completedFuture(Acknowledge.get());
     }
 
+    @Override
+    public CompletableFuture<JobResourceRequirements> requestJobResourceRequirements() {
+        return CompletableFuture.completedFuture(schedulerNG.requestJobResourceRequirements());
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> updateJobResourceRequirements(
+            JobResourceRequirements jobResourceRequirements) {
+        final Map<JobVertexID, Integer> maxParallelismPerJobVertex = new HashMap<>();
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            maxParallelismPerJobVertex.put(
+                    vertex.getID(), SchedulerBase.getDefaultMaxParallelism(vertex));
+        }
+        final List<String> validationErrors =
+                JobResourceRequirements.validate(
+                        jobResourceRequirements, maxParallelismPerJobVertex);
+        if (validationErrors.isEmpty()) {
+            schedulerNG.updateJobResourceRequirements(jobResourceRequirements);

Review Comment:
   What are the benefits of handling this error in DefaultScheduler over here?
   Something like 
   ```
           try {
               return CompletableFuture.completedFuture(schedulerNG.updateJobResourceRequirements(...));
           } catch (UnsupportedOperationException e) {
               return FutureUtils.completedExceptionally(
                   new RestHandlerException(e.getMessage(), HttpResponseStatus.NOT_IMPLEMENTED)); // not sure about the code
           }
   ```
   Handling `BAD_REQUEST` and `NOT_IMPLEMENTED` (?) in one place seems more consistent to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #22288: [FLINK-31590] Add mechanics to the JobMaster for handling job resource requirements.

Posted by "zentol (via GitHub)" <gi...@apache.org>.
zentol commented on code in PR #22288:
URL: https://github.com/apache/flink/pull/22288#discussion_r1150502154


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##########
@@ -937,6 +944,34 @@ public CompletableFuture<Acknowledge> notifyNewBlockedNodes(Collection<BlockedNo
         return CompletableFuture.completedFuture(Acknowledge.get());
     }
 
+    @Override
+    public CompletableFuture<JobResourceRequirements> requestJobResourceRequirements() {
+        return CompletableFuture.completedFuture(schedulerNG.requestJobResourceRequirements());
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> updateJobResourceRequirements(
+            JobResourceRequirements jobResourceRequirements) {
+        final Map<JobVertexID, Integer> maxParallelismPerJobVertex = new HashMap<>();
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            maxParallelismPerJobVertex.put(
+                    vertex.getID(), SchedulerBase.getDefaultMaxParallelism(vertex));
+        }
+        final List<String> validationErrors =
+                JobResourceRequirements.validate(
+                        jobResourceRequirements, maxParallelismPerJobVertex);
+        if (validationErrors.isEmpty()) {
+            schedulerNG.updateJobResourceRequirements(jobResourceRequirements);

Review Comment:
   We will handle this being unsupported by the DefaultScheduler directly in the REST layer.
   We won't even call this method if the adaptive scheduler isn't used.
   (The idea being to determine this limitation early on so we can expose this via the config endpoint, and we don't want to wire a `supportsXXX` call through all layers)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol merged pull request #22288: [FLINK-31590] Add mechanics to the JobMaster for handling job resource requirements.

Posted by "zentol (via GitHub)" <gi...@apache.org>.
zentol merged PR #22288:
URL: https://github.com/apache/flink/pull/22288


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #22288: [FLINK-31590] Add mechanics to the JobMaster for handling job resource requirements.

Posted by "flinkbot (via GitHub)" <gi...@apache.org>.
flinkbot commented on PR #22288:
URL: https://github.com/apache/flink/pull/22288#issuecomment-1486775467

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "b37939400499ee985215eeba9823b02c0d998309",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "b37939400499ee985215eeba9823b02c0d998309",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b37939400499ee985215eeba9823b02c0d998309 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #22288: [FLINK-31590] Add mechanics to the JobMaster for handling job resource requirements.

Posted by "zentol (via GitHub)" <gi...@apache.org>.
zentol commented on code in PR #22288:
URL: https://github.com/apache/flink/pull/22288#discussion_r1150502154


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##########
@@ -937,6 +944,34 @@ public CompletableFuture<Acknowledge> notifyNewBlockedNodes(Collection<BlockedNo
         return CompletableFuture.completedFuture(Acknowledge.get());
     }
 
+    @Override
+    public CompletableFuture<JobResourceRequirements> requestJobResourceRequirements() {
+        return CompletableFuture.completedFuture(schedulerNG.requestJobResourceRequirements());
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> updateJobResourceRequirements(
+            JobResourceRequirements jobResourceRequirements) {
+        final Map<JobVertexID, Integer> maxParallelismPerJobVertex = new HashMap<>();
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            maxParallelismPerJobVertex.put(
+                    vertex.getID(), SchedulerBase.getDefaultMaxParallelism(vertex));
+        }
+        final List<String> validationErrors =
+                JobResourceRequirements.validate(
+                        jobResourceRequirements, maxParallelismPerJobVertex);
+        if (validationErrors.isEmpty()) {
+            schedulerNG.updateJobResourceRequirements(jobResourceRequirements);

Review Comment:
   We will handle this being unsupported by the DefaultScheduler directly in the REST layer.
   We won't even call this method if the adaptive scheduler isn't used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org