You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2023/01/12 11:57:07 UTC

[flink-kubernetes-operator] 01/04: Decouple scaling execution from parallelism computation

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit a66993089eb0e8fb01503d8e1b4f54b0f3983ade
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Thu Jan 5 16:12:04 2023 +0100

    Decouple scaling execution from parallelism computation
---
 .../operator/autoscaler/JobVertexScaler.java       | 189 +++++++++++++++++++++
 .../operator/autoscaler/ScalingExecutor.java       | 148 +---------------
 .../operator/autoscaler/JobVertexScalerTest.java   | 186 ++++++++++++++++++++
 .../operator/autoscaler/ScalingExecutorTest.java   | 122 +------------
 4 files changed, 384 insertions(+), 261 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
new file mode 100644
index 00000000..7b21a244
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Map;
+import java.util.SortedMap;
+
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+
+/** Component responsible for computing vertex parallelism based on the scaling metrics. */
+public class JobVertexScaler {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JobVertexScaler.class);
+
+    private Clock clock = Clock.system(ZoneId.systemDefault());
+
+    public int computeScaleTargetParallelism(
+            Configuration conf,
+            JobVertexID vertex,
+            Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+            SortedMap<Instant, ScalingSummary> history) {
+
+        var currentParallelism = (int) evaluatedMetrics.get(PARALLELISM).getCurrent();
+        double averageTrueProcessingRate = evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
+
+        if (Double.isNaN(averageTrueProcessingRate)) {
+            LOG.info(
+                    "True processing rate is not available for {}, cannot compute new parallelism",
+                    vertex);
+            return currentParallelism;
+        }
+
+        double targetCapacity =
+                AutoScalerUtils.getTargetProcessingCapacity(
+                        evaluatedMetrics, conf, conf.get(TARGET_UTILIZATION), true);
+        if (Double.isNaN(targetCapacity)) {
+            LOG.info(
+                    "Target data rate is not available for {}, cannot compute new parallelism",
+                    vertex);
+            return currentParallelism;
+        }
+
+        LOG.info("Target processing capacity for {} is {}", vertex, targetCapacity);
+        double scaleFactor = targetCapacity / averageTrueProcessingRate;
+        double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
+        if (scaleFactor < minScaleFactor) {
+            LOG.info(
+                    "Computed scale factor of {} for {} is capped by maximum scale down factor to {}",
+                    scaleFactor,
+                    vertex,
+                    minScaleFactor);
+            scaleFactor = minScaleFactor;
+        }
+
+        int newParallelism =
+                scale(
+                        currentParallelism,
+                        (int) evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
+                        scaleFactor,
+                        conf.getInteger(VERTEX_MIN_PARALLELISM),
+                        conf.getInteger(VERTEX_MAX_PARALLELISM));
+
+        if (!history.isEmpty()) {
+            if (detectImmediateScaleDownAfterScaleUp(
+                    conf, history, currentParallelism, newParallelism)) {
+                LOG.info(
+                        "Skipping immediate scale down after scale up for {} resetting target parallelism to {}",
+                        vertex,
+                        currentParallelism);
+                newParallelism = currentParallelism;
+            }
+
+            // currentParallelism = 2 , newParallelism = 1, minimumProcRate = 1000 r/s
+            // history
+            // currentParallelism 1 => 3 -> empiricalProcRate = 800
+            // empiricalProcRate + upperBoundary < minimumProcRate => don't scale
+        }
+
+        return newParallelism;
+    }
+
+    private boolean detectImmediateScaleDownAfterScaleUp(
+            Configuration conf,
+            SortedMap<Instant, ScalingSummary> history,
+            int currentParallelism,
+            int newParallelism) {
+        var lastScalingTs = history.lastKey();
+        var lastSummary = history.get(lastScalingTs);
+
+        boolean isScaleDown = newParallelism < currentParallelism;
+        boolean lastScaleUp = lastSummary.getNewParallelism() > lastSummary.getCurrentParallelism();
+
+        var gracePeriod = conf.get(SCALE_UP_GRACE_PERIOD);
+
+        boolean withinConfiguredTime =
+                Duration.between(lastScalingTs, clock.instant()).minus(gracePeriod).isNegative();
+
+        return isScaleDown && lastScaleUp && withinConfiguredTime;
+    }
+
+    @VisibleForTesting
+    protected static int scale(
+            int parallelism,
+            int numKeyGroups,
+            double scaleFactor,
+            int minParallelism,
+            int maxParallelism) {
+        Preconditions.checkArgument(
+                minParallelism <= maxParallelism,
+                "The minimum parallelism must not be greater than the maximum parallelism.");
+        if (minParallelism > numKeyGroups) {
+            LOG.warn(
+                    "Specified autoscaler minimum parallelism {} is greater than the operator max parallelism {}. The min parallelism will be set to the operator max parallelism.",
+                    minParallelism,
+                    numKeyGroups);
+        }
+        if (numKeyGroups < maxParallelism && maxParallelism != Integer.MAX_VALUE) {
+            LOG.warn(
+                    "Specified autoscaler maximum parallelism {} is greater than the operator max parallelism {}. This means the operator max parallelism can never be reached.",
+                    maxParallelism,
+                    numKeyGroups);
+        }
+
+        int newParallelism =
+                // Prevent integer overflow when converting from double to integer.
+                // We do not have to detect underflow because doubles cannot
+                // underflow.
+                (int) Math.min(Math.ceil(scaleFactor * parallelism), Integer.MAX_VALUE);
+
+        // Cap parallelism at either number of key groups or parallelism limit
+        final int upperBound = Math.min(numKeyGroups, maxParallelism);
+
+        // Apply min/max parallelism
+        newParallelism = Math.min(Math.max(minParallelism, newParallelism), upperBound);
+
+        // Try to adjust the parallelism such that it divides the number of key groups without a
+        // remainder => state is evenly spread across subtasks
+        for (int p = newParallelism; p <= numKeyGroups / 2 && p <= upperBound; p++) {
+            if (numKeyGroups % p == 0) {
+                return p;
+            }
+        }
+
+        // If key group adjustment fails, use originally computed parallelism
+        return newParallelism;
+    }
+
+    @VisibleForTesting
+    protected void setClock(Clock clock) {
+        this.clock = Preconditions.checkNotNull(clock);
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
index df2d96a9..976ec56e 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
 import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
 import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.Preconditions;
@@ -35,7 +34,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.time.Clock;
-import java.time.Duration;
 import java.time.Instant;
 import java.time.ZoneId;
 import java.util.Collections;
@@ -43,15 +41,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.SortedMap;
 
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
 import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
 import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.STABILIZATION_INTERVAL;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
 import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
 import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
 import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
@@ -70,11 +61,17 @@ public class ScalingExecutor implements Cleanup {
     private static final Logger LOG = LoggerFactory.getLogger(ScalingExecutor.class);
 
     private final KubernetesClient kubernetesClient;
+    private final JobVertexScaler jobVertexScaler;
 
     private Clock clock = Clock.system(ZoneId.systemDefault());
 
     public ScalingExecutor(KubernetesClient kubernetesClient) {
+        this(kubernetesClient, new JobVertexScaler());
+    }
+
+    public ScalingExecutor(KubernetesClient kubernetesClient, JobVertexScaler jobVertexScaler) {
         this.kubernetesClient = kubernetesClient;
+        this.jobVertexScaler = jobVertexScaler;
     }
 
     public boolean scaleResource(
@@ -192,7 +189,7 @@ public class ScalingExecutor implements Cleanup {
                     var currentParallelism =
                             (int) metrics.get(ScalingMetric.PARALLELISM).getCurrent();
                     var newParallelism =
-                            computeScaleTargetParallelism(
+                            jobVertexScaler.computeScaleTargetParallelism(
                                     conf,
                                     v,
                                     metrics,
@@ -205,136 +202,6 @@ public class ScalingExecutor implements Cleanup {
         return out;
     }
 
-    protected int computeScaleTargetParallelism(
-            Configuration conf,
-            JobVertexID vertex,
-            Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
-            SortedMap<Instant, ScalingSummary> history) {
-
-        var currentParallelism = (int) evaluatedMetrics.get(PARALLELISM).getCurrent();
-        double averageTrueProcessingRate = evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
-
-        if (Double.isNaN(averageTrueProcessingRate)) {
-            LOG.info(
-                    "True processing rate is not available for {}, cannot compute new parallelism",
-                    vertex);
-            return currentParallelism;
-        }
-
-        double targetCapacity =
-                AutoScalerUtils.getTargetProcessingCapacity(
-                        evaluatedMetrics, conf, conf.get(TARGET_UTILIZATION), true);
-        if (Double.isNaN(targetCapacity)) {
-            LOG.info(
-                    "Target data rate is not available for {}, cannot compute new parallelism",
-                    vertex);
-            return currentParallelism;
-        }
-
-        LOG.info("Target processing capacity for {} is {}", vertex, targetCapacity);
-        double scaleFactor = targetCapacity / averageTrueProcessingRate;
-        double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
-        if (scaleFactor < minScaleFactor) {
-            LOG.info(
-                    "Computed scale factor of {} for {} is capped by maximum scale down factor to {}",
-                    scaleFactor,
-                    vertex,
-                    minScaleFactor);
-            scaleFactor = minScaleFactor;
-        }
-
-        int newParallelism =
-                scale(
-                        currentParallelism,
-                        (int) evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
-                        scaleFactor,
-                        conf.getInteger(VERTEX_MIN_PARALLELISM),
-                        conf.getInteger(VERTEX_MAX_PARALLELISM));
-
-        if (!history.isEmpty()) {
-            if (detectImmediateScaleDownAfterScaleUp(
-                    conf, history, currentParallelism, newParallelism)) {
-                LOG.info(
-                        "Skipping immediate scale down after scale up for {} resetting target parallelism to {}",
-                        vertex,
-                        currentParallelism);
-                newParallelism = currentParallelism;
-            }
-
-            // currentParallelism = 2 , newParallelism = 1, minimumProcRate = 1000 r/s
-            // history
-            // currentParallelism 1 => 3 -> empiricalProcRate = 800
-            // empiricalProcRate + upperBoundary < minimumProcRate => don't scale
-        }
-
-        return newParallelism;
-    }
-
-    private boolean detectImmediateScaleDownAfterScaleUp(
-            Configuration conf,
-            SortedMap<Instant, ScalingSummary> history,
-            int currentParallelism,
-            int newParallelism) {
-        var lastScalingTs = history.lastKey();
-        var lastSummary = history.get(lastScalingTs);
-
-        boolean isScaleDown = newParallelism < currentParallelism;
-        boolean lastScaleUp = lastSummary.getNewParallelism() > lastSummary.getCurrentParallelism();
-
-        var gracePeriod = conf.get(SCALE_UP_GRACE_PERIOD);
-
-        boolean withinConfiguredTime =
-                Duration.between(lastScalingTs, clock.instant()).minus(gracePeriod).isNegative();
-
-        return isScaleDown && lastScaleUp && withinConfiguredTime;
-    }
-
-    public static int scale(
-            int parallelism,
-            int numKeyGroups,
-            double scaleFactor,
-            int minParallelism,
-            int maxParallelism) {
-        Preconditions.checkArgument(
-                minParallelism <= maxParallelism,
-                "The minimum parallelism must not be greater than the maximum parallelism.");
-        if (minParallelism > numKeyGroups) {
-            LOG.warn(
-                    "Specified autoscaler minimum parallelism {} is greater than the operator max parallelism {}. The min parallelism will be set to the operator max parallelism.",
-                    minParallelism,
-                    numKeyGroups);
-        }
-        if (numKeyGroups < maxParallelism && maxParallelism != Integer.MAX_VALUE) {
-            LOG.warn(
-                    "Specified autoscaler maximum parallelism {} is greater than the operator max parallelism {}. This means the operator max parallelism can never be reached.",
-                    maxParallelism,
-                    numKeyGroups);
-        }
-
-        int newParallelism =
-                // Prevent integer overflow when converting from double to integer.
-                // We do not have to detect underflow because doubles cannot
-                // underflow.
-                (int) Math.min(Math.ceil(scaleFactor * parallelism), Integer.MAX_VALUE);
-
-        // Cap parallelism at either number of key groups or parallelism limit
-        final int upperBound = Math.min(numKeyGroups, maxParallelism);
-
-        // Apply min/max parallelism
-        newParallelism = Math.min(Math.max(minParallelism, newParallelism), upperBound);
-
-        // Try to adjust the parallelism such that it divides the number of key groups without a
-        // remainder => state is evenly spread across subtasks
-        for (int p = newParallelism; p <= numKeyGroups / 2 && p <= upperBound; p++) {
-            if (numKeyGroups % p == 0) {
-                return p;
-            }
-        }
-
-        // If key group adjustment fails, use originally computed parallelism
-        return newParallelism;
-    }
-
     private void setVertexParallelismOverrides(
             AbstractFlinkResource<?, ?> resource,
             Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics,
@@ -362,6 +229,7 @@ public class ScalingExecutor implements Cleanup {
     @VisibleForTesting
     protected void setClock(Clock clock) {
         this.clock = Preconditions.checkNotNull(clock);
+        jobVertexScaler.setClock(clock);
     }
 
     @Override
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
new file mode 100644
index 00000000..e3a774f7
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.junit.Assert;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Test for vertex parallelism scaler logic. */
+public class JobVertexScalerTest {
+
+    private JobVertexScaler vertexScaler;
+    private Configuration conf;
+
+    @BeforeEach
+    public void setup() {
+        vertexScaler = new JobVertexScaler();
+        conf = new Configuration();
+        conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
+        conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
+    }
+
+    @Test
+    public void testParallelismScaling() {
+        var op = new JobVertexID();
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+        assertEquals(
+                5,
+                vertexScaler.computeScaleTargetParallelism(
+                        conf, op, evaluated(10, 50, 100), Collections.emptySortedMap()));
+
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
+        assertEquals(
+                8,
+                vertexScaler.computeScaleTargetParallelism(
+                        conf, op, evaluated(10, 50, 100), Collections.emptySortedMap()));
+
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
+        assertEquals(
+                10,
+                vertexScaler.computeScaleTargetParallelism(
+                        conf, op, evaluated(10, 80, 100), Collections.emptySortedMap()));
+
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
+        assertEquals(
+                8,
+                vertexScaler.computeScaleTargetParallelism(
+                        conf, op, evaluated(10, 60, 100), Collections.emptySortedMap()));
+
+        assertEquals(
+                8,
+                vertexScaler.computeScaleTargetParallelism(
+                        conf, op, evaluated(10, 59, 100), Collections.emptySortedMap()));
+
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.5);
+        assertEquals(
+                10,
+                vertexScaler.computeScaleTargetParallelism(
+                        conf, op, evaluated(2, 100, 40), Collections.emptySortedMap()));
+
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6);
+        assertEquals(
+                4,
+                vertexScaler.computeScaleTargetParallelism(
+                        conf, op, evaluated(2, 100, 100), Collections.emptySortedMap()));
+
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+        conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.5);
+        assertEquals(
+                5,
+                vertexScaler.computeScaleTargetParallelism(
+                        conf, op, evaluated(10, 10, 100), Collections.emptySortedMap()));
+
+        conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.6);
+        assertEquals(
+                4,
+                vertexScaler.computeScaleTargetParallelism(
+                        conf, op, evaluated(10, 10, 100), Collections.emptySortedMap()));
+    }
+
+    @Test
+    public void testParallelismComputation() {
+        final int minParallelism = 1;
+        final int maxParallelism = Integer.MAX_VALUE;
+        assertEquals(1, JobVertexScaler.scale(1, 720, 0.0001, minParallelism, maxParallelism));
+        assertEquals(1, JobVertexScaler.scale(2, 720, 0.1, minParallelism, maxParallelism));
+        assertEquals(5, JobVertexScaler.scale(6, 720, 0.8, minParallelism, maxParallelism));
+        assertEquals(32, JobVertexScaler.scale(16, 128, 1.5, minParallelism, maxParallelism));
+        assertEquals(400, JobVertexScaler.scale(200, 720, 2, minParallelism, maxParallelism));
+        assertEquals(
+                720,
+                JobVertexScaler.scale(200, 720, Integer.MAX_VALUE, minParallelism, maxParallelism));
+    }
+
+    @Test
+    public void testParallelismComputationWithLimit() {
+        assertEquals(5, JobVertexScaler.scale(6, 720, 0.8, 1, 700));
+        assertEquals(8, JobVertexScaler.scale(8, 720, 0.8, 8, 700));
+
+        assertEquals(32, JobVertexScaler.scale(16, 128, 1.5, 1, Integer.MAX_VALUE));
+        assertEquals(64, JobVertexScaler.scale(16, 128, 1.5, 60, Integer.MAX_VALUE));
+
+        assertEquals(300, JobVertexScaler.scale(200, 720, 2, 1, 300));
+        assertEquals(600, JobVertexScaler.scale(200, 720, Integer.MAX_VALUE, 1, 600));
+    }
+
+    @Test
+    public void ensureMinParallelismDoesNotExceedMax() {
+        Assert.assertThrows(
+                IllegalArgumentException.class,
+                () ->
+                        assertEquals(
+                                600, JobVertexScaler.scale(200, 720, Integer.MAX_VALUE, 500, 499)));
+    }
+
+    @Test
+    public void testMinParallelismLimitIsUsed() {
+        conf.setInteger(AutoScalerOptions.VERTEX_MIN_PARALLELISM, 5);
+        assertEquals(
+                5,
+                vertexScaler.computeScaleTargetParallelism(
+                        conf,
+                        new JobVertexID(),
+                        evaluated(10, 100, 500),
+                        Collections.emptySortedMap()));
+    }
+
+    @Test
+    public void testMaxParallelismLimitIsUsed() {
+        conf.setInteger(AutoScalerOptions.VERTEX_MAX_PARALLELISM, 10);
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+        assertEquals(
+                10,
+                vertexScaler.computeScaleTargetParallelism(
+                        conf,
+                        new JobVertexID(),
+                        evaluated(10, 500, 100),
+                        Collections.emptySortedMap()));
+    }
+
+    private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(
+            int parallelism, double target, double procRate, double catchupRate) {
+        var metrics = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
+        metrics.put(ScalingMetric.PARALLELISM, EvaluatedScalingMetric.of(parallelism));
+        metrics.put(ScalingMetric.MAX_PARALLELISM, EvaluatedScalingMetric.of(720));
+        metrics.put(ScalingMetric.TARGET_DATA_RATE, new EvaluatedScalingMetric(target, target));
+        metrics.put(ScalingMetric.CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(catchupRate));
+        metrics.put(
+                ScalingMetric.TRUE_PROCESSING_RATE, new EvaluatedScalingMetric(procRate, procRate));
+        ScalingMetricEvaluator.computeProcessingRateThresholds(metrics, conf);
+        return metrics;
+    }
+
+    private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(
+            int parallelism, double target, double procRate) {
+        return evaluated(parallelism, target, procRate, 0.);
+    }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
index 9aa67145..e1dc139e 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
-import org.junit.Assert;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -37,7 +36,6 @@ import java.time.Clock;
 import java.time.Duration;
 import java.time.Instant;
 import java.time.ZoneId;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -47,7 +45,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-/** Test for scaling metrics collection logic. */
+/** Test for scaling execution logic. */
 @EnableKubernetesMockClient(crud = true)
 public class ScalingExecutorTest {
 
@@ -172,124 +170,6 @@ public class ScalingExecutorTest {
         assertFalse(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated, scalingSummary));
     }
 
-    @Test
-    public void testParallelismScaling() {
-        var op = new JobVertexID();
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
-        assertEquals(
-                5,
-                scalingDecisionExecutor.computeScaleTargetParallelism(
-                        conf, op, evaluated(10, 50, 100), Collections.emptySortedMap()));
-
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
-        assertEquals(
-                8,
-                scalingDecisionExecutor.computeScaleTargetParallelism(
-                        conf, op, evaluated(10, 50, 100), Collections.emptySortedMap()));
-
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
-        assertEquals(
-                10,
-                scalingDecisionExecutor.computeScaleTargetParallelism(
-                        conf, op, evaluated(10, 80, 100), Collections.emptySortedMap()));
-
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
-        assertEquals(
-                8,
-                scalingDecisionExecutor.computeScaleTargetParallelism(
-                        conf, op, evaluated(10, 60, 100), Collections.emptySortedMap()));
-
-        assertEquals(
-                8,
-                scalingDecisionExecutor.computeScaleTargetParallelism(
-                        conf, op, evaluated(10, 59, 100), Collections.emptySortedMap()));
-
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.5);
-        assertEquals(
-                10,
-                scalingDecisionExecutor.computeScaleTargetParallelism(
-                        conf, op, evaluated(2, 100, 40), Collections.emptySortedMap()));
-
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6);
-        assertEquals(
-                4,
-                scalingDecisionExecutor.computeScaleTargetParallelism(
-                        conf, op, evaluated(2, 100, 100), Collections.emptySortedMap()));
-
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
-        conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.5);
-        assertEquals(
-                5,
-                scalingDecisionExecutor.computeScaleTargetParallelism(
-                        conf, op, evaluated(10, 10, 100), Collections.emptySortedMap()));
-
-        conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.6);
-        assertEquals(
-                4,
-                scalingDecisionExecutor.computeScaleTargetParallelism(
-                        conf, op, evaluated(10, 10, 100), Collections.emptySortedMap()));
-    }
-
-    @Test
-    public void testParallelismComputation() {
-        final int minParallelism = 1;
-        final int maxParallelism = Integer.MAX_VALUE;
-        assertEquals(1, ScalingExecutor.scale(1, 720, 0.0001, minParallelism, maxParallelism));
-        assertEquals(1, ScalingExecutor.scale(2, 720, 0.1, minParallelism, maxParallelism));
-        assertEquals(5, ScalingExecutor.scale(6, 720, 0.8, minParallelism, maxParallelism));
-        assertEquals(32, ScalingExecutor.scale(16, 128, 1.5, minParallelism, maxParallelism));
-        assertEquals(400, ScalingExecutor.scale(200, 720, 2, minParallelism, maxParallelism));
-        assertEquals(
-                720,
-                ScalingExecutor.scale(200, 720, Integer.MAX_VALUE, minParallelism, maxParallelism));
-    }
-
-    @Test
-    public void testParallelismComputationWithLimit() {
-        assertEquals(5, ScalingExecutor.scale(6, 720, 0.8, 1, 700));
-        assertEquals(8, ScalingExecutor.scale(8, 720, 0.8, 8, 700));
-
-        assertEquals(32, ScalingExecutor.scale(16, 128, 1.5, 1, Integer.MAX_VALUE));
-        assertEquals(64, ScalingExecutor.scale(16, 128, 1.5, 60, Integer.MAX_VALUE));
-
-        assertEquals(300, ScalingExecutor.scale(200, 720, 2, 1, 300));
-        assertEquals(600, ScalingExecutor.scale(200, 720, Integer.MAX_VALUE, 1, 600));
-    }
-
-    @Test
-    public void ensureMinParallelismDoesNotExceedMax() {
-        Assert.assertThrows(
-                IllegalArgumentException.class,
-                () ->
-                        assertEquals(
-                                600, ScalingExecutor.scale(200, 720, Integer.MAX_VALUE, 500, 499)));
-    }
-
-    @Test
-    public void testMinParallelismLimitIsUsed() {
-        conf.setInteger(AutoScalerOptions.VERTEX_MIN_PARALLELISM, 5);
-        assertEquals(
-                5,
-                scalingDecisionExecutor.computeScaleTargetParallelism(
-                        conf,
-                        new JobVertexID(),
-                        evaluated(10, 100, 500),
-                        Collections.emptySortedMap()));
-    }
-
-    @Test
-    public void testMaxParallelismLimitIsUsed() {
-        conf.setInteger(AutoScalerOptions.VERTEX_MAX_PARALLELISM, 10);
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
-        assertEquals(
-                10,
-                scalingDecisionExecutor.computeScaleTargetParallelism(
-                        conf,
-                        new JobVertexID(),
-                        evaluated(10, 500, 100),
-                        Collections.emptySortedMap()));
-    }
-
     @Test
     public void testScaleDownAfterScaleUpDetection() throws Exception {
         var op = new JobVertexID();