You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/12/19 21:16:04 UTC

[flink-kubernetes-operator] 05/09: [FLINK-30260][autoscaler] Add scaling execution logic

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 37bf5107a957110cfa4289858f32a0baa00d2dbf
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Thu Dec 1 09:00:40 2022 +0100

    [FLINK-30260][autoscaler] Add scaling execution logic
---
 .../operator/autoscaler/ScalingExecutor.java       | 362 +++++++++++++++++++++
 .../operator/autoscaler/ScalingSummary.java        |  51 +++
 .../operator/autoscaler/ScalingExecutorTest.java   | 350 ++++++++++++++++++++
 3 files changed, 763 insertions(+)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
new file mode 100644
index 00000000..afd9d2fc
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
+import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedMap;
+
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.STABILIZATION_INTERVAL;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+
+/** Class responsible for executing scaling decisions. */
+public class ScalingExecutor implements Cleanup {
+
+    public static final ConfigOption<Map<String, String>> PARALLELISM_OVERRIDES =
+            ConfigOptions.key("pipeline.jobvertex-parallelism-overrides")
+                    .mapType()
+                    .defaultValue(Collections.emptyMap())
+                    .withDescription(
+                            "A parallelism override map (jobVertexId -> parallelism) which will be used to update"
+                                    + " the parallelism of the corresponding job vertices of submitted JobGraphs.");
+
+    private static final Logger LOG = LoggerFactory.getLogger(ScalingExecutor.class);
+
+    private final KubernetesClient kubernetesClient;
+
+    private Clock clock = Clock.system(ZoneId.systemDefault());
+
+    public ScalingExecutor(KubernetesClient kubernetesClient) {
+        this.kubernetesClient = kubernetesClient;
+    }
+
+    public boolean scaleResource(
+            AbstractFlinkResource<?, ?> resource,
+            AutoScalerInfo scalingInformation,
+            Configuration conf,
+            Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics)
+            throws Exception {
+
+        if (!conf.get(SCALING_ENABLED)) {
+            return false;
+        }
+
+        if (!stabilizationPeriodPassed(resource, conf)) {
+            return false;
+        }
+
+        var scalingHistory = scalingInformation.getScalingHistory();
+        var scalingSummaries = computeScalingSummary(conf, evaluatedMetrics, scalingHistory);
+        if (scalingSummaries.isEmpty()) {
+            LOG.info("All job vertices are currently running at their target parallelism.");
+            return false;
+        }
+
+        if (allVerticesWithinUtilizationTarget(evaluatedMetrics, scalingSummaries)) {
+            return false;
+        }
+
+        LOG.info("Scaling vertices:");
+        scalingSummaries.forEach(
+                (v, s) ->
+                        LOG.info(
+                                "{} | Parallelism {} -> {}",
+                                v,
+                                s.getCurrentParallelism(),
+                                s.getNewParallelism()));
+
+        setVertexParallelismOverrides(resource, evaluatedMetrics, scalingSummaries);
+
+        KubernetesClientUtils.replaceSpecAfterScaling(kubernetesClient, resource);
+        scalingInformation.addToScalingHistory(clock.instant(), scalingSummaries);
+
+        return true;
+    }
+
+    private boolean stabilizationPeriodPassed(
+            AbstractFlinkResource<?, ?> resource, Configuration conf) {
+        var now = clock.instant();
+        var startTs =
+                Instant.ofEpochMilli(
+                        Long.parseLong(resource.getStatus().getJobStatus().getStartTime()));
+        var stableTime = startTs.plus(conf.get(STABILIZATION_INTERVAL));
+
+        if (stableTime.isAfter(now)) {
+            LOG.info("Waiting until {} to stabilize before new scale operation.", stableTime);
+            return false;
+        }
+        return true;
+    }
+
+    protected static boolean allVerticesWithinUtilizationTarget(
+            Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics,
+            Map<JobVertexID, ScalingSummary> scalingSummaries) {
+
+        for (Map.Entry<JobVertexID, ScalingSummary> entry : scalingSummaries.entrySet()) {
+            var vertex = entry.getKey();
+            var scalingSummary = entry.getValue();
+            var metrics = evaluatedMetrics.get(vertex);
+
+            double processingRate = metrics.get(TRUE_PROCESSING_RATE).getAverage();
+            double scaleUpRateThreshold = metrics.get(SCALE_UP_RATE_THRESHOLD).getCurrent();
+            double scaleDownRateThreshold = metrics.get(SCALE_DOWN_RATE_THRESHOLD).getCurrent();
+
+            if (processingRate < scaleUpRateThreshold || processingRate > scaleDownRateThreshold) {
+                LOG.info(
+                        "Vertex {}(pCurr={}, pNew={}) processing rate {} is outside ({}, {})",
+                        vertex,
+                        scalingSummary.getCurrentParallelism(),
+                        scalingSummary.getNewParallelism(),
+                        processingRate,
+                        scaleUpRateThreshold,
+                        scaleDownRateThreshold);
+                return false;
+            } else {
+                LOG.debug(
+                        "Vertex {}(pCurr={}, pNew={}) processing rate {} is within target ({}, {})",
+                        vertex,
+                        scalingSummary.getCurrentParallelism(),
+                        scalingSummary.getNewParallelism(),
+                        processingRate,
+                        scaleUpRateThreshold,
+                        scaleDownRateThreshold);
+            }
+        }
+        LOG.info("All vertex processing rates are within target.");
+        return true;
+    }
+
+    private Map<JobVertexID, ScalingSummary> computeScalingSummary(
+            Configuration conf,
+            Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics,
+            Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory) {
+
+        var out = new HashMap<JobVertexID, ScalingSummary>();
+        evaluatedMetrics.forEach(
+                (v, metrics) -> {
+                    var currentParallelism =
+                            (int) metrics.get(ScalingMetric.PARALLELISM).getCurrent();
+                    var newParallelism =
+                            computeScaleTargetParallelism(
+                                    conf,
+                                    v,
+                                    metrics,
+                                    scalingHistory.getOrDefault(v, Collections.emptySortedMap()));
+                    if (currentParallelism != newParallelism) {
+                        out.put(v, new ScalingSummary(currentParallelism, newParallelism, metrics));
+                    }
+                });
+
+        return out;
+    }
+
+    protected int computeScaleTargetParallelism(
+            Configuration conf,
+            JobVertexID vertex,
+            Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+            SortedMap<Instant, ScalingSummary> history) {
+
+        var currentParallelism = (int) evaluatedMetrics.get(PARALLELISM).getCurrent();
+        double averageTrueProcessingRate = evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
+
+        if (Double.isNaN(averageTrueProcessingRate)) {
+            LOG.info(
+                    "True processing rate is not available for {}, cannot compute new parallelism",
+                    vertex);
+            return currentParallelism;
+        }
+
+        double targetCapacity =
+                AutoScalerUtils.getTargetProcessingCapacity(
+                        evaluatedMetrics, conf, conf.get(TARGET_UTILIZATION), true);
+        if (Double.isNaN(targetCapacity)) {
+            LOG.info(
+                    "Target data rate is not available for {}, cannot compute new parallelism",
+                    vertex);
+            return currentParallelism;
+        }
+
+        LOG.info("Target processing capacity for {} is {}", vertex, targetCapacity);
+        double scaleFactor = targetCapacity / averageTrueProcessingRate;
+        double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
+        if (scaleFactor < minScaleFactor) {
+            LOG.info(
+                    "Computed scale factor of {} for {} is capped by maximum scale down factor to {}",
+                    scaleFactor,
+                    vertex,
+                    minScaleFactor);
+            scaleFactor = minScaleFactor;
+        }
+
+        int newParallelism =
+                scale(
+                        currentParallelism,
+                        (int) evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
+                        scaleFactor,
+                        conf.getInteger(VERTEX_MIN_PARALLELISM),
+                        conf.getInteger(VERTEX_MAX_PARALLELISM));
+
+        if (!history.isEmpty()) {
+            if (detectImmediateScaleDownAfterScaleUp(
+                    conf, history, currentParallelism, newParallelism)) {
+                LOG.info(
+                        "Skipping immediate scale down after scale up for {} resetting target parallelism to {}",
+                        vertex,
+                        currentParallelism);
+                newParallelism = currentParallelism;
+            }
+
+            // currentParallelism = 2 , newParallelism = 1, minimumProcRate = 1000 r/s
+            // history
+            // currentParallelism 1 => 3 -> empiricalProcRate = 800
+            // empiricalProcRate + upperBoundary < minimumProcRate => don't scale
+        }
+
+        return newParallelism;
+    }
+
+    private boolean detectImmediateScaleDownAfterScaleUp(
+            Configuration conf,
+            SortedMap<Instant, ScalingSummary> history,
+            int currentParallelism,
+            int newParallelism) {
+        var lastScalingTs = history.lastKey();
+        var lastSummary = history.get(lastScalingTs);
+
+        boolean isScaleDown = newParallelism < currentParallelism;
+        boolean lastScaleUp = lastSummary.getNewParallelism() > lastSummary.getCurrentParallelism();
+
+        var gracePeriod = conf.get(SCALE_UP_GRACE_PERIOD);
+
+        boolean withinConfiguredTime =
+                Duration.between(lastScalingTs, clock.instant()).minus(gracePeriod).isNegative();
+
+        return isScaleDown && lastScaleUp && withinConfiguredTime;
+    }
+
+    public static int scale(
+            int parallelism,
+            int numKeyGroups,
+            double scaleFactor,
+            int minParallelism,
+            int maxParallelism) {
+        Preconditions.checkArgument(
+                minParallelism <= maxParallelism,
+                "The minimum parallelism must not be greater than the maximum parallelism.");
+        if (minParallelism > numKeyGroups) {
+            LOG.warn(
+                    "Specified autoscaler minimum parallelism {} is greater than the operator max parallelism {}. The min parallelism will be set to the operator max parallelism.",
+                    minParallelism,
+                    numKeyGroups);
+        }
+        if (numKeyGroups < maxParallelism && maxParallelism != Integer.MAX_VALUE) {
+            LOG.warn(
+                    "Specified autoscaler maximum parallelism {} is greater than the operator max parallelism {}. This means the operator max parallelism can never be reached.",
+                    maxParallelism,
+                    numKeyGroups);
+        }
+
+        int newParallelism =
+                // Prevent integer overflow when converting from double to integer.
+                // We do not have to detect underflow because doubles cannot
+                // underflow.
+                (int) Math.min(Math.ceil(scaleFactor * parallelism), Integer.MAX_VALUE);
+
+        // Cap parallelism at either number of key groups or parallelism limit
+        final int upperBound = Math.min(numKeyGroups, maxParallelism);
+
+        // Apply min/max parallelism
+        newParallelism = Math.min(Math.max(minParallelism, newParallelism), upperBound);
+
+        // Try to adjust the parallelism such that it divides the number of key groups without a
+        // remainder => state is evenly spread across subtasks
+        for (int p = newParallelism; p <= numKeyGroups / 2 && p <= upperBound; p++) {
+            if (numKeyGroups % p == 0) {
+                return p;
+            }
+        }
+
+        // If key group adjustment fails, use originally computed parallelism
+        return newParallelism;
+    }
+
+    private void setVertexParallelismOverrides(
+            AbstractFlinkResource<?, ?> resource,
+            Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics,
+            Map<JobVertexID, ScalingSummary> summaries) {
+        var flinkConf = Configuration.fromMap(resource.getSpec().getFlinkConfiguration());
+        var overrides = new HashMap<String, String>();
+        evaluatedMetrics.forEach(
+                (id, metrics) -> {
+                    if (summaries.containsKey(id)) {
+                        overrides.put(
+                                id.toHexString(),
+                                String.valueOf(summaries.get(id).getNewParallelism()));
+                    } else {
+                        overrides.put(
+                                id.toHexString(),
+                                String.valueOf(
+                                        (int) metrics.get(ScalingMetric.PARALLELISM).getCurrent()));
+                    }
+                });
+        flinkConf.set(PARALLELISM_OVERRIDES, overrides);
+
+        resource.getSpec().setFlinkConfiguration(flinkConf.toMap());
+    }
+
+    @VisibleForTesting
+    protected void setClock(Clock clock) {
+        this.clock = Preconditions.checkNotNull(clock);
+    }
+
+    @Override
+    public void cleanup(AbstractFlinkResource<?, ?> cr) {
+        // No cleanup is currently necessary
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java
new file mode 100644
index 00000000..7def1716
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Map;
+
+/** Scaling summary returned by the {@link ScalingMetricEvaluator}. */
+@Data
+@NoArgsConstructor
+public class ScalingSummary {
+
+    private int currentParallelism;
+
+    private int newParallelism;
+
+    private Map<ScalingMetric, EvaluatedScalingMetric> metrics;
+
+    public ScalingSummary(
+            int currentParallelism,
+            int newParallelism,
+            Map<ScalingMetric, EvaluatedScalingMetric> metrics) {
+        if (currentParallelism == newParallelism) {
+            throw new IllegalArgumentException(
+                    "Current parallelism should not be equal to newParallelism during scaling.");
+        }
+        this.currentParallelism = currentParallelism;
+        this.newParallelism = newParallelism;
+        this.metrics = metrics;
+    }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
new file mode 100644
index 00000000..e819fd89
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import org.junit.Assert;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.kubernetes.operator.autoscaler.ScalingExecutor.PARALLELISM_OVERRIDES;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Test for scaling metrics collection logic. */
+@EnableKubernetesMockClient(crud = true)
+public class ScalingExecutorTest {
+
+    private ScalingExecutor scalingDecisionExecutor;
+    private Configuration conf;
+    private KubernetesClient kubernetesClient;
+    private FlinkDeployment flinkDep;
+
+    @BeforeEach
+    public void setup() {
+        scalingDecisionExecutor = new ScalingExecutor(kubernetesClient);
+        conf = new Configuration();
+        conf.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ZERO);
+        conf.set(AutoScalerOptions.SCALING_ENABLED, true);
+        conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
+        conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
+
+        flinkDep = TestUtils.buildApplicationCluster();
+        kubernetesClient.resource(flinkDep).createOrReplace();
+        flinkDep.getStatus()
+                .getJobStatus()
+                .setStartTime(String.valueOf(System.currentTimeMillis()));
+    }
+
+    @Test
+    public void testStabilizationPeriod() throws Exception {
+        conf.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ofMinutes(1));
+
+        var metrics = Map.of(new JobVertexID(), evaluated(1, 110, 100));
+
+        var scalingInfo = new AutoScalerInfo(new HashMap<>());
+        var clock = Clock.fixed(Instant.now(), ZoneId.systemDefault());
+        flinkDep.getStatus()
+                .getJobStatus()
+                .setStartTime(String.valueOf(clock.instant().toEpochMilli()));
+
+        scalingDecisionExecutor.setClock(clock);
+        assertFalse(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics));
+
+        clock = Clock.offset(clock, Duration.ofSeconds(30));
+        scalingDecisionExecutor.setClock(clock);
+        assertFalse(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics));
+
+        clock = Clock.offset(clock, Duration.ofSeconds(20));
+        scalingDecisionExecutor.setClock(clock);
+        assertFalse(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics));
+
+        clock = Clock.offset(clock, Duration.ofSeconds(20));
+        scalingDecisionExecutor.setClock(clock);
+        assertTrue(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics));
+
+        flinkDep.getStatus()
+                .getJobStatus()
+                .setStartTime(String.valueOf(clock.instant().toEpochMilli()));
+        assertFalse(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics));
+
+        clock = Clock.offset(clock, Duration.ofSeconds(59));
+        scalingDecisionExecutor.setClock(clock);
+        assertFalse(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics));
+
+        clock = Clock.offset(clock, Duration.ofSeconds(2));
+        scalingDecisionExecutor.setClock(clock);
+        assertTrue(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics));
+    }
+
+    @Test
+    public void testUtilizationBoundaries() {
+        // Restart time should not affect utilization boundary
+        conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO);
+        conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
+
+        var flinkDep = TestUtils.buildApplicationCluster();
+        kubernetesClient.resource(flinkDep).createOrReplace();
+
+        var op1 = new JobVertexID();
+
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6);
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.);
+
+        var evaluated = Map.of(op1, evaluated(1, 70, 100));
+        var scalingSummary = Map.of(op1, new ScalingSummary(2, 1, evaluated.get(op1)));
+        assertFalse(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated, scalingSummary));
+
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.2);
+        evaluated = Map.of(op1, evaluated(1, 70, 100));
+        scalingSummary = Map.of(op1, new ScalingSummary(2, 1, evaluated.get(op1)));
+        assertTrue(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated, scalingSummary));
+        assertNull(getScaledParallelism(flinkDep));
+
+        var op2 = new JobVertexID();
+        evaluated =
+                Map.of(
+                        op1, evaluated(1, 70, 100),
+                        op2, evaluated(1, 85, 100));
+        scalingSummary =
+                Map.of(
+                        op1,
+                        new ScalingSummary(1, 2, evaluated.get(op1)),
+                        op2,
+                        new ScalingSummary(1, 2, evaluated.get(op2)));
+
+        assertFalse(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated, scalingSummary));
+
+        evaluated =
+                Map.of(
+                        op1, evaluated(1, 70, 100),
+                        op2, evaluated(1, 70, 100));
+        scalingSummary =
+                Map.of(
+                        op1,
+                        new ScalingSummary(1, 2, evaluated.get(op1)),
+                        op2,
+                        new ScalingSummary(1, 2, evaluated.get(op2)));
+        assertTrue(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated, scalingSummary));
+
+        // Test with backlog based scaling
+        evaluated = Map.of(op1, evaluated(1, 70, 100, 15));
+        scalingSummary = Map.of(op1, new ScalingSummary(1, 2, evaluated.get(op1)));
+        assertFalse(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated, scalingSummary));
+    }
+
+    @Test
+    public void testParallelismScaling() {
+        var op = new JobVertexID();
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+        assertEquals(
+                5,
+                scalingDecisionExecutor.computeScaleTargetParallelism(
+                        conf, op, evaluated(10, 50, 100), Collections.emptySortedMap()));
+
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
+        assertEquals(
+                8,
+                scalingDecisionExecutor.computeScaleTargetParallelism(
+                        conf, op, evaluated(10, 50, 100), Collections.emptySortedMap()));
+
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
+        assertEquals(
+                10,
+                scalingDecisionExecutor.computeScaleTargetParallelism(
+                        conf, op, evaluated(10, 80, 100), Collections.emptySortedMap()));
+
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
+        assertEquals(
+                8,
+                scalingDecisionExecutor.computeScaleTargetParallelism(
+                        conf, op, evaluated(10, 60, 100), Collections.emptySortedMap()));
+
+        assertEquals(
+                8,
+                scalingDecisionExecutor.computeScaleTargetParallelism(
+                        conf, op, evaluated(10, 59, 100), Collections.emptySortedMap()));
+
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.5);
+        assertEquals(
+                10,
+                scalingDecisionExecutor.computeScaleTargetParallelism(
+                        conf, op, evaluated(2, 100, 40), Collections.emptySortedMap()));
+
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6);
+        assertEquals(
+                4,
+                scalingDecisionExecutor.computeScaleTargetParallelism(
+                        conf, op, evaluated(2, 100, 100), Collections.emptySortedMap()));
+
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+        conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.5);
+        assertEquals(
+                5,
+                scalingDecisionExecutor.computeScaleTargetParallelism(
+                        conf, op, evaluated(10, 10, 100), Collections.emptySortedMap()));
+
+        conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.6);
+        assertEquals(
+                4,
+                scalingDecisionExecutor.computeScaleTargetParallelism(
+                        conf, op, evaluated(10, 10, 100), Collections.emptySortedMap()));
+    }
+
+    @Test
+    public void testParallelismComputation() {
+        final int minParallelism = 1;
+        final int maxParallelism = Integer.MAX_VALUE;
+        assertEquals(1, ScalingExecutor.scale(1, 720, 0.0001, minParallelism, maxParallelism));
+        assertEquals(1, ScalingExecutor.scale(2, 720, 0.1, minParallelism, maxParallelism));
+        assertEquals(5, ScalingExecutor.scale(6, 720, 0.8, minParallelism, maxParallelism));
+        assertEquals(32, ScalingExecutor.scale(16, 128, 1.5, minParallelism, maxParallelism));
+        assertEquals(400, ScalingExecutor.scale(200, 720, 2, minParallelism, maxParallelism));
+        assertEquals(
+                720,
+                ScalingExecutor.scale(200, 720, Integer.MAX_VALUE, minParallelism, maxParallelism));
+    }
+
+    @Test
+    public void testParallelismComputationWithLimit() {
+        assertEquals(5, ScalingExecutor.scale(6, 720, 0.8, 1, 700));
+        assertEquals(8, ScalingExecutor.scale(8, 720, 0.8, 8, 700));
+
+        assertEquals(32, ScalingExecutor.scale(16, 128, 1.5, 1, Integer.MAX_VALUE));
+        assertEquals(64, ScalingExecutor.scale(16, 128, 1.5, 60, Integer.MAX_VALUE));
+
+        assertEquals(300, ScalingExecutor.scale(200, 720, 2, 1, 300));
+        assertEquals(600, ScalingExecutor.scale(200, 720, Integer.MAX_VALUE, 1, 600));
+    }
+
+    @Test
+    public void ensureMinParallelismDoesNotExceedMax() {
+        Assert.assertThrows(
+                IllegalArgumentException.class,
+                () ->
+                        assertEquals(
+                                600, ScalingExecutor.scale(200, 720, Integer.MAX_VALUE, 500, 499)));
+    }
+
+    @Test
+    public void testMinParallelismLimitIsUsed() {
+        conf.setInteger(AutoScalerOptions.VERTEX_MIN_PARALLELISM, 5);
+        assertEquals(
+                5,
+                scalingDecisionExecutor.computeScaleTargetParallelism(
+                        conf,
+                        new JobVertexID(),
+                        evaluated(10, 100, 500),
+                        Collections.emptySortedMap()));
+    }
+
+    @Test
+    public void testMaxParallelismLimitIsUsed() {
+        conf.setInteger(AutoScalerOptions.VERTEX_MAX_PARALLELISM, 10);
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+        assertEquals(
+                10,
+                scalingDecisionExecutor.computeScaleTargetParallelism(
+                        conf,
+                        new JobVertexID(),
+                        evaluated(10, 500, 100),
+                        Collections.emptySortedMap()));
+    }
+
+    @Test
+    public void testScaleDownAfterScaleUpDetection() throws Exception {
+        var op = new JobVertexID();
+        var scalingInfo = new AutoScalerInfo(new HashMap<>());
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+        conf.set(AutoScalerOptions.SCALE_UP_GRACE_PERIOD, Duration.ofMinutes(1));
+
+        scalingDecisionExecutor.scaleResource(
+                flinkDep, scalingInfo, conf, Map.of(op, evaluated(5, 100, 50)));
+        assertEquals(Map.of(op, 10), getScaledParallelism(flinkDep));
+
+        // Should now allow scale back down immediately
+        scalingDecisionExecutor.scaleResource(
+                flinkDep, scalingInfo, conf, Map.of(op, evaluated(10, 50, 100)));
+        assertEquals(Map.of(op, 10), getScaledParallelism(flinkDep));
+
+        // Pass some time...
+        var clock = Clock.offset(Clock.systemDefaultZone(), Duration.ofSeconds(61));
+        scalingDecisionExecutor.setClock(clock);
+        scalingDecisionExecutor.scaleResource(
+                flinkDep, scalingInfo, conf, Map.of(op, evaluated(10, 50, 100)));
+        assertEquals(Map.of(op, 5), getScaledParallelism(flinkDep));
+
+        // Allow immediate scale up
+        scalingDecisionExecutor.scaleResource(
+                flinkDep, scalingInfo, conf, Map.of(op, evaluated(5, 100, 50)));
+        assertEquals(Map.of(op, 10), getScaledParallelism(flinkDep));
+    }
+
+    private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(
+            int parallelism, double target, double procRate, double catchupRate) {
+        var metrics = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
+        metrics.put(ScalingMetric.PARALLELISM, EvaluatedScalingMetric.of(parallelism));
+        metrics.put(ScalingMetric.MAX_PARALLELISM, EvaluatedScalingMetric.of(720));
+        metrics.put(ScalingMetric.TARGET_DATA_RATE, new EvaluatedScalingMetric(target, target));
+        metrics.put(ScalingMetric.CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(catchupRate));
+        metrics.put(
+                ScalingMetric.TRUE_PROCESSING_RATE, new EvaluatedScalingMetric(procRate, procRate));
+        ScalingMetricEvaluator.computeProcessingRateThresholds(metrics, conf);
+        return metrics;
+    }
+
+    private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(
+            int parallelism, double target, double procRate) {
+        return evaluated(parallelism, target, procRate, 0.);
+    }
+
+    protected static Map<JobVertexID, Integer> getScaledParallelism(
+            AbstractFlinkResource<?, ?> resource) {
+
+        var conf = Configuration.fromMap(resource.getSpec().getFlinkConfiguration());
+        var overrides = conf.get(PARALLELISM_OVERRIDES);
+        if (overrides == null || overrides.isEmpty()) {
+            return null;
+        }
+
+        var out = new HashMap<JobVertexID, Integer>();
+
+        overrides.forEach((k, v) -> out.put(JobVertexID.fromHexString(k), Integer.parseInt(v)));
+        return out;
+    }
+}