You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2023/01/12 11:57:06 UTC

[flink-kubernetes-operator] branch main updated (35604761 -> 8e41ed5e)

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


    from 35604761 [FLINK-29607] Introduce FlinkResourceContext to simplify observer/reconciler logic
     new a6699308 Decouple scaling execution from parallelism computation
     new ed6acd45 [FLINK-30574] Detect ineffective scale up operations to avoid scaling further
     new 98eec733 [hotfix] Improve AutoScalerInfo
     new 8e41ed5e [hotfix][e2e] Pin busybox version

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/content/docs/custom-resource/pod-template.md  |   2 +-
 .../generated/auto_scaler_configuration.html       |  24 ++
 e2e-tests/data/flinkdep-cr.yaml                    |   2 +-
 examples/pod-template.yaml                         |   2 +-
 .../operator/autoscaler/AutoScalerInfo.java        |  35 ++-
 .../operator/autoscaler/JobAutoScaler.java         |   5 +-
 .../operator/autoscaler/JobVertexScaler.java       | 245 +++++++++++++++++++
 .../operator/autoscaler/ScalingExecutor.java       | 150 +-----------
 .../autoscaler/ScalingMetricCollector.java         |  55 ++---
 .../operator/autoscaler/ScalingSummary.java        |   6 +
 .../autoscaler/config/AutoScalerOptions.java       |  27 +++
 .../operator/autoscaler/metrics/ScalingMetric.java |   5 +-
 .../operator/autoscaler/AutoScalerInfoTest.java    | 100 ++++++++
 .../operator/autoscaler/JobVertexScalerTest.java   | 268 +++++++++++++++++++++
 .../MetricsCollectionAndEvaluationTest.java        |  13 +
 .../operator/autoscaler/ScalingExecutorTest.java   | 153 +-----------
 16 files changed, 758 insertions(+), 334 deletions(-)
 create mode 100644 flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
 create mode 100644 flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
 create mode 100644 flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java


[flink-kubernetes-operator] 03/04: [hotfix] Improve AutoScalerInfo

Posted by gy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 98eec733cce17ffe138bdc83707c7e7dfa3fc111
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Wed Jan 11 11:49:26 2023 +0100

    [hotfix] Improve AutoScalerInfo
---
 .../generated/auto_scaler_configuration.html       |  12 +++
 .../operator/autoscaler/AutoScalerInfo.java        |  35 ++++++--
 .../operator/autoscaler/ScalingExecutor.java       |   2 +-
 .../autoscaler/ScalingMetricCollector.java         |  50 +++++------
 .../autoscaler/config/AutoScalerOptions.java       |  13 +++
 .../operator/autoscaler/AutoScalerInfoTest.java    | 100 +++++++++++++++++++++
 .../MetricsCollectionAndEvaluationTest.java        |  13 +++
 7 files changed, 190 insertions(+), 35 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
index f4e5cd9a..e1e98204 100644
--- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
+++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
@@ -20,6 +20,18 @@
             <td>Boolean</td>
             <td>Enable job autoscaler module.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.job.autoscaler.history.max.age</h5></td>
+            <td style="word-wrap: break-word;">86400000 ms</td>
+            <td>Duration</td>
+            <td>Maximum age for past scaling decisions to retain.</td>
+        </tr>
+        <tr>
+            <td><h5>kubernetes.operator.job.autoscaler.history.max.count</h5></td>
+            <td style="word-wrap: break-word;">1</td>
+            <td>Integer</td>
+            <td>Maximum number of past scaling decisions to retain per vertex.</td>
+        </tr>
         <tr>
             <td><h5>kubernetes.operator.job.autoscaler.metrics.window</h5></td>
             <td style="word-wrap: break-word;">5 min</td>
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
index 6f35281f..bbfd9ab4 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
@@ -18,7 +18,9 @@
 package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.kubernetes.operator.autoscaler.utils.JobVertexSerDeModule;
 import org.apache.flink.kubernetes.utils.Constants;
@@ -36,9 +38,9 @@ import lombok.SneakyThrows;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.time.Duration;
 import java.time.Instant;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.SortedMap;
@@ -49,9 +51,6 @@ public class AutoScalerInfo {
 
     private static final Logger LOG = LoggerFactory.getLogger(JobAutoScaler.class);
 
-    private static final int SCALING_HISTORY_MAX_COUNT = 5;
-    private static final Duration SCALING_HISTORY_MAX_DURATION = Duration.ofHours(24);
-
     private static final String LABEL_COMPONENT_AUTOSCALER = "autoscaler";
 
     private static final String COLLECTED_METRICS_KEY = "collectedMetrics";
@@ -94,6 +93,18 @@ public class AutoScalerInfo {
         configMap.getData().put(JOB_UPDATE_TS_KEY, jobUpdateTs.toString());
     }
 
+    @SneakyThrows
+    public void updateVertexList(List<JobVertexID> vertexList) {
+        // Make sure to init history
+        getScalingHistory();
+
+        if (scalingHistory.keySet().removeIf(v -> !vertexList.contains(v))) {
+            configMap
+                    .getData()
+                    .put(SCALING_HISTORY_KEY, YAML_MAPPER.writeValueAsString(scalingHistory));
+        }
+    }
+
     public void clearMetricHistory() {
         configMap.getData().remove(COLLECTED_METRICS_KEY);
         configMap.getData().remove(JOB_UPDATE_TS_KEY);
@@ -117,7 +128,8 @@ public class AutoScalerInfo {
     }
 
     @SneakyThrows
-    public void addToScalingHistory(Instant now, Map<JobVertexID, ScalingSummary> summaries) {
+    public void addToScalingHistory(
+            Instant now, Map<JobVertexID, ScalingSummary> summaries, Configuration conf) {
         // Make sure to init history
         getScalingHistory();
 
@@ -129,10 +141,17 @@ public class AutoScalerInfo {
         while (entryIt.hasNext()) {
             var entry = entryIt.next();
             // Limit how long past scaling decisions are remembered
-            entry.setValue(entry.getValue().tailMap(now.minus(SCALING_HISTORY_MAX_DURATION)));
+            entry.setValue(
+                    entry.getValue()
+                            .tailMap(
+                                    now.minus(
+                                            conf.get(
+                                                    AutoScalerOptions
+                                                            .VERTEX_SCALING_HISTORY_AGE))));
             var vertexHistory = entry.getValue();
-            while (vertexHistory.size() > SCALING_HISTORY_MAX_COUNT) {
-                vertexHistory.remove(vertexHistory.lastKey());
+            while (vertexHistory.size()
+                    > conf.get(AutoScalerOptions.VERTEX_SCALING_HISTORY_COUNT)) {
+                vertexHistory.remove(vertexHistory.firstKey());
             }
             if (vertexHistory.isEmpty()) {
                 entryIt.remove();
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
index 976ec56e..99db185a 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
@@ -111,7 +111,7 @@ public class ScalingExecutor implements Cleanup {
         setVertexParallelismOverrides(resource, evaluatedMetrics, scalingSummaries);
 
         KubernetesClientUtils.replaceSpecAfterScaling(kubernetesClient, resource);
-        scalingInformation.addToScalingHistory(clock.instant(), scalingSummaries);
+        scalingInformation.addToScalingHistory(clock.instant(), scalingSummaries, conf);
 
         return true;
     }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
index 58e7fd9f..1e468270 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
@@ -49,12 +49,10 @@ import org.slf4j.LoggerFactory;
 import java.time.Clock;
 import java.time.Duration;
 import java.time.Instant;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -97,7 +95,7 @@ public abstract class ScalingMetricCollector implements Cleanup {
             cleanup(cr);
         }
 
-        var topology = getJobTopology(flinkService, cr, conf);
+        var topology = getJobTopology(flinkService, cr, conf, scalingInformation);
 
         var stabilizationDuration = conf.get(AutoScalerOptions.STABILIZATION_INTERVAL);
         var stableTime = currentJobUpdateTs.plus(stabilizationDuration);
@@ -151,14 +149,22 @@ public abstract class ScalingMetricCollector implements Cleanup {
     }
 
     protected JobTopology getJobTopology(
-            FlinkService flinkService, AbstractFlinkResource<?, ?> cr, Configuration conf)
+            FlinkService flinkService,
+            AbstractFlinkResource<?, ?> cr,
+            Configuration conf,
+            AutoScalerInfo scalerInfo)
             throws Exception {
 
         try (var restClient = (RestClusterClient<String>) flinkService.getClusterClient(conf)) {
             var jobId = JobID.fromHexString(cr.getStatus().getJobStatus().getJobId());
             var topology =
                     topologies.computeIfAbsent(
-                            ResourceID.fromResource(cr), r -> queryJobTopology(restClient, jobId));
+                            ResourceID.fromResource(cr),
+                            r -> {
+                                var t = queryJobTopology(restClient, jobId);
+                                scalerInfo.updateVertexList(t.getVerticesInTopologicalOrder());
+                                return t;
+                            });
             updateKafkaSourceMaxParallelisms(restClient, jobId, topology);
             return topology;
         }
@@ -212,13 +218,6 @@ public abstract class ScalingMetricCollector implements Cleanup {
         }
     }
 
-    private List<JobVertexID> getVertexList(
-            FlinkService flinkService, AbstractFlinkResource<?, ?> cr, Configuration conf)
-            throws Exception {
-        JobTopology topology = getJobTopology(flinkService, cr, conf);
-        return new ArrayList<>(topology.getParallelisms().keySet());
-    }
-
     /**
      * Given a map of collected Flink vertex metrics we compute the scaling metrics for each job
      * vertex.
@@ -301,7 +300,7 @@ public abstract class ScalingMetricCollector implements Cleanup {
             JobTopology topology) {
 
         var jobId = JobID.fromHexString(cr.getStatus().getJobStatus().getJobId());
-        var vertices = getVertexList(flinkService, cr, conf);
+        var vertices = topology.getVerticesInTopologicalOrder();
 
         long deployedGeneration = getDeployedGeneration(cr);
 
@@ -390,19 +389,18 @@ public abstract class ScalingMetricCollector implements Cleanup {
         }
 
         requiredMetrics.forEach(
-                flinkMetric -> {
-                    filteredMetrics.put(
-                            flinkMetric
-                                    .findAny(allMetricNames)
-                                    .orElseThrow(
-                                            () ->
-                                                    new RuntimeException(
-                                                            "Could not find required metric "
-                                                                    + flinkMetric.name()
-                                                                    + " for "
-                                                                    + jobVertexID)),
-                            flinkMetric);
-                });
+                flinkMetric ->
+                        filteredMetrics.put(
+                                flinkMetric
+                                        .findAny(allMetricNames)
+                                        .orElseThrow(
+                                                () ->
+                                                        new RuntimeException(
+                                                                "Could not find required metric "
+                                                                        + flinkMetric.name()
+                                                                        + " for "
+                                                                        + jobVertexID)),
+                                flinkMetric));
 
         return filteredMetrics;
     }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
index 721d85dc..07fa6069 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
@@ -132,4 +132,17 @@ public class AutoScalerOptions {
                     .defaultValue(0.1)
                     .withDescription(
                             "Processing rate increase threshold for detecting ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% of the desired capacity increase with scaling, the action is marked ineffective.");
+
+    public static final ConfigOption<Integer> VERTEX_SCALING_HISTORY_COUNT =
+            autoScalerConfig("history.max.count")
+                    .intType()
+                    .defaultValue(1)
+                    .withDescription(
+                            "Maximum number of past scaling decisions to retain per vertex.");
+
+    public static final ConfigOption<Duration> VERTEX_SCALING_HISTORY_AGE =
+            autoScalerConfig("history.max.age")
+                    .durationType()
+                    .defaultValue(Duration.ofHours(24))
+                    .withDescription("Maximum age for past scaling decisions to retain.");
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
new file mode 100644
index 00000000..e4ba23ee
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Test for AutoScalerInfo. */
+public class AutoScalerInfoTest {
+
+    @Test
+    public void testTopologyUpdate() {
+        var data = new HashMap<String, String>();
+        var info = new AutoScalerInfo(data);
+
+        var v1 = new JobVertexID();
+        var v2 = new JobVertexID();
+        var v3 = new JobVertexID();
+
+        var history = new HashMap<JobVertexID, ScalingSummary>();
+        history.put(v1, new ScalingSummary(1, 2, null));
+        history.put(v2, new ScalingSummary(1, 2, null));
+
+        info.addToScalingHistory(Instant.now(), history, new Configuration());
+
+        assertEquals(history.keySet(), info.getScalingHistory().keySet());
+        assertEquals(history.keySet(), new AutoScalerInfo(data).getScalingHistory().keySet());
+
+        info.updateVertexList(List.of(v2, v3));
+
+        // Expect v1 to be removed
+        assertEquals(Set.of(v2), info.getScalingHistory().keySet());
+        assertEquals(Set.of(v2), new AutoScalerInfo(data).getScalingHistory().keySet());
+    }
+
+    @Test
+    public void testHistorySizeConfigs() {
+        var data = new HashMap<String, String>();
+        var info = new AutoScalerInfo(data);
+
+        var v1 = new JobVertexID();
+
+        var history = new HashMap<JobVertexID, ScalingSummary>();
+        history.put(v1, new ScalingSummary(1, 2, null));
+
+        var conf = new Configuration();
+        conf.set(AutoScalerOptions.VERTEX_SCALING_HISTORY_COUNT, 2);
+        conf.set(AutoScalerOptions.VERTEX_SCALING_HISTORY_AGE, Duration.ofSeconds(10));
+
+        var now = Instant.now();
+
+        // Verify count based expiration
+        info.addToScalingHistory(now, history, conf);
+        assertEquals(1, info.getScalingHistory().get(v1).size());
+
+        info.addToScalingHistory(now.plus(Duration.ofSeconds(1)), history, conf);
+        info.addToScalingHistory(now.plus(Duration.ofSeconds(2)), history, conf);
+
+        assertEquals(2, info.getScalingHistory().get(v1).size());
+        assertEquals(
+                Set.of(now.plus(Duration.ofSeconds(1)), now.plus(Duration.ofSeconds(2))),
+                info.getScalingHistory().get(v1).keySet());
+
+        // Verify time based expiration
+        info.addToScalingHistory(now.plus(Duration.ofSeconds(15)), history, conf);
+        assertEquals(1, info.getScalingHistory().get(v1).size());
+        assertEquals(
+                Set.of(now.plus(Duration.ofSeconds(15))),
+                info.getScalingHistory().get(v1).keySet());
+        assertEquals(
+                Set.of(now.plus(Duration.ofSeconds(15))),
+                new AutoScalerInfo(data).getScalingHistory().get(v1).keySet());
+    }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
index 61ed26ce..8387c620 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -336,4 +336,17 @@ public class MetricsCollectionAndEvaluationTest {
         app.getStatus().getJobStatus().setUpdateTime("0");
         assertEquals(1, metricsHistory.getMetricHistory().size());
     }
+
+    @Test
+    public void testClearHistoryOnTopoChange() throws Exception {
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.);
+
+        setDefaultMetrics(metricsCollector);
+
+        // We haven't left the stabilization period
+        // => no metrics reporting and collection should take place
+        var collectedMetrics = metricsCollector.updateMetrics(app, scalingInfo, service, conf);
+        assertTrue(collectedMetrics.getMetricHistory().isEmpty());
+    }
 }


[flink-kubernetes-operator] 01/04: Decouple scaling execution from parallelism computation

Posted by gy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit a66993089eb0e8fb01503d8e1b4f54b0f3983ade
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Thu Jan 5 16:12:04 2023 +0100

    Decouple scaling execution from parallelism computation
---
 .../operator/autoscaler/JobVertexScaler.java       | 189 +++++++++++++++++++++
 .../operator/autoscaler/ScalingExecutor.java       | 148 +---------------
 .../operator/autoscaler/JobVertexScalerTest.java   | 186 ++++++++++++++++++++
 .../operator/autoscaler/ScalingExecutorTest.java   | 122 +------------
 4 files changed, 384 insertions(+), 261 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
new file mode 100644
index 00000000..7b21a244
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Map;
+import java.util.SortedMap;
+
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+
+/** Component responsible for computing vertex parallelism based on the scaling metrics. */
+public class JobVertexScaler {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JobVertexScaler.class);
+
+    private Clock clock = Clock.system(ZoneId.systemDefault());
+
+    public int computeScaleTargetParallelism(
+            Configuration conf,
+            JobVertexID vertex,
+            Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+            SortedMap<Instant, ScalingSummary> history) {
+
+        var currentParallelism = (int) evaluatedMetrics.get(PARALLELISM).getCurrent();
+        double averageTrueProcessingRate = evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
+
+        if (Double.isNaN(averageTrueProcessingRate)) {
+            LOG.info(
+                    "True processing rate is not available for {}, cannot compute new parallelism",
+                    vertex);
+            return currentParallelism;
+        }
+
+        double targetCapacity =
+                AutoScalerUtils.getTargetProcessingCapacity(
+                        evaluatedMetrics, conf, conf.get(TARGET_UTILIZATION), true);
+        if (Double.isNaN(targetCapacity)) {
+            LOG.info(
+                    "Target data rate is not available for {}, cannot compute new parallelism",
+                    vertex);
+            return currentParallelism;
+        }
+
+        LOG.info("Target processing capacity for {} is {}", vertex, targetCapacity);
+        double scaleFactor = targetCapacity / averageTrueProcessingRate;
+        double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
+        if (scaleFactor < minScaleFactor) {
+            LOG.info(
+                    "Computed scale factor of {} for {} is capped by maximum scale down factor to {}",
+                    scaleFactor,
+                    vertex,
+                    minScaleFactor);
+            scaleFactor = minScaleFactor;
+        }
+
+        int newParallelism =
+                scale(
+                        currentParallelism,
+                        (int) evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
+                        scaleFactor,
+                        conf.getInteger(VERTEX_MIN_PARALLELISM),
+                        conf.getInteger(VERTEX_MAX_PARALLELISM));
+
+        if (!history.isEmpty()) {
+            if (detectImmediateScaleDownAfterScaleUp(
+                    conf, history, currentParallelism, newParallelism)) {
+                LOG.info(
+                        "Skipping immediate scale down after scale up for {} resetting target parallelism to {}",
+                        vertex,
+                        currentParallelism);
+                newParallelism = currentParallelism;
+            }
+
+            // currentParallelism = 2 , newParallelism = 1, minimumProcRate = 1000 r/s
+            // history
+            // currentParallelism 1 => 3 -> empiricalProcRate = 800
+            // empiricalProcRate + upperBoundary < minimumProcRate => don't scale
+        }
+
+        return newParallelism;
+    }
+
+    private boolean detectImmediateScaleDownAfterScaleUp(
+            Configuration conf,
+            SortedMap<Instant, ScalingSummary> history,
+            int currentParallelism,
+            int newParallelism) {
+        var lastScalingTs = history.lastKey();
+        var lastSummary = history.get(lastScalingTs);
+
+        boolean isScaleDown = newParallelism < currentParallelism;
+        boolean lastScaleUp = lastSummary.getNewParallelism() > lastSummary.getCurrentParallelism();
+
+        var gracePeriod = conf.get(SCALE_UP_GRACE_PERIOD);
+
+        boolean withinConfiguredTime =
+                Duration.between(lastScalingTs, clock.instant()).minus(gracePeriod).isNegative();
+
+        return isScaleDown && lastScaleUp && withinConfiguredTime;
+    }
+
+    @VisibleForTesting
+    protected static int scale(
+            int parallelism,
+            int numKeyGroups,
+            double scaleFactor,
+            int minParallelism,
+            int maxParallelism) {
+        Preconditions.checkArgument(
+                minParallelism <= maxParallelism,
+                "The minimum parallelism must not be greater than the maximum parallelism.");
+        if (minParallelism > numKeyGroups) {
+            LOG.warn(
+                    "Specified autoscaler minimum parallelism {} is greater than the operator max parallelism {}. The min parallelism will be set to the operator max parallelism.",
+                    minParallelism,
+                    numKeyGroups);
+        }
+        if (numKeyGroups < maxParallelism && maxParallelism != Integer.MAX_VALUE) {
+            LOG.warn(
+                    "Specified autoscaler maximum parallelism {} is greater than the operator max parallelism {}. This means the operator max parallelism can never be reached.",
+                    maxParallelism,
+                    numKeyGroups);
+        }
+
+        int newParallelism =
+                // Prevent integer overflow when converting from double to integer.
+                // We do not have to detect underflow because doubles cannot
+                // underflow.
+                (int) Math.min(Math.ceil(scaleFactor * parallelism), Integer.MAX_VALUE);
+
+        // Cap parallelism at either number of key groups or parallelism limit
+        final int upperBound = Math.min(numKeyGroups, maxParallelism);
+
+        // Apply min/max parallelism
+        newParallelism = Math.min(Math.max(minParallelism, newParallelism), upperBound);
+
+        // Try to adjust the parallelism such that it divides the number of key groups without a
+        // remainder => state is evenly spread across subtasks
+        for (int p = newParallelism; p <= numKeyGroups / 2 && p <= upperBound; p++) {
+            if (numKeyGroups % p == 0) {
+                return p;
+            }
+        }
+
+        // If key group adjustment fails, use originally computed parallelism
+        return newParallelism;
+    }
+
+    @VisibleForTesting
+    protected void setClock(Clock clock) {
+        this.clock = Preconditions.checkNotNull(clock);
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
index df2d96a9..976ec56e 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
 import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
 import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.Preconditions;
@@ -35,7 +34,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.time.Clock;
-import java.time.Duration;
 import java.time.Instant;
 import java.time.ZoneId;
 import java.util.Collections;
@@ -43,15 +41,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.SortedMap;
 
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
 import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
 import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.STABILIZATION_INTERVAL;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
 import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
 import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
 import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
@@ -70,11 +61,17 @@ public class ScalingExecutor implements Cleanup {
     private static final Logger LOG = LoggerFactory.getLogger(ScalingExecutor.class);
 
     private final KubernetesClient kubernetesClient;
+    private final JobVertexScaler jobVertexScaler;
 
     private Clock clock = Clock.system(ZoneId.systemDefault());
 
     public ScalingExecutor(KubernetesClient kubernetesClient) {
+        this(kubernetesClient, new JobVertexScaler());
+    }
+
+    public ScalingExecutor(KubernetesClient kubernetesClient, JobVertexScaler jobVertexScaler) {
         this.kubernetesClient = kubernetesClient;
+        this.jobVertexScaler = jobVertexScaler;
     }
 
     public boolean scaleResource(
@@ -192,7 +189,7 @@ public class ScalingExecutor implements Cleanup {
                     var currentParallelism =
                             (int) metrics.get(ScalingMetric.PARALLELISM).getCurrent();
                     var newParallelism =
-                            computeScaleTargetParallelism(
+                            jobVertexScaler.computeScaleTargetParallelism(
                                     conf,
                                     v,
                                     metrics,
@@ -205,136 +202,6 @@ public class ScalingExecutor implements Cleanup {
         return out;
     }
 
-    protected int computeScaleTargetParallelism(
-            Configuration conf,
-            JobVertexID vertex,
-            Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
-            SortedMap<Instant, ScalingSummary> history) {
-
-        var currentParallelism = (int) evaluatedMetrics.get(PARALLELISM).getCurrent();
-        double averageTrueProcessingRate = evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
-
-        if (Double.isNaN(averageTrueProcessingRate)) {
-            LOG.info(
-                    "True processing rate is not available for {}, cannot compute new parallelism",
-                    vertex);
-            return currentParallelism;
-        }
-
-        double targetCapacity =
-                AutoScalerUtils.getTargetProcessingCapacity(
-                        evaluatedMetrics, conf, conf.get(TARGET_UTILIZATION), true);
-        if (Double.isNaN(targetCapacity)) {
-            LOG.info(
-                    "Target data rate is not available for {}, cannot compute new parallelism",
-                    vertex);
-            return currentParallelism;
-        }
-
-        LOG.info("Target processing capacity for {} is {}", vertex, targetCapacity);
-        double scaleFactor = targetCapacity / averageTrueProcessingRate;
-        double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
-        if (scaleFactor < minScaleFactor) {
-            LOG.info(
-                    "Computed scale factor of {} for {} is capped by maximum scale down factor to {}",
-                    scaleFactor,
-                    vertex,
-                    minScaleFactor);
-            scaleFactor = minScaleFactor;
-        }
-
-        int newParallelism =
-                scale(
-                        currentParallelism,
-                        (int) evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
-                        scaleFactor,
-                        conf.getInteger(VERTEX_MIN_PARALLELISM),
-                        conf.getInteger(VERTEX_MAX_PARALLELISM));
-
-        if (!history.isEmpty()) {
-            if (detectImmediateScaleDownAfterScaleUp(
-                    conf, history, currentParallelism, newParallelism)) {
-                LOG.info(
-                        "Skipping immediate scale down after scale up for {} resetting target parallelism to {}",
-                        vertex,
-                        currentParallelism);
-                newParallelism = currentParallelism;
-            }
-
-            // currentParallelism = 2 , newParallelism = 1, minimumProcRate = 1000 r/s
-            // history
-            // currentParallelism 1 => 3 -> empiricalProcRate = 800
-            // empiricalProcRate + upperBoundary < minimumProcRate => don't scale
-        }
-
-        return newParallelism;
-    }
-
-    private boolean detectImmediateScaleDownAfterScaleUp(
-            Configuration conf,
-            SortedMap<Instant, ScalingSummary> history,
-            int currentParallelism,
-            int newParallelism) {
-        var lastScalingTs = history.lastKey();
-        var lastSummary = history.get(lastScalingTs);
-
-        boolean isScaleDown = newParallelism < currentParallelism;
-        boolean lastScaleUp = lastSummary.getNewParallelism() > lastSummary.getCurrentParallelism();
-
-        var gracePeriod = conf.get(SCALE_UP_GRACE_PERIOD);
-
-        boolean withinConfiguredTime =
-                Duration.between(lastScalingTs, clock.instant()).minus(gracePeriod).isNegative();
-
-        return isScaleDown && lastScaleUp && withinConfiguredTime;
-    }
-
-    public static int scale(
-            int parallelism,
-            int numKeyGroups,
-            double scaleFactor,
-            int minParallelism,
-            int maxParallelism) {
-        Preconditions.checkArgument(
-                minParallelism <= maxParallelism,
-                "The minimum parallelism must not be greater than the maximum parallelism.");
-        if (minParallelism > numKeyGroups) {
-            LOG.warn(
-                    "Specified autoscaler minimum parallelism {} is greater than the operator max parallelism {}. The min parallelism will be set to the operator max parallelism.",
-                    minParallelism,
-                    numKeyGroups);
-        }
-        if (numKeyGroups < maxParallelism && maxParallelism != Integer.MAX_VALUE) {
-            LOG.warn(
-                    "Specified autoscaler maximum parallelism {} is greater than the operator max parallelism {}. This means the operator max parallelism can never be reached.",
-                    maxParallelism,
-                    numKeyGroups);
-        }
-
-        int newParallelism =
-                // Prevent integer overflow when converting from double to integer.
-                // We do not have to detect underflow because doubles cannot
-                // underflow.
-                (int) Math.min(Math.ceil(scaleFactor * parallelism), Integer.MAX_VALUE);
-
-        // Cap parallelism at either number of key groups or parallelism limit
-        final int upperBound = Math.min(numKeyGroups, maxParallelism);
-
-        // Apply min/max parallelism
-        newParallelism = Math.min(Math.max(minParallelism, newParallelism), upperBound);
-
-        // Try to adjust the parallelism such that it divides the number of key groups without a
-        // remainder => state is evenly spread across subtasks
-        for (int p = newParallelism; p <= numKeyGroups / 2 && p <= upperBound; p++) {
-            if (numKeyGroups % p == 0) {
-                return p;
-            }
-        }
-
-        // If key group adjustment fails, use originally computed parallelism
-        return newParallelism;
-    }
-
     private void setVertexParallelismOverrides(
             AbstractFlinkResource<?, ?> resource,
             Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics,
@@ -362,6 +229,7 @@ public class ScalingExecutor implements Cleanup {
     @VisibleForTesting
     protected void setClock(Clock clock) {
         this.clock = Preconditions.checkNotNull(clock);
+        jobVertexScaler.setClock(clock);
     }
 
     @Override
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
new file mode 100644
index 00000000..e3a774f7
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.junit.Assert;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Test for vertex parallelism scaler logic. */
+public class JobVertexScalerTest {
+
+    private JobVertexScaler vertexScaler;
+    private Configuration conf;
+
+    @BeforeEach
+    public void setup() {
+        vertexScaler = new JobVertexScaler();
+        conf = new Configuration();
+        conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
+        conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
+    }
+
+    @Test
+    public void testParallelismScaling() {
+        var op = new JobVertexID();
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+        assertEquals(
+                5,
+                vertexScaler.computeScaleTargetParallelism(
+                        conf, op, evaluated(10, 50, 100), Collections.emptySortedMap()));
+
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
+        assertEquals(
+                8,
+                vertexScaler.computeScaleTargetParallelism(
+                        conf, op, evaluated(10, 50, 100), Collections.emptySortedMap()));
+
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
+        assertEquals(
+                10,
+                vertexScaler.computeScaleTargetParallelism(
+                        conf, op, evaluated(10, 80, 100), Collections.emptySortedMap()));
+
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
+        assertEquals(
+                8,
+                vertexScaler.computeScaleTargetParallelism(
+                        conf, op, evaluated(10, 60, 100), Collections.emptySortedMap()));
+
+        assertEquals(
+                8,
+                vertexScaler.computeScaleTargetParallelism(
+                        conf, op, evaluated(10, 59, 100), Collections.emptySortedMap()));
+
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.5);
+        assertEquals(
+                10,
+                vertexScaler.computeScaleTargetParallelism(
+                        conf, op, evaluated(2, 100, 40), Collections.emptySortedMap()));
+
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6);
+        assertEquals(
+                4,
+                vertexScaler.computeScaleTargetParallelism(
+                        conf, op, evaluated(2, 100, 100), Collections.emptySortedMap()));
+
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+        conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.5);
+        assertEquals(
+                5,
+                vertexScaler.computeScaleTargetParallelism(
+                        conf, op, evaluated(10, 10, 100), Collections.emptySortedMap()));
+
+        conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.6);
+        assertEquals(
+                4,
+                vertexScaler.computeScaleTargetParallelism(
+                        conf, op, evaluated(10, 10, 100), Collections.emptySortedMap()));
+    }
+
+    @Test
+    public void testParallelismComputation() {
+        final int minParallelism = 1;
+        final int maxParallelism = Integer.MAX_VALUE;
+        assertEquals(1, JobVertexScaler.scale(1, 720, 0.0001, minParallelism, maxParallelism));
+        assertEquals(1, JobVertexScaler.scale(2, 720, 0.1, minParallelism, maxParallelism));
+        assertEquals(5, JobVertexScaler.scale(6, 720, 0.8, minParallelism, maxParallelism));
+        assertEquals(32, JobVertexScaler.scale(16, 128, 1.5, minParallelism, maxParallelism));
+        assertEquals(400, JobVertexScaler.scale(200, 720, 2, minParallelism, maxParallelism));
+        assertEquals(
+                720,
+                JobVertexScaler.scale(200, 720, Integer.MAX_VALUE, minParallelism, maxParallelism));
+    }
+
+    @Test
+    public void testParallelismComputationWithLimit() {
+        assertEquals(5, JobVertexScaler.scale(6, 720, 0.8, 1, 700));
+        assertEquals(8, JobVertexScaler.scale(8, 720, 0.8, 8, 700));
+
+        assertEquals(32, JobVertexScaler.scale(16, 128, 1.5, 1, Integer.MAX_VALUE));
+        assertEquals(64, JobVertexScaler.scale(16, 128, 1.5, 60, Integer.MAX_VALUE));
+
+        assertEquals(300, JobVertexScaler.scale(200, 720, 2, 1, 300));
+        assertEquals(600, JobVertexScaler.scale(200, 720, Integer.MAX_VALUE, 1, 600));
+    }
+
+    @Test
+    public void ensureMinParallelismDoesNotExceedMax() {
+        Assert.assertThrows(
+                IllegalArgumentException.class,
+                () ->
+                        assertEquals(
+                                600, JobVertexScaler.scale(200, 720, Integer.MAX_VALUE, 500, 499)));
+    }
+
+    @Test
+    public void testMinParallelismLimitIsUsed() {
+        conf.setInteger(AutoScalerOptions.VERTEX_MIN_PARALLELISM, 5);
+        assertEquals(
+                5,
+                vertexScaler.computeScaleTargetParallelism(
+                        conf,
+                        new JobVertexID(),
+                        evaluated(10, 100, 500),
+                        Collections.emptySortedMap()));
+    }
+
+    @Test
+    public void testMaxParallelismLimitIsUsed() {
+        conf.setInteger(AutoScalerOptions.VERTEX_MAX_PARALLELISM, 10);
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+        assertEquals(
+                10,
+                vertexScaler.computeScaleTargetParallelism(
+                        conf,
+                        new JobVertexID(),
+                        evaluated(10, 500, 100),
+                        Collections.emptySortedMap()));
+    }
+
+    private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(
+            int parallelism, double target, double procRate, double catchupRate) {
+        var metrics = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
+        metrics.put(ScalingMetric.PARALLELISM, EvaluatedScalingMetric.of(parallelism));
+        metrics.put(ScalingMetric.MAX_PARALLELISM, EvaluatedScalingMetric.of(720));
+        metrics.put(ScalingMetric.TARGET_DATA_RATE, new EvaluatedScalingMetric(target, target));
+        metrics.put(ScalingMetric.CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(catchupRate));
+        metrics.put(
+                ScalingMetric.TRUE_PROCESSING_RATE, new EvaluatedScalingMetric(procRate, procRate));
+        ScalingMetricEvaluator.computeProcessingRateThresholds(metrics, conf);
+        return metrics;
+    }
+
+    private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(
+            int parallelism, double target, double procRate) {
+        return evaluated(parallelism, target, procRate, 0.);
+    }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
index 9aa67145..e1dc139e 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
-import org.junit.Assert;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -37,7 +36,6 @@ import java.time.Clock;
 import java.time.Duration;
 import java.time.Instant;
 import java.time.ZoneId;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -47,7 +45,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-/** Test for scaling metrics collection logic. */
+/** Test for scaling execution logic. */
 @EnableKubernetesMockClient(crud = true)
 public class ScalingExecutorTest {
 
@@ -172,124 +170,6 @@ public class ScalingExecutorTest {
         assertFalse(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated, scalingSummary));
     }
 
-    @Test
-    public void testParallelismScaling() {
-        var op = new JobVertexID();
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
-        assertEquals(
-                5,
-                scalingDecisionExecutor.computeScaleTargetParallelism(
-                        conf, op, evaluated(10, 50, 100), Collections.emptySortedMap()));
-
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
-        assertEquals(
-                8,
-                scalingDecisionExecutor.computeScaleTargetParallelism(
-                        conf, op, evaluated(10, 50, 100), Collections.emptySortedMap()));
-
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
-        assertEquals(
-                10,
-                scalingDecisionExecutor.computeScaleTargetParallelism(
-                        conf, op, evaluated(10, 80, 100), Collections.emptySortedMap()));
-
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
-        assertEquals(
-                8,
-                scalingDecisionExecutor.computeScaleTargetParallelism(
-                        conf, op, evaluated(10, 60, 100), Collections.emptySortedMap()));
-
-        assertEquals(
-                8,
-                scalingDecisionExecutor.computeScaleTargetParallelism(
-                        conf, op, evaluated(10, 59, 100), Collections.emptySortedMap()));
-
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.5);
-        assertEquals(
-                10,
-                scalingDecisionExecutor.computeScaleTargetParallelism(
-                        conf, op, evaluated(2, 100, 40), Collections.emptySortedMap()));
-
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6);
-        assertEquals(
-                4,
-                scalingDecisionExecutor.computeScaleTargetParallelism(
-                        conf, op, evaluated(2, 100, 100), Collections.emptySortedMap()));
-
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
-        conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.5);
-        assertEquals(
-                5,
-                scalingDecisionExecutor.computeScaleTargetParallelism(
-                        conf, op, evaluated(10, 10, 100), Collections.emptySortedMap()));
-
-        conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.6);
-        assertEquals(
-                4,
-                scalingDecisionExecutor.computeScaleTargetParallelism(
-                        conf, op, evaluated(10, 10, 100), Collections.emptySortedMap()));
-    }
-
-    @Test
-    public void testParallelismComputation() {
-        final int minParallelism = 1;
-        final int maxParallelism = Integer.MAX_VALUE;
-        assertEquals(1, ScalingExecutor.scale(1, 720, 0.0001, minParallelism, maxParallelism));
-        assertEquals(1, ScalingExecutor.scale(2, 720, 0.1, minParallelism, maxParallelism));
-        assertEquals(5, ScalingExecutor.scale(6, 720, 0.8, minParallelism, maxParallelism));
-        assertEquals(32, ScalingExecutor.scale(16, 128, 1.5, minParallelism, maxParallelism));
-        assertEquals(400, ScalingExecutor.scale(200, 720, 2, minParallelism, maxParallelism));
-        assertEquals(
-                720,
-                ScalingExecutor.scale(200, 720, Integer.MAX_VALUE, minParallelism, maxParallelism));
-    }
-
-    @Test
-    public void testParallelismComputationWithLimit() {
-        assertEquals(5, ScalingExecutor.scale(6, 720, 0.8, 1, 700));
-        assertEquals(8, ScalingExecutor.scale(8, 720, 0.8, 8, 700));
-
-        assertEquals(32, ScalingExecutor.scale(16, 128, 1.5, 1, Integer.MAX_VALUE));
-        assertEquals(64, ScalingExecutor.scale(16, 128, 1.5, 60, Integer.MAX_VALUE));
-
-        assertEquals(300, ScalingExecutor.scale(200, 720, 2, 1, 300));
-        assertEquals(600, ScalingExecutor.scale(200, 720, Integer.MAX_VALUE, 1, 600));
-    }
-
-    @Test
-    public void ensureMinParallelismDoesNotExceedMax() {
-        Assert.assertThrows(
-                IllegalArgumentException.class,
-                () ->
-                        assertEquals(
-                                600, ScalingExecutor.scale(200, 720, Integer.MAX_VALUE, 500, 499)));
-    }
-
-    @Test
-    public void testMinParallelismLimitIsUsed() {
-        conf.setInteger(AutoScalerOptions.VERTEX_MIN_PARALLELISM, 5);
-        assertEquals(
-                5,
-                scalingDecisionExecutor.computeScaleTargetParallelism(
-                        conf,
-                        new JobVertexID(),
-                        evaluated(10, 100, 500),
-                        Collections.emptySortedMap()));
-    }
-
-    @Test
-    public void testMaxParallelismLimitIsUsed() {
-        conf.setInteger(AutoScalerOptions.VERTEX_MAX_PARALLELISM, 10);
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
-        assertEquals(
-                10,
-                scalingDecisionExecutor.computeScaleTargetParallelism(
-                        conf,
-                        new JobVertexID(),
-                        evaluated(10, 500, 100),
-                        Collections.emptySortedMap()));
-    }
-
     @Test
     public void testScaleDownAfterScaleUpDetection() throws Exception {
         var op = new JobVertexID();


[flink-kubernetes-operator] 04/04: [hotfix][e2e] Pin busybox version

Posted by gy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 8e41ed5ec1adda9ae06bc0b6203abf42939fbf2b
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Thu Jan 12 11:37:59 2023 +0100

    [hotfix][e2e] Pin busybox version
---
 docs/content/docs/custom-resource/pod-template.md | 2 +-
 e2e-tests/data/flinkdep-cr.yaml                   | 2 +-
 examples/pod-template.yaml                        | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/docs/content/docs/custom-resource/pod-template.md b/docs/content/docs/custom-resource/pod-template.md
index 7dd01354..9a544c9d 100644
--- a/docs/content/docs/custom-resource/pod-template.md
+++ b/docs/content/docs/custom-resource/pod-template.md
@@ -93,7 +93,7 @@ spec:
         initContainers:
           # Sample sidecar container
           - name: busybox
-            image: busybox:latest
+            image: busybox:1.33.1
             command: [ 'sh','-c','echo hello from task manager' ]
   job:
     jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
diff --git a/e2e-tests/data/flinkdep-cr.yaml b/e2e-tests/data/flinkdep-cr.yaml
index 4c50b3a4..8458cbc2 100644
--- a/e2e-tests/data/flinkdep-cr.yaml
+++ b/e2e-tests/data/flinkdep-cr.yaml
@@ -44,7 +44,7 @@ spec:
     spec:
       initContainers:
         - name: artifacts-fetcher
-          image: busybox:latest
+          image: busybox:1.33.1
           imagePullPolicy: IfNotPresent
           # Use wget or other tools to get user jars from remote storage
           command: [ 'wget', 'https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.14.4/flink-examples-streaming_2.12-1.14.4.jar', '-O', '/flink-artifact/myjob.jar' ]
diff --git a/examples/pod-template.yaml b/examples/pod-template.yaml
index f2b6c6cb..9b01cdf5 100644
--- a/examples/pod-template.yaml
+++ b/examples/pod-template.yaml
@@ -65,7 +65,7 @@ spec:
         initContainers:
           # Sample init container for fetching remote artifacts
           - name: busybox
-            image: busybox:latest
+            image: busybox:1.33.1
             volumeMounts:
               - mountPath: /opt/flink/downloads
                 name: downloads


[flink-kubernetes-operator] 02/04: [FLINK-30574] Detect ineffective scale up operations to avoid scaling further

Posted by gy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit ed6acd453c6e14f7fdb3295c6f0d0567de6acbd8
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Fri Jan 6 13:54:15 2023 +0100

    [FLINK-30574] Detect ineffective scale up operations to avoid scaling further
---
 .../generated/auto_scaler_configuration.html       | 12 +++
 .../operator/autoscaler/JobAutoScaler.java         |  5 +-
 .../operator/autoscaler/JobVertexScaler.java       | 98 +++++++++++++++++-----
 .../autoscaler/ScalingMetricCollector.java         |  5 +-
 .../operator/autoscaler/ScalingSummary.java        |  6 ++
 .../autoscaler/config/AutoScalerOptions.java       | 14 ++++
 .../operator/autoscaler/metrics/ScalingMetric.java |  5 +-
 .../operator/autoscaler/JobVertexScalerTest.java   | 96 +++++++++++++++++++--
 .../operator/autoscaler/ScalingExecutorTest.java   | 31 +------
 9 files changed, 209 insertions(+), 63 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
index af9b4396..f4e5cd9a 100644
--- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
+++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
@@ -44,6 +44,18 @@
             <td>Duration</td>
             <td>Period in which no scale down is allowed after a scale up</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.job.autoscaler.scaling.effectiveness.detection.enabled</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Whether to enable detection of ineffective scaling operations and allowing the autoscaler to block further scale ups.</td>
+        </tr>
+        <tr>
+            <td><h5>kubernetes.operator.job.autoscaler.scaling.effectiveness.threshold</h5></td>
+            <td style="word-wrap: break-word;">0.1</td>
+            <td>Double</td>
+            <td>Processing rate increase threshold for detecting ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% of the desired capacity increase with scaling, the action is marked ineffective.</td>
+        </tr>
         <tr>
             <td><h5>kubernetes.operator.job.autoscaler.scaling.enabled</h5></td>
             <td style="word-wrap: break-word;">true</td>
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScaler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScaler.java
index cdd03411..4e6ce36e 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScaler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScaler.java
@@ -86,7 +86,7 @@ public class JobAutoScaler implements Cleanup {
         }
 
         if (!resource.getStatus().getJobStatus().getState().equals(JobStatus.RUNNING.name())) {
-            LOG.info("Job autoscaler is waiting for RUNNING job     state");
+            LOG.info("Job autoscaler is waiting for RUNNING job state");
             return false;
         }
 
@@ -98,8 +98,7 @@ public class JobAutoScaler implements Cleanup {
                     metricsCollector.updateMetrics(
                             resource, autoScalerInfo, ctx.getFlinkService(), conf);
 
-            if (collectedMetrics == null || collectedMetrics.getMetricHistory().isEmpty()) {
-                LOG.info("No metrics were collected. Skipping scaling step");
+            if (collectedMetrics.getMetricHistory().isEmpty()) {
                 return false;
             }
 
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
index 7b21a244..4ebabf47 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
 import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
@@ -29,7 +30,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.time.Clock;
-import java.time.Duration;
 import java.time.Instant;
 import java.time.ZoneId;
 import java.util.Map;
@@ -40,6 +40,7 @@ import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerO
 import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
 import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
 import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
 import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
 import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
 import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
@@ -59,7 +60,6 @@ public class JobVertexScaler {
 
         var currentParallelism = (int) evaluatedMetrics.get(PARALLELISM).getCurrent();
         double averageTrueProcessingRate = evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
-
         if (Double.isNaN(averageTrueProcessingRate)) {
             LOG.info(
                     "True processing rate is not available for {}, cannot compute new parallelism",
@@ -97,42 +97,98 @@ public class JobVertexScaler {
                         conf.getInteger(VERTEX_MIN_PARALLELISM),
                         conf.getInteger(VERTEX_MAX_PARALLELISM));
 
-        if (!history.isEmpty()) {
-            if (detectImmediateScaleDownAfterScaleUp(
-                    conf, history, currentParallelism, newParallelism)) {
-                LOG.info(
-                        "Skipping immediate scale down after scale up for {} resetting target parallelism to {}",
+        if (newParallelism == currentParallelism
+                || blockScalingBasedOnPastActions(
                         vertex,
-                        currentParallelism);
-                newParallelism = currentParallelism;
-            }
-
-            // currentParallelism = 2 , newParallelism = 1, minimumProcRate = 1000 r/s
-            // history
-            // currentParallelism 1 => 3 -> empiricalProcRate = 800
-            // empiricalProcRate + upperBoundary < minimumProcRate => don't scale
+                        conf,
+                        evaluatedMetrics,
+                        history,
+                        currentParallelism,
+                        newParallelism)) {
+            return currentParallelism;
         }
 
+        // We record our expectations for this scaling operation
+        evaluatedMetrics.put(
+                ScalingMetric.EXPECTED_PROCESSING_RATE, EvaluatedScalingMetric.of(targetCapacity));
         return newParallelism;
     }
 
-    private boolean detectImmediateScaleDownAfterScaleUp(
+    private boolean blockScalingBasedOnPastActions(
+            JobVertexID vertex,
             Configuration conf,
+            Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
             SortedMap<Instant, ScalingSummary> history,
             int currentParallelism,
             int newParallelism) {
+
+        // If we don't have past scaling actions for this vertex, there is nothing to do
+        if (history.isEmpty()) {
+            return false;
+        }
+
+        boolean scaledUp = currentParallelism < newParallelism;
         var lastScalingTs = history.lastKey();
         var lastSummary = history.get(lastScalingTs);
 
-        boolean isScaleDown = newParallelism < currentParallelism;
-        boolean lastScaleUp = lastSummary.getNewParallelism() > lastSummary.getCurrentParallelism();
+        if (currentParallelism == lastSummary.getNewParallelism() && lastSummary.isScaledUp()) {
+            if (scaledUp) {
+                return detectIneffectiveScaleUp(vertex, conf, evaluatedMetrics, lastSummary);
+            } else {
+                return detectImmediateScaleDownAfterScaleUp(vertex, conf, lastScalingTs);
+            }
+        }
+        return false;
+    }
+
+    private boolean detectImmediateScaleDownAfterScaleUp(
+            JobVertexID vertex, Configuration conf, Instant lastScalingTs) {
 
         var gracePeriod = conf.get(SCALE_UP_GRACE_PERIOD);
+        if (lastScalingTs.plus(gracePeriod).isAfter(clock.instant())) {
+            LOG.info(
+                    "Skipping immediate scale down after scale up within grace period for {}",
+                    vertex);
+            return true;
+        } else {
+            return false;
+        }
+    }
 
-        boolean withinConfiguredTime =
-                Duration.between(lastScalingTs, clock.instant()).minus(gracePeriod).isNegative();
+    private boolean detectIneffectiveScaleUp(
+            JobVertexID vertex,
+            Configuration conf,
+            Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+            ScalingSummary lastSummary) {
+
+        double lastProcRate = lastSummary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage();
+        double lastExpectedProcRate =
+                lastSummary.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent();
+        var currentProcRate = evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
+
+        // To judge the effectiveness of the scale up operation we compute how much of the expected
+        // increase actually happened. For example if we expect a 100 increase in proc rate and only
+        // got an increase of 10 we only accomplished 10% of the desired increase. If this number is
+        // below the threshold, we mark the scaling ineffective.
+        double expectedIncrease = lastExpectedProcRate - lastProcRate;
+        double actualIncrease = currentProcRate - lastProcRate;
+
+        boolean withinEffectiveThreshold =
+                (actualIncrease / expectedIncrease)
+                        >= conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD);
+        if (withinEffectiveThreshold) {
+            return false;
+        }
 
-        return isScaleDown && lastScaleUp && withinConfiguredTime;
+        // TODO: Trigger kube event
+
+        if (conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
+            LOG.info(
+                    "Skipping further scale up after ineffective previous scale up for {}", vertex);
+            return true;
+        } else {
+            return false;
+        }
     }
 
     @VisibleForTesting
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
index c7674a5b..58e7fd9f 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
@@ -59,6 +59,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
@@ -102,6 +103,7 @@ public abstract class ScalingMetricCollector implements Cleanup {
         var stableTime = currentJobUpdateTs.plus(stabilizationDuration);
         if (now.isBefore(stableTime)) {
             // As long as we are stabilizing, collect no metrics at all
+            LOG.info("Skipping metric collection during stabilization period until {}", stableTime);
             return new CollectedMetrics(topology, Collections.emptySortedMap());
         }
 
@@ -120,7 +122,7 @@ public abstract class ScalingMetricCollector implements Cleanup {
                             if (h == null) {
                                 h = scalingInformation.getMetricHistory();
                             }
-                            return h.tailMap(now.minus(metricsWindowSize));
+                            return new TreeMap<>(h.tailMap(now.minus(metricsWindowSize)));
                         });
 
         // The filtered list of metrics we want to query for each vertex
@@ -141,6 +143,7 @@ public abstract class ScalingMetricCollector implements Cleanup {
         if (now.isBefore(stableTime.plus(conf.get(AutoScalerOptions.METRICS_WINDOW)))) {
             // As long as we haven't had time to collect a full window,
             // collect metrics but do not return any metrics
+            LOG.info("Waiting until initial metric window is full before starting scaling");
             return new CollectedMetrics(topology, Collections.emptySortedMap());
         }
 
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java
index 7def1716..4ff7a6e9 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java
@@ -20,6 +20,7 @@ package org.apache.flink.kubernetes.operator.autoscaler;
 import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
 import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
@@ -48,4 +49,9 @@ public class ScalingSummary {
         this.newParallelism = newParallelism;
         this.metrics = metrics;
     }
+
+    @JsonIgnore
+    public boolean isScaledUp() {
+        return newParallelism > currentParallelism;
+    }
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
index d9a0b86b..721d85dc 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
@@ -118,4 +118,18 @@ public class AutoScalerOptions {
                     .defaultValue(Duration.ofMinutes(5))
                     .withDescription(
                             "Expected restart time to be used until the operator can determine it reliably from history.");
+
+    public static final ConfigOption<Boolean> SCALING_EFFECTIVENESS_DETECTION_ENABLED =
+            autoScalerConfig("scaling.effectiveness.detection.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Whether to enable detection of ineffective scaling operations and allowing the autoscaler to block further scale ups.");
+
+    public static final ConfigOption<Double> SCALING_EFFECTIVENESS_THRESHOLD =
+            autoScalerConfig("scaling.effectiveness.threshold")
+                    .doubleType()
+                    .defaultValue(0.1)
+                    .withDescription(
+                            "Processing rate increase threshold for detecting ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% of the desired capacity increase with scaling, the action is marked ineffective.");
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
index d3103971..aad95d39 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
@@ -60,7 +60,10 @@ public enum ScalingMetric {
     SCALE_UP_RATE_THRESHOLD(false),
 
     /** Lower boundary of the target data rate range. */
-    SCALE_DOWN_RATE_THRESHOLD(false);
+    SCALE_DOWN_RATE_THRESHOLD(false),
+
+    /** Expected true processing rate after scale up. */
+    EXPECTED_PROCESSING_RATE(false);
 
     private final boolean calculateAverage;
 
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
index e3a774f7..34c2b8cc 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
@@ -27,12 +27,17 @@ import org.junit.Assert;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.time.Clock;
 import java.time.Duration;
+import java.time.Instant;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.TreeMap;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Test for vertex parallelism scaler logic. */
 public class JobVertexScalerTest {
@@ -166,21 +171,98 @@ public class JobVertexScalerTest {
                         Collections.emptySortedMap()));
     }
 
+    @Test
+    public void testScaleDownAfterScaleUpDetection() {
+        var op = new JobVertexID();
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+        conf.set(AutoScalerOptions.SCALE_UP_GRACE_PERIOD, Duration.ofMinutes(1));
+        var clock = Clock.systemDefaultZone();
+        vertexScaler.setClock(clock);
+
+        var evaluated = evaluated(5, 100, 50);
+        var history = new TreeMap<Instant, ScalingSummary>();
+        assertEquals(10, vertexScaler.computeScaleTargetParallelism(conf, op, evaluated, history));
+
+        history.put(clock.instant(), new ScalingSummary(5, 10, evaluated));
+
+        // Should not allow scale back down immediately
+        evaluated = evaluated(10, 50, 100);
+        assertEquals(10, vertexScaler.computeScaleTargetParallelism(conf, op, evaluated, history));
+
+        // Pass some time...
+        clock = Clock.offset(Clock.systemDefaultZone(), Duration.ofSeconds(61));
+        vertexScaler.setClock(clock);
+
+        assertEquals(5, vertexScaler.computeScaleTargetParallelism(conf, op, evaluated, history));
+        history.put(clock.instant(), new ScalingSummary(10, 5, evaluated));
+
+        // Allow immediate scale up
+        evaluated = evaluated(5, 100, 50);
+        assertEquals(10, vertexScaler.computeScaleTargetParallelism(conf, op, evaluated, history));
+        history.put(clock.instant(), new ScalingSummary(5, 10, evaluated));
+    }
+
+    @Test
+    public void testIneffectiveScalingDetection() {
+        var op = new JobVertexID();
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+        conf.set(AutoScalerOptions.SCALE_UP_GRACE_PERIOD, Duration.ZERO);
+
+        var evaluated = evaluated(5, 100, 50);
+        var history = new TreeMap<Instant, ScalingSummary>();
+        assertEquals(10, vertexScaler.computeScaleTargetParallelism(conf, op, evaluated, history));
+        assertEquals(100, evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent());
+        history.put(Instant.now(), new ScalingSummary(5, 10, evaluated));
+
+        // Allow to scale higher if scaling was effective (80%)
+        evaluated = evaluated(10, 180, 90);
+        assertEquals(20, vertexScaler.computeScaleTargetParallelism(conf, op, evaluated, history));
+        assertEquals(180, evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent());
+        history.put(Instant.now(), new ScalingSummary(10, 20, evaluated));
+
+        // Detect ineffective scaling, less than 5% of target increase (instead of 90 -> 180, only
+        // 90 -> 94. Do not try to scale above 20
+        evaluated = evaluated(20, 180, 94);
+        assertEquals(20, vertexScaler.computeScaleTargetParallelism(conf, op, evaluated, history));
+        assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
+
+        // Still considered ineffective (less than <10%)
+        evaluated = evaluated(20, 180, 98);
+        assertEquals(20, vertexScaler.computeScaleTargetParallelism(conf, op, evaluated, history));
+        assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
+
+        // Allow scale up if current parallelism doesnt match last (user rescaled manually)
+        evaluated = evaluated(10, 180, 90);
+        assertEquals(20, vertexScaler.computeScaleTargetParallelism(conf, op, evaluated, history));
+
+        // Over 10%, effective
+        evaluated = evaluated(20, 180, 100);
+        assertEquals(36, vertexScaler.computeScaleTargetParallelism(conf, op, evaluated, history));
+        assertTrue(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
+
+        // Ineffective but detection is turned off
+        conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, false);
+        evaluated = evaluated(20, 180, 90);
+        assertEquals(40, vertexScaler.computeScaleTargetParallelism(conf, op, evaluated, history));
+        assertTrue(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
+        conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, true);
+
+        // Allow scale down even if ineffective
+        evaluated = evaluated(20, 45, 90);
+        assertEquals(10, vertexScaler.computeScaleTargetParallelism(conf, op, evaluated, history));
+        assertTrue(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
+    }
+
     private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(
-            int parallelism, double target, double procRate, double catchupRate) {
+            int parallelism, double target, double procRate) {
         var metrics = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
         metrics.put(ScalingMetric.PARALLELISM, EvaluatedScalingMetric.of(parallelism));
         metrics.put(ScalingMetric.MAX_PARALLELISM, EvaluatedScalingMetric.of(720));
         metrics.put(ScalingMetric.TARGET_DATA_RATE, new EvaluatedScalingMetric(target, target));
-        metrics.put(ScalingMetric.CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(catchupRate));
+        metrics.put(ScalingMetric.CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(0.));
         metrics.put(
                 ScalingMetric.TRUE_PROCESSING_RATE, new EvaluatedScalingMetric(procRate, procRate));
         ScalingMetricEvaluator.computeProcessingRateThresholds(metrics, conf);
         return metrics;
     }
-
-    private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(
-            int parallelism, double target, double procRate) {
-        return evaluated(parallelism, target, procRate, 0.);
-    }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
index e1dc139e..7f9fd458 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
@@ -40,7 +40,6 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static org.apache.flink.kubernetes.operator.autoscaler.ScalingExecutor.PARALLELISM_OVERRIDES;
-import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -74,6 +73,7 @@ public class ScalingExecutorTest {
     @Test
     public void testStabilizationPeriod() throws Exception {
         conf.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ofMinutes(1));
+        conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, false);
 
         var metrics = Map.of(new JobVertexID(), evaluated(1, 110, 100));
 
@@ -170,35 +170,6 @@ public class ScalingExecutorTest {
         assertFalse(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated, scalingSummary));
     }
 
-    @Test
-    public void testScaleDownAfterScaleUpDetection() throws Exception {
-        var op = new JobVertexID();
-        var scalingInfo = new AutoScalerInfo(new HashMap<>());
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
-        conf.set(AutoScalerOptions.SCALE_UP_GRACE_PERIOD, Duration.ofMinutes(1));
-
-        scalingDecisionExecutor.scaleResource(
-                flinkDep, scalingInfo, conf, Map.of(op, evaluated(5, 100, 50)));
-        assertEquals(Map.of(op, 10), getScaledParallelism(flinkDep));
-
-        // Should now allow scale back down immediately
-        scalingDecisionExecutor.scaleResource(
-                flinkDep, scalingInfo, conf, Map.of(op, evaluated(10, 50, 100)));
-        assertEquals(Map.of(op, 10), getScaledParallelism(flinkDep));
-
-        // Pass some time...
-        var clock = Clock.offset(Clock.systemDefaultZone(), Duration.ofSeconds(61));
-        scalingDecisionExecutor.setClock(clock);
-        scalingDecisionExecutor.scaleResource(
-                flinkDep, scalingInfo, conf, Map.of(op, evaluated(10, 50, 100)));
-        assertEquals(Map.of(op, 5), getScaledParallelism(flinkDep));
-
-        // Allow immediate scale up
-        scalingDecisionExecutor.scaleResource(
-                flinkDep, scalingInfo, conf, Map.of(op, evaluated(5, 100, 50)));
-        assertEquals(Map.of(op, 10), getScaledParallelism(flinkDep));
-    }
-
     private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(
             int parallelism, double target, double procRate, double catchupRate) {
         var metrics = new HashMap<ScalingMetric, EvaluatedScalingMetric>();