You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "rkhachatryan (via GitHub)" <gi...@apache.org> on 2023/03/15 00:36:54 UTC

[GitHub] [flink] rkhachatryan commented on a diff in pull request #22169: [FLINK-31399] AdaptiveScheduler is able to handle changes in job resource requirements.

rkhachatryan commented on code in PR #22169:
URL: https://github.com/apache/flink/pull/22169#discussion_r1136373675


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##########
@@ -123,9 +123,18 @@ private void handleDeploymentFailure(ExecutionVertex executionVertex, JobExcepti
     }
 
     @Override
-    public void notifyNewResourcesAvailable() {
-        if (context.canScaleUp(getExecutionGraph())) {
-            getLogger().info("New resources are available. Restarting job to scale up.");
+    public void onNewResourcesAvailable() {
+        maybeRescale();
+    }
+
+    @Override
+    public void onNewResourceRequirements() {
+        maybeRescale();
+    }

Review Comment:
   1. Should there be a timeout to eventually satisfy the requirements? (similar to one in `WaitingForResources`)
   I guess in `onNewResourcesAvailable()`, new resources can be ignored indefinitely, but we can't afford that for requirements?
   
   2. Is `maybeRescale()` indeed suitable for new requirements?
   In case when some vertices were up-scaled and some downscaled, the cumulative parallelism won't change (that's what is eventually checked in `AdaptiveScheduler.shouldRescale` from `maybeRescale`).
   Or am I missing something?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/scalingpolicy/EnforceMinimalIncreaseRescalingController.java:
##########
@@ -25,16 +25,17 @@
  * Simple scaling policy for a reactive mode. The user can configure a minimum cumulative
  * parallelism increase to allow a scale up.
  */
-public class ReactiveScaleUpController implements ScaleUpController {
+public class EnforceMinimalIncreaseRescalingController implements RescalingController {
 
     private final int minParallelismIncrease;
 
-    public ReactiveScaleUpController(Configuration configuration) {
+    public EnforceMinimalIncreaseRescalingController(Configuration configuration) {
         minParallelismIncrease = configuration.get(MIN_PARALLELISM_INCREASE);
     }
 
     @Override
-    public boolean canScaleUp(int currentCumulativeParallelism, int newCumulativeParallelism) {
-        return newCumulativeParallelism - currentCumulativeParallelism >= minParallelismIncrease;
+    public boolean shouldRescale(int currentCumulativeParallelism, int newCumulativeParallelism) {
+        return Math.abs(newCumulativeParallelism - currentCumulativeParallelism)
+                >= minParallelismIncrease;

Review Comment:
   1. nit: `minParallelismIncrease` => `minParallelismChange`
   2. Or maybe it's better to have both minIncrease and minDecrease? At least no need to **rename** the config option :slightly_smiling_face: 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##########
@@ -730,15 +748,49 @@ public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoord
                                                         + " does not exist")));
     }
 
+    @Override
+    public JobResourceRequirements requestJobResourceRequirements() {
+        final JobResourceRequirements.Builder builder = JobResourceRequirements.newBuilder();
+        for (JobInformation.VertexInformation vertex : jobInformation.getVertices()) {
+            builder.setParallelismForJobVertex(vertex.getJobVertexID(), 1, vertex.getParallelism());

Review Comment:
   Here, the lower bound is always `1`, and `SlotAllocator` also ignores it.
   
   Should we throw `UnsupportedOperationException` if the requested lower bound is higher?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##########
@@ -730,15 +748,49 @@ public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoord
                                                         + " does not exist")));
     }
 
+    @Override
+    public JobResourceRequirements requestJobResourceRequirements() {
+        final JobResourceRequirements.Builder builder = JobResourceRequirements.newBuilder();
+        for (JobInformation.VertexInformation vertex : jobInformation.getVertices()) {
+            builder.setParallelismForJobVertex(vertex.getJobVertexID(), 1, vertex.getParallelism());
+        }
+        return builder.build();
+    }
+
+    @Override
+    public void updateJobResourceRequirements(JobResourceRequirements jobResourceRequirements) {

Review Comment:
   I'm missing storing these new requirements (if they changed).
   Is that done somewhere else?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java:
##########
@@ -144,12 +135,17 @@ public Logger getLogger() {
     }
 
     @Override
-    public void notifyNewResourcesAvailable() {
+    public void onNewResourcesAvailable() {
+        checkDesiredOrSufficientResourcesAvailable();
+    }
+
+    @Override
+    public void onNewResourceRequirements() {
         checkDesiredOrSufficientResourcesAvailable();
     }
 
     private void checkDesiredOrSufficientResourcesAvailable() {
-        if (context.hasDesiredResources(desiredResources)) {
+        if (context.hasDesiredResources()) {

Review Comment:
   not directly related to this PR:
   when `resourceStabilizationDeadline` is not null, should we skip scheduling `checkDesiredOrSufficientResourcesAvailable` (on line 162)?
   Otherwise, we schedule as many checks as there are changes in resources.
   Or am I missing something?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##########
@@ -216,11 +218,16 @@
     private final DeploymentStateTimeMetrics deploymentTimeMetrics;
 
     private final BoundedFIFOQueue<RootExceptionHistoryEntry> exceptionHistory;
+    private JobGraphJobInformation jobInformation;
+    private ResourceCounter desiredResources = ResourceCounter.empty();
 
     private final JobManagerJobMetricGroup jobManagerJobMetricGroup;
 
+    private final Duration slotIdleTimeout;
+

Review Comment:
   It's actually a check interval, not the timeout, right?
   Should we then call it `slotIdlenessCheckInterval`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java:
##########
@@ -185,4 +187,28 @@ void deliverOperatorEventToCoordinator(
      */
     CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(
             OperatorID operator, CoordinationRequest request) throws FlinkException;
+
+    /**
+     * Read current {@link JobResourceRequirements job resource requirements}.
+     *
+     * @return Current resource requirements.
+     */
+    default JobResourceRequirements requestJobResourceRequirements() {
+        throw new UnsupportedOperationException(

Review Comment:
   IIRC, there were plans to implement `GET` for the default scheduler (please correct me if I'm wrong).
   If it's out of scope of this PR, should we create a ticket and add a `TODO` here?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##########
@@ -730,15 +748,49 @@ public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoord
                                                         + " does not exist")));
     }
 
+    @Override
+    public JobResourceRequirements requestJobResourceRequirements() {
+        final JobResourceRequirements.Builder builder = JobResourceRequirements.newBuilder();
+        for (JobInformation.VertexInformation vertex : jobInformation.getVertices()) {
+            builder.setParallelismForJobVertex(vertex.getJobVertexID(), 1, vertex.getParallelism());
+        }
+        return builder.build();
+    }
+
+    @Override
+    public void updateJobResourceRequirements(JobResourceRequirements jobResourceRequirements) {
+        if (executionMode == SchedulerExecutionMode.REACTIVE) {
+            throw new UnsupportedOperationException(
+                    "Cannot change the parallelism of a job running in reactive mode.");
+        }
+        final Optional<VertexParallelismStore> maybeUpdateVertexParallelismStore =
+                DefaultVertexParallelismStore.applyJobResourceRequirements(
+                        jobInformation.getVertexParallelismStore(), jobResourceRequirements);
+        if (maybeUpdateVertexParallelismStore.isPresent()) {
+            this.jobInformation =
+                    new JobGraphJobInformation(jobGraph, maybeUpdateVertexParallelismStore.get());
+            declareDesiredResources();
+            state.tryRun(
+                    ResourceListener.class,
+                    ResourceListener::onNewResourceRequirements,
+                    "Current state does not react to desired parallelism changes.");

Review Comment:
   Just to double-check:
   if the cumulative DoP doesn't change,
   - nothing is (re)declared in `declareDesiredResources()` - that's fine (?)
   - but we still want to proceed to `onNewResourceRequirements` check (?)



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##########
@@ -1213,4 +1268,17 @@ <T extends State> T transitionToState(StateFactory<T> targetState) {
     State getState() {
         return state;
     }
+
+    /**
+     * Check for slots that are idle for more than {@link JobManagerOptions#SLOT_IDLE_TIMEOUT} and
+     * release them back to the ResourceManager.
+     */
+    private void checkIdleSlotTimeout() {
+        declarativeSlotPool.releaseIdleSlots(System.currentTimeMillis());
+        getMainThreadExecutor()
+                .schedule(
+                        this::checkIdleSlotTimeout,

Review Comment:
   1. Could you explain why this method is necessary -don't we release the excessive slots in `CreatingExecutionGraph`? What is the scenario?
   
   2.  Should the scheduled task be cancelled, e.g. in case of losing leadership? Otherwise, won't we get as many scheduled tasks as leadership changes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org