You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "huwh (via GitHub)" <gi...@apache.org> on 2023/03/20 12:34:40 UTC

[GitHub] [flink] huwh commented on a diff in pull request #22169: [FLINK-31399] AdaptiveScheduler is able to handle changes in job resource requirements.

huwh commented on code in PR #22169:
URL: https://github.com/apache/flink/pull/22169#discussion_r1142025502


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##########
@@ -1213,4 +1260,29 @@ <T extends State> T transitionToState(StateFactory<T> targetState) {
     State getState() {
         return state;
     }
+
+    /**
+     * Check for slots that are idle for more than {@link JobManagerOptions#SLOT_IDLE_TIMEOUT} and
+     * release them back to the ResourceManager.
+     */
+    private void checkIdleSlotTimeout() {
+        if (getState().getJobStatus().isGloballyTerminalState()) {
+            // Job has reached the terminal state, so we can return all slots to the ResourceManager
+            // to speed things up because we no longer need them. This optimization lets us skip
+            // waiting for the slot pool service to close.
+            for (SlotInfo slotInfo : declarativeSlotPool.getAllSlotsInformation()) {
+                declarativeSlotPool.releaseSlot(
+                        slotInfo.getAllocationId(),
+                        new FlinkException(
+                                "Returning slots to their owners, because the job has reached a globally terminal state."));
+            }
+            return;
+        }
+        declarativeSlotPool.releaseIdleSlots(System.currentTimeMillis());

Review Comment:
   Shall we move the periodic check of idle timeout to DeclarativeSlotPool since both Adaptive scheduler and DeclarativeSlotPoolBridge need it.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##########
@@ -184,12 +193,12 @@ CompletableFuture<String> stopWithSavepoint(
         FailureResult howToHandleFailure(Throwable failure);
 
         /**
-         * Asks if we can scale up the currently executing job.
+         * Asks if we should rescale the currently executing job.
          *
          * @param executionGraph executionGraph for making the scaling decision.
-         * @return true, if we can scale up
+         * @return true, if we should scale up

Review Comment:
   scale up -> rescale



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##########
@@ -730,15 +748,49 @@ public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoord
                                                         + " does not exist")));
     }
 
+    @Override
+    public JobResourceRequirements requestJobResourceRequirements() {
+        final JobResourceRequirements.Builder builder = JobResourceRequirements.newBuilder();
+        for (JobInformation.VertexInformation vertex : jobInformation.getVertices()) {
+            builder.setParallelismForJobVertex(vertex.getJobVertexID(), 1, vertex.getParallelism());
+        }
+        return builder.build();
+    }
+
+    @Override
+    public void updateJobResourceRequirements(JobResourceRequirements jobResourceRequirements) {
+        if (executionMode == SchedulerExecutionMode.REACTIVE) {
+            throw new UnsupportedOperationException(
+                    "Cannot change the parallelism of a job running in reactive mode.");
+        }
+        final Optional<VertexParallelismStore> maybeUpdateVertexParallelismStore =
+                DefaultVertexParallelismStore.applyJobResourceRequirements(
+                        jobInformation.getVertexParallelismStore(), jobResourceRequirements);
+        if (maybeUpdateVertexParallelismStore.isPresent()) {
+            this.jobInformation =
+                    new JobGraphJobInformation(jobGraph, maybeUpdateVertexParallelismStore.get());

Review Comment:
   Maybe we need a new SlotAssigner to assign slots from entire task executor first. Otherwise, the resource(task executor) could not release after job scale down.
   
   This could implement in other issues.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/scalingpolicy/EnforceMinimalIncreaseRescalingController.java:
##########
@@ -25,16 +25,17 @@
  * Simple scaling policy for a reactive mode. The user can configure a minimum cumulative
  * parallelism increase to allow a scale up.
  */
-public class ReactiveScaleUpController implements ScaleUpController {
+public class EnforceMinimalIncreaseRescalingController implements RescalingController {
 
     private final int minParallelismIncrease;
 
-    public ReactiveScaleUpController(Configuration configuration) {
+    public EnforceMinimalIncreaseRescalingController(Configuration configuration) {
         minParallelismIncrease = configuration.get(MIN_PARALLELISM_INCREASE);
     }
 
     @Override
-    public boolean canScaleUp(int currentCumulativeParallelism, int newCumulativeParallelism) {
-        return newCumulativeParallelism - currentCumulativeParallelism >= minParallelismIncrease;
+    public boolean shouldRescale(int currentCumulativeParallelism, int newCumulativeParallelism) {
+        return Math.abs(newCumulativeParallelism - currentCumulativeParallelism)
+                >= minParallelismIncrease;

Review Comment:
   +1 for add a minDecrease



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org