You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/12/19 21:16:03 UTC

[flink-kubernetes-operator] 04/09: [FLINK-30260][autoscaler] Add scaling metric evaluation logic

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 0b3cc1312e9070ac0a28cea76e1c6d9284f52acd
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Thu Dec 1 08:59:46 2022 +0100

    [FLINK-30260][autoscaler] Add scaling metric evaluation logic
---
 .../autoscaler/ScalingMetricEvaluator.java         | 228 +++++++++++++
 .../autoscaler/BacklogBasedScalingTest.java        | 352 +++++++++++++++++++++
 .../autoscaler/ScalingMetricEvaluatorTest.java     | 227 +++++++++++++
 3 files changed, 807 insertions(+)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
new file mode 100644
index 00000000..fdf5e33f
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
+import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.math3.stat.StatUtils;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedMap;
+
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LAG;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.OUTPUT_RATIO;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_OUTPUT_RATE;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+
+/** Job scaling evaluator for autoscaler. */
+public class ScalingMetricEvaluator {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ScalingMetricEvaluator.class);
+
+    private Clock clock = Clock.systemDefaultZone();
+
+    public Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluate(
+            Configuration conf, CollectedMetrics collectedMetrics) {
+
+        var scalingOutput = new HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>();
+        var metricsHistory = collectedMetrics.getMetricHistory();
+        var topology = collectedMetrics.getJobTopology();
+
+        for (var vertex : topology.getVerticesInTopologicalOrder()) {
+            scalingOutput.put(
+                    vertex,
+                    computeVertexScalingSummary(
+                            conf, scalingOutput, metricsHistory, topology, vertex));
+        }
+
+        return scalingOutput;
+    }
+
+    @NotNull
+    private Map<ScalingMetric, EvaluatedScalingMetric> computeVertexScalingSummary(
+            Configuration conf,
+            HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> scalingOutput,
+            SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> metricsHistory,
+            JobTopology topology,
+            JobVertexID vertex) {
+
+        var latestVertexMetrics = metricsHistory.get(metricsHistory.lastKey()).get(vertex);
+
+        var evaluatedMetrics = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
+        computeTargetDataRate(
+                topology,
+                vertex,
+                conf,
+                scalingOutput,
+                metricsHistory,
+                latestVertexMetrics,
+                evaluatedMetrics);
+
+        evaluatedMetrics.put(
+                TRUE_PROCESSING_RATE,
+                new EvaluatedScalingMetric(
+                        latestVertexMetrics.get(TRUE_PROCESSING_RATE),
+                        getAverage(TRUE_PROCESSING_RATE, vertex, metricsHistory, conf)));
+
+        evaluatedMetrics.put(
+                PARALLELISM, EvaluatedScalingMetric.of(topology.getParallelisms().get(vertex)));
+        evaluatedMetrics.put(
+                MAX_PARALLELISM,
+                EvaluatedScalingMetric.of(topology.getMaxParallelisms().get(vertex)));
+
+        computeProcessingRateThresholds(evaluatedMetrics, conf);
+
+        var isSink = topology.getOutputs().get(vertex).isEmpty();
+        if (!isSink) {
+            evaluatedMetrics.put(
+                    TRUE_OUTPUT_RATE,
+                    new EvaluatedScalingMetric(
+                            latestVertexMetrics.get(TRUE_OUTPUT_RATE),
+                            getAverage(TRUE_OUTPUT_RATE, vertex, metricsHistory, conf)));
+            evaluatedMetrics.put(
+                    OUTPUT_RATIO,
+                    new EvaluatedScalingMetric(
+                            latestVertexMetrics.get(OUTPUT_RATIO),
+                            getAverage(OUTPUT_RATIO, vertex, metricsHistory, conf)));
+        }
+
+        return evaluatedMetrics;
+    }
+
+    @VisibleForTesting
+    protected static void computeProcessingRateThresholds(
+            Map<ScalingMetric, EvaluatedScalingMetric> metrics, Configuration conf) {
+
+        double utilizationBoundary = conf.getDouble(TARGET_UTILIZATION_BOUNDARY);
+
+        double scaleUpThreshold =
+                AutoScalerUtils.getTargetProcessingCapacity(
+                        metrics, conf, conf.get(TARGET_UTILIZATION) + utilizationBoundary, false);
+
+        double scaleDownThreshold =
+                AutoScalerUtils.getTargetProcessingCapacity(
+                        metrics, conf, conf.get(TARGET_UTILIZATION) - utilizationBoundary, true);
+
+        metrics.put(SCALE_UP_RATE_THRESHOLD, EvaluatedScalingMetric.of(scaleUpThreshold));
+        metrics.put(SCALE_DOWN_RATE_THRESHOLD, EvaluatedScalingMetric.of(scaleDownThreshold));
+    }
+
+    private void computeTargetDataRate(
+            JobTopology topology,
+            JobVertexID vertex,
+            Configuration conf,
+            HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> alreadyEvaluated,
+            SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> metricsHistory,
+            Map<ScalingMetric, Double> latestVertexMetrics,
+            Map<ScalingMetric, EvaluatedScalingMetric> out) {
+
+        boolean isSource = topology.getInputs().get(vertex).isEmpty();
+        if (isSource) {
+            double catchUpTargetSec = conf.get(AutoScalerOptions.CATCH_UP_DURATION).toSeconds();
+
+            var sourceRateMetric =
+                    latestVertexMetrics.containsKey(TARGET_DATA_RATE)
+                            ? TARGET_DATA_RATE
+                            : SOURCE_DATA_RATE;
+            if (!latestVertexMetrics.containsKey(sourceRateMetric)) {
+                throw new RuntimeException(
+                        "Cannot evaluate metrics without source target rate information");
+            }
+
+            out.put(
+                    TARGET_DATA_RATE,
+                    new EvaluatedScalingMetric(
+                            latestVertexMetrics.get(sourceRateMetric),
+                            getAverage(sourceRateMetric, vertex, metricsHistory, conf)));
+
+            double lag = latestVertexMetrics.getOrDefault(LAG, 0.);
+            double catchUpInputRate = catchUpTargetSec == 0 ? 0 : lag / catchUpTargetSec;
+            if (catchUpInputRate > 0) {
+                LOG.info(
+                        "Extra backlog processing input rate for {} is {}",
+                        vertex,
+                        catchUpInputRate);
+            }
+            out.put(CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(catchUpInputRate));
+        } else {
+            var inputs = topology.getInputs().get(vertex);
+            double sumCurrentTargetRate = 0;
+            double sumAvgTargetRate = 0;
+            double sumCatchUpDataRate = 0;
+            for (var inputVertex : inputs) {
+                var inputEvaluatedMetrics = alreadyEvaluated.get(inputVertex);
+                var inputTargetRate = inputEvaluatedMetrics.get(TARGET_DATA_RATE);
+                var outputRateMultiplier = inputEvaluatedMetrics.get(OUTPUT_RATIO).getAverage();
+                sumCurrentTargetRate += inputTargetRate.getCurrent() * outputRateMultiplier;
+                sumAvgTargetRate += inputTargetRate.getAverage() * outputRateMultiplier;
+                sumCatchUpDataRate +=
+                        inputEvaluatedMetrics.get(CATCH_UP_DATA_RATE).getCurrent()
+                                * outputRateMultiplier;
+            }
+            out.put(
+                    TARGET_DATA_RATE,
+                    new EvaluatedScalingMetric(sumCurrentTargetRate, sumAvgTargetRate));
+            out.put(CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(sumCatchUpDataRate));
+        }
+    }
+
+    private double getAverage(
+            ScalingMetric metric,
+            JobVertexID jobVertexId,
+            SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> metricsHistory,
+            Configuration conf) {
+        return StatUtils.mean(
+                metricsHistory
+                        .tailMap(clock.instant().minus(conf.get(AutoScalerOptions.METRICS_WINDOW)))
+                        .values().stream()
+                        .map(m -> m.get(jobVertexId))
+                        .filter(m -> m.containsKey(metric))
+                        .mapToDouble(m -> m.get(metric))
+                        .filter(d -> !Double.isNaN(d))
+                        .toArray());
+    }
+
+    @VisibleForTesting
+    protected void setClock(Clock clock) {
+        this.clock = Preconditions.checkNotNull(clock);
+    }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
new file mode 100644
index 00000000..1e1466c8
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.TestingFlinkService;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
+import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+/** Test for scaling metrics collection logic. */
+@EnableKubernetesMockClient(crud = true)
+public class BacklogBasedScalingTest {
+
+    private ScalingMetricEvaluator evaluator;
+    private TestingFlinkService service;
+    private TestingMetricsCollector metricsCollector;
+    private ScalingExecutor scalingExecutor;
+
+    private FlinkDeployment app;
+    private JobVertexID source1, sink;
+
+    private FlinkConfigManager confManager;
+    private JobAutoScaler autoscaler;
+
+    private KubernetesClient kubernetesClient;
+
+    @BeforeEach
+    public void setup() {
+        evaluator = new ScalingMetricEvaluator();
+        scalingExecutor = new ScalingExecutor(kubernetesClient);
+        service = new TestingFlinkService();
+
+        app = TestUtils.buildApplicationCluster();
+        app.getMetadata().setGeneration(1L);
+        app.getStatus().getJobStatus().setJobId(new JobID().toHexString());
+        kubernetesClient.resource(app).createOrReplace();
+
+        source1 = new JobVertexID();
+        sink = new JobVertexID();
+
+        metricsCollector =
+                new TestingMetricsCollector(
+                        new JobTopology(
+                                new VertexInfo(source1, Set.of(), 1, 720),
+                                new VertexInfo(sink, Set.of(source1), 1, 720)));
+
+        var defaultConf = new Configuration();
+        defaultConf.set(AutoScalerOptions.AUTOSCALER_ENABLED, true);
+        defaultConf.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ZERO);
+        defaultConf.set(AutoScalerOptions.RESTART_TIME, Duration.ofSeconds(1));
+        defaultConf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofSeconds(2));
+        defaultConf.set(AutoScalerOptions.SCALING_ENABLED, true);
+        defaultConf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
+        defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.8);
+        defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1);
+        defaultConf.set(AutoScalerOptions.SCALE_UP_GRACE_PERIOD, Duration.ZERO);
+
+        confManager = new FlinkConfigManager(defaultConf);
+        ReconciliationUtils.updateStatusForDeployedSpec(
+                app, confManager.getDeployConfig(app.getMetadata(), app.getSpec()));
+        app.getStatus().getJobStatus().setState(JobStatus.RUNNING.name());
+
+        autoscaler =
+                new JobAutoScaler(
+                        kubernetesClient,
+                        confManager,
+                        metricsCollector,
+                        evaluator,
+                        scalingExecutor,
+                        TestUtils.createTestMetricGroup(new Configuration()));
+    }
+
+    @Test
+    public void test() throws Exception {
+        var ctx = createAutoscalerTestContext();
+        var now = Instant.now();
+        setClocksTo(now);
+        app.getStatus().getJobStatus().setStartTime(String.valueOf(now.toEpochMilli()));
+        metricsCollector.setCurrentMetrics(
+                Map.of(
+                        source1,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, 850., Double.NaN),
+                                FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 500.),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 500.),
+                                FlinkMetric.PENDING_RECORDS,
+                                new AggregatedMetric(
+                                        "", Double.NaN, Double.NaN, Double.NaN, 2000.)),
+                        sink,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, 850., Double.NaN),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric(
+                                        "", Double.NaN, Double.NaN, Double.NaN, 500.))));
+
+        autoscaler.scale(app, service, confManager.getObserveConfig(app), ctx);
+        assertFalse(AutoScalerInfo.forResource(app, kubernetesClient).getMetricHistory().isEmpty());
+
+        now = now.plus(Duration.ofSeconds(1));
+        setClocksTo(now);
+        autoscaler.scale(app, service, confManager.getObserveConfig(app), ctx);
+
+        var scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
+        assertEquals(4, scaledParallelism.get(source1));
+        assertEquals(4, scaledParallelism.get(sink));
+
+        metricsCollector.setJobTopology(
+                new JobTopology(
+                        new VertexInfo(source1, Set.of(), 4, 24),
+                        new VertexInfo(sink, Set.of(source1), 4, 720)));
+        metricsCollector.setCurrentMetrics(
+                Map.of(
+                        source1,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, 1000., Double.NaN),
+                                FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 1800.),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 1800.),
+                                FlinkMetric.PENDING_RECORDS,
+                                new AggregatedMetric(
+                                        "", Double.NaN, Double.NaN, Double.NaN, 2500.)),
+                        sink,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, 1000., Double.NaN),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric(
+                                        "", Double.NaN, Double.NaN, Double.NaN, 1800.))));
+
+        now = now.plus(Duration.ofSeconds(1));
+        setClocksTo(now);
+        app.getStatus().getJobStatus().setStartTime(String.valueOf(now.toEpochMilli()));
+        autoscaler.scale(
+                app, service, confManager.getObserveConfig(app), createAutoscalerTestContext());
+        assertFalse(AutoScalerInfo.forResource(app, kubernetesClient).getMetricHistory().isEmpty());
+        scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
+        assertEquals(4, scaledParallelism.get(source1));
+        assertEquals(4, scaledParallelism.get(sink));
+
+        metricsCollector.setCurrentMetrics(
+                Map.of(
+                        source1,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, 1000., Double.NaN),
+                                FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 1800.),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 1800.),
+                                FlinkMetric.PENDING_RECORDS,
+                                new AggregatedMetric(
+                                        "", Double.NaN, Double.NaN, Double.NaN, 1200.)),
+                        sink,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, 1000., Double.NaN),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric(
+                                        "", Double.NaN, Double.NaN, Double.NaN, 1800.))));
+
+        now = now.plus(Duration.ofSeconds(1));
+        setClocksTo(now);
+        autoscaler.scale(
+                app, service, confManager.getObserveConfig(app), createAutoscalerTestContext());
+        assertFalse(AutoScalerInfo.forResource(app, kubernetesClient).getMetricHistory().isEmpty());
+        scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
+        assertEquals(4, scaledParallelism.get(source1));
+        assertEquals(4, scaledParallelism.get(sink));
+
+        // We have finally caught up to our original lag, time to scale down
+        metricsCollector.setCurrentMetrics(
+                Map.of(
+                        source1,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, 600., Double.NaN),
+                                FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 800.),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 800.),
+                                FlinkMetric.PENDING_RECORDS,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 0.)),
+                        sink,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, 600., Double.NaN),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric(
+                                        "", Double.NaN, Double.NaN, Double.NaN, 800.))));
+        now = now.plus(Duration.ofSeconds(1));
+        setClocksTo(now);
+        autoscaler.scale(
+                app, service, confManager.getObserveConfig(app), createAutoscalerTestContext());
+
+        scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
+        assertEquals(2, scaledParallelism.get(source1));
+        assertEquals(2, scaledParallelism.get(sink));
+        metricsCollector.setJobTopology(
+                new JobTopology(
+                        new VertexInfo(source1, Set.of(), 2, 24),
+                        new VertexInfo(sink, Set.of(source1), 2, 720)));
+
+        metricsCollector.setCurrentMetrics(
+                Map.of(
+                        source1,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, 1000., Double.NaN),
+                                FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 900.),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 900.),
+                                FlinkMetric.PENDING_RECORDS,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 500.)),
+                        sink,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, 1000., Double.NaN),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric(
+                                        "", Double.NaN, Double.NaN, Double.NaN, 900.))));
+        now = now.plus(Duration.ofSeconds(1));
+        setClocksTo(now);
+        app.getStatus().getJobStatus().setStartTime(String.valueOf(now.toEpochMilli()));
+        autoscaler.scale(
+                app, service, confManager.getObserveConfig(app), createAutoscalerTestContext());
+        scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
+        assertEquals(2, scaledParallelism.get(source1));
+        assertEquals(2, scaledParallelism.get(sink));
+
+        metricsCollector.setCurrentMetrics(
+                Map.of(
+                        source1,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, 1000., Double.NaN),
+                                FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 900.),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 900.),
+                                FlinkMetric.PENDING_RECORDS,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 100.)),
+                        sink,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, 1000., Double.NaN),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric(
+                                        "", Double.NaN, Double.NaN, Double.NaN, 900.))));
+        now = now.plus(Duration.ofSeconds(1));
+        setClocksTo(now);
+        autoscaler.scale(
+                app, service, confManager.getObserveConfig(app), createAutoscalerTestContext());
+        scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
+        assertEquals(2, scaledParallelism.get(source1));
+        assertEquals(2, scaledParallelism.get(sink));
+
+        metricsCollector.setCurrentMetrics(
+                Map.of(
+                        source1,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, 500., Double.NaN),
+                                FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 500.),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 500.),
+                                FlinkMetric.PENDING_RECORDS,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 0.)),
+                        sink,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, 500., Double.NaN),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric(
+                                        "", Double.NaN, Double.NaN, Double.NaN, 500.))));
+        now = now.plus(Duration.ofSeconds(1));
+        setClocksTo(now);
+        autoscaler.scale(
+                app, service, confManager.getObserveConfig(app), createAutoscalerTestContext());
+        scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
+        assertEquals(2, scaledParallelism.get(source1));
+        assertEquals(2, scaledParallelism.get(sink));
+    }
+
+    private void setClocksTo(Instant time) {
+        var clock = Clock.fixed(time, ZoneId.systemDefault());
+        metricsCollector.setClock(clock);
+        evaluator.setClock(clock);
+        scalingExecutor.setClock(clock);
+    }
+
+    @NotNull
+    private TestUtils.TestingContext<HasMetadata> createAutoscalerTestContext() {
+        return new TestUtils.TestingContext<>() {
+            public <T1> Set<T1> getSecondaryResources(Class<T1> aClass) {
+                return (Set)
+                        kubernetesClient.configMaps().inAnyNamespace().list().getItems().stream()
+                                .collect(Collectors.toSet());
+            }
+        };
+    }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
new file mode 100644
index 00000000..7a4af640
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
+import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.CATCH_UP_DURATION;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.RESTART_TIME;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LAG;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.OUTPUT_RATIO;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_OUTPUT_RATE;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Scaling evaluator test. */
+public class ScalingMetricEvaluatorTest {
+
+    @Test
+    public void testLagBasedSourceScaling() {
+        var source = new JobVertexID();
+        var sink = new JobVertexID();
+
+        var topology =
+                new JobTopology(
+                        new VertexInfo(source, Collections.emptySet(), 1, 1),
+                        new VertexInfo(sink, Set.of(source), 1, 1));
+
+        var evaluator = new ScalingMetricEvaluator();
+
+        var metricHistory = new TreeMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>>();
+
+        metricHistory.put(
+                Instant.now(),
+                Map.of(
+                        source,
+                        Map.of(
+                                SOURCE_DATA_RATE,
+                                100.,
+                                LAG,
+                                0.,
+                                OUTPUT_RATIO,
+                                2.,
+                                TRUE_OUTPUT_RATE,
+                                200.,
+                                TRUE_PROCESSING_RATE,
+                                200.),
+                        sink,
+                        Map.of(TRUE_PROCESSING_RATE, 2000.)));
+
+        metricHistory.put(
+                Instant.now(),
+                Map.of(
+                        source,
+                        Map.of(
+                                SOURCE_DATA_RATE, 200.,
+                                LAG, 1000.,
+                                OUTPUT_RATIO, 2.,
+                                TRUE_OUTPUT_RATE, 200.,
+                                TRUE_PROCESSING_RATE, 200.),
+                        sink,
+                        Map.of(TRUE_PROCESSING_RATE, 2000.)));
+
+        var conf = new Configuration();
+
+        conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofSeconds(2));
+        conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO);
+        var evaluatedMetrics =
+                evaluator.evaluate(conf, new CollectedMetrics(topology, metricHistory));
+        assertEquals(
+                new EvaluatedScalingMetric(200, 150),
+                evaluatedMetrics.get(source).get(TARGET_DATA_RATE));
+        assertEquals(
+                EvaluatedScalingMetric.of(500),
+                evaluatedMetrics.get(source).get(CATCH_UP_DATA_RATE));
+        assertEquals(
+                new EvaluatedScalingMetric(400, 300),
+                evaluatedMetrics.get(sink).get(TARGET_DATA_RATE));
+        assertEquals(
+                EvaluatedScalingMetric.of(1000),
+                evaluatedMetrics.get(sink).get(CATCH_UP_DATA_RATE));
+
+        conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofSeconds(1));
+        evaluatedMetrics = evaluator.evaluate(conf, new CollectedMetrics(topology, metricHistory));
+        assertEquals(
+                new EvaluatedScalingMetric(200, 150),
+                evaluatedMetrics.get(source).get(TARGET_DATA_RATE));
+        assertEquals(
+                EvaluatedScalingMetric.of(1000),
+                evaluatedMetrics.get(source).get(CATCH_UP_DATA_RATE));
+        assertEquals(
+                new EvaluatedScalingMetric(400, 300),
+                evaluatedMetrics.get(sink).get(TARGET_DATA_RATE));
+        assertEquals(
+                EvaluatedScalingMetric.of(2000),
+                evaluatedMetrics.get(sink).get(CATCH_UP_DATA_RATE));
+
+        // Restart time should not affect evaluated metrics
+        conf.set(AutoScalerOptions.RESTART_TIME, Duration.ofSeconds(2));
+
+        evaluatedMetrics = evaluator.evaluate(conf, new CollectedMetrics(topology, metricHistory));
+        assertEquals(
+                new EvaluatedScalingMetric(200, 150),
+                evaluatedMetrics.get(source).get(TARGET_DATA_RATE));
+        assertEquals(
+                EvaluatedScalingMetric.of(1000),
+                evaluatedMetrics.get(source).get(CATCH_UP_DATA_RATE));
+        assertEquals(
+                new EvaluatedScalingMetric(400, 300),
+                evaluatedMetrics.get(sink).get(TARGET_DATA_RATE));
+        assertEquals(
+                EvaluatedScalingMetric.of(2000),
+                evaluatedMetrics.get(sink).get(CATCH_UP_DATA_RATE));
+
+        // Turn off lag based scaling
+        conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
+        evaluatedMetrics = evaluator.evaluate(conf, new CollectedMetrics(topology, metricHistory));
+        assertEquals(
+                new EvaluatedScalingMetric(200, 150),
+                evaluatedMetrics.get(source).get(TARGET_DATA_RATE));
+        assertEquals(
+                EvaluatedScalingMetric.of(0), evaluatedMetrics.get(source).get(CATCH_UP_DATA_RATE));
+        assertEquals(
+                new EvaluatedScalingMetric(400, 300),
+                evaluatedMetrics.get(sink).get(TARGET_DATA_RATE));
+        assertEquals(
+                EvaluatedScalingMetric.of(0), evaluatedMetrics.get(sink).get(CATCH_UP_DATA_RATE));
+
+        // Test 0 lag
+        metricHistory.clear();
+        metricHistory.put(
+                Instant.now(),
+                Map.of(
+                        source,
+                        Map.of(
+                                SOURCE_DATA_RATE,
+                                100.,
+                                LAG,
+                                0.,
+                                OUTPUT_RATIO,
+                                2.,
+                                TRUE_OUTPUT_RATE,
+                                200.,
+                                TRUE_PROCESSING_RATE,
+                                200.),
+                        sink,
+                        Map.of(TRUE_PROCESSING_RATE, 2000.)));
+
+        conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofMinutes(1));
+        evaluatedMetrics = evaluator.evaluate(conf, new CollectedMetrics(topology, metricHistory));
+        assertEquals(
+                new EvaluatedScalingMetric(100, 100),
+                evaluatedMetrics.get(source).get(TARGET_DATA_RATE));
+        assertEquals(
+                new EvaluatedScalingMetric(200, 200),
+                evaluatedMetrics.get(sink).get(TARGET_DATA_RATE));
+    }
+
+    @Test
+    public void testUtilizationBoundaryComputation() {
+
+        var conf = new Configuration();
+        conf.set(TARGET_UTILIZATION, 0.8);
+        conf.set(TARGET_UTILIZATION_BOUNDARY, 0.1);
+        conf.set(RESTART_TIME, Duration.ofSeconds(1));
+        conf.set(CATCH_UP_DURATION, Duration.ZERO);
+
+        // Default behaviour, restart time does not factor in
+        assertEquals(Tuple2.of(778.0, 1000.0), getThresholds(700, 0, conf));
+
+        conf.set(CATCH_UP_DURATION, Duration.ofSeconds(2));
+        assertEquals(Tuple2.of(1128.0, 1700.0), getThresholds(700, 350, conf));
+        assertEquals(Tuple2.of(778.0, 1350.0), getThresholds(700, 0, conf));
+    }
+
+    private Tuple2<Double, Double> getThresholds(
+            double inputTargetRate, double catchUpRate, Configuration conf) {
+        var map = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
+
+        map.put(TARGET_DATA_RATE, new EvaluatedScalingMetric(Double.NaN, inputTargetRate));
+        map.put(CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(catchUpRate));
+
+        ScalingMetricEvaluator.computeProcessingRateThresholds(map, conf);
+        return Tuple2.of(
+                map.get(SCALE_UP_RATE_THRESHOLD).getCurrent(),
+                map.get(SCALE_DOWN_RATE_THRESHOLD).getCurrent());
+    }
+}