You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/12/19 21:15:59 UTC

[flink-kubernetes-operator] branch main updated (c8ed11df -> 86458aac)

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


    from c8ed11df [FLINK-28875] Add FlinkSessionJobControllerTest
     new 8d285fe1 [FLINK-30260][autoscaler] Define scaling metrics
     new 5aa9956e [FLINK-30260][autoscaler] Utilities for collecting and computing metrics
     new d535333a [FLINK-30260][autoscaler] Collect and compute scaling metrics through Flink REST API
     new 0b3cc131 [FLINK-30260][autoscaler] Add scaling metric evaluation logic
     new 37bf5107 [FLINK-30260][autoscaler] Add scaling execution logic
     new 53b1639a [FLINK-30260][autoscaler] Add JobAutoScaler component and configs
     new 4408c665 [FLINK-30260][autoscaler] Integrate autoscaler components with reconciler mechanism
     new c5e437f3 [FLINK-30260][autoscaler] Autoscaler example application
     new 86458aac [docs][autoscaler] Add docs page + enable config doc generation for Autoscaler options

The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/ci.yml                           |   2 +-
 docs/content/docs/custom-resource/autoscaler.md    | 158 ++++++++
 docs/content/docs/custom-resource/reference.md     |   1 +
 docs/content/docs/operations/configuration.md      |   6 +
 .../generated/auto_scaler_configuration.html       |  90 +++++
 .../kustomization.yaml => autoscaling/Dockerfile}  |   9 +-
 .../autoscaling.yaml}                              |  37 +-
 .../pom.xml                                        |  91 ++++-
 .../main/java/autoscaling/AutoscalingExample.java  |  40 ++
 .../src/main/resources/log4j2.properties           |   8 +-
 .../configuration/ConfigOptionsDocGenerator.java   |   6 +-
 .../kubernetes/operator/api/spec/FlinkVersion.java |   3 +-
 .../flink/kubernetes/operator/FlinkOperator.java   |  14 +-
 .../operator/autoscaler/AutoScalerInfo.java        | 189 +++++++++
 .../Cleanup.java}                                  |  18 +-
 .../operator/autoscaler/JobAutoScaler.java         | 204 ++++++++++
 .../autoscaler/RestApiMetricsCollector.java        | 100 +++++
 .../operator/autoscaler/ScalingExecutor.java       | 362 +++++++++++++++++
 .../autoscaler/ScalingMetricCollector.java         | 443 +++++++++++++++++++++
 .../autoscaler/ScalingMetricEvaluator.java         | 228 +++++++++++
 .../operator/autoscaler/ScalingSummary.java        |  51 +++
 .../autoscaler/config/AutoScalerOptions.java       | 121 ++++++
 .../autoscaler/metrics/CollectedMetrics.java       |  25 +-
 .../autoscaler/metrics/EvaluatedScalingMetric.java |  21 +-
 .../operator/autoscaler/metrics/FlinkMetric.java   |  49 +++
 .../operator/autoscaler/metrics/ScalingMetric.java |  74 ++++
 .../autoscaler/metrics/ScalingMetrics.java         | 159 ++++++++
 .../operator/autoscaler/topology/JobTopology.java  | 154 +++++++
 .../topology/VertexInfo.java}                      |  22 +-
 .../operator/autoscaler/utils/AutoScalerUtils.java |  70 ++++
 .../autoscaler/utils/JobVertexSerDeModule.java     |  54 +++
 .../AbstractFlinkResourceReconciler.java           |  13 +-
 .../deployment/AbstractJobReconciler.java          |   6 +-
 .../deployment/ApplicationReconciler.java          |   6 +-
 .../reconciler/deployment/ReconcilerFactory.java   |  12 +-
 .../reconciler/deployment/SessionReconciler.java   |   6 +-
 .../sessionjob/SessionJobReconciler.java           |   6 +-
 .../operator/service/AbstractFlinkService.java     |   4 +-
 .../kubernetes/operator/service/FlinkService.java  |   3 +
 .../operator/utils/KubernetesClientUtils.java      |  19 +
 .../rest/messages/job/metrics/IOMetricsInfo.java   | 191 +++++++++
 .../kubernetes/operator/TestingFlinkService.java   |   2 +-
 .../autoscaler/BacklogBasedScalingTest.java        | 352 ++++++++++++++++
 .../operator/autoscaler/JobTopologyTest.java       |  99 +++++
 .../MetricsCollectionAndEvaluationTest.java        | 251 ++++++++++++
 .../operator/autoscaler/ScalingExecutorTest.java   | 350 ++++++++++++++++
 .../autoscaler/ScalingMetricEvaluatorTest.java     | 227 +++++++++++
 .../autoscaler/TestingMetricsCollector.java        |  79 ++++
 .../autoscaler/metrics/ScalingMetricsTest.java     | 178 +++++++++
 .../TestingFlinkDeploymentController.java          |   5 +-
 .../TestingFlinkSessionJobController.java          |   5 +-
 .../sessionjob/FlinkSessionJobObserverTest.java    |   3 +-
 .../deployment/ApplicationReconcilerTest.java      |   3 +-
 .../ApplicationReconcilerUpgradeModeTest.java      |   3 +-
 .../deployment/SessionReconcilerTest.java          |   9 +-
 .../sessionjob/SessionJobReconcilerTest.java       |  12 +-
 .../operator/service/NativeFlinkServiceTest.java   |   2 +-
 .../crds/flinkdeployments.flink.apache.org-v1.yml  |   1 +
 pom.xml                                            |   1 +
 59 files changed, 4538 insertions(+), 119 deletions(-)
 create mode 100644 docs/content/docs/custom-resource/autoscaler.md
 create mode 100644 docs/layouts/shortcodes/generated/auto_scaler_configuration.html
 copy examples/{kustomize/sidecar/kustomization.yaml => autoscaling/Dockerfile} (87%)
 copy examples/{basic-checkpoint-ha.yaml => autoscaling/autoscaling.yaml} (69%)
 copy examples/{flink-sql-runner-example => autoscaling}/pom.xml (57%)
 create mode 100644 examples/autoscaling/src/main/java/autoscaling/AutoscalingExample.java
 copy {flink-kubernetes-webhook => examples/autoscaling}/src/main/resources/log4j2.properties (86%)
 create mode 100644 flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
 copy flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/{exception/StatusConflictException.java => autoscaler/Cleanup.java} (68%)
 create mode 100644 flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScaler.java
 create mode 100644 flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java
 create mode 100644 flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
 create mode 100644 flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
 create mode 100644 flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
 create mode 100644 flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java
 create mode 100644 flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
 copy flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobState.java => flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java (62%)
 copy flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/Resource.java => flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java (73%)
 create mode 100644 flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/FlinkMetric.java
 create mode 100644 flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
 create mode 100644 flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
 create mode 100644 flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/JobTopology.java
 copy flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/{metrics/CustomResourceMetrics.java => autoscaler/topology/VertexInfo.java} (71%)
 create mode 100644 flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtils.java
 create mode 100644 flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/JobVertexSerDeModule.java
 create mode 100644 flink-kubernetes-operator/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java
 create mode 100644 flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
 create mode 100644 flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java
 create mode 100644 flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
 create mode 100644 flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
 create mode 100644 flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
 create mode 100644 flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java
 create mode 100644 flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java


[flink-kubernetes-operator] 01/09: [FLINK-30260][autoscaler] Define scaling metrics

Posted by gy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 8d285fe1397c24d740ac7baa85e9bc22eb03fff6
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Thu Dec 1 08:47:23 2022 +0100

    [FLINK-30260][autoscaler] Define scaling metrics
---
 .../autoscaler/metrics/CollectedMetrics.java       |  34 ++++
 .../autoscaler/metrics/EvaluatedScalingMetric.java |  36 +++++
 .../operator/autoscaler/metrics/ScalingMetric.java |  74 +++++++++
 .../autoscaler/metrics/ScalingMetrics.java         | 159 ++++++++++++++++++
 .../autoscaler/metrics/ScalingMetricsTest.java     | 178 +++++++++++++++++++++
 5 files changed, 481 insertions(+)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java
new file mode 100644
index 00000000..60f1c410
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+
+import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import lombok.Value;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.SortedMap;
+
+/** Topology and collected metric history. */
+@Value
+public class CollectedMetrics {
+    JobTopology jobTopology;
+    SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> metricHistory;
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java
new file mode 100644
index 00000000..a2304128
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/** Evaluated scaling metric. */
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class EvaluatedScalingMetric {
+    private double current;
+
+    private double average;
+
+    public static EvaluatedScalingMetric of(double value) {
+        return new EvaluatedScalingMetric(value, Double.NaN);
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
new file mode 100644
index 00000000..d3103971
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+
+/**
+ * Supported scaling metrics. These represent high level metrics computed from Flink job metrics
+ * that are used for scaling decisions in the autoscaler module.
+ */
+public enum ScalingMetric {
+
+    /** Max subtask load (busy time ratio 0 (idle) to 1 (fully utilized)). */
+    LOAD_MAX(true),
+
+    /** Average subtask load (busy time ratio 0 (idle) to 1 (fully utilized)). */
+    LOAD_AVG(true),
+
+    /** Processing rate at full capacity (records/sec). */
+    TRUE_PROCESSING_RATE(true),
+
+    /** Output rate at full capacity (records/sec). */
+    TRUE_OUTPUT_RATE(true),
+
+    /**
+     * Incoming data rate to the source, e.g. rate of records written to the Kafka topic
+     * (records/sec).
+     */
+    SOURCE_DATA_RATE(true),
+
+    /** Target processing rate of operators as derived from source inputs (records/sec). */
+    TARGET_DATA_RATE(true),
+
+    /** Target processing rate of operators as derived from backlog (records/sec). */
+    CATCH_UP_DATA_RATE(false),
+
+    /** Number of outputs produced on average for every input record. */
+    OUTPUT_RATIO(true),
+
+    /** Total number of pending records. */
+    LAG(false),
+    /** Job vertex parallelism. */
+    PARALLELISM(false),
+    /** Job vertex max parallelism. */
+    MAX_PARALLELISM(false),
+    /** Upper boundary of the target data rate range. */
+    SCALE_UP_RATE_THRESHOLD(false),
+
+    /** Lower boundary of the target data rate range. */
+    SCALE_DOWN_RATE_THRESHOLD(false);
+
+    private final boolean calculateAverage;
+
+    ScalingMetric(boolean calculateAverage) {
+        this.calculateAverage = calculateAverage;
+    }
+
+    public boolean isCalculateAverage() {
+        return calculateAverage;
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
new file mode 100644
index 00000000..27b3d97d
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SOURCE_SCALING_ENABLED;
+
+/** Utilities for computing scaling metrics based on Flink metrics. */
+public class ScalingMetrics {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ScalingMetrics.class);
+
+    public static void computeLoadMetrics(
+            Map<FlinkMetric, AggregatedMetric> flinkMetrics,
+            Map<ScalingMetric, Double> scalingMetrics) {
+
+        var busyTime = flinkMetrics.get(FlinkMetric.BUSY_TIME_PER_SEC);
+        if (busyTime == null) {
+            return;
+        }
+
+        if (!busyTime.getAvg().isNaN()) {
+            scalingMetrics.put(ScalingMetric.LOAD_AVG, busyTime.getAvg() / 1000);
+        }
+
+        if (!busyTime.getMax().isNaN()) {
+            scalingMetrics.put(ScalingMetric.LOAD_MAX, busyTime.getMax() / 1000);
+        }
+    }
+
+    public static void computeDataRateMetrics(
+            JobVertexID jobVertexID,
+            Map<FlinkMetric, AggregatedMetric> flinkMetrics,
+            Map<ScalingMetric, Double> scalingMetrics,
+            JobTopology topology,
+            Optional<Double> lagGrowthOpt,
+            Configuration conf) {
+
+        var source = topology.getInputs().get(jobVertexID).isEmpty();
+        var sink = topology.getOutputs().get(jobVertexID).isEmpty();
+
+        var busyTime = flinkMetrics.get(FlinkMetric.BUSY_TIME_PER_SEC);
+
+        if (busyTime == null || busyTime.getAvg().isNaN()) {
+            LOG.error("Cannot compute true processing/output rate without busyTimeMsPerSecond");
+            return;
+        }
+
+        var numRecordsInPerSecond = flinkMetrics.get(FlinkMetric.NUM_RECORDS_IN_PER_SEC);
+        if (numRecordsInPerSecond == null) {
+            numRecordsInPerSecond =
+                    flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
+        }
+
+        var outputPerSecond = flinkMetrics.get(FlinkMetric.NUM_RECORDS_OUT_PER_SEC);
+
+        double busyTimeMultiplier = 1000 / busyTime.getAvg();
+
+        if (source && !conf.getBoolean(SOURCE_SCALING_ENABLED)) {
+            double sourceInputRate =
+                    numRecordsInPerSecond != null ? numRecordsInPerSecond.getSum() : Double.NaN;
+
+            double targetDataRate;
+            if (!Double.isNaN(sourceInputRate) && sourceInputRate > 0) {
+                targetDataRate = sourceInputRate;
+            } else {
+                // If source in metric is not available (maybe legacy source) we use source
+                // output that should always be available
+                targetDataRate =
+                        flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC).getSum();
+            }
+            scalingMetrics.put(ScalingMetric.TRUE_PROCESSING_RATE, Double.NaN);
+            scalingMetrics.put(
+                    ScalingMetric.OUTPUT_RATIO, outputPerSecond.getSum() / targetDataRate);
+            var trueOutputRate = busyTimeMultiplier * outputPerSecond.getSum();
+            scalingMetrics.put(ScalingMetric.TRUE_OUTPUT_RATE, trueOutputRate);
+            scalingMetrics.put(ScalingMetric.TARGET_DATA_RATE, trueOutputRate);
+            LOG.info(
+                    "Scaling disabled for source {} using output rate {} as target",
+                    jobVertexID,
+                    trueOutputRate);
+        } else {
+            if (source) {
+                if (!lagGrowthOpt.isPresent() || numRecordsInPerSecond.getSum().isNaN()) {
+                    LOG.error(
+                            "Cannot compute source target data rate without numRecordsInPerSecond and pendingRecords (lag) metric for {}.",
+                            jobVertexID);
+                    scalingMetrics.put(ScalingMetric.TARGET_DATA_RATE, Double.NaN);
+                } else {
+                    double sourceDataRate =
+                            Math.max(0, numRecordsInPerSecond.getSum() + lagGrowthOpt.get());
+                    LOG.info(
+                            "Using computed source data rate {} for {}",
+                            sourceDataRate,
+                            jobVertexID);
+                    scalingMetrics.put(ScalingMetric.SOURCE_DATA_RATE, sourceDataRate);
+                }
+            }
+
+            if (!numRecordsInPerSecond.getSum().isNaN()) {
+                double trueProcessingRate = busyTimeMultiplier * numRecordsInPerSecond.getSum();
+                if (trueProcessingRate <= 0 || !Double.isFinite(trueProcessingRate)) {
+                    trueProcessingRate = Double.NaN;
+                }
+                scalingMetrics.put(ScalingMetric.TRUE_PROCESSING_RATE, trueProcessingRate);
+            } else {
+                LOG.error("Cannot compute true processing rate without numRecordsInPerSecond");
+            }
+
+            if (!sink) {
+                if (!outputPerSecond.getSum().isNaN()) {
+                    scalingMetrics.put(
+                            ScalingMetric.OUTPUT_RATIO,
+                            outputPerSecond.getSum() / numRecordsInPerSecond.getSum());
+                    scalingMetrics.put(
+                            ScalingMetric.TRUE_OUTPUT_RATE,
+                            busyTimeMultiplier * outputPerSecond.getSum());
+                } else {
+                    LOG.error(
+                            "Cannot compute processing and input rate without numRecordsOutPerSecond");
+                }
+            }
+        }
+    }
+
+    public static void computeLagMetrics(
+            Map<FlinkMetric, AggregatedMetric> flinkMetrics,
+            Map<ScalingMetric, Double> scalingMetrics) {
+        var pendingRecords = flinkMetrics.get(FlinkMetric.PENDING_RECORDS);
+        if (pendingRecords != null) {
+            scalingMetrics.put(ScalingMetric.LAG, pendingRecords.getSum());
+        }
+    }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
new file mode 100644
index 00000000..5aa84b1c
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
+import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Tests for scaling metrics computation logic. */
+public class ScalingMetricsTest {
+
+    @Test
+    public void testProcessingAndOutputMetrics() {
+        var source = new JobVertexID();
+        var op = new JobVertexID();
+        var sink = new JobVertexID();
+
+        var topology =
+                new JobTopology(
+                        new VertexInfo(source, Collections.emptySet(), 1, 1),
+                        new VertexInfo(op, Set.of(source), 1, 1),
+                        new VertexInfo(sink, Set.of(op), 1, 1));
+
+        Map<ScalingMetric, Double> scalingMetrics = new HashMap<>();
+        ScalingMetrics.computeDataRateMetrics(
+                source,
+                Map.of(
+                        FlinkMetric.BUSY_TIME_PER_SEC,
+                        new AggregatedMetric("", Double.NaN, Double.NaN, 100., Double.NaN),
+                        FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                        new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 1000.),
+                        FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                        new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 2000.)),
+                scalingMetrics,
+                topology,
+                Optional.of(15.),
+                new Configuration());
+
+        assertEquals(
+                Map.of(
+                        ScalingMetric.TRUE_PROCESSING_RATE,
+                        10000.,
+                        ScalingMetric.TRUE_OUTPUT_RATE,
+                        20000.,
+                        ScalingMetric.OUTPUT_RATIO,
+                        2.,
+                        ScalingMetric.SOURCE_DATA_RATE,
+                        1015.),
+                scalingMetrics);
+
+        // test negative lag growth (catch up)
+        scalingMetrics.clear();
+        ScalingMetrics.computeDataRateMetrics(
+                source,
+                Map.of(
+                        FlinkMetric.BUSY_TIME_PER_SEC,
+                        new AggregatedMetric("", Double.NaN, Double.NaN, 100., Double.NaN),
+                        FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                        new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 1000.),
+                        FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                        new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 2000.)),
+                scalingMetrics,
+                topology,
+                Optional.of(-50.),
+                new Configuration());
+
+        assertEquals(
+                Map.of(
+                        ScalingMetric.TRUE_PROCESSING_RATE,
+                        10000.,
+                        ScalingMetric.TRUE_OUTPUT_RATE,
+                        20000.,
+                        ScalingMetric.OUTPUT_RATIO,
+                        2.,
+                        ScalingMetric.SOURCE_DATA_RATE,
+                        950.),
+                scalingMetrics);
+
+        scalingMetrics.clear();
+        ScalingMetrics.computeDataRateMetrics(
+                op,
+                Map.of(
+                        FlinkMetric.BUSY_TIME_PER_SEC,
+                        new AggregatedMetric("", Double.NaN, Double.NaN, 100., Double.NaN),
+                        FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                        new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 1000.),
+                        FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                        new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 2000.)),
+                scalingMetrics,
+                topology,
+                Optional.empty(),
+                new Configuration());
+
+        assertEquals(
+                Map.of(
+                        ScalingMetric.TRUE_PROCESSING_RATE,
+                        10000.,
+                        ScalingMetric.TRUE_OUTPUT_RATE,
+                        20000.,
+                        ScalingMetric.OUTPUT_RATIO,
+                        2.),
+                scalingMetrics);
+    }
+
+    @Test
+    public void testSourceScalingDisabled() {
+        var source = new JobVertexID();
+
+        var topology = new JobTopology(new VertexInfo(source, Collections.emptySet(), 1, 1));
+
+        Configuration conf = new Configuration();
+        // Disable scaling sources
+        conf.setBoolean(AutoScalerOptions.SOURCE_SCALING_ENABLED, false);
+
+        Map<ScalingMetric, Double> scalingMetrics = new HashMap<>();
+        ScalingMetrics.computeDataRateMetrics(
+                source,
+                Map.of(
+                        FlinkMetric.BUSY_TIME_PER_SEC,
+                        new AggregatedMetric("", Double.NaN, Double.NaN, 500., Double.NaN),
+                        FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC,
+                        new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 2000.),
+                        FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                        new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 4000.)),
+                scalingMetrics,
+                topology,
+                Optional.empty(),
+                conf);
+
+        // Sources are not scaled, the rates are solely computed on the basis of the true output
+        // rate
+        assertEquals(Double.NaN, scalingMetrics.get(ScalingMetric.TRUE_PROCESSING_RATE));
+        assertEquals(8000, scalingMetrics.get(ScalingMetric.TARGET_DATA_RATE));
+        assertEquals(8000, scalingMetrics.get(ScalingMetric.TRUE_OUTPUT_RATE));
+        assertEquals(2, scalingMetrics.get(ScalingMetric.OUTPUT_RATIO));
+    }
+
+    @Test
+    public void testLoadMetrics() {
+        Map<ScalingMetric, Double> scalingMetrics = new HashMap<>();
+        ScalingMetrics.computeLoadMetrics(
+                Map.of(
+                        FlinkMetric.BUSY_TIME_PER_SEC,
+                        new AggregatedMetric("", Double.NaN, 200., 100., Double.NaN)),
+                scalingMetrics);
+
+        assertEquals(0.2, scalingMetrics.get(ScalingMetric.LOAD_MAX));
+        assertEquals(0.1, scalingMetrics.get(ScalingMetric.LOAD_AVG));
+    }
+}


[flink-kubernetes-operator] 05/09: [FLINK-30260][autoscaler] Add scaling execution logic

Posted by gy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 37bf5107a957110cfa4289858f32a0baa00d2dbf
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Thu Dec 1 09:00:40 2022 +0100

    [FLINK-30260][autoscaler] Add scaling execution logic
---
 .../operator/autoscaler/ScalingExecutor.java       | 362 +++++++++++++++++++++
 .../operator/autoscaler/ScalingSummary.java        |  51 +++
 .../operator/autoscaler/ScalingExecutorTest.java   | 350 ++++++++++++++++++++
 3 files changed, 763 insertions(+)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
new file mode 100644
index 00000000..afd9d2fc
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
+import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedMap;
+
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.STABILIZATION_INTERVAL;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+
+/** Class responsible for executing scaling decisions. */
+public class ScalingExecutor implements Cleanup {
+
+    public static final ConfigOption<Map<String, String>> PARALLELISM_OVERRIDES =
+            ConfigOptions.key("pipeline.jobvertex-parallelism-overrides")
+                    .mapType()
+                    .defaultValue(Collections.emptyMap())
+                    .withDescription(
+                            "A parallelism override map (jobVertexId -> parallelism) which will be used to update"
+                                    + " the parallelism of the corresponding job vertices of submitted JobGraphs.");
+
+    private static final Logger LOG = LoggerFactory.getLogger(ScalingExecutor.class);
+
+    private final KubernetesClient kubernetesClient;
+
+    private Clock clock = Clock.system(ZoneId.systemDefault());
+
+    public ScalingExecutor(KubernetesClient kubernetesClient) {
+        this.kubernetesClient = kubernetesClient;
+    }
+
+    public boolean scaleResource(
+            AbstractFlinkResource<?, ?> resource,
+            AutoScalerInfo scalingInformation,
+            Configuration conf,
+            Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics)
+            throws Exception {
+
+        if (!conf.get(SCALING_ENABLED)) {
+            return false;
+        }
+
+        if (!stabilizationPeriodPassed(resource, conf)) {
+            return false;
+        }
+
+        var scalingHistory = scalingInformation.getScalingHistory();
+        var scalingSummaries = computeScalingSummary(conf, evaluatedMetrics, scalingHistory);
+        if (scalingSummaries.isEmpty()) {
+            LOG.info("All job vertices are currently running at their target parallelism.");
+            return false;
+        }
+
+        if (allVerticesWithinUtilizationTarget(evaluatedMetrics, scalingSummaries)) {
+            return false;
+        }
+
+        LOG.info("Scaling vertices:");
+        scalingSummaries.forEach(
+                (v, s) ->
+                        LOG.info(
+                                "{} | Parallelism {} -> {}",
+                                v,
+                                s.getCurrentParallelism(),
+                                s.getNewParallelism()));
+
+        setVertexParallelismOverrides(resource, evaluatedMetrics, scalingSummaries);
+
+        KubernetesClientUtils.replaceSpecAfterScaling(kubernetesClient, resource);
+        scalingInformation.addToScalingHistory(clock.instant(), scalingSummaries);
+
+        return true;
+    }
+
+    private boolean stabilizationPeriodPassed(
+            AbstractFlinkResource<?, ?> resource, Configuration conf) {
+        var now = clock.instant();
+        var startTs =
+                Instant.ofEpochMilli(
+                        Long.parseLong(resource.getStatus().getJobStatus().getStartTime()));
+        var stableTime = startTs.plus(conf.get(STABILIZATION_INTERVAL));
+
+        if (stableTime.isAfter(now)) {
+            LOG.info("Waiting until {} to stabilize before new scale operation.", stableTime);
+            return false;
+        }
+        return true;
+    }
+
+    protected static boolean allVerticesWithinUtilizationTarget(
+            Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics,
+            Map<JobVertexID, ScalingSummary> scalingSummaries) {
+
+        for (Map.Entry<JobVertexID, ScalingSummary> entry : scalingSummaries.entrySet()) {
+            var vertex = entry.getKey();
+            var scalingSummary = entry.getValue();
+            var metrics = evaluatedMetrics.get(vertex);
+
+            double processingRate = metrics.get(TRUE_PROCESSING_RATE).getAverage();
+            double scaleUpRateThreshold = metrics.get(SCALE_UP_RATE_THRESHOLD).getCurrent();
+            double scaleDownRateThreshold = metrics.get(SCALE_DOWN_RATE_THRESHOLD).getCurrent();
+
+            if (processingRate < scaleUpRateThreshold || processingRate > scaleDownRateThreshold) {
+                LOG.info(
+                        "Vertex {}(pCurr={}, pNew={}) processing rate {} is outside ({}, {})",
+                        vertex,
+                        scalingSummary.getCurrentParallelism(),
+                        scalingSummary.getNewParallelism(),
+                        processingRate,
+                        scaleUpRateThreshold,
+                        scaleDownRateThreshold);
+                return false;
+            } else {
+                LOG.debug(
+                        "Vertex {}(pCurr={}, pNew={}) processing rate {} is within target ({}, {})",
+                        vertex,
+                        scalingSummary.getCurrentParallelism(),
+                        scalingSummary.getNewParallelism(),
+                        processingRate,
+                        scaleUpRateThreshold,
+                        scaleDownRateThreshold);
+            }
+        }
+        LOG.info("All vertex processing rates are within target.");
+        return true;
+    }
+
+    private Map<JobVertexID, ScalingSummary> computeScalingSummary(
+            Configuration conf,
+            Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics,
+            Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory) {
+
+        var out = new HashMap<JobVertexID, ScalingSummary>();
+        evaluatedMetrics.forEach(
+                (v, metrics) -> {
+                    var currentParallelism =
+                            (int) metrics.get(ScalingMetric.PARALLELISM).getCurrent();
+                    var newParallelism =
+                            computeScaleTargetParallelism(
+                                    conf,
+                                    v,
+                                    metrics,
+                                    scalingHistory.getOrDefault(v, Collections.emptySortedMap()));
+                    if (currentParallelism != newParallelism) {
+                        out.put(v, new ScalingSummary(currentParallelism, newParallelism, metrics));
+                    }
+                });
+
+        return out;
+    }
+
+    protected int computeScaleTargetParallelism(
+            Configuration conf,
+            JobVertexID vertex,
+            Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+            SortedMap<Instant, ScalingSummary> history) {
+
+        var currentParallelism = (int) evaluatedMetrics.get(PARALLELISM).getCurrent();
+        double averageTrueProcessingRate = evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
+
+        if (Double.isNaN(averageTrueProcessingRate)) {
+            LOG.info(
+                    "True processing rate is not available for {}, cannot compute new parallelism",
+                    vertex);
+            return currentParallelism;
+        }
+
+        double targetCapacity =
+                AutoScalerUtils.getTargetProcessingCapacity(
+                        evaluatedMetrics, conf, conf.get(TARGET_UTILIZATION), true);
+        if (Double.isNaN(targetCapacity)) {
+            LOG.info(
+                    "Target data rate is not available for {}, cannot compute new parallelism",
+                    vertex);
+            return currentParallelism;
+        }
+
+        LOG.info("Target processing capacity for {} is {}", vertex, targetCapacity);
+        double scaleFactor = targetCapacity / averageTrueProcessingRate;
+        double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
+        if (scaleFactor < minScaleFactor) {
+            LOG.info(
+                    "Computed scale factor of {} for {} is capped by maximum scale down factor to {}",
+                    scaleFactor,
+                    vertex,
+                    minScaleFactor);
+            scaleFactor = minScaleFactor;
+        }
+
+        int newParallelism =
+                scale(
+                        currentParallelism,
+                        (int) evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
+                        scaleFactor,
+                        conf.getInteger(VERTEX_MIN_PARALLELISM),
+                        conf.getInteger(VERTEX_MAX_PARALLELISM));
+
+        if (!history.isEmpty()) {
+            if (detectImmediateScaleDownAfterScaleUp(
+                    conf, history, currentParallelism, newParallelism)) {
+                LOG.info(
+                        "Skipping immediate scale down after scale up for {} resetting target parallelism to {}",
+                        vertex,
+                        currentParallelism);
+                newParallelism = currentParallelism;
+            }
+
+            // currentParallelism = 2 , newParallelism = 1, minimumProcRate = 1000 r/s
+            // history
+            // currentParallelism 1 => 3 -> empiricalProcRate = 800
+            // empiricalProcRate + upperBoundary < minimumProcRate => don't scale
+        }
+
+        return newParallelism;
+    }
+
+    private boolean detectImmediateScaleDownAfterScaleUp(
+            Configuration conf,
+            SortedMap<Instant, ScalingSummary> history,
+            int currentParallelism,
+            int newParallelism) {
+        var lastScalingTs = history.lastKey();
+        var lastSummary = history.get(lastScalingTs);
+
+        boolean isScaleDown = newParallelism < currentParallelism;
+        boolean lastScaleUp = lastSummary.getNewParallelism() > lastSummary.getCurrentParallelism();
+
+        var gracePeriod = conf.get(SCALE_UP_GRACE_PERIOD);
+
+        boolean withinConfiguredTime =
+                Duration.between(lastScalingTs, clock.instant()).minus(gracePeriod).isNegative();
+
+        return isScaleDown && lastScaleUp && withinConfiguredTime;
+    }
+
+    public static int scale(
+            int parallelism,
+            int numKeyGroups,
+            double scaleFactor,
+            int minParallelism,
+            int maxParallelism) {
+        Preconditions.checkArgument(
+                minParallelism <= maxParallelism,
+                "The minimum parallelism must not be greater than the maximum parallelism.");
+        if (minParallelism > numKeyGroups) {
+            LOG.warn(
+                    "Specified autoscaler minimum parallelism {} is greater than the operator max parallelism {}. The min parallelism will be set to the operator max parallelism.",
+                    minParallelism,
+                    numKeyGroups);
+        }
+        if (numKeyGroups < maxParallelism && maxParallelism != Integer.MAX_VALUE) {
+            LOG.warn(
+                    "Specified autoscaler maximum parallelism {} is greater than the operator max parallelism {}. This means the operator max parallelism can never be reached.",
+                    maxParallelism,
+                    numKeyGroups);
+        }
+
+        int newParallelism =
+                // Prevent integer overflow when converting from double to integer.
+                // We do not have to detect underflow because doubles cannot
+                // underflow.
+                (int) Math.min(Math.ceil(scaleFactor * parallelism), Integer.MAX_VALUE);
+
+        // Cap parallelism at either number of key groups or parallelism limit
+        final int upperBound = Math.min(numKeyGroups, maxParallelism);
+
+        // Apply min/max parallelism
+        newParallelism = Math.min(Math.max(minParallelism, newParallelism), upperBound);
+
+        // Try to adjust the parallelism such that it divides the number of key groups without a
+        // remainder => state is evenly spread across subtasks
+        for (int p = newParallelism; p <= numKeyGroups / 2 && p <= upperBound; p++) {
+            if (numKeyGroups % p == 0) {
+                return p;
+            }
+        }
+
+        // If key group adjustment fails, use originally computed parallelism
+        return newParallelism;
+    }
+
+    private void setVertexParallelismOverrides(
+            AbstractFlinkResource<?, ?> resource,
+            Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics,
+            Map<JobVertexID, ScalingSummary> summaries) {
+        var flinkConf = Configuration.fromMap(resource.getSpec().getFlinkConfiguration());
+        var overrides = new HashMap<String, String>();
+        evaluatedMetrics.forEach(
+                (id, metrics) -> {
+                    if (summaries.containsKey(id)) {
+                        overrides.put(
+                                id.toHexString(),
+                                String.valueOf(summaries.get(id).getNewParallelism()));
+                    } else {
+                        overrides.put(
+                                id.toHexString(),
+                                String.valueOf(
+                                        (int) metrics.get(ScalingMetric.PARALLELISM).getCurrent()));
+                    }
+                });
+        flinkConf.set(PARALLELISM_OVERRIDES, overrides);
+
+        resource.getSpec().setFlinkConfiguration(flinkConf.toMap());
+    }
+
+    @VisibleForTesting
+    protected void setClock(Clock clock) {
+        this.clock = Preconditions.checkNotNull(clock);
+    }
+
+    @Override
+    public void cleanup(AbstractFlinkResource<?, ?> cr) {
+        // No cleanup is currently necessary
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java
new file mode 100644
index 00000000..7def1716
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Map;
+
+/** Scaling summary returned by the {@link ScalingMetricEvaluator}. */
+@Data
+@NoArgsConstructor
+public class ScalingSummary {
+
+    private int currentParallelism;
+
+    private int newParallelism;
+
+    private Map<ScalingMetric, EvaluatedScalingMetric> metrics;
+
+    public ScalingSummary(
+            int currentParallelism,
+            int newParallelism,
+            Map<ScalingMetric, EvaluatedScalingMetric> metrics) {
+        if (currentParallelism == newParallelism) {
+            throw new IllegalArgumentException(
+                    "Current parallelism should not be equal to newParallelism during scaling.");
+        }
+        this.currentParallelism = currentParallelism;
+        this.newParallelism = newParallelism;
+        this.metrics = metrics;
+    }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
new file mode 100644
index 00000000..e819fd89
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import org.junit.Assert;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.kubernetes.operator.autoscaler.ScalingExecutor.PARALLELISM_OVERRIDES;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Test for scaling metrics collection logic. */
+@EnableKubernetesMockClient(crud = true)
+public class ScalingExecutorTest {
+
+    private ScalingExecutor scalingDecisionExecutor;
+    private Configuration conf;
+    private KubernetesClient kubernetesClient;
+    private FlinkDeployment flinkDep;
+
+    @BeforeEach
+    public void setup() {
+        scalingDecisionExecutor = new ScalingExecutor(kubernetesClient);
+        conf = new Configuration();
+        conf.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ZERO);
+        conf.set(AutoScalerOptions.SCALING_ENABLED, true);
+        conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
+        conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
+
+        flinkDep = TestUtils.buildApplicationCluster();
+        kubernetesClient.resource(flinkDep).createOrReplace();
+        flinkDep.getStatus()
+                .getJobStatus()
+                .setStartTime(String.valueOf(System.currentTimeMillis()));
+    }
+
+    @Test
+    public void testStabilizationPeriod() throws Exception {
+        conf.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ofMinutes(1));
+
+        var metrics = Map.of(new JobVertexID(), evaluated(1, 110, 100));
+
+        var scalingInfo = new AutoScalerInfo(new HashMap<>());
+        var clock = Clock.fixed(Instant.now(), ZoneId.systemDefault());
+        flinkDep.getStatus()
+                .getJobStatus()
+                .setStartTime(String.valueOf(clock.instant().toEpochMilli()));
+
+        scalingDecisionExecutor.setClock(clock);
+        assertFalse(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics));
+
+        clock = Clock.offset(clock, Duration.ofSeconds(30));
+        scalingDecisionExecutor.setClock(clock);
+        assertFalse(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics));
+
+        clock = Clock.offset(clock, Duration.ofSeconds(20));
+        scalingDecisionExecutor.setClock(clock);
+        assertFalse(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics));
+
+        clock = Clock.offset(clock, Duration.ofSeconds(20));
+        scalingDecisionExecutor.setClock(clock);
+        assertTrue(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics));
+
+        flinkDep.getStatus()
+                .getJobStatus()
+                .setStartTime(String.valueOf(clock.instant().toEpochMilli()));
+        assertFalse(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics));
+
+        clock = Clock.offset(clock, Duration.ofSeconds(59));
+        scalingDecisionExecutor.setClock(clock);
+        assertFalse(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics));
+
+        clock = Clock.offset(clock, Duration.ofSeconds(2));
+        scalingDecisionExecutor.setClock(clock);
+        assertTrue(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics));
+    }
+
+    @Test
+    public void testUtilizationBoundaries() {
+        // Restart time should not affect utilization boundary
+        conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO);
+        conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
+
+        var flinkDep = TestUtils.buildApplicationCluster();
+        kubernetesClient.resource(flinkDep).createOrReplace();
+
+        var op1 = new JobVertexID();
+
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6);
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.);
+
+        var evaluated = Map.of(op1, evaluated(1, 70, 100));
+        var scalingSummary = Map.of(op1, new ScalingSummary(2, 1, evaluated.get(op1)));
+        assertFalse(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated, scalingSummary));
+
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.2);
+        evaluated = Map.of(op1, evaluated(1, 70, 100));
+        scalingSummary = Map.of(op1, new ScalingSummary(2, 1, evaluated.get(op1)));
+        assertTrue(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated, scalingSummary));
+        assertNull(getScaledParallelism(flinkDep));
+
+        var op2 = new JobVertexID();
+        evaluated =
+                Map.of(
+                        op1, evaluated(1, 70, 100),
+                        op2, evaluated(1, 85, 100));
+        scalingSummary =
+                Map.of(
+                        op1,
+                        new ScalingSummary(1, 2, evaluated.get(op1)),
+                        op2,
+                        new ScalingSummary(1, 2, evaluated.get(op2)));
+
+        assertFalse(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated, scalingSummary));
+
+        evaluated =
+                Map.of(
+                        op1, evaluated(1, 70, 100),
+                        op2, evaluated(1, 70, 100));
+        scalingSummary =
+                Map.of(
+                        op1,
+                        new ScalingSummary(1, 2, evaluated.get(op1)),
+                        op2,
+                        new ScalingSummary(1, 2, evaluated.get(op2)));
+        assertTrue(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated, scalingSummary));
+
+        // Test with backlog based scaling
+        evaluated = Map.of(op1, evaluated(1, 70, 100, 15));
+        scalingSummary = Map.of(op1, new ScalingSummary(1, 2, evaluated.get(op1)));
+        assertFalse(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated, scalingSummary));
+    }
+
+    @Test
+    public void testParallelismScaling() {
+        var op = new JobVertexID();
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+        assertEquals(
+                5,
+                scalingDecisionExecutor.computeScaleTargetParallelism(
+                        conf, op, evaluated(10, 50, 100), Collections.emptySortedMap()));
+
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
+        assertEquals(
+                8,
+                scalingDecisionExecutor.computeScaleTargetParallelism(
+                        conf, op, evaluated(10, 50, 100), Collections.emptySortedMap()));
+
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
+        assertEquals(
+                10,
+                scalingDecisionExecutor.computeScaleTargetParallelism(
+                        conf, op, evaluated(10, 80, 100), Collections.emptySortedMap()));
+
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
+        assertEquals(
+                8,
+                scalingDecisionExecutor.computeScaleTargetParallelism(
+                        conf, op, evaluated(10, 60, 100), Collections.emptySortedMap()));
+
+        assertEquals(
+                8,
+                scalingDecisionExecutor.computeScaleTargetParallelism(
+                        conf, op, evaluated(10, 59, 100), Collections.emptySortedMap()));
+
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.5);
+        assertEquals(
+                10,
+                scalingDecisionExecutor.computeScaleTargetParallelism(
+                        conf, op, evaluated(2, 100, 40), Collections.emptySortedMap()));
+
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6);
+        assertEquals(
+                4,
+                scalingDecisionExecutor.computeScaleTargetParallelism(
+                        conf, op, evaluated(2, 100, 100), Collections.emptySortedMap()));
+
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+        conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.5);
+        assertEquals(
+                5,
+                scalingDecisionExecutor.computeScaleTargetParallelism(
+                        conf, op, evaluated(10, 10, 100), Collections.emptySortedMap()));
+
+        conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.6);
+        assertEquals(
+                4,
+                scalingDecisionExecutor.computeScaleTargetParallelism(
+                        conf, op, evaluated(10, 10, 100), Collections.emptySortedMap()));
+    }
+
+    @Test
+    public void testParallelismComputation() {
+        final int minParallelism = 1;
+        final int maxParallelism = Integer.MAX_VALUE;
+        assertEquals(1, ScalingExecutor.scale(1, 720, 0.0001, minParallelism, maxParallelism));
+        assertEquals(1, ScalingExecutor.scale(2, 720, 0.1, minParallelism, maxParallelism));
+        assertEquals(5, ScalingExecutor.scale(6, 720, 0.8, minParallelism, maxParallelism));
+        assertEquals(32, ScalingExecutor.scale(16, 128, 1.5, minParallelism, maxParallelism));
+        assertEquals(400, ScalingExecutor.scale(200, 720, 2, minParallelism, maxParallelism));
+        assertEquals(
+                720,
+                ScalingExecutor.scale(200, 720, Integer.MAX_VALUE, minParallelism, maxParallelism));
+    }
+
+    @Test
+    public void testParallelismComputationWithLimit() {
+        assertEquals(5, ScalingExecutor.scale(6, 720, 0.8, 1, 700));
+        assertEquals(8, ScalingExecutor.scale(8, 720, 0.8, 8, 700));
+
+        assertEquals(32, ScalingExecutor.scale(16, 128, 1.5, 1, Integer.MAX_VALUE));
+        assertEquals(64, ScalingExecutor.scale(16, 128, 1.5, 60, Integer.MAX_VALUE));
+
+        assertEquals(300, ScalingExecutor.scale(200, 720, 2, 1, 300));
+        assertEquals(600, ScalingExecutor.scale(200, 720, Integer.MAX_VALUE, 1, 600));
+    }
+
+    @Test
+    public void ensureMinParallelismDoesNotExceedMax() {
+        Assert.assertThrows(
+                IllegalArgumentException.class,
+                () ->
+                        assertEquals(
+                                600, ScalingExecutor.scale(200, 720, Integer.MAX_VALUE, 500, 499)));
+    }
+
+    @Test
+    public void testMinParallelismLimitIsUsed() {
+        conf.setInteger(AutoScalerOptions.VERTEX_MIN_PARALLELISM, 5);
+        assertEquals(
+                5,
+                scalingDecisionExecutor.computeScaleTargetParallelism(
+                        conf,
+                        new JobVertexID(),
+                        evaluated(10, 100, 500),
+                        Collections.emptySortedMap()));
+    }
+
+    @Test
+    public void testMaxParallelismLimitIsUsed() {
+        conf.setInteger(AutoScalerOptions.VERTEX_MAX_PARALLELISM, 10);
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+        assertEquals(
+                10,
+                scalingDecisionExecutor.computeScaleTargetParallelism(
+                        conf,
+                        new JobVertexID(),
+                        evaluated(10, 500, 100),
+                        Collections.emptySortedMap()));
+    }
+
+    @Test
+    public void testScaleDownAfterScaleUpDetection() throws Exception {
+        var op = new JobVertexID();
+        var scalingInfo = new AutoScalerInfo(new HashMap<>());
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+        conf.set(AutoScalerOptions.SCALE_UP_GRACE_PERIOD, Duration.ofMinutes(1));
+
+        scalingDecisionExecutor.scaleResource(
+                flinkDep, scalingInfo, conf, Map.of(op, evaluated(5, 100, 50)));
+        assertEquals(Map.of(op, 10), getScaledParallelism(flinkDep));
+
+        // Should now allow scale back down immediately
+        scalingDecisionExecutor.scaleResource(
+                flinkDep, scalingInfo, conf, Map.of(op, evaluated(10, 50, 100)));
+        assertEquals(Map.of(op, 10), getScaledParallelism(flinkDep));
+
+        // Pass some time...
+        var clock = Clock.offset(Clock.systemDefaultZone(), Duration.ofSeconds(61));
+        scalingDecisionExecutor.setClock(clock);
+        scalingDecisionExecutor.scaleResource(
+                flinkDep, scalingInfo, conf, Map.of(op, evaluated(10, 50, 100)));
+        assertEquals(Map.of(op, 5), getScaledParallelism(flinkDep));
+
+        // Allow immediate scale up
+        scalingDecisionExecutor.scaleResource(
+                flinkDep, scalingInfo, conf, Map.of(op, evaluated(5, 100, 50)));
+        assertEquals(Map.of(op, 10), getScaledParallelism(flinkDep));
+    }
+
+    private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(
+            int parallelism, double target, double procRate, double catchupRate) {
+        var metrics = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
+        metrics.put(ScalingMetric.PARALLELISM, EvaluatedScalingMetric.of(parallelism));
+        metrics.put(ScalingMetric.MAX_PARALLELISM, EvaluatedScalingMetric.of(720));
+        metrics.put(ScalingMetric.TARGET_DATA_RATE, new EvaluatedScalingMetric(target, target));
+        metrics.put(ScalingMetric.CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(catchupRate));
+        metrics.put(
+                ScalingMetric.TRUE_PROCESSING_RATE, new EvaluatedScalingMetric(procRate, procRate));
+        ScalingMetricEvaluator.computeProcessingRateThresholds(metrics, conf);
+        return metrics;
+    }
+
+    private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(
+            int parallelism, double target, double procRate) {
+        return evaluated(parallelism, target, procRate, 0.);
+    }
+
+    protected static Map<JobVertexID, Integer> getScaledParallelism(
+            AbstractFlinkResource<?, ?> resource) {
+
+        var conf = Configuration.fromMap(resource.getSpec().getFlinkConfiguration());
+        var overrides = conf.get(PARALLELISM_OVERRIDES);
+        if (overrides == null || overrides.isEmpty()) {
+            return null;
+        }
+
+        var out = new HashMap<JobVertexID, Integer>();
+
+        overrides.forEach((k, v) -> out.put(JobVertexID.fromHexString(k), Integer.parseInt(v)));
+        return out;
+    }
+}


[flink-kubernetes-operator] 06/09: [FLINK-30260][autoscaler] Add JobAutoScaler component and configs

Posted by gy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 53b1639ab2fe53f6e8b5aac707d325f86f54b603
Author: Matyas Orhidi <ma...@apple.com>
AuthorDate: Thu Dec 1 08:58:10 2022 +0100

    [FLINK-30260][autoscaler] Add JobAutoScaler component and configs
---
 .../operator/autoscaler/AutoScalerInfo.java        | 189 +++++++++++++++++++
 .../kubernetes/operator/autoscaler/Cleanup.java    |  30 +++
 .../operator/autoscaler/JobAutoScaler.java         | 204 +++++++++++++++++++++
 .../autoscaler/config/AutoScalerOptions.java       | 121 ++++++++++++
 4 files changed, 544 insertions(+)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
new file mode 100644
index 00000000..1236cc65
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.utils.JobVertexSerDeModule;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import lombok.SneakyThrows;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/** Class for encapsulating information stored for each resource when using the autoscaler. */
+public class AutoScalerInfo {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JobAutoScaler.class);
+
+    private static final int SCALING_HISTORY_MAX_COUNT = 5;
+    private static final Duration SCALING_HISTORY_MAX_DURATION = Duration.ofHours(24);
+
+    private static final String LABEL_COMPONENT_AUTOSCALER = "autoscaler";
+
+    private static final String COLLECTED_METRICS_KEY = "collectedMetrics";
+    private static final String SCALING_HISTORY_KEY = "scalingHistory";
+    private static final String JOB_START_TS_KEY = "jobStartTs";
+
+    private static final ObjectMapper YAML_MAPPER =
+            new ObjectMapper(new YAMLFactory())
+                    .registerModule(new JavaTimeModule())
+                    .registerModule(new JobVertexSerDeModule());
+
+    private final ConfigMap configMap;
+    private Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory;
+
+    public AutoScalerInfo(ConfigMap configMap) {
+        this.configMap = configMap;
+    }
+
+    @VisibleForTesting
+    public AutoScalerInfo(Map<String, String> data) {
+        this(new ConfigMap());
+        configMap.setData(Preconditions.checkNotNull(data));
+    }
+
+    @SneakyThrows
+    public SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> getMetricHistory() {
+        var historyYaml = configMap.getData().get(COLLECTED_METRICS_KEY);
+        if (historyYaml == null) {
+            return new TreeMap<>();
+        }
+
+        return YAML_MAPPER.readValue(historyYaml, new TypeReference<>() {});
+    }
+
+    @SneakyThrows
+    public void updateMetricHistory(
+            Instant jobStartTs,
+            SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> history) {
+        configMap.getData().put(COLLECTED_METRICS_KEY, YAML_MAPPER.writeValueAsString(history));
+        configMap.getData().put(JOB_START_TS_KEY, jobStartTs.toString());
+    }
+
+    public void clearMetricHistory() {
+        configMap.getData().remove(COLLECTED_METRICS_KEY);
+        configMap.getData().remove(JOB_START_TS_KEY);
+    }
+
+    public Optional<Instant> getJobStartTs() {
+        return Optional.ofNullable(configMap.getData().get(JOB_START_TS_KEY)).map(Instant::parse);
+    }
+
+    @SneakyThrows
+    public Map<JobVertexID, SortedMap<Instant, ScalingSummary>> getScalingHistory() {
+        if (scalingHistory != null) {
+            return scalingHistory;
+        }
+        var yaml = configMap.getData().get(SCALING_HISTORY_KEY);
+        scalingHistory =
+                yaml == null
+                        ? new HashMap<>()
+                        : YAML_MAPPER.readValue(yaml, new TypeReference<>() {});
+        return scalingHistory;
+    }
+
+    @SneakyThrows
+    public void addToScalingHistory(Instant now, Map<JobVertexID, ScalingSummary> summaries) {
+        // Make sure to init history
+        getScalingHistory();
+
+        summaries.forEach(
+                (id, summary) ->
+                        scalingHistory.computeIfAbsent(id, j -> new TreeMap<>()).put(now, summary));
+
+        var entryIt = scalingHistory.entrySet().iterator();
+        while (entryIt.hasNext()) {
+            var entry = entryIt.next();
+            // Limit how long past scaling decisions are remembered
+            entry.setValue(entry.getValue().tailMap(now.minus(SCALING_HISTORY_MAX_DURATION)));
+            var vertexHistory = entry.getValue();
+            while (vertexHistory.size() > SCALING_HISTORY_MAX_COUNT) {
+                vertexHistory.remove(vertexHistory.lastKey());
+            }
+            if (vertexHistory.isEmpty()) {
+                entryIt.remove();
+            }
+        }
+
+        configMap
+                .getData()
+                .put(SCALING_HISTORY_KEY, YAML_MAPPER.writeValueAsString(scalingHistory));
+    }
+
+    public void replaceInKubernetes(KubernetesClient client) {
+        client.resource(configMap).replace();
+    }
+
+    public static AutoScalerInfo forResource(
+            AbstractFlinkResource<?, ?> cr, KubernetesClient kubeClient) {
+
+        var objectMeta = new ObjectMeta();
+        objectMeta.setName("autoscaler-" + cr.getMetadata().getName());
+        objectMeta.setNamespace(cr.getMetadata().getNamespace());
+
+        ConfigMap infoCm =
+                getScalingInfoConfigMap(objectMeta, kubeClient)
+                        .orElseGet(
+                                () -> {
+                                    LOG.info("Creating scaling info config map");
+
+                                    objectMeta.setLabels(
+                                            Map.of(
+                                                    Constants.LABEL_COMPONENT_KEY,
+                                                    LABEL_COMPONENT_AUTOSCALER,
+                                                    Constants.LABEL_APP_KEY,
+                                                    cr.getMetadata().getName()));
+                                    var cm = new ConfigMap();
+                                    cm.setMetadata(objectMeta);
+                                    cm.addOwnerReference(cr);
+                                    cm.setData(new HashMap<>());
+                                    return kubeClient.resource(cm).create();
+                                });
+
+        return new AutoScalerInfo(infoCm);
+    }
+
+    private static Optional<ConfigMap> getScalingInfoConfigMap(
+            ObjectMeta objectMeta, KubernetesClient kubeClient) {
+        return Optional.ofNullable(
+                kubeClient
+                        .configMaps()
+                        .inNamespace(objectMeta.getNamespace())
+                        .withName(objectMeta.getName())
+                        .get());
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/Cleanup.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/Cleanup.java
new file mode 100644
index 00000000..ae1b98bb
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/Cleanup.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+
+/** Cleanup interface for autoscaling related metadata. */
+public interface Cleanup {
+    /**
+     * Method is called when a custom resource is deleted.
+     *
+     * @param cr custom resource
+     */
+    void cleanup(AbstractFlinkResource<?, ?> cr);
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScaler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScaler.java
new file mode 100644
index 00000000..d9fb0041
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScaler.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
+
+/** Application and SessionJob autoscaler. */
+public class JobAutoScaler implements Cleanup {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JobAutoScaler.class);
+
+    private final KubernetesClient kubernetesClient;
+    private final FlinkConfigManager configManager;
+    private final ScalingMetricCollector metricsCollector;
+    private final ScalingMetricEvaluator evaluator;
+    private final ScalingExecutor scalingExecutor;
+    private final KubernetesOperatorMetricGroup metricGroup;
+
+    private final Map<ResourceID, Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>>
+            lastEvaluatedMetrics = new ConcurrentHashMap<>();
+    private final Map<ResourceID, Set<JobVertexID>> registeredMetrics = new ConcurrentHashMap<>();
+
+    public JobAutoScaler(
+            KubernetesClient kubernetesClient,
+            FlinkConfigManager configManager,
+            ScalingMetricCollector metricsCollector,
+            ScalingMetricEvaluator evaluator,
+            ScalingExecutor scalingExecutor,
+            KubernetesOperatorMetricGroup metricGroup) {
+        this.kubernetesClient = kubernetesClient;
+
+        this.configManager = configManager;
+        this.metricsCollector = metricsCollector;
+        this.evaluator = evaluator;
+        this.scalingExecutor = scalingExecutor;
+        this.metricGroup = metricGroup;
+    }
+
+    @Override
+    public void cleanup(AbstractFlinkResource<?, ?> cr) {
+        LOG.info("Cleaning up autoscaling meta data");
+        metricsCollector.cleanup(cr);
+        scalingExecutor.cleanup(cr);
+        var resourceId = ResourceID.fromResource(cr);
+        lastEvaluatedMetrics.remove(resourceId);
+        registeredMetrics.remove(resourceId);
+    }
+
+    public boolean scale(
+            AbstractFlinkResource<?, ?> resource,
+            FlinkService flinkService,
+            Configuration conf,
+            Context<?> context) {
+
+        if (resource.getSpec().getJob() == null || !conf.getBoolean(AUTOSCALER_ENABLED)) {
+            LOG.info("Job autoscaler is disabled");
+            return false;
+        }
+
+        if (!resource.getStatus().getJobStatus().getState().equals(JobStatus.RUNNING.name())) {
+            LOG.info("Job autoscaler is waiting for RUNNING job     state");
+            return false;
+        }
+
+        try {
+            var autoScalerInfo = AutoScalerInfo.forResource(resource, kubernetesClient);
+
+            LOG.info("Collecting metrics for scaling");
+            var collectedMetrics =
+                    metricsCollector.getMetricsHistory(
+                            resource, autoScalerInfo, flinkService, conf);
+
+            if (collectedMetrics == null || collectedMetrics.getMetricHistory().isEmpty()) {
+                LOG.info("No metrics were collected. Skipping scaling step");
+                return false;
+            }
+
+            LOG.debug("Evaluating scaling metrics for {}", collectedMetrics);
+            var evaluatedMetrics = evaluator.evaluate(conf, collectedMetrics);
+            LOG.info("Scaling metrics evaluated: {}", evaluatedMetrics);
+            lastEvaluatedMetrics.put(ResourceID.fromResource(resource), evaluatedMetrics);
+            registerResourceScalingMetrics(resource);
+
+            var specAdjusted =
+                    scalingExecutor.scaleResource(resource, autoScalerInfo, conf, evaluatedMetrics);
+            autoScalerInfo.replaceInKubernetes(kubernetesClient);
+            return specAdjusted;
+        } catch (Exception e) {
+            LOG.error("Error while scaling resource", e);
+            return false;
+        }
+    }
+
+    private void registerResourceScalingMetrics(AbstractFlinkResource<?, ?> resource) {
+        var resourceId = ResourceID.fromResource(resource);
+        var scalerGroup =
+                metricGroup
+                        .createResourceNamespaceGroup(
+                                configManager.getDefaultConfig(),
+                                resource.getClass(),
+                                resource.getMetadata().getNamespace())
+                        .createResourceNamespaceGroup(
+                                configManager.getDefaultConfig(), resource.getMetadata().getName())
+                        .addGroup("AutoScaler");
+
+        lastEvaluatedMetrics
+                .get(resourceId)
+                .forEach(
+                        (jobVertexID, evaluated) -> {
+                            if (!registeredMetrics
+                                    .computeIfAbsent(resourceId, r -> new HashSet<>())
+                                    .add(jobVertexID)) {
+                                return;
+                            }
+                            LOG.info("Registering scaling metrics for job vertex {}", jobVertexID);
+                            var jobVertexMg =
+                                    scalerGroup.addGroup("jobVertexID", jobVertexID.toHexString());
+
+                            evaluated.forEach(
+                                    (sm, esm) -> {
+                                        var smGroup = jobVertexMg.addGroup(sm.name());
+
+                                        smGroup.gauge(
+                                                "Current",
+                                                () ->
+                                                        Optional.ofNullable(
+                                                                        lastEvaluatedMetrics.get(
+                                                                                resourceId))
+                                                                .map(m -> m.get(jobVertexID))
+                                                                .map(
+                                                                        metrics ->
+                                                                                metrics.get(sm)
+                                                                                        .getCurrent())
+                                                                .orElse(null));
+
+                                        if (sm.isCalculateAverage()) {
+                                            smGroup.gauge(
+                                                    "Average",
+                                                    () ->
+                                                            Optional.ofNullable(
+                                                                            lastEvaluatedMetrics
+                                                                                    .get(
+                                                                                            resourceId))
+                                                                    .map(m -> m.get(jobVertexID))
+                                                                    .map(
+                                                                            metrics ->
+                                                                                    metrics.get(sm)
+                                                                                            .getAverage())
+                                                                    .orElse(null));
+                                        }
+                                    });
+                        });
+    }
+
+    public static JobAutoScaler create(
+            KubernetesClient kubernetesClient,
+            FlinkConfigManager configManager,
+            KubernetesOperatorMetricGroup metricGroup) {
+        return new JobAutoScaler(
+                kubernetesClient,
+                configManager,
+                new RestApiMetricsCollector(),
+                new ScalingMetricEvaluator(),
+                new ScalingExecutor(kubernetesClient),
+                metricGroup);
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
new file mode 100644
index 00000000..d9a0b86b
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler.config;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+import java.time.Duration;
+
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.operatorConfig;
+
+/** Config options related to the autoscaler module. */
+public class AutoScalerOptions {
+
+    private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
+        return operatorConfig("job.autoscaler." + key);
+    }
+
+    public static final ConfigOption<Boolean> AUTOSCALER_ENABLED =
+            autoScalerConfig("enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Enable job autoscaler module.");
+
+    public static final ConfigOption<Boolean> SCALING_ENABLED =
+            autoScalerConfig("scaling.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Enable vertex scaling execution by the autoscaler. If disabled, the autoscaler will only collect metrics and evaluate the suggested parallelism for each vertex but will not upgrade the jobs.");
+
+    public static final ConfigOption<Duration> METRICS_WINDOW =
+            autoScalerConfig("metrics.window")
+                    .durationType()
+                    .defaultValue(Duration.ofMinutes(5))
+                    .withDescription("Scaling metrics aggregation window size.");
+
+    public static final ConfigOption<Duration> STABILIZATION_INTERVAL =
+            autoScalerConfig("stabilization.interval")
+                    .durationType()
+                    .defaultValue(Duration.ofMinutes(5))
+                    .withDescription(
+                            "Stabilization period in which no new scaling will be executed");
+
+    public static final ConfigOption<Boolean> SOURCE_SCALING_ENABLED =
+            autoScalerConfig("scaling.sources.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Whether to enable scaling source vertices. "
+                                    + "Source vertices set the baseline ingestion rate for the processing based on the backlog size. "
+                                    + "If disabled, only regular job vertices will be scaled and source vertices will be unchanged.");
+
+    public static final ConfigOption<Double> TARGET_UTILIZATION =
+            autoScalerConfig("target.utilization")
+                    .doubleType()
+                    .defaultValue(0.7)
+                    .withDescription("Target vertex utilization");
+
+    public static final ConfigOption<Double> TARGET_UTILIZATION_BOUNDARY =
+            autoScalerConfig("target.utilization.boundary")
+                    .doubleType()
+                    .defaultValue(0.1)
+                    .withDescription(
+                            "Target vertex utilization boundary. Scaling won't be performed if utilization is within (target - boundary, target + boundary)");
+
+    public static final ConfigOption<Duration> SCALE_UP_GRACE_PERIOD =
+            autoScalerConfig("scale-up.grace-period")
+                    .durationType()
+                    .defaultValue(Duration.ofMinutes(10))
+                    .withDescription("Period in which no scale down is allowed after a scale up");
+
+    public static final ConfigOption<Integer> VERTEX_MIN_PARALLELISM =
+            autoScalerConfig("vertex.min-parallelism")
+                    .intType()
+                    .defaultValue(1)
+                    .withDescription("The minimum parallelism the autoscaler can use.");
+
+    public static final ConfigOption<Integer> VERTEX_MAX_PARALLELISM =
+            autoScalerConfig("vertex.max-parallelism")
+                    .intType()
+                    .defaultValue(Integer.MAX_VALUE)
+                    .withDescription(
+                            "The maximum parallelism the autoscaler can use. Note that this limit will be ignored if it is higher than the max parallelism configured in the Flink config or directly on each operator.");
+
+    public static final ConfigOption<Double> MAX_SCALE_DOWN_FACTOR =
+            autoScalerConfig("scale-down.max-factor")
+                    .doubleType()
+                    .defaultValue(0.6)
+                    .withDescription(
+                            "Max scale down factor. 1 means no limit on scale down, 0.6 means job can only be scaled down with 60% of the original parallelism.");
+
+    public static final ConfigOption<Duration> CATCH_UP_DURATION =
+            autoScalerConfig("catch-up.duration")
+                    .durationType()
+                    .defaultValue(Duration.ofMinutes(10))
+                    .withDescription(
+                            "The target duration for fully processing any backlog after a scaling operation. Set to 0 to disable backlog based scaling.");
+
+    public static final ConfigOption<Duration> RESTART_TIME =
+            autoScalerConfig("restart.time")
+                    .durationType()
+                    .defaultValue(Duration.ofMinutes(5))
+                    .withDescription(
+                            "Expected restart time to be used until the operator can determine it reliably from history.");
+}


[flink-kubernetes-operator] 02/09: [FLINK-30260][autoscaler] Utilities for collecting and computing metrics

Posted by gy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 5aa9956e7c61a5d58fb70a5ef8dd0ccb2dcabf53
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Thu Dec 1 08:55:50 2022 +0100

    [FLINK-30260][autoscaler] Utilities for collecting and computing metrics
---
 .../operator/autoscaler/topology/JobTopology.java  | 154 +++++++++++++++++++++
 .../operator/autoscaler/topology/VertexInfo.java   |  37 +++++
 .../operator/autoscaler/utils/AutoScalerUtils.java |  70 ++++++++++
 .../autoscaler/utils/JobVertexSerDeModule.java     |  54 ++++++++
 .../operator/autoscaler/JobTopologyTest.java       |  99 +++++++++++++
 5 files changed, 414 insertions(+)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/JobTopology.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/JobTopology.java
new file mode 100644
index 00000000..3f2462da
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/JobTopology.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler.topology;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/** Structure representing information about the jobgraph that is relevant for scaling. */
+@ToString
+@EqualsAndHashCode
+public class JobTopology {
+
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    @Getter private final ImmutableMap<JobVertexID, Set<JobVertexID>> inputs;
+    @Getter private final ImmutableMap<JobVertexID, Set<JobVertexID>> outputs;
+    @Getter private final ImmutableMap<JobVertexID, Integer> parallelisms;
+    private final ImmutableMap<JobVertexID, Integer> originalMaxParallelism;
+    @Getter private final Map<JobVertexID, Integer> maxParallelisms;
+
+    public JobTopology(VertexInfo... vertexInfo) {
+        this(Set.of(vertexInfo));
+    }
+
+    public JobTopology(Set<VertexInfo> vertexInfo) {
+
+        Map<JobVertexID, Set<JobVertexID>> vertexOutputs = new HashMap<>();
+        Map<JobVertexID, Set<JobVertexID>> vertexInputs = new HashMap<>();
+        Map<JobVertexID, Integer> vertexParallelism = new HashMap<>();
+        maxParallelisms = new HashMap<>();
+
+        vertexInfo.forEach(
+                info -> {
+                    var vertexId = info.getId();
+                    vertexParallelism.put(vertexId, info.getParallelism());
+                    maxParallelisms.put(vertexId, info.getMaxParallelism());
+
+                    vertexInputs.put(vertexId, info.getInputs());
+                    vertexOutputs.computeIfAbsent(vertexId, id -> new HashSet<>());
+                    info.getInputs()
+                            .forEach(
+                                    inputId ->
+                                            vertexOutputs
+                                                    .computeIfAbsent(inputId, id -> new HashSet<>())
+                                                    .add(vertexId));
+                });
+
+        var outputBuilder = ImmutableMap.<JobVertexID, Set<JobVertexID>>builder();
+        vertexOutputs.forEach((id, l) -> outputBuilder.put(id, ImmutableSet.copyOf(l)));
+        outputs = outputBuilder.build();
+
+        var inputBuilder = ImmutableMap.<JobVertexID, Set<JobVertexID>>builder();
+        vertexInputs.forEach((id, l) -> inputBuilder.put(id, ImmutableSet.copyOf(l)));
+        this.inputs = inputBuilder.build();
+
+        this.parallelisms = ImmutableMap.copyOf(vertexParallelism);
+        this.originalMaxParallelism = ImmutableMap.copyOf(maxParallelisms);
+    }
+
+    public boolean isSource(JobVertexID jobVertexID) {
+        return getInputs().get(jobVertexID).isEmpty();
+    }
+
+    public void updateMaxParallelism(JobVertexID vertexID, int maxParallelism) {
+        maxParallelisms.put(
+                vertexID, Math.min(originalMaxParallelism.get(vertexID), maxParallelism));
+    }
+
+    public List<JobVertexID> getVerticesInTopologicalOrder() {
+        List<JobVertexID> sorted = new ArrayList<>(inputs.size());
+
+        Map<JobVertexID, List<JobVertexID>> remainingInputs = new HashMap<>(inputs.size());
+        inputs.forEach((v, l) -> remainingInputs.put(v, new ArrayList<>(l)));
+
+        while (!remainingInputs.isEmpty()) {
+            List<JobVertexID> verticesWithZeroIndegree = new ArrayList<>();
+            remainingInputs.forEach(
+                    (v, inputs) -> {
+                        if (inputs.isEmpty()) {
+                            verticesWithZeroIndegree.add(v);
+                        }
+                    });
+
+            verticesWithZeroIndegree.forEach(
+                    v -> {
+                        remainingInputs.remove(v);
+                        outputs.get(v).forEach(o -> remainingInputs.get(o).remove(v));
+                    });
+
+            sorted.addAll(verticesWithZeroIndegree);
+        }
+        return sorted;
+    }
+
+    public static JobTopology fromJsonPlan(
+            String jsonPlan, Map<JobVertexID, Integer> maxParallelismMap)
+            throws JsonProcessingException {
+        ObjectNode plan = objectMapper.readValue(jsonPlan, ObjectNode.class);
+        ArrayNode nodes = (ArrayNode) plan.get("nodes");
+
+        var vertexInfo = new HashSet<VertexInfo>();
+
+        for (JsonNode node : nodes) {
+            var vertexId = JobVertexID.fromHexString(node.get("id").asText());
+            var inputList = new HashSet<JobVertexID>();
+            vertexInfo.add(
+                    new VertexInfo(
+                            vertexId,
+                            inputList,
+                            node.get("parallelism").asInt(),
+                            maxParallelismMap.get(vertexId)));
+            if (node.has("inputs")) {
+                for (JsonNode input : node.get("inputs")) {
+                    inputList.add(JobVertexID.fromHexString(input.get("id").asText()));
+                }
+            }
+        }
+
+        return new JobTopology(vertexInfo);
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/VertexInfo.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/VertexInfo.java
new file mode 100644
index 00000000..1137e9e6
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/VertexInfo.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler.topology;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import lombok.Value;
+
+import java.util.Set;
+
+/** Job vertex information. */
+@Value
+public class VertexInfo {
+
+    JobVertexID id;
+
+    Set<JobVertexID> inputs;
+
+    int parallelism;
+
+    int maxParallelism;
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtils.java
new file mode 100644
index 00000000..a1353f65
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler.utils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+
+import java.util.Map;
+
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
+
+/** AutoScaler utilities. */
+public class AutoScalerUtils {
+
+    public static double getTargetProcessingCapacity(
+            Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+            Configuration conf,
+            double targetUtilization,
+            boolean withRestart) {
+
+        // Target = Lag Catchup Rate + Restart Catchup Rate + Processing at utilization
+        // Target = LAG/CATCH_UP + INPUT_RATE*RESTART/CATCH_UP + INPUT_RATE/TARGET_UTIL
+
+        double lagCatchupTargetRate = evaluatedMetrics.get(CATCH_UP_DATA_RATE).getCurrent();
+        if (Double.isNaN(lagCatchupTargetRate)) {
+            return Double.NaN;
+        }
+
+        double catchUpTargetSec = conf.get(AutoScalerOptions.CATCH_UP_DURATION).toSeconds();
+        double restartTimeSec = conf.get(AutoScalerOptions.RESTART_TIME).toSeconds();
+
+        targetUtilization = Math.max(0., targetUtilization);
+        targetUtilization = Math.min(1., targetUtilization);
+
+        double avgInputTargetRate = evaluatedMetrics.get(TARGET_DATA_RATE).getAverage();
+        if (Double.isNaN(avgInputTargetRate)) {
+            return Double.NaN;
+        }
+
+        if (targetUtilization == 0) {
+            return Double.POSITIVE_INFINITY;
+        }
+
+        double restartCatchupRate =
+                !withRestart || catchUpTargetSec == 0
+                        ? 0
+                        : (avgInputTargetRate * restartTimeSec) / catchUpTargetSec;
+        double inputTargetAtUtilization = avgInputTargetRate / targetUtilization;
+
+        return Math.round(lagCatchupTargetRate + restartCatchupRate + inputTargetAtUtilization);
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/JobVertexSerDeModule.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/JobVertexSerDeModule.java
new file mode 100644
index 00000000..5a5195a0
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/JobVertexSerDeModule.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler.utils;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.KeyDeserializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+
+import java.io.IOException;
+
+/** Jackson serializer module for {@link JobVertexID}. */
+public class JobVertexSerDeModule extends SimpleModule {
+
+    public JobVertexSerDeModule() {
+        this.addKeySerializer(JobVertexID.class, new JobVertexIdKeySerializer());
+        this.addKeyDeserializer(JobVertexID.class, new JobVertexIdKeyDeserializer());
+    }
+
+    private static class JobVertexIdKeySerializer extends JsonSerializer<JobVertexID> {
+        @Override
+        public void serialize(JobVertexID value, JsonGenerator jgen, SerializerProvider provider)
+                throws IOException {
+
+            jgen.writeFieldName(value.toHexString());
+        }
+    }
+
+    private static class JobVertexIdKeyDeserializer extends KeyDeserializer {
+        @Override
+        public Object deserializeKey(String s, DeserializationContext deserializationContext) {
+            return JobVertexID.fromHexString(s);
+        }
+    }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java
new file mode 100644
index 00000000..406c2e30
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Test for JobTopology parsing logic. */
+public class JobTopologyTest {
+
+    @Test
+    public void testTopologyFromJson() throws JsonProcessingException {
+        var env = StreamExecutionEnvironment.getExecutionEnvironment();
+        var s1 = env.fromElements(1).name("s1");
+        var s2 = env.fromElements(1).name("s2");
+
+        s1.union(s2)
+                .shuffle()
+                .map(i -> i)
+                .name("map1")
+                .setParallelism(2)
+                .shuffle()
+                .print()
+                .name("sink1")
+                .setParallelism(3);
+
+        var s3 = env.fromElements(1).name("s3");
+        var map2 = s3.shuffle().map(i -> i).name("map2").setParallelism(4).shuffle();
+
+        map2.print().name("sink2").setParallelism(5);
+        map2.print().name("sink3").setParallelism(6);
+
+        var jobGraph = env.getStreamGraph().getJobGraph();
+        var jsonPlan = JsonPlanGenerator.generatePlan(jobGraph);
+
+        var vertices = new HashMap<String, JobVertexID>();
+        var maxParallelism = new HashMap<JobVertexID, Integer>();
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            vertices.put(vertex.getName(), vertex.getID());
+            maxParallelism.put(
+                    vertex.getID(),
+                    vertex.getMaxParallelism() != -1
+                            ? vertex.getMaxParallelism()
+                            : SchedulerBase.getDefaultMaxParallelism(vertex));
+        }
+
+        JobTopology jobTopology = JobTopology.fromJsonPlan(jsonPlan, maxParallelism);
+
+        assertTrue(jobTopology.getOutputs().get(vertices.get("Sink: sink1")).isEmpty());
+        assertTrue(jobTopology.getOutputs().get(vertices.get("Sink: sink2")).isEmpty());
+        assertTrue(jobTopology.getOutputs().get(vertices.get("Sink: sink3")).isEmpty());
+
+        assertEquals(
+                Set.of(vertices.get("map1")),
+                jobTopology.getOutputs().get(vertices.get("Source: s1")));
+        assertEquals(
+                Set.of(vertices.get("map1")),
+                jobTopology.getOutputs().get(vertices.get("Source: s2")));
+        assertEquals(
+                Set.of(vertices.get("map2")),
+                jobTopology.getOutputs().get(vertices.get("Source: s3")));
+
+        assertEquals(
+                Set.of(vertices.get("Sink: sink2"), vertices.get("Sink: sink3")),
+                jobTopology.getOutputs().get(vertices.get("map2")));
+
+        assertEquals(2, jobTopology.getParallelisms().get(vertices.get("map1")));
+        assertEquals(4, jobTopology.getParallelisms().get(vertices.get("map2")));
+        jobTopology.getMaxParallelisms().forEach((v, p) -> assertEquals(128, p));
+    }
+}


[flink-kubernetes-operator] 08/09: [FLINK-30260][autoscaler] Autoscaler example application

Posted by gy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit c5e437f35ff88133e65b07d2b2e52638fa6a1f05
Author: Matyas Orhidi <ma...@apple.com>
AuthorDate: Thu Dec 1 08:49:25 2022 +0100

    [FLINK-30260][autoscaler] Autoscaler example application
---
 examples/autoscaling/Dockerfile                    |  20 +++
 examples/autoscaling/autoscaling.yaml              |  63 +++++++
 examples/autoscaling/pom.xml                       | 188 +++++++++++++++++++++
 .../main/java/autoscaling/AutoscalingExample.java  |  40 +++++
 .../src/main/resources/log4j2.properties           |  28 +++
 5 files changed, 339 insertions(+)

diff --git a/examples/autoscaling/Dockerfile b/examples/autoscaling/Dockerfile
new file mode 100644
index 00000000..63f78418
--- /dev/null
+++ b/examples/autoscaling/Dockerfile
@@ -0,0 +1,20 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+FROM ghcr.io/apache/flink-docker:1.17-SNAPSHOT-scala_2.12-java11-debian
+COPY ./target/autoscaling*.jar /opt/flink/usrlib/autoscaling.jar
diff --git a/examples/autoscaling/autoscaling.yaml b/examples/autoscaling/autoscaling.yaml
new file mode 100644
index 00000000..6f638c26
--- /dev/null
+++ b/examples/autoscaling/autoscaling.yaml
@@ -0,0 +1,63 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+apiVersion: flink.apache.org/v1beta1
+kind: FlinkDeployment
+metadata:
+  name: autoscaling-example
+spec:
+  image: autoscaling-example
+  flinkVersion: v1_16
+  flinkConfiguration:
+    kubernetes.operator.job.autoscaler.enabled: "true"
+    kubernetes.operator.job.autoscaler.scaling.enabled: "true"
+    kubernetes.operator.job.autoscaler.scaling.sources.enabled: "false"
+    kubernetes.operator.job.autoscaler.stabilization.interval: "1m"
+
+    taskmanager.numberOfTaskSlots: "2"
+    pipeline.max-parallelism: "720"
+    state.savepoints.dir: file:///flink-data/savepoints
+    state.checkpoints.dir: file:///flink-data/checkpoints
+    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+    high-availability.storageDir: file:///flink-data/ha
+    execution.checkpointing.interval: "1m"
+  serviceAccount: flink
+  jobManager:
+    resource:
+      memory: "2048m"
+      cpu: 0.5
+  taskManager:
+    resource:
+      memory: "2048m"
+      cpu: 1
+  podTemplate:
+    spec:
+      containers:
+        - name: flink-main-container
+          volumeMounts:
+            - mountPath: /flink-data
+              name: flink-volume
+      volumes:
+        - name: flink-volume
+          hostPath:
+            path: /tmp/flink
+            type: Directory
+  job:
+    jarURI: local:///opt/flink/usrlib/autoscaling.jar
+    parallelism: 1
+    upgradeMode: last-state
diff --git a/examples/autoscaling/pom.xml b/examples/autoscaling/pom.xml
new file mode 100644
index 00000000..250c39eb
--- /dev/null
+++ b/examples/autoscaling/pom.xml
@@ -0,0 +1,188 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-kubernetes-operator-parent</artifactId>
+        <version>1.4-SNAPSHOT</version>
+        <relativePath>../..</relativePath>
+    </parent>
+
+    <artifactId>autoscaling</artifactId>
+
+    <name>Flink Autoscaler Test Job</name>
+
+    <!-- Given that this is an example skip maven deployment -->
+    <properties>
+        <maven.deploy.skip>true</maven.deploy.skip>
+    </properties>
+
+    <dependencies>
+
+        <!-- This dependency is provided, because it should not be packaged into the JAR file. -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- Add logging framework, to produce console output when running in the IDE. -->
+        <!-- These dependencies are excluded from the application JAR by default. -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>${slf4j.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-slf4j-impl</artifactId>
+            <version>${log4j.version}</version>
+            <scope>runtime</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
+            <version>${log4j.version}</version>
+            <scope>runtime</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+            <version>${log4j.version}</version>
+            <scope>runtime</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+
+            <!-- Java Compiler -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.1</version>
+                <configuration>
+                    <source>${maven.compiler.source}</source>
+                    <target>${maven.compiler.source}</target>
+                </configuration>
+            </plugin>
+
+            <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
+            <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>3.0.0</version>
+                <executions>
+                    <!-- Run shade goal on package phase -->
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <artifactSet>
+                                <excludes>
+                                    <exclude>org.apache.flink:flink-shaded-force-shading</exclude>
+                                    <exclude>com.google.code.findbugs:jsr305</exclude>
+                                    <exclude>org.slf4j:*</exclude>
+                                    <exclude>org.apache.logging.log4j:*</exclude>
+                                </excludes>
+                            </artifactSet>
+                            <filters>
+                                <filter>
+                                    <!-- Do not copy the signatures in the META-INF folder.
+                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <transformers>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    <mainClass>autoscaling.AutoscalingExample</mainClass>
+                                </transformer>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+
+        <pluginManagement>
+            <plugins>
+
+                <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
+                <plugin>
+                    <groupId>org.eclipse.m2e</groupId>
+                    <artifactId>lifecycle-mapping</artifactId>
+                    <version>1.0.0</version>
+                    <configuration>
+                        <lifecycleMappingMetadata>
+                            <pluginExecutions>
+                                <pluginExecution>
+                                    <pluginExecutionFilter>
+                                        <groupId>org.apache.maven.plugins</groupId>
+                                        <artifactId>maven-shade-plugin</artifactId>
+                                        <versionRange>[3.0.0,)</versionRange>
+                                        <goals>
+                                            <goal>shade</goal>
+                                        </goals>
+                                    </pluginExecutionFilter>
+                                    <action>
+                                        <ignore/>
+                                    </action>
+                                </pluginExecution>
+                                <pluginExecution>
+                                    <pluginExecutionFilter>
+                                        <groupId>org.apache.maven.plugins</groupId>
+                                        <artifactId>maven-compiler-plugin</artifactId>
+                                        <versionRange>[3.1,)</versionRange>
+                                        <goals>
+                                            <goal>testCompile</goal>
+                                            <goal>compile</goal>
+                                        </goals>
+                                    </pluginExecutionFilter>
+                                    <action>
+                                        <ignore/>
+                                    </action>
+                                </pluginExecution>
+                            </pluginExecutions>
+                        </lifecycleMappingMetadata>
+                    </configuration>
+                </plugin>
+            </plugins>
+        </pluginManagement>
+    </build>
+</project>
diff --git a/examples/autoscaling/src/main/java/autoscaling/AutoscalingExample.java b/examples/autoscaling/src/main/java/autoscaling/AutoscalingExample.java
new file mode 100644
index 00000000..c74fd52c
--- /dev/null
+++ b/examples/autoscaling/src/main/java/autoscaling/AutoscalingExample.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package autoscaling;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/** Autoscaling Example. */
+public class AutoscalingExample {
+    public static void main(String[] args) throws Exception {
+        var env = StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<Long> stream = env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE);
+        stream =
+                stream.shuffle()
+                        .map(
+                                i -> {
+                                    // Add sleep to artificially slow down processing
+                                    // Thread.sleep(sleep);
+                                    return i;
+                                });
+        stream.print();
+        env.execute("Autoscaling Example");
+    }
+}
diff --git a/examples/autoscaling/src/main/resources/log4j2.properties b/examples/autoscaling/src/main/resources/log4j2.properties
new file mode 100644
index 00000000..f0630a3b
--- /dev/null
+++ b/examples/autoscaling/src/main/resources/log4j2.properties
@@ -0,0 +1,28 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+rootLogger.level = WARN
+rootLogger.appenderRef.console.ref = ConsoleAppender
+
+logger.slowjob.name = slowjob
+logger.slowjob.level = INFO
+
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n


[flink-kubernetes-operator] 04/09: [FLINK-30260][autoscaler] Add scaling metric evaluation logic

Posted by gy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 0b3cc1312e9070ac0a28cea76e1c6d9284f52acd
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Thu Dec 1 08:59:46 2022 +0100

    [FLINK-30260][autoscaler] Add scaling metric evaluation logic
---
 .../autoscaler/ScalingMetricEvaluator.java         | 228 +++++++++++++
 .../autoscaler/BacklogBasedScalingTest.java        | 352 +++++++++++++++++++++
 .../autoscaler/ScalingMetricEvaluatorTest.java     | 227 +++++++++++++
 3 files changed, 807 insertions(+)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
new file mode 100644
index 00000000..fdf5e33f
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
+import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.math3.stat.StatUtils;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedMap;
+
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LAG;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.OUTPUT_RATIO;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_OUTPUT_RATE;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+
+/** Job scaling evaluator for autoscaler. */
+public class ScalingMetricEvaluator {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ScalingMetricEvaluator.class);
+
+    private Clock clock = Clock.systemDefaultZone();
+
+    public Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluate(
+            Configuration conf, CollectedMetrics collectedMetrics) {
+
+        var scalingOutput = new HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>();
+        var metricsHistory = collectedMetrics.getMetricHistory();
+        var topology = collectedMetrics.getJobTopology();
+
+        for (var vertex : topology.getVerticesInTopologicalOrder()) {
+            scalingOutput.put(
+                    vertex,
+                    computeVertexScalingSummary(
+                            conf, scalingOutput, metricsHistory, topology, vertex));
+        }
+
+        return scalingOutput;
+    }
+
+    @NotNull
+    private Map<ScalingMetric, EvaluatedScalingMetric> computeVertexScalingSummary(
+            Configuration conf,
+            HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> scalingOutput,
+            SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> metricsHistory,
+            JobTopology topology,
+            JobVertexID vertex) {
+
+        var latestVertexMetrics = metricsHistory.get(metricsHistory.lastKey()).get(vertex);
+
+        var evaluatedMetrics = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
+        computeTargetDataRate(
+                topology,
+                vertex,
+                conf,
+                scalingOutput,
+                metricsHistory,
+                latestVertexMetrics,
+                evaluatedMetrics);
+
+        evaluatedMetrics.put(
+                TRUE_PROCESSING_RATE,
+                new EvaluatedScalingMetric(
+                        latestVertexMetrics.get(TRUE_PROCESSING_RATE),
+                        getAverage(TRUE_PROCESSING_RATE, vertex, metricsHistory, conf)));
+
+        evaluatedMetrics.put(
+                PARALLELISM, EvaluatedScalingMetric.of(topology.getParallelisms().get(vertex)));
+        evaluatedMetrics.put(
+                MAX_PARALLELISM,
+                EvaluatedScalingMetric.of(topology.getMaxParallelisms().get(vertex)));
+
+        computeProcessingRateThresholds(evaluatedMetrics, conf);
+
+        var isSink = topology.getOutputs().get(vertex).isEmpty();
+        if (!isSink) {
+            evaluatedMetrics.put(
+                    TRUE_OUTPUT_RATE,
+                    new EvaluatedScalingMetric(
+                            latestVertexMetrics.get(TRUE_OUTPUT_RATE),
+                            getAverage(TRUE_OUTPUT_RATE, vertex, metricsHistory, conf)));
+            evaluatedMetrics.put(
+                    OUTPUT_RATIO,
+                    new EvaluatedScalingMetric(
+                            latestVertexMetrics.get(OUTPUT_RATIO),
+                            getAverage(OUTPUT_RATIO, vertex, metricsHistory, conf)));
+        }
+
+        return evaluatedMetrics;
+    }
+
+    @VisibleForTesting
+    protected static void computeProcessingRateThresholds(
+            Map<ScalingMetric, EvaluatedScalingMetric> metrics, Configuration conf) {
+
+        double utilizationBoundary = conf.getDouble(TARGET_UTILIZATION_BOUNDARY);
+
+        double scaleUpThreshold =
+                AutoScalerUtils.getTargetProcessingCapacity(
+                        metrics, conf, conf.get(TARGET_UTILIZATION) + utilizationBoundary, false);
+
+        double scaleDownThreshold =
+                AutoScalerUtils.getTargetProcessingCapacity(
+                        metrics, conf, conf.get(TARGET_UTILIZATION) - utilizationBoundary, true);
+
+        metrics.put(SCALE_UP_RATE_THRESHOLD, EvaluatedScalingMetric.of(scaleUpThreshold));
+        metrics.put(SCALE_DOWN_RATE_THRESHOLD, EvaluatedScalingMetric.of(scaleDownThreshold));
+    }
+
+    private void computeTargetDataRate(
+            JobTopology topology,
+            JobVertexID vertex,
+            Configuration conf,
+            HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> alreadyEvaluated,
+            SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> metricsHistory,
+            Map<ScalingMetric, Double> latestVertexMetrics,
+            Map<ScalingMetric, EvaluatedScalingMetric> out) {
+
+        boolean isSource = topology.getInputs().get(vertex).isEmpty();
+        if (isSource) {
+            double catchUpTargetSec = conf.get(AutoScalerOptions.CATCH_UP_DURATION).toSeconds();
+
+            var sourceRateMetric =
+                    latestVertexMetrics.containsKey(TARGET_DATA_RATE)
+                            ? TARGET_DATA_RATE
+                            : SOURCE_DATA_RATE;
+            if (!latestVertexMetrics.containsKey(sourceRateMetric)) {
+                throw new RuntimeException(
+                        "Cannot evaluate metrics without source target rate information");
+            }
+
+            out.put(
+                    TARGET_DATA_RATE,
+                    new EvaluatedScalingMetric(
+                            latestVertexMetrics.get(sourceRateMetric),
+                            getAverage(sourceRateMetric, vertex, metricsHistory, conf)));
+
+            double lag = latestVertexMetrics.getOrDefault(LAG, 0.);
+            double catchUpInputRate = catchUpTargetSec == 0 ? 0 : lag / catchUpTargetSec;
+            if (catchUpInputRate > 0) {
+                LOG.info(
+                        "Extra backlog processing input rate for {} is {}",
+                        vertex,
+                        catchUpInputRate);
+            }
+            out.put(CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(catchUpInputRate));
+        } else {
+            var inputs = topology.getInputs().get(vertex);
+            double sumCurrentTargetRate = 0;
+            double sumAvgTargetRate = 0;
+            double sumCatchUpDataRate = 0;
+            for (var inputVertex : inputs) {
+                var inputEvaluatedMetrics = alreadyEvaluated.get(inputVertex);
+                var inputTargetRate = inputEvaluatedMetrics.get(TARGET_DATA_RATE);
+                var outputRateMultiplier = inputEvaluatedMetrics.get(OUTPUT_RATIO).getAverage();
+                sumCurrentTargetRate += inputTargetRate.getCurrent() * outputRateMultiplier;
+                sumAvgTargetRate += inputTargetRate.getAverage() * outputRateMultiplier;
+                sumCatchUpDataRate +=
+                        inputEvaluatedMetrics.get(CATCH_UP_DATA_RATE).getCurrent()
+                                * outputRateMultiplier;
+            }
+            out.put(
+                    TARGET_DATA_RATE,
+                    new EvaluatedScalingMetric(sumCurrentTargetRate, sumAvgTargetRate));
+            out.put(CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(sumCatchUpDataRate));
+        }
+    }
+
+    private double getAverage(
+            ScalingMetric metric,
+            JobVertexID jobVertexId,
+            SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> metricsHistory,
+            Configuration conf) {
+        return StatUtils.mean(
+                metricsHistory
+                        .tailMap(clock.instant().minus(conf.get(AutoScalerOptions.METRICS_WINDOW)))
+                        .values().stream()
+                        .map(m -> m.get(jobVertexId))
+                        .filter(m -> m.containsKey(metric))
+                        .mapToDouble(m -> m.get(metric))
+                        .filter(d -> !Double.isNaN(d))
+                        .toArray());
+    }
+
+    @VisibleForTesting
+    protected void setClock(Clock clock) {
+        this.clock = Preconditions.checkNotNull(clock);
+    }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
new file mode 100644
index 00000000..1e1466c8
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.TestingFlinkService;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
+import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+/** Test for scaling metrics collection logic. */
+@EnableKubernetesMockClient(crud = true)
+public class BacklogBasedScalingTest {
+
+    private ScalingMetricEvaluator evaluator;
+    private TestingFlinkService service;
+    private TestingMetricsCollector metricsCollector;
+    private ScalingExecutor scalingExecutor;
+
+    private FlinkDeployment app;
+    private JobVertexID source1, sink;
+
+    private FlinkConfigManager confManager;
+    private JobAutoScaler autoscaler;
+
+    private KubernetesClient kubernetesClient;
+
+    @BeforeEach
+    public void setup() {
+        evaluator = new ScalingMetricEvaluator();
+        scalingExecutor = new ScalingExecutor(kubernetesClient);
+        service = new TestingFlinkService();
+
+        app = TestUtils.buildApplicationCluster();
+        app.getMetadata().setGeneration(1L);
+        app.getStatus().getJobStatus().setJobId(new JobID().toHexString());
+        kubernetesClient.resource(app).createOrReplace();
+
+        source1 = new JobVertexID();
+        sink = new JobVertexID();
+
+        metricsCollector =
+                new TestingMetricsCollector(
+                        new JobTopology(
+                                new VertexInfo(source1, Set.of(), 1, 720),
+                                new VertexInfo(sink, Set.of(source1), 1, 720)));
+
+        var defaultConf = new Configuration();
+        defaultConf.set(AutoScalerOptions.AUTOSCALER_ENABLED, true);
+        defaultConf.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ZERO);
+        defaultConf.set(AutoScalerOptions.RESTART_TIME, Duration.ofSeconds(1));
+        defaultConf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofSeconds(2));
+        defaultConf.set(AutoScalerOptions.SCALING_ENABLED, true);
+        defaultConf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
+        defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.8);
+        defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1);
+        defaultConf.set(AutoScalerOptions.SCALE_UP_GRACE_PERIOD, Duration.ZERO);
+
+        confManager = new FlinkConfigManager(defaultConf);
+        ReconciliationUtils.updateStatusForDeployedSpec(
+                app, confManager.getDeployConfig(app.getMetadata(), app.getSpec()));
+        app.getStatus().getJobStatus().setState(JobStatus.RUNNING.name());
+
+        autoscaler =
+                new JobAutoScaler(
+                        kubernetesClient,
+                        confManager,
+                        metricsCollector,
+                        evaluator,
+                        scalingExecutor,
+                        TestUtils.createTestMetricGroup(new Configuration()));
+    }
+
+    @Test
+    public void test() throws Exception {
+        var ctx = createAutoscalerTestContext();
+        var now = Instant.now();
+        setClocksTo(now);
+        app.getStatus().getJobStatus().setStartTime(String.valueOf(now.toEpochMilli()));
+        metricsCollector.setCurrentMetrics(
+                Map.of(
+                        source1,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, 850., Double.NaN),
+                                FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 500.),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 500.),
+                                FlinkMetric.PENDING_RECORDS,
+                                new AggregatedMetric(
+                                        "", Double.NaN, Double.NaN, Double.NaN, 2000.)),
+                        sink,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, 850., Double.NaN),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric(
+                                        "", Double.NaN, Double.NaN, Double.NaN, 500.))));
+
+        autoscaler.scale(app, service, confManager.getObserveConfig(app), ctx);
+        assertFalse(AutoScalerInfo.forResource(app, kubernetesClient).getMetricHistory().isEmpty());
+
+        now = now.plus(Duration.ofSeconds(1));
+        setClocksTo(now);
+        autoscaler.scale(app, service, confManager.getObserveConfig(app), ctx);
+
+        var scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
+        assertEquals(4, scaledParallelism.get(source1));
+        assertEquals(4, scaledParallelism.get(sink));
+
+        metricsCollector.setJobTopology(
+                new JobTopology(
+                        new VertexInfo(source1, Set.of(), 4, 24),
+                        new VertexInfo(sink, Set.of(source1), 4, 720)));
+        metricsCollector.setCurrentMetrics(
+                Map.of(
+                        source1,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, 1000., Double.NaN),
+                                FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 1800.),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 1800.),
+                                FlinkMetric.PENDING_RECORDS,
+                                new AggregatedMetric(
+                                        "", Double.NaN, Double.NaN, Double.NaN, 2500.)),
+                        sink,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, 1000., Double.NaN),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric(
+                                        "", Double.NaN, Double.NaN, Double.NaN, 1800.))));
+
+        now = now.plus(Duration.ofSeconds(1));
+        setClocksTo(now);
+        app.getStatus().getJobStatus().setStartTime(String.valueOf(now.toEpochMilli()));
+        autoscaler.scale(
+                app, service, confManager.getObserveConfig(app), createAutoscalerTestContext());
+        assertFalse(AutoScalerInfo.forResource(app, kubernetesClient).getMetricHistory().isEmpty());
+        scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
+        assertEquals(4, scaledParallelism.get(source1));
+        assertEquals(4, scaledParallelism.get(sink));
+
+        metricsCollector.setCurrentMetrics(
+                Map.of(
+                        source1,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, 1000., Double.NaN),
+                                FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 1800.),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 1800.),
+                                FlinkMetric.PENDING_RECORDS,
+                                new AggregatedMetric(
+                                        "", Double.NaN, Double.NaN, Double.NaN, 1200.)),
+                        sink,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, 1000., Double.NaN),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric(
+                                        "", Double.NaN, Double.NaN, Double.NaN, 1800.))));
+
+        now = now.plus(Duration.ofSeconds(1));
+        setClocksTo(now);
+        autoscaler.scale(
+                app, service, confManager.getObserveConfig(app), createAutoscalerTestContext());
+        assertFalse(AutoScalerInfo.forResource(app, kubernetesClient).getMetricHistory().isEmpty());
+        scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
+        assertEquals(4, scaledParallelism.get(source1));
+        assertEquals(4, scaledParallelism.get(sink));
+
+        // We have finally caught up to our original lag, time to scale down
+        metricsCollector.setCurrentMetrics(
+                Map.of(
+                        source1,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, 600., Double.NaN),
+                                FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 800.),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 800.),
+                                FlinkMetric.PENDING_RECORDS,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 0.)),
+                        sink,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, 600., Double.NaN),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric(
+                                        "", Double.NaN, Double.NaN, Double.NaN, 800.))));
+        now = now.plus(Duration.ofSeconds(1));
+        setClocksTo(now);
+        autoscaler.scale(
+                app, service, confManager.getObserveConfig(app), createAutoscalerTestContext());
+
+        scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
+        assertEquals(2, scaledParallelism.get(source1));
+        assertEquals(2, scaledParallelism.get(sink));
+        metricsCollector.setJobTopology(
+                new JobTopology(
+                        new VertexInfo(source1, Set.of(), 2, 24),
+                        new VertexInfo(sink, Set.of(source1), 2, 720)));
+
+        metricsCollector.setCurrentMetrics(
+                Map.of(
+                        source1,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, 1000., Double.NaN),
+                                FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 900.),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 900.),
+                                FlinkMetric.PENDING_RECORDS,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 500.)),
+                        sink,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, 1000., Double.NaN),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric(
+                                        "", Double.NaN, Double.NaN, Double.NaN, 900.))));
+        now = now.plus(Duration.ofSeconds(1));
+        setClocksTo(now);
+        app.getStatus().getJobStatus().setStartTime(String.valueOf(now.toEpochMilli()));
+        autoscaler.scale(
+                app, service, confManager.getObserveConfig(app), createAutoscalerTestContext());
+        scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
+        assertEquals(2, scaledParallelism.get(source1));
+        assertEquals(2, scaledParallelism.get(sink));
+
+        metricsCollector.setCurrentMetrics(
+                Map.of(
+                        source1,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, 1000., Double.NaN),
+                                FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 900.),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 900.),
+                                FlinkMetric.PENDING_RECORDS,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 100.)),
+                        sink,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, 1000., Double.NaN),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric(
+                                        "", Double.NaN, Double.NaN, Double.NaN, 900.))));
+        now = now.plus(Duration.ofSeconds(1));
+        setClocksTo(now);
+        autoscaler.scale(
+                app, service, confManager.getObserveConfig(app), createAutoscalerTestContext());
+        scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
+        assertEquals(2, scaledParallelism.get(source1));
+        assertEquals(2, scaledParallelism.get(sink));
+
+        metricsCollector.setCurrentMetrics(
+                Map.of(
+                        source1,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, 500., Double.NaN),
+                                FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 500.),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 500.),
+                                FlinkMetric.PENDING_RECORDS,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 0.)),
+                        sink,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, 500., Double.NaN),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric(
+                                        "", Double.NaN, Double.NaN, Double.NaN, 500.))));
+        now = now.plus(Duration.ofSeconds(1));
+        setClocksTo(now);
+        autoscaler.scale(
+                app, service, confManager.getObserveConfig(app), createAutoscalerTestContext());
+        scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
+        assertEquals(2, scaledParallelism.get(source1));
+        assertEquals(2, scaledParallelism.get(sink));
+    }
+
+    private void setClocksTo(Instant time) {
+        var clock = Clock.fixed(time, ZoneId.systemDefault());
+        metricsCollector.setClock(clock);
+        evaluator.setClock(clock);
+        scalingExecutor.setClock(clock);
+    }
+
+    @NotNull
+    private TestUtils.TestingContext<HasMetadata> createAutoscalerTestContext() {
+        return new TestUtils.TestingContext<>() {
+            public <T1> Set<T1> getSecondaryResources(Class<T1> aClass) {
+                return (Set)
+                        kubernetesClient.configMaps().inAnyNamespace().list().getItems().stream()
+                                .collect(Collectors.toSet());
+            }
+        };
+    }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
new file mode 100644
index 00000000..7a4af640
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
+import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.CATCH_UP_DURATION;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.RESTART_TIME;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LAG;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.OUTPUT_RATIO;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_OUTPUT_RATE;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Scaling evaluator test. */
+public class ScalingMetricEvaluatorTest {
+
+    @Test
+    public void testLagBasedSourceScaling() {
+        var source = new JobVertexID();
+        var sink = new JobVertexID();
+
+        var topology =
+                new JobTopology(
+                        new VertexInfo(source, Collections.emptySet(), 1, 1),
+                        new VertexInfo(sink, Set.of(source), 1, 1));
+
+        var evaluator = new ScalingMetricEvaluator();
+
+        var metricHistory = new TreeMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>>();
+
+        metricHistory.put(
+                Instant.now(),
+                Map.of(
+                        source,
+                        Map.of(
+                                SOURCE_DATA_RATE,
+                                100.,
+                                LAG,
+                                0.,
+                                OUTPUT_RATIO,
+                                2.,
+                                TRUE_OUTPUT_RATE,
+                                200.,
+                                TRUE_PROCESSING_RATE,
+                                200.),
+                        sink,
+                        Map.of(TRUE_PROCESSING_RATE, 2000.)));
+
+        metricHistory.put(
+                Instant.now(),
+                Map.of(
+                        source,
+                        Map.of(
+                                SOURCE_DATA_RATE, 200.,
+                                LAG, 1000.,
+                                OUTPUT_RATIO, 2.,
+                                TRUE_OUTPUT_RATE, 200.,
+                                TRUE_PROCESSING_RATE, 200.),
+                        sink,
+                        Map.of(TRUE_PROCESSING_RATE, 2000.)));
+
+        var conf = new Configuration();
+
+        conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofSeconds(2));
+        conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO);
+        var evaluatedMetrics =
+                evaluator.evaluate(conf, new CollectedMetrics(topology, metricHistory));
+        assertEquals(
+                new EvaluatedScalingMetric(200, 150),
+                evaluatedMetrics.get(source).get(TARGET_DATA_RATE));
+        assertEquals(
+                EvaluatedScalingMetric.of(500),
+                evaluatedMetrics.get(source).get(CATCH_UP_DATA_RATE));
+        assertEquals(
+                new EvaluatedScalingMetric(400, 300),
+                evaluatedMetrics.get(sink).get(TARGET_DATA_RATE));
+        assertEquals(
+                EvaluatedScalingMetric.of(1000),
+                evaluatedMetrics.get(sink).get(CATCH_UP_DATA_RATE));
+
+        conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofSeconds(1));
+        evaluatedMetrics = evaluator.evaluate(conf, new CollectedMetrics(topology, metricHistory));
+        assertEquals(
+                new EvaluatedScalingMetric(200, 150),
+                evaluatedMetrics.get(source).get(TARGET_DATA_RATE));
+        assertEquals(
+                EvaluatedScalingMetric.of(1000),
+                evaluatedMetrics.get(source).get(CATCH_UP_DATA_RATE));
+        assertEquals(
+                new EvaluatedScalingMetric(400, 300),
+                evaluatedMetrics.get(sink).get(TARGET_DATA_RATE));
+        assertEquals(
+                EvaluatedScalingMetric.of(2000),
+                evaluatedMetrics.get(sink).get(CATCH_UP_DATA_RATE));
+
+        // Restart time should not affect evaluated metrics
+        conf.set(AutoScalerOptions.RESTART_TIME, Duration.ofSeconds(2));
+
+        evaluatedMetrics = evaluator.evaluate(conf, new CollectedMetrics(topology, metricHistory));
+        assertEquals(
+                new EvaluatedScalingMetric(200, 150),
+                evaluatedMetrics.get(source).get(TARGET_DATA_RATE));
+        assertEquals(
+                EvaluatedScalingMetric.of(1000),
+                evaluatedMetrics.get(source).get(CATCH_UP_DATA_RATE));
+        assertEquals(
+                new EvaluatedScalingMetric(400, 300),
+                evaluatedMetrics.get(sink).get(TARGET_DATA_RATE));
+        assertEquals(
+                EvaluatedScalingMetric.of(2000),
+                evaluatedMetrics.get(sink).get(CATCH_UP_DATA_RATE));
+
+        // Turn off lag based scaling
+        conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
+        evaluatedMetrics = evaluator.evaluate(conf, new CollectedMetrics(topology, metricHistory));
+        assertEquals(
+                new EvaluatedScalingMetric(200, 150),
+                evaluatedMetrics.get(source).get(TARGET_DATA_RATE));
+        assertEquals(
+                EvaluatedScalingMetric.of(0), evaluatedMetrics.get(source).get(CATCH_UP_DATA_RATE));
+        assertEquals(
+                new EvaluatedScalingMetric(400, 300),
+                evaluatedMetrics.get(sink).get(TARGET_DATA_RATE));
+        assertEquals(
+                EvaluatedScalingMetric.of(0), evaluatedMetrics.get(sink).get(CATCH_UP_DATA_RATE));
+
+        // Test 0 lag
+        metricHistory.clear();
+        metricHistory.put(
+                Instant.now(),
+                Map.of(
+                        source,
+                        Map.of(
+                                SOURCE_DATA_RATE,
+                                100.,
+                                LAG,
+                                0.,
+                                OUTPUT_RATIO,
+                                2.,
+                                TRUE_OUTPUT_RATE,
+                                200.,
+                                TRUE_PROCESSING_RATE,
+                                200.),
+                        sink,
+                        Map.of(TRUE_PROCESSING_RATE, 2000.)));
+
+        conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofMinutes(1));
+        evaluatedMetrics = evaluator.evaluate(conf, new CollectedMetrics(topology, metricHistory));
+        assertEquals(
+                new EvaluatedScalingMetric(100, 100),
+                evaluatedMetrics.get(source).get(TARGET_DATA_RATE));
+        assertEquals(
+                new EvaluatedScalingMetric(200, 200),
+                evaluatedMetrics.get(sink).get(TARGET_DATA_RATE));
+    }
+
+    @Test
+    public void testUtilizationBoundaryComputation() {
+
+        var conf = new Configuration();
+        conf.set(TARGET_UTILIZATION, 0.8);
+        conf.set(TARGET_UTILIZATION_BOUNDARY, 0.1);
+        conf.set(RESTART_TIME, Duration.ofSeconds(1));
+        conf.set(CATCH_UP_DURATION, Duration.ZERO);
+
+        // Default behaviour, restart time does not factor in
+        assertEquals(Tuple2.of(778.0, 1000.0), getThresholds(700, 0, conf));
+
+        conf.set(CATCH_UP_DURATION, Duration.ofSeconds(2));
+        assertEquals(Tuple2.of(1128.0, 1700.0), getThresholds(700, 350, conf));
+        assertEquals(Tuple2.of(778.0, 1350.0), getThresholds(700, 0, conf));
+    }
+
+    private Tuple2<Double, Double> getThresholds(
+            double inputTargetRate, double catchUpRate, Configuration conf) {
+        var map = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
+
+        map.put(TARGET_DATA_RATE, new EvaluatedScalingMetric(Double.NaN, inputTargetRate));
+        map.put(CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(catchUpRate));
+
+        ScalingMetricEvaluator.computeProcessingRateThresholds(map, conf);
+        return Tuple2.of(
+                map.get(SCALE_UP_RATE_THRESHOLD).getCurrent(),
+                map.get(SCALE_DOWN_RATE_THRESHOLD).getCurrent());
+    }
+}


[flink-kubernetes-operator] 09/09: [docs][autoscaler] Add docs page + enable config doc generation for Autoscaler options

Posted by gy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 86458aac1c1c6fc675a38ea54c93cb46900a2419
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Thu Dec 15 11:27:00 2022 +0100

    [docs][autoscaler] Add docs page + enable config doc generation for Autoscaler options
---
 .github/workflows/ci.yml                           |   2 +-
 docs/content/docs/custom-resource/autoscaler.md    | 158 +++++++++++++++++++++
 docs/content/docs/operations/configuration.md      |   6 +
 .../generated/auto_scaler_configuration.html       |  90 ++++++++++++
 .../configuration/ConfigOptionsDocGenerator.java   |   6 +-
 5 files changed, 260 insertions(+), 2 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index c48b631c..c3891788 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -41,7 +41,7 @@ jobs:
       - name: Build with Maven
         run: |
           set -o pipefail; mvn clean install javadoc:javadoc -Pgenerate-docs | tee ./mvn.log; set +o pipefail
-          if [[ $(cat ./mvn.log | grep -E -v '(flink-runtime-.*.jar, flink-kubernetes-operator-.*.jar)|(flink-kubernetes-operator-.*.jar, flink-runtime-.*.jar) define 2 overlapping classes' | grep -c "overlapping classes" -) -gt 0 ]];then
+          if [[ $(cat ./mvn.log | grep -E -v '(flink-runtime-.*.jar, flink-kubernetes-operator-.*.jar)|(flink-kubernetes-operator-.*.jar, flink-runtime-.*.jar) define 3 overlapping classes' | grep -c "overlapping classes" -) -gt 0 ]];then
             echo "Found overlapping classes: "
             cat ./mvn.log | grep "overlapping classes"
             exit 1
diff --git a/docs/content/docs/custom-resource/autoscaler.md b/docs/content/docs/custom-resource/autoscaler.md
new file mode 100644
index 00000000..db7a6d36
--- /dev/null
+++ b/docs/content/docs/custom-resource/autoscaler.md
@@ -0,0 +1,158 @@
+---
+title: "Autoscaler"
+weight: 4
+type: docs
+aliases:
+- /custom-resource/autoscaler.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Autoscaler
+
+The operator provides a job autoscaler functionality that collects various metrics from running Flink jobs and automatically scales individual job vertexes (chained operator groups) to eliminate backpressure and satisfy the utilization and catch-up duration target set by the user.
+By adjusting parallelism on a job vertex level (in contrast to job parallelism) we can efficiently autoscale complex and heterogeneous streaming applications.
+
+Key benefits to the user:
+ - Better cluster resource utilization and lower operating costs
+ - Automatic parallelism tuning for even complex streaming pipelines
+ - Automatic adaptation to changing load patterns
+ - Detailed utilization metrics for performance debugging
+
+Job requirements:
+ - The autoscaler currently only works with the latest [Flink 1.17 snapshot images](ghcr.io/apache/flink-docker:1.17-SNAPSHOT-scala_2.12-java11-debian) or after backporting the following fixes to your 1.15/1.16 Flink image
+   - [Job vertex parallelism overrides](https://github.com/apache/flink/commit/23ce2281a0bb4047c64def9af7ddd5f19d88e2a9) (must have)
+   - [Support timespan for busyTime metrics](https://github.com/apache/flink/commit/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35) (good to have)
+ - All sources must use the new [Source API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface) (most common connectors already do)
+ - Source scaling requires sources to expose the [standardized connector metrics](https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics) for accessing backlog information (source scaling can be disabled)
+
+In the current state the autoscaler works best with Kafka sources, as they expose all the standardized metrics. It also comes with some additional benefits when using Kafka such as automatically detecting and limiting source max parallelism to the number of Kafka partitions.
+
+{{< hint info >}}
+The autoscaler also supports a passive/metrics-only mode where it only collects and evaluates scaling related performance metrics but does not trigger any job upgrades.
+This can be used to gain confidence in the module without any impact on the running applications.
+
+To disable scaling actions, set: `kubernetes.operator.job.autoscaler.scaling.enabled: "false"`
+{{< /hint >}}
+
+## Configuration guide
+
+Depending on your environment and job characteristics there are a few very important configurations that will affect how well the autoscaler works.
+
+Key configuration areas
+ - Job and per operator max parallelism
+ - Stabilization and metrics collection intervals
+ - Target utilization and flexible boundaries
+ - Target catch-up duration and restart time
+
+The defaults might work reasonably well for many applications, but some tuning may be required in this early stage of the autoscaler module.
+
+### Job and per operator max parallelism
+
+When computing the scaled parallelism, the autoscaler always considers the max parallelism settings for each job vertex to ensure that it doesn't introduce unnecessary data skew.
+The computed parallelism will always be a divisor of the max_parallelism number.
+
+To ensure flexible scaling it is therefore recommended to chose max parallelism settings that have a [lot of divisors](https://en.wikipedia.org/wiki/Highly_composite_number) instead of relying on the Flink provided defaults.
+You can then use the `pipeline.max-parallelism` to configure this for your pipeline.
+
+Some good numbers for max-parallelism are: 120, 180, 240, 360, 720 etc.
+
+It is also possible to set maxParallelism on a per operator level, which can be useful if we want to avoid scaling some sources/sinks beyond a certain number.
+
+### Stabilization and metrics collection intervals
+
+The autoscaler always looks at average metrics in the collection time window defined by `kubernetes.operator.job.autoscaler.metrics.window`.
+The size of this window determines how small fluctuations will affect the autoscaler. The larger the window, the more smoothing and stability we get, but we may be slower to react to sudden load changes.
+We suggest you experiment with setting this anywhere between 3-60 minutes for best experience.
+
+To allow jobs to stabilize after recovery users can configure a stabilization window by setting `kubernetes.operator.job.autoscaler.stabilization.interval`. 
+During this time period no scaling actions will be taken.
+
+{{< hint warning >}}
+Currently the autoscaler treats the collection window as the **maximum** window. Metric evaluation will start right after the stabilization period.
+We also include metrics collected during the stabilization period at the moment which might cause some instability with very low stabilization periods.
+
+We are working on improving this.
+{{< /hint >}}
+
+### Target utilization and flexible boundaries
+
+In order to provide stable job performance and some buffer for load fluctuations, the autoscaler allows users to set a target utilization level for the job (`kubernetes.operator.job.autoscaler.target.utilization`).
+A target of `0.6` means we are targeting 60% utilization/load for the job vertexes.
+
+In general, it's not recommended to set target utilization close to 100% as performance usually degrades as we reach capacity limits in most real world systems.
+
+In addition to the utilization target we can set a utilization boundary, that serves as extra buffer to avoid immediate scaling on load fluctuations.
+Setting `kubernetes.operator.job.autoscaler.target.utilization.boundary: "0.2"` means that we allow 20% deviation from the target utilization before triggering a scaling action.
+
+### Target catch-up duration and restart time
+
+When taking scaling decisions the operator need to account for the extra capacity required to catch up the backlog created during scaling operations.
+The amount of extra capacity is determined automatically by the following 2 configs:
+
+ - `kubernetes.operator.job.autoscaler.restart.time` : Time it usually takes to restart the application
+ - `kubernetes.operator.job.autoscaler.catch-up.duration` : Time to job is expected to catch up after scaling 
+
+In the future the autoscaler may be able to automatically determine the restart time, but the target catch-up duration depends on the users SLO.
+
+By lowering the catch-up duration the autoscaler will have to reserve more extra capacity for the scaling actions.
+We suggest setting this based on your actual objective, such us 1, 5, 10 minutes etc.
+
+### Basic configuration example
+```yaml
+...
+flinkVersion: v1_17
+flinkConfiguration:
+    kubernetes.operator.job.autoscaler.enabled: "true"
+    kubernetes.operator.job.autoscaler.stabilization.interval: "5m"
+    kubernetes.operator.job.autoscaler.metrics.window: "5m"
+    kubernetes.operator.job.autoscaler.target.utilization: "0.6"
+    kubernetes.operator.job.autoscaler.target.utilization.boundary: "0.2"
+    kubernetes.operator.job.autoscaler.restart.time: 2m
+    kubernetes.operator.job.autoscaler.catch-up.duration: 5m
+
+    pipeline.max-parallelism: "720"
+```
+
+### Advanced config parameters
+
+The autoscaler also exposes various more advanced config parameters that affect scaling actions:
+
+ - Minimum time before scaling down after scaling up a vertex
+ - Maximum parallelism change when scaling down
+ - Min/max parallelism
+
+The list of options will likely grow to cover more complex scaling scenarios.
+
+For a detailed config reference check the [general configuration page]({{< ref "docs/operations/configuration#autoscaler-configuration" >}})
+
+## Metrics
+
+The operator reports detailed jobvertex level metrics about the evaluated Flink job metrics that are collected and used in the scaling decision.
+
+This includes:
+ - Utilization, input rate, target rate metrics
+ - Scaling thresholds
+ - Parallelism and max parallelism changes over time
+
+These metrics are reported under the Kubernetes Operator Resource metric group:
+
+```
+[resource_prefix].Autoscaler.[jobVertexID].[ScalingMetric].Current/Average
+```
diff --git a/docs/content/docs/operations/configuration.md b/docs/content/docs/operations/configuration.md
index 4a328399..5db2f1b5 100644
--- a/docs/content/docs/operations/configuration.md
+++ b/docs/content/docs/operations/configuration.md
@@ -95,6 +95,12 @@ These options can be configured on both an operator and a per-resource level. Wh
 
 {{< generated/dynamic_section >}}
 
+### Autoscaler Configuration
+
+Like other resource options these can be configured on both an operator and a per-resource level. When set under `spec.flinkConfiguration` for the Flink resources it will override the default value provided in the operator default configuration (`flink-conf.yaml`).
+
+{{< generated/auto_scaler_configuration >}}
+
 ### System Metrics Configuration
 
 Operator system metrics configuration. Cannot be overridden on a per-resource basis.
diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
new file mode 100644
index 00000000..af9b4396
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
@@ -0,0 +1,90 @@
+<table class="configuration table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default</th>
+            <th class="text-left" style="width: 10%">Type</th>
+            <th class="text-left" style="width: 55%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>kubernetes.operator.job.autoscaler.catch-up.duration</h5></td>
+            <td style="word-wrap: break-word;">10 min</td>
+            <td>Duration</td>
+            <td>The target duration for fully processing any backlog after a scaling operation. Set to 0 to disable backlog based scaling.</td>
+        </tr>
+        <tr>
+            <td><h5>kubernetes.operator.job.autoscaler.enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Enable job autoscaler module.</td>
+        </tr>
+        <tr>
+            <td><h5>kubernetes.operator.job.autoscaler.metrics.window</h5></td>
+            <td style="word-wrap: break-word;">5 min</td>
+            <td>Duration</td>
+            <td>Scaling metrics aggregation window size.</td>
+        </tr>
+        <tr>
+            <td><h5>kubernetes.operator.job.autoscaler.restart.time</h5></td>
+            <td style="word-wrap: break-word;">5 min</td>
+            <td>Duration</td>
+            <td>Expected restart time to be used until the operator can determine it reliably from history.</td>
+        </tr>
+        <tr>
+            <td><h5>kubernetes.operator.job.autoscaler.scale-down.max-factor</h5></td>
+            <td style="word-wrap: break-word;">0.6</td>
+            <td>Double</td>
+            <td>Max scale down factor. 1 means no limit on scale down, 0.6 means job can only be scaled down with 60% of the original parallelism.</td>
+        </tr>
+        <tr>
+            <td><h5>kubernetes.operator.job.autoscaler.scale-up.grace-period</h5></td>
+            <td style="word-wrap: break-word;">10 min</td>
+            <td>Duration</td>
+            <td>Period in which no scale down is allowed after a scale up</td>
+        </tr>
+        <tr>
+            <td><h5>kubernetes.operator.job.autoscaler.scaling.enabled</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Enable vertex scaling execution by the autoscaler. If disabled, the autoscaler will only collect metrics and evaluate the suggested parallelism for each vertex but will not upgrade the jobs.</td>
+        </tr>
+        <tr>
+            <td><h5>kubernetes.operator.job.autoscaler.scaling.sources.enabled</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Whether to enable scaling source vertices. Source vertices set the baseline ingestion rate for the processing based on the backlog size. If disabled, only regular job vertices will be scaled and source vertices will be unchanged.</td>
+        </tr>
+        <tr>
+            <td><h5>kubernetes.operator.job.autoscaler.stabilization.interval</h5></td>
+            <td style="word-wrap: break-word;">5 min</td>
+            <td>Duration</td>
+            <td>Stabilization period in which no new scaling will be executed</td>
+        </tr>
+        <tr>
+            <td><h5>kubernetes.operator.job.autoscaler.target.utilization</h5></td>
+            <td style="word-wrap: break-word;">0.7</td>
+            <td>Double</td>
+            <td>Target vertex utilization</td>
+        </tr>
+        <tr>
+            <td><h5>kubernetes.operator.job.autoscaler.target.utilization.boundary</h5></td>
+            <td style="word-wrap: break-word;">0.1</td>
+            <td>Double</td>
+            <td>Target vertex utilization boundary. Scaling won't be performed if utilization is within (target - boundary, target + boundary)</td>
+        </tr>
+        <tr>
+            <td><h5>kubernetes.operator.job.autoscaler.vertex.max-parallelism</h5></td>
+            <td style="word-wrap: break-word;">2147483647</td>
+            <td>Integer</td>
+            <td>The maximum parallelism the autoscaler can use. Note that this limit will be ignored if it is higher than the max parallelism configured in the Flink config or directly on each operator.</td>
+        </tr>
+        <tr>
+            <td><h5>kubernetes.operator.job.autoscaler.vertex.min-parallelism</h5></td>
+            <td style="word-wrap: break-word;">1</td>
+            <td>Integer</td>
+            <td>The minimum parallelism the autoscaler can use.</td>
+        </tr>
+    </tbody>
+</table>
diff --git a/flink-kubernetes-docs/src/main/java/org/apache/flink/kubernetes/operator/docs/configuration/ConfigOptionsDocGenerator.java b/flink-kubernetes-docs/src/main/java/org/apache/flink/kubernetes/operator/docs/configuration/ConfigOptionsDocGenerator.java
index 43c81b42..0f302dcd 100644
--- a/flink-kubernetes-docs/src/main/java/org/apache/flink/kubernetes/operator/docs/configuration/ConfigOptionsDocGenerator.java
+++ b/flink-kubernetes-docs/src/main/java/org/apache/flink/kubernetes/operator/docs/configuration/ConfigOptionsDocGenerator.java
@@ -74,7 +74,11 @@ public class ConfigOptionsDocGenerator {
                 new OptionsClassLocation(
                         "flink-kubernetes-operator", "org.apache.flink.kubernetes.operator.config"),
                 new OptionsClassLocation(
-                        "flink-kubernetes-operator", "org.apache.flink.kubernetes.operator.metrics")
+                        "flink-kubernetes-operator",
+                        "org.apache.flink.kubernetes.operator.metrics"),
+                new OptionsClassLocation(
+                        "flink-kubernetes-operator",
+                        "org.apache.flink.kubernetes.operator.autoscaler.config")
             };
     static final String DEFAULT_PATH_PREFIX = "src/main/java";
 


[flink-kubernetes-operator] 03/09: [FLINK-30260][autoscaler] Collect and compute scaling metrics through Flink REST API

Posted by gy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit d535333aae312fdd3ea867743d6acfa006a95048
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Thu Dec 1 08:48:51 2022 +0100

    [FLINK-30260][autoscaler] Collect and compute scaling metrics through Flink REST API
---
 .../autoscaler/RestApiMetricsCollector.java        | 100 +++++
 .../autoscaler/ScalingMetricCollector.java         | 443 +++++++++++++++++++++
 .../operator/autoscaler/metrics/FlinkMetric.java   |  49 +++
 .../rest/messages/job/metrics/IOMetricsInfo.java   | 191 +++++++++
 .../MetricsCollectionAndEvaluationTest.java        | 251 ++++++++++++
 .../autoscaler/TestingMetricsCollector.java        |  79 ++++
 6 files changed, 1113 insertions(+)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java
new file mode 100644
index 00000000..9c359502
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
+
+import lombok.SneakyThrows;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** Metric collector using flink rest api. */
+public class RestApiMetricsCollector extends ScalingMetricCollector {
+    private static final Logger LOG = LoggerFactory.getLogger(RestApiMetricsCollector.class);
+
+    @Override
+    protected Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> queryAllAggregatedMetrics(
+            AbstractFlinkResource<?, ?> cr,
+            FlinkService flinkService,
+            Configuration conf,
+            Map<JobVertexID, Map<String, FlinkMetric>> filteredVertexMetricNames) {
+
+        return filteredVertexMetricNames.entrySet().stream()
+                .collect(
+                        Collectors.toMap(
+                                e -> e.getKey(),
+                                e ->
+                                        queryAggregatedVertexMetrics(
+                                                flinkService, cr, conf, e.getKey(), e.getValue())));
+    }
+
+    @SneakyThrows
+    protected Map<FlinkMetric, AggregatedMetric> queryAggregatedVertexMetrics(
+            FlinkService flinkService,
+            AbstractFlinkResource<?, ?> cr,
+            Configuration conf,
+            JobVertexID jobVertexID,
+            Map<String, FlinkMetric> metrics) {
+
+        LOG.info("Querying metrics {} for {}", metrics, jobVertexID);
+
+        var jobId = JobID.fromHexString(cr.getStatus().getJobStatus().getJobId());
+
+        var parameters = new AggregatedSubtaskMetricsParameters();
+        var pathIt = parameters.getPathParameters().iterator();
+
+        ((JobIDPathParameter) pathIt.next()).resolve(jobId);
+        ((JobVertexIdPathParameter) pathIt.next()).resolve(jobVertexID);
+
+        parameters
+                .getQueryParameters()
+                .iterator()
+                .next()
+                .resolveFromString(StringUtils.join(metrics.keySet(), ","));
+
+        try (var restClient = (RestClusterClient<String>) flinkService.getClusterClient(conf)) {
+
+            var responseBody =
+                    restClient
+                            .sendRequest(
+                                    AggregatedSubtaskMetricsHeaders.getInstance(),
+                                    parameters,
+                                    EmptyRequestBody.getInstance())
+                            .get();
+
+            return responseBody.getMetrics().stream()
+                    .collect(Collectors.toMap(m -> metrics.get(m.getId()), m -> m));
+        }
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
new file mode 100644
index 00000000..993e6760
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetrics;
+import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
+import org.apache.flink.util.Preconditions;
+
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import lombok.SneakyThrows;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SOURCE_SCALING_ENABLED;
+
+/** Metric collector using flink rest api. */
+public abstract class ScalingMetricCollector implements Cleanup {
+    private static final Logger LOG = LoggerFactory.getLogger(ScalingMetricCollector.class);
+
+    private final Map<ResourceID, Tuple2<Long, Map<JobVertexID, Map<String, FlinkMetric>>>>
+            availableVertexMetricNames = new ConcurrentHashMap<>();
+
+    private final Map<ResourceID, SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>>>
+            histories = new ConcurrentHashMap<>();
+
+    private final Map<ResourceID, JobTopology> topologies = new ConcurrentHashMap<>();
+
+    private Clock clock = Clock.systemDefaultZone();
+
+    public CollectedMetrics getMetricsHistory(
+            AbstractFlinkResource<?, ?> cr,
+            AutoScalerInfo scalingInformation,
+            FlinkService flinkService,
+            Configuration conf)
+            throws Exception {
+
+        var resourceID = ResourceID.fromResource(cr);
+        var currentJobStartTs =
+                Instant.ofEpochMilli(Long.parseLong(cr.getStatus().getJobStatus().getStartTime()));
+
+        if (!currentJobStartTs.equals(
+                scalingInformation.getJobStartTs().orElse(currentJobStartTs))) {
+            scalingInformation.clearMetricHistory();
+            cleanup(cr);
+        }
+
+        // Initialize metric history
+        var scalingMetricHistory =
+                histories.compute(
+                        resourceID,
+                        (k, h) -> {
+                            if (h == null) {
+                                h = scalingInformation.getMetricHistory();
+                            }
+                            return h.tailMap(
+                                    clock.instant()
+                                            .minus(conf.get(AutoScalerOptions.METRICS_WINDOW)));
+                        });
+
+        var topology = getJobTopology(flinkService, cr, conf);
+
+        // The filtered list of metrics we want to query for each vertex
+        var filteredVertexMetricNames = queryFilteredMetricNames(flinkService, cr, conf, topology);
+
+        // Aggregated job vertex metrics collected from Flink based on the filtered metric names
+        var collectedVertexMetrics =
+                queryAllAggregatedMetrics(cr, flinkService, conf, filteredVertexMetricNames);
+
+        // The computed scaling metrics based on the collected aggregated vertex metrics
+        var scalingMetrics =
+                convertToScalingMetrics(resourceID, collectedVertexMetrics, topology, conf);
+
+        // Add scaling metrics to history if they were computed successfully
+        scalingMetricHistory.put(clock.instant(), scalingMetrics);
+        scalingInformation.updateMetricHistory(currentJobStartTs, scalingMetricHistory);
+
+        return new CollectedMetrics(topology, scalingMetricHistory);
+    }
+
+    protected JobTopology getJobTopology(
+            FlinkService flinkService, AbstractFlinkResource<?, ?> cr, Configuration conf)
+            throws Exception {
+
+        try (var restClient = (RestClusterClient<String>) flinkService.getClusterClient(conf)) {
+            var jobId = JobID.fromHexString(cr.getStatus().getJobStatus().getJobId());
+            var topology =
+                    topologies.computeIfAbsent(
+                            ResourceID.fromResource(cr), r -> queryJobTopology(restClient, jobId));
+            updateKafkaSourceMaxParallelisms(restClient, jobId, topology);
+            return topology;
+        }
+    }
+
+    @VisibleForTesting
+    protected JobTopology queryJobTopology(RestClusterClient<String> restClient, JobID jobId) {
+        try {
+            var jobDetailsInfo = restClient.getJobDetails(jobId).get();
+
+            Map<JobVertexID, Integer> maxParallelismMap =
+                    jobDetailsInfo.getJobVertexInfos().stream()
+                            .collect(
+                                    Collectors.toMap(
+                                            JobDetailsInfo.JobVertexDetailsInfo::getJobVertexID,
+                                            JobDetailsInfo.JobVertexDetailsInfo
+                                                    ::getMaxParallelism));
+
+            return JobTopology.fromJsonPlan(jobDetailsInfo.getJsonPlan(), maxParallelismMap);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void updateKafkaSourceMaxParallelisms(
+            RestClusterClient<String> restClient, JobID jobId, JobTopology topology)
+            throws Exception {
+        for (Map.Entry<JobVertexID, Set<JobVertexID>> entry : topology.getInputs().entrySet()) {
+            if (entry.getValue().isEmpty()) {
+                var sourceVertex = entry.getKey();
+                queryAggregatedMetricNames(restClient, jobId, sourceVertex).stream()
+                        .map(AggregatedMetric::getId)
+                        .filter(s -> s.endsWith(".currentOffset"))
+                        .mapToInt(
+                                s -> {
+                                    // We extract the partition from the pattern:
+                                    // ...topic.[topic].partition.3.currentOffset
+                                    var split = s.split("\\.");
+                                    return Integer.parseInt(split[split.length - 2]);
+                                })
+                        .max()
+                        .ifPresent(
+                                p -> {
+                                    LOG.info(
+                                            "Updating source {} max parallelism based on available partitions to {}",
+                                            sourceVertex,
+                                            p + 1);
+                                    topology.updateMaxParallelism(sourceVertex, p + 1);
+                                });
+            }
+        }
+    }
+
+    private List<JobVertexID> getVertexList(
+            FlinkService flinkService, AbstractFlinkResource<?, ?> cr, Configuration conf)
+            throws Exception {
+        JobTopology topology = getJobTopology(flinkService, cr, conf);
+        return new ArrayList<>(topology.getParallelisms().keySet());
+    }
+
+    /**
+     * Given a map of collected Flink vertex metrics we compute the scaling metrics for each job
+     * vertex.
+     *
+     * @param collectedMetrics Collected metrics for all job vertices.
+     * @return Computed scaling metrics for all job vertices.
+     */
+    private Map<JobVertexID, Map<ScalingMetric, Double>> convertToScalingMetrics(
+            ResourceID resourceID,
+            Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> collectedMetrics,
+            JobTopology jobTopology,
+            Configuration conf) {
+
+        var out = new HashMap<JobVertexID, Map<ScalingMetric, Double>>();
+        collectedMetrics.forEach(
+                (jobVertexID, vertexFlinkMetrics) -> {
+                    LOG.info(
+                            "Calculating vertex scaling metrics for {} from {}",
+                            jobVertexID,
+                            vertexFlinkMetrics);
+                    var vertexScalingMetrics = new HashMap<ScalingMetric, Double>();
+                    out.put(jobVertexID, vertexScalingMetrics);
+
+                    ScalingMetrics.computeLagMetrics(vertexFlinkMetrics, vertexScalingMetrics);
+                    ScalingMetrics.computeLoadMetrics(vertexFlinkMetrics, vertexScalingMetrics);
+
+                    Optional<Double> lagGrowthRate =
+                            computeLagGrowthRate(
+                                    resourceID,
+                                    jobVertexID,
+                                    vertexScalingMetrics.get(ScalingMetric.LAG));
+
+                    ScalingMetrics.computeDataRateMetrics(
+                            jobVertexID,
+                            vertexFlinkMetrics,
+                            vertexScalingMetrics,
+                            jobTopology,
+                            lagGrowthRate,
+                            conf);
+
+                    LOG.info(
+                            "Vertex scaling metrics for {}: {}", jobVertexID, vertexScalingMetrics);
+                });
+
+        return out;
+    }
+
+    @NotNull
+    private Optional<Double> computeLagGrowthRate(
+            ResourceID resourceID, JobVertexID jobVertexID, Double currentLag) {
+        var metricHistory = histories.get(resourceID);
+
+        if (metricHistory == null || metricHistory.isEmpty()) {
+            return Optional.empty();
+        }
+
+        var lastCollectionTime = metricHistory.lastKey();
+        var lastCollectedMetrics = metricHistory.get(lastCollectionTime).get(jobVertexID);
+
+        if (lastCollectedMetrics == null) {
+            return Optional.empty();
+        }
+
+        var lastLag = lastCollectedMetrics.get(ScalingMetric.LAG);
+
+        if (lastLag == null || currentLag == null) {
+            return Optional.empty();
+        }
+
+        var timeDiff = Duration.between(lastCollectionTime, clock.instant()).toSeconds();
+        return Optional.of((currentLag - lastLag) / timeDiff);
+    }
+
+    /** Query the available metric names for each job vertex for the current spec generation. */
+    @SneakyThrows
+    protected Map<JobVertexID, Map<String, FlinkMetric>> queryFilteredMetricNames(
+            FlinkService flinkService,
+            AbstractFlinkResource<?, ?> cr,
+            Configuration conf,
+            JobTopology topology) {
+
+        var jobId = JobID.fromHexString(cr.getStatus().getJobStatus().getJobId());
+        var vertices = getVertexList(flinkService, cr, conf);
+
+        long deployedGeneration = getDeployedGeneration(cr);
+
+        var previousMetricNames = availableVertexMetricNames.get(ResourceID.fromResource(cr));
+
+        if (previousMetricNames != null) {
+            if (deployedGeneration == previousMetricNames.f0) {
+                // We have already gathered the metric names for this spec, no need to query again
+                return previousMetricNames.f1;
+            } else {
+                availableVertexMetricNames.remove(ResourceID.fromResource(cr));
+            }
+        }
+
+        try (var restClient = (RestClusterClient<String>) flinkService.getClusterClient(conf)) {
+            var names =
+                    vertices.stream()
+                            .collect(
+                                    Collectors.toMap(
+                                            v -> v,
+                                            v ->
+                                                    getFilteredVertexMetricNames(
+                                                            restClient, jobId, v, topology, conf)));
+            availableVertexMetricNames.put(
+                    ResourceID.fromResource(cr), Tuple2.of(deployedGeneration, names));
+            return names;
+        }
+    }
+
+    public static long getDeployedGeneration(AbstractFlinkResource<?, ?> cr) {
+        return cr.getStatus()
+                .getReconciliationStatus()
+                .deserializeLastReconciledSpecWithMeta()
+                .getMeta()
+                .getMetadata()
+                .getGeneration();
+    }
+
+    /**
+     * Query and filter metric names for a given job vertex.
+     *
+     * @param restClient Flink rest client.
+     * @param jobID Job Id.
+     * @param jobVertexID Job Vertex Id.
+     * @return Map of filtered metric names.
+     */
+    @SneakyThrows
+    protected Map<String, FlinkMetric> getFilteredVertexMetricNames(
+            RestClusterClient<?> restClient,
+            JobID jobID,
+            JobVertexID jobVertexID,
+            JobTopology topology,
+            Configuration conf) {
+
+        var allMetricNames = queryAggregatedMetricNames(restClient, jobID, jobVertexID);
+
+        var filteredMetrics = new HashMap<String, FlinkMetric>();
+        var requiredMetrics = new HashSet<FlinkMetric>();
+
+        requiredMetrics.add(FlinkMetric.BUSY_TIME_PER_SEC);
+
+        if (topology.isSource(jobVertexID)) {
+            requiredMetrics.add(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
+            if (conf.getBoolean(SOURCE_SCALING_ENABLED)) {
+                requiredMetrics.add(FlinkMetric.PENDING_RECORDS);
+            } else {
+                FlinkMetric.PENDING_RECORDS
+                        .findAny(allMetricNames)
+                        .ifPresent(m -> filteredMetrics.put(m, FlinkMetric.PENDING_RECORDS));
+                FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC
+                        .findAny(allMetricNames)
+                        .ifPresent(
+                                m ->
+                                        filteredMetrics.put(
+                                                m,
+                                                FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC));
+            }
+        } else {
+            // Not a source so we must have numRecordsInPerSecond
+            requiredMetrics.add(FlinkMetric.NUM_RECORDS_IN_PER_SEC);
+        }
+
+        if (!topology.getOutputs().get(jobVertexID).isEmpty()) {
+            // Not a sink so we must have numRecordsOutPerSecond
+            requiredMetrics.add(FlinkMetric.NUM_RECORDS_OUT_PER_SEC);
+        }
+
+        requiredMetrics.forEach(
+                flinkMetric -> {
+                    filteredMetrics.put(
+                            flinkMetric
+                                    .findAny(allMetricNames)
+                                    .orElseThrow(
+                                            () ->
+                                                    new RuntimeException(
+                                                            "Could not find required metric "
+                                                                    + flinkMetric.name()
+                                                                    + " for "
+                                                                    + jobVertexID)),
+                            flinkMetric);
+                });
+
+        return filteredMetrics;
+    }
+
+    @VisibleForTesting
+    protected Collection<AggregatedMetric> queryAggregatedMetricNames(
+            RestClusterClient<?> restClient, JobID jobID, JobVertexID jobVertexID)
+            throws Exception {
+        var parameters = new AggregatedSubtaskMetricsParameters();
+        var pathIt = parameters.getPathParameters().iterator();
+
+        ((JobIDPathParameter) pathIt.next()).resolve(jobID);
+        ((JobVertexIdPathParameter) pathIt.next()).resolve(jobVertexID);
+
+        return restClient
+                .sendRequest(
+                        AggregatedSubtaskMetricsHeaders.getInstance(),
+                        parameters,
+                        EmptyRequestBody.getInstance())
+                .get()
+                .getMetrics();
+    }
+
+    protected abstract Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>>
+            queryAllAggregatedMetrics(
+                    AbstractFlinkResource<?, ?> cr,
+                    FlinkService flinkService,
+                    Configuration conf,
+                    Map<JobVertexID, Map<String, FlinkMetric>> filteredVertexMetricNames);
+
+    @Override
+    public void cleanup(AbstractFlinkResource<?, ?> cr) {
+        LOG.info("Scaling metric cleanup");
+        var resourceId = ResourceID.fromResource(cr);
+        histories.remove(resourceId);
+        availableVertexMetricNames.remove(resourceId);
+        topologies.remove(resourceId);
+    }
+
+    @VisibleForTesting
+    protected void setClock(Clock clock) {
+        this.clock = Preconditions.checkNotNull(clock);
+    }
+
+    @VisibleForTesting
+    protected Map<ResourceID, Tuple2<Long, Map<JobVertexID, Map<String, FlinkMetric>>>>
+            getAvailableVertexMetricNames() {
+        return availableVertexMetricNames;
+    }
+
+    @VisibleForTesting
+    protected Map<ResourceID, SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>>>
+            getHistories() {
+        return histories;
+    }
+
+    @VisibleForTesting
+    protected Map<ResourceID, JobTopology> getTopologies() {
+        return topologies;
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/FlinkMetric.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/FlinkMetric.java
new file mode 100644
index 00000000..76ba7a98
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/FlinkMetric.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+/**
+ * Enum representing the collected Flink metrics for autoscaling. The actual metric names depend on
+ * the JobGraph.
+ */
+public enum FlinkMetric {
+    BUSY_TIME_PER_SEC(s -> s.equals("busyTimeMsPerSecond")),
+    NUM_RECORDS_IN_PER_SEC(s -> s.equals("numRecordsInPerSecond")),
+    NUM_RECORDS_OUT_PER_SEC(s -> s.equals("numRecordsOutPerSecond")),
+    SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC(
+            s -> s.startsWith("Source__") && s.endsWith(".numRecordsOutPerSecond")),
+    SOURCE_TASK_NUM_RECORDS_IN_PER_SEC(
+            s -> s.startsWith("Source__") && s.endsWith(".numRecordsInPerSecond")),
+    PENDING_RECORDS(s -> s.endsWith(".pendingRecords"));
+
+    FlinkMetric(Predicate<String> predicate) {
+        this.predicate = predicate;
+    }
+
+    public final Predicate<String> predicate;
+
+    public Optional<String> findAny(Collection<AggregatedMetric> metrics) {
+        return metrics.stream().map(AggregatedMetric::getId).filter(predicate).findAny();
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java
new file mode 100644
index 00000000..accecbe3
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/** IO metrics information. */
+public final class IOMetricsInfo {
+
+    public static final String FIELD_NAME_BYTES_READ = "read-bytes";
+
+    private static final String FIELD_NAME_BYTES_READ_COMPLETE = "read-bytes-complete";
+
+    public static final String FIELD_NAME_BYTES_WRITTEN = "write-bytes";
+
+    private static final String FIELD_NAME_BYTES_WRITTEN_COMPLETE = "write-bytes-complete";
+
+    public static final String FIELD_NAME_RECORDS_READ = "read-records";
+
+    private static final String FIELD_NAME_RECORDS_READ_COMPLETE = "read-records-complete";
+
+    public static final String FIELD_NAME_RECORDS_WRITTEN = "write-records";
+
+    private static final String FIELD_NAME_RECORDS_WRITTEN_COMPLETE = "write-records-complete";
+
+    public static final String FIELD_NAME_ACC_BACK_PRESSURE = "accumulated-backpressured-time";
+
+    public static final String FIELD_NAME_ACC_IDLE = "accumulated-idle-time";
+
+    public static final String FIELD_NAME_ACC_BUSY = "accumulated-busy-time";
+
+    @JsonProperty(FIELD_NAME_BYTES_READ)
+    private final long bytesRead;
+
+    @JsonProperty(FIELD_NAME_BYTES_READ_COMPLETE)
+    private final boolean bytesReadComplete;
+
+    @JsonProperty(FIELD_NAME_BYTES_WRITTEN)
+    private final long bytesWritten;
+
+    @JsonProperty(FIELD_NAME_BYTES_WRITTEN_COMPLETE)
+    private final boolean bytesWrittenComplete;
+
+    @JsonProperty(FIELD_NAME_RECORDS_READ)
+    private final long recordsRead;
+
+    @JsonProperty(FIELD_NAME_RECORDS_READ_COMPLETE)
+    private final boolean recordsReadComplete;
+
+    @JsonProperty(FIELD_NAME_RECORDS_WRITTEN)
+    private final long recordsWritten;
+
+    @JsonProperty(FIELD_NAME_RECORDS_WRITTEN_COMPLETE)
+    private final boolean recordsWrittenComplete;
+
+    @JsonProperty(FIELD_NAME_ACC_BACK_PRESSURE)
+    private final Long accumulatedBackpressured;
+
+    @JsonProperty(FIELD_NAME_ACC_IDLE)
+    private final Long accumulatedIdle;
+
+    @JsonProperty(FIELD_NAME_ACC_BUSY)
+    private final Double accumulatedBusy;
+
+    @JsonCreator
+    public IOMetricsInfo(
+            @JsonProperty(FIELD_NAME_BYTES_READ) long bytesRead,
+            @JsonProperty(FIELD_NAME_BYTES_READ_COMPLETE) boolean bytesReadComplete,
+            @JsonProperty(FIELD_NAME_BYTES_WRITTEN) long bytesWritten,
+            @JsonProperty(FIELD_NAME_BYTES_WRITTEN_COMPLETE) boolean bytesWrittenComplete,
+            @JsonProperty(FIELD_NAME_RECORDS_READ) long recordsRead,
+            @JsonProperty(FIELD_NAME_RECORDS_READ_COMPLETE) boolean recordsReadComplete,
+            @JsonProperty(FIELD_NAME_RECORDS_WRITTEN) long recordsWritten,
+            @JsonProperty(FIELD_NAME_RECORDS_WRITTEN_COMPLETE) boolean recordsWrittenComplete,
+            @JsonProperty(FIELD_NAME_ACC_BACK_PRESSURE) Long accumulatedBackpressured,
+            @JsonProperty(FIELD_NAME_ACC_IDLE) Long accumulatedIdle,
+            @JsonProperty(FIELD_NAME_ACC_BUSY) Double accumulatedBusy) {
+        this.bytesRead = bytesRead;
+        this.bytesReadComplete = bytesReadComplete;
+        this.bytesWritten = bytesWritten;
+        this.bytesWrittenComplete = bytesWrittenComplete;
+        this.recordsRead = recordsRead;
+        this.recordsReadComplete = recordsReadComplete;
+        this.recordsWritten = recordsWritten;
+        this.recordsWrittenComplete = recordsWrittenComplete;
+        this.accumulatedBackpressured = accumulatedBackpressured;
+        this.accumulatedIdle = accumulatedIdle;
+        this.accumulatedBusy = accumulatedBusy;
+    }
+
+    public long getBytesRead() {
+        return bytesRead;
+    }
+
+    public boolean isBytesReadComplete() {
+        return bytesReadComplete;
+    }
+
+    public long getBytesWritten() {
+        return bytesWritten;
+    }
+
+    public boolean isBytesWrittenComplete() {
+        return bytesWrittenComplete;
+    }
+
+    public long getRecordsRead() {
+        return recordsRead;
+    }
+
+    public boolean isRecordsReadComplete() {
+        return recordsReadComplete;
+    }
+
+    public long getRecordsWritten() {
+        return recordsWritten;
+    }
+
+    public boolean isRecordsWrittenComplete() {
+        return recordsWrittenComplete;
+    }
+
+    public long getAccumulatedBackpressured() {
+        return accumulatedBackpressured;
+    }
+
+    public double getAccumulatedBusy() {
+        return accumulatedBusy;
+    }
+
+    public long getAccumulatedIdle() {
+        return accumulatedIdle;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        IOMetricsInfo that = (IOMetricsInfo) o;
+        return bytesRead == that.bytesRead
+                && bytesReadComplete == that.bytesReadComplete
+                && bytesWritten == that.bytesWritten
+                && bytesWrittenComplete == that.bytesWrittenComplete
+                && recordsRead == that.recordsRead
+                && recordsReadComplete == that.recordsReadComplete
+                && recordsWritten == that.recordsWritten
+                && recordsWrittenComplete == that.recordsWrittenComplete
+                && accumulatedBackpressured == that.accumulatedBackpressured
+                && accumulatedBusy == that.accumulatedBusy
+                && accumulatedIdle == that.accumulatedIdle;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                bytesRead,
+                bytesReadComplete,
+                bytesWritten,
+                bytesWrittenComplete,
+                recordsRead,
+                recordsReadComplete,
+                recordsWritten,
+                recordsWrittenComplete,
+                accumulatedBackpressured,
+                accumulatedBusy,
+                accumulatedIdle);
+    }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
new file mode 100644
index 00000000..11fc16a9
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.TestingFlinkService;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
+import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/** Test for scaling metrics collection logic. */
+@EnableKubernetesMockClient(crud = true)
+public class MetricsCollectionAndEvaluationTest {
+
+    private ScalingMetricEvaluator evaluator;
+    private TestingFlinkService service;
+    private TestingMetricsCollector metricsCollector;
+    private ScalingExecutor scalingExecutor;
+
+    private FlinkDeployment app;
+    private Configuration conf;
+    private JobVertexID source1, source2, map, sink;
+    private JobTopology topology;
+
+    private KubernetesClient kubernetesClient;
+
+    @BeforeEach
+    public void setup() {
+        evaluator = new ScalingMetricEvaluator();
+        scalingExecutor = new ScalingExecutor(kubernetesClient);
+        service = new TestingFlinkService();
+
+        app = TestUtils.buildApplicationCluster();
+        app.getMetadata().setGeneration(1L);
+        app.getStatus().getJobStatus().setJobId(new JobID().toHexString());
+        kubernetesClient.resource(app).createOrReplace();
+
+        source1 = new JobVertexID();
+        source2 = new JobVertexID();
+        map = new JobVertexID();
+        sink = new JobVertexID();
+
+        topology =
+                new JobTopology(
+                        new VertexInfo(source1, Set.of(), 2, 720),
+                        new VertexInfo(source2, Set.of(), 2, 720),
+                        new VertexInfo(map, Set.of(source1, source2), 12, 720),
+                        new VertexInfo(sink, Set.of(map), 8, 24));
+
+        metricsCollector = new TestingMetricsCollector(topology);
+
+        var confManager = new FlinkConfigManager(new Configuration());
+        conf = confManager.getDeployConfig(app.getMetadata(), app.getSpec());
+        conf.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ZERO);
+        conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO);
+        conf.set(AutoScalerOptions.SCALING_ENABLED, true);
+        conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
+        ReconciliationUtils.updateStatusForDeployedSpec(app, conf);
+        app.getStatus().getJobStatus().setStartTime(String.valueOf(System.currentTimeMillis()));
+    }
+
+    @Test
+    public void testEndToEnd() throws Exception {
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.);
+
+        var scalingInfo = new AutoScalerInfo(new HashMap<>());
+
+        setDefaultMetrics();
+        metricsCollector.getMetricsHistory(app, scalingInfo, service, conf);
+
+        // Test resetting the collector and make sure we can deserialize the scalingInfo correctly
+        metricsCollector = new TestingMetricsCollector(topology);
+        setDefaultMetrics();
+
+        var clock = Clock.fixed(Instant.now().plus(Duration.ofSeconds(3)), ZoneId.systemDefault());
+        metricsCollector.setClock(clock);
+        evaluator.setClock(clock);
+
+        var collectedMetrics = metricsCollector.getMetricsHistory(app, scalingInfo, service, conf);
+
+        var evaluation = evaluator.evaluate(conf, collectedMetrics);
+        scalingExecutor.scaleResource(app, scalingInfo, conf, evaluation);
+
+        var scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
+        assertEquals(2, scaledParallelism.get(source1));
+        assertEquals(2, scaledParallelism.get(source2));
+        assertEquals(6, scaledParallelism.get(map));
+        assertEquals(4, scaledParallelism.get(sink));
+
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.5);
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.);
+
+        evaluation = evaluator.evaluate(conf, collectedMetrics);
+        scalingExecutor.scaleResource(app, scalingInfo, conf, evaluation);
+
+        scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
+        assertEquals(4, scaledParallelism.get(source1));
+        assertEquals(4, scaledParallelism.get(source2));
+        assertEquals(12, scaledParallelism.get(map));
+        assertEquals(8, scaledParallelism.get(sink));
+
+        var resourceID = ResourceID.fromResource(app);
+        assertNotNull(metricsCollector.getHistories().get(resourceID));
+        assertNotNull(metricsCollector.getTopologies().get(resourceID));
+
+        metricsCollector.cleanup(app);
+        scalingExecutor.cleanup(app);
+        assertNull(metricsCollector.getHistories().get(resourceID));
+        assertNull(metricsCollector.getAvailableVertexMetricNames().get(resourceID));
+        assertNull(metricsCollector.getTopologies().get(resourceID));
+        assertNull(metricsCollector.getTopologies().get(resourceID));
+    }
+
+    private void setDefaultMetrics() {
+        metricsCollector.setCurrentMetrics(
+                Map.of(
+                        source1,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, 1000., Double.NaN),
+                                FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 500.),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 500.),
+                                FlinkMetric.PENDING_RECORDS,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 0.)),
+                        source2,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, 1000., Double.NaN),
+                                FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 500.),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 500.),
+                                FlinkMetric.PENDING_RECORDS,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 0.)),
+                        map,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, 500., Double.NaN),
+                                FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 2000.),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric(
+                                        "", Double.NaN, Double.NaN, Double.NaN, 1000.)),
+                        sink,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, Double.NaN, 500., Double.NaN),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric(
+                                        "", Double.NaN, Double.NaN, Double.NaN, 2000.))));
+    }
+
+    @Test
+    public void testKafkaPartitionMaxParallelism() throws Exception {
+        var scalingInfo = new AutoScalerInfo(new HashMap<>());
+
+        setDefaultMetrics();
+        metricsCollector.getMetricsHistory(app, scalingInfo, service, conf);
+
+        var clock = Clock.fixed(Instant.now().plus(Duration.ofSeconds(3)), ZoneId.systemDefault());
+        metricsCollector.setClock(clock);
+        evaluator.setClock(clock);
+
+        var collectedMetrics = metricsCollector.getMetricsHistory(app, scalingInfo, service, conf);
+
+        assertEquals(720, collectedMetrics.getJobTopology().getMaxParallelisms().get(source1));
+        assertEquals(720, collectedMetrics.getJobTopology().getMaxParallelisms().get(source2));
+
+        clock = Clock.fixed(Instant.now().plus(Duration.ofSeconds(3)), ZoneId.systemDefault());
+        metricsCollector.setClock(clock);
+        evaluator.setClock(clock);
+
+        metricsCollector.setMetricNames(
+                Map.of(
+                        source1,
+                        List.of(
+                                new AggregatedMetric(
+                                        "1.Source__Kafka_Source_(testTopic).KafkaSourceReader.topic.testTopic.partition.0.currentOffset"),
+                                new AggregatedMetric(
+                                        "1.Source__Kafka_Source_(testTopic).KafkaSourceReader.topic.testTopic.partition.1.currentOffset"),
+                                new AggregatedMetric(
+                                        "1.Source__Kafka_Source_(testTopic).KafkaSourceReader.topic.testTopic.partition.2.currentOffset"),
+                                new AggregatedMetric(
+                                        "1.Source__Kafka_Source_(testTopic).KafkaSourceReader.topic.testTopic.partition.3.currentOffset"))));
+
+        collectedMetrics = metricsCollector.getMetricsHistory(app, scalingInfo, service, conf);
+        assertEquals(4, collectedMetrics.getJobTopology().getMaxParallelisms().get(source1));
+        assertEquals(720, collectedMetrics.getJobTopology().getMaxParallelisms().get(source2));
+    }
+
+    @Test
+    public void testJobDetailsRestCompatibility() throws JsonProcessingException {
+        String flink15Response =
+                "{\"jid\":\"068d4a00e4592099e94bb7a45f5bbd95\",\"name\":\"State machine job\",\"isStoppable\":false,\"state\":\"RUNNING\",\"start-time\":1667487397898,\"end-time\":-1,\"duration\":82350,\"maxParallelism\":-1,\"now\":1667487480248,\"timestamps\":{\"RUNNING\":1667487398514,\"FAILING\":0,\"CANCELLING\":0,\"FINISHED\":0,\"FAILED\":0,\"RESTARTING\":0,\"SUSPENDED\":0,\"INITIALIZING\":1667487397898,\"CANCELED\":0,\"RECONCILING\":0,\"CREATED\":1667487398210},\"vertices\":[{\"id\" [...]
+        String flink16Response =
+                "{\"jid\":\"2667c218edfecda90ba9b4b23e8e14e1\",\"name\":\"State machine job\",\"isStoppable\":false,\"state\":\"RUNNING\",\"start-time\":1667487688693,\"end-time\":-1,\"duration\":36646,\"maxParallelism\":-1,\"now\":1667487725339,\"timestamps\":{\"RESTARTING\":0,\"RECONCILING\":0,\"INITIALIZING\":1667487688693,\"FAILED\":0,\"CANCELED\":0,\"SUSPENDED\":0,\"RUNNING\":1667487689116,\"FAILING\":0,\"FINISHED\":0,\"CREATED\":1667487688912,\"CANCELLING\":0},\"vertices\":[{\"id\" [...]
+
+        var flinkObjectMapper = new ObjectMapper();
+        flinkObjectMapper.readValue(flink15Response, JobDetailsInfo.class);
+        flinkObjectMapper.readValue(flink16Response, JobDetailsInfo.class);
+    }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java
new file mode 100644
index 00000000..c487e116
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+
+import lombok.Setter;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Testing {@link ScalingMetricCollector} implementation. */
+public class TestingMetricsCollector extends ScalingMetricCollector {
+
+    @Setter private JobTopology jobTopology;
+
+    @Setter
+    private Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> currentMetrics = new HashMap<>();
+
+    @Setter private Map<JobVertexID, Collection<AggregatedMetric>> metricNames = new HashMap<>();
+
+    public TestingMetricsCollector(JobTopology jobTopology) {
+        this.jobTopology = jobTopology;
+    }
+
+    @Override
+    protected JobTopology queryJobTopology(RestClusterClient<String> restClient, JobID jobId) {
+        return jobTopology;
+    }
+
+    @Override
+    protected Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> queryAllAggregatedMetrics(
+            AbstractFlinkResource<?, ?> cr,
+            FlinkService flinkService,
+            Configuration conf,
+            Map<JobVertexID, Map<String, FlinkMetric>> filteredVertexMetricNames) {
+        return currentMetrics;
+    }
+
+    @Override
+    protected Map<JobVertexID, Map<String, FlinkMetric>> queryFilteredMetricNames(
+            FlinkService flinkService,
+            AbstractFlinkResource<?, ?> cr,
+            Configuration conf,
+            JobTopology topology) {
+        return Collections.emptyMap();
+    }
+
+    @Override
+    protected Collection<AggregatedMetric> queryAggregatedMetricNames(
+            RestClusterClient<?> restClient, JobID jobID, JobVertexID jobVertexID) {
+        return metricNames.getOrDefault(jobVertexID, Collections.emptyList());
+    }
+}


[flink-kubernetes-operator] 07/09: [FLINK-30260][autoscaler] Integrate autoscaler components with reconciler mechanism

Posted by gy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 4408c6658cf1b3dc21c7b3362720eb5879760696
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Thu Dec 1 08:38:14 2022 +0100

    [FLINK-30260][autoscaler] Integrate autoscaler components with reconciler mechanism
---
 docs/content/docs/custom-resource/reference.md        |  1 +
 .../kubernetes/operator/api/spec/FlinkVersion.java    |  3 ++-
 .../flink/kubernetes/operator/FlinkOperator.java      | 14 ++++++++++++--
 .../deployment/AbstractFlinkResourceReconciler.java   | 13 +++++++++++--
 .../reconciler/deployment/AbstractJobReconciler.java  |  6 ++++--
 .../reconciler/deployment/ApplicationReconciler.java  |  6 ++++--
 .../reconciler/deployment/ReconcilerFactory.java      | 12 +++++++++---
 .../reconciler/deployment/SessionReconciler.java      |  6 ++++--
 .../reconciler/sessionjob/SessionJobReconciler.java   |  6 ++++--
 .../operator/service/AbstractFlinkService.java        |  4 ++--
 .../kubernetes/operator/service/FlinkService.java     |  3 +++
 .../operator/utils/KubernetesClientUtils.java         | 19 +++++++++++++++++++
 .../kubernetes/operator/TestingFlinkService.java      |  2 +-
 .../controller/TestingFlinkDeploymentController.java  |  5 ++++-
 .../controller/TestingFlinkSessionJobController.java  |  5 ++++-
 .../sessionjob/FlinkSessionJobObserverTest.java       |  3 ++-
 .../deployment/ApplicationReconcilerTest.java         |  3 ++-
 .../ApplicationReconcilerUpgradeModeTest.java         |  3 ++-
 .../reconciler/deployment/SessionReconcilerTest.java  |  9 ++++++---
 .../sessionjob/SessionJobReconcilerTest.java          | 12 +++++++++---
 .../operator/service/NativeFlinkServiceTest.java      |  2 +-
 .../crds/flinkdeployments.flink.apache.org-v1.yml     |  1 +
 pom.xml                                               |  1 +
 23 files changed, 108 insertions(+), 31 deletions(-)

diff --git a/docs/content/docs/custom-resource/reference.md b/docs/content/docs/custom-resource/reference.md
index 8977e5e6..eaf4076c 100644
--- a/docs/content/docs/custom-resource/reference.md
+++ b/docs/content/docs/custom-resource/reference.md
@@ -82,6 +82,7 @@ This page serves as a full reference for FlinkDeployment custom resource definit
 | v1_14 |  |
 | v1_15 |  |
 | v1_16 |  |
+| v1_17 |  |
 
 ### IngressSpec
 **Class**: org.apache.flink.kubernetes.operator.api.spec.IngressSpec
diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java
index 8b388361..951e4514 100644
--- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java
+++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java
@@ -26,7 +26,8 @@ public enum FlinkVersion {
     v1_13,
     v1_14,
     v1_15,
-    v1_16;
+    v1_16,
+    v1_17;
 
     public boolean isNewerVersionThan(FlinkVersion otherVersion) {
         return this.ordinal() > otherVersion.ordinal();
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index c6362cfb..adaa1562 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -143,7 +143,12 @@ public class FlinkOperator {
         var eventRecorder = EventRecorder.create(client, listeners);
         var reconcilerFactory =
                 new ReconcilerFactory(
-                        client, flinkServiceFactory, configManager, eventRecorder, statusRecorder);
+                        client,
+                        flinkServiceFactory,
+                        configManager,
+                        eventRecorder,
+                        statusRecorder,
+                        metricGroup);
         var observerFactory =
                 new FlinkDeploymentObserverFactory(
                         flinkServiceFactory, configManager, statusRecorder, eventRecorder);
@@ -167,7 +172,12 @@ public class FlinkOperator {
         var statusRecorder = StatusRecorder.create(client, metricManager, listeners);
         var reconciler =
                 new SessionJobReconciler(
-                        client, flinkServiceFactory, configManager, eventRecorder, statusRecorder);
+                        client,
+                        flinkServiceFactory,
+                        configManager,
+                        eventRecorder,
+                        statusRecorder,
+                        metricGroup);
         var observer =
                 new FlinkSessionJobObserver(flinkServiceFactory, configManager, eventRecorder);
         var controller =
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index ea6a3048..94d154a1 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -30,8 +30,10 @@ import org.apache.flink.kubernetes.operator.api.spec.JobState;
 import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
 import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.autoscaler.JobAutoScaler;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
 import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.reconciler.diff.ReflectiveDiffBuilder;
@@ -72,6 +74,7 @@ public abstract class AbstractFlinkResourceReconciler<
     protected final EventRecorder eventRecorder;
     protected final StatusRecorder<CR, STATUS> statusRecorder;
     protected final KubernetesClient kubernetesClient;
+    protected final JobAutoScaler resourceScaler;
 
     public static final String MSG_SUSPENDED = "Suspending existing deployment.";
     public static final String MSG_SPEC_CHANGED =
@@ -85,11 +88,14 @@ public abstract class AbstractFlinkResourceReconciler<
             KubernetesClient kubernetesClient,
             FlinkConfigManager configManager,
             EventRecorder eventRecorder,
-            StatusRecorder<CR, STATUS> statusRecorder) {
+            StatusRecorder<CR, STATUS> statusRecorder,
+            KubernetesOperatorMetricGroup operatorMetricGroup) {
         this.kubernetesClient = kubernetesClient;
         this.configManager = configManager;
         this.eventRecorder = eventRecorder;
         this.statusRecorder = statusRecorder;
+        this.resourceScaler =
+                JobAutoScaler.create(kubernetesClient, configManager, operatorMetricGroup);
     }
 
     @Override
@@ -176,7 +182,9 @@ public abstract class AbstractFlinkResourceReconciler<
                     MSG_ROLLBACK);
             rollback(cr, ctx, observeConfig);
         } else if (!reconcileOtherChanges(cr, ctx, observeConfig)) {
-            LOG.info("Resource fully reconciled, nothing to do...");
+            if (!resourceScaler.scale(cr, flinkService, observeConfig, ctx)) {
+                LOG.info("Resource fully reconciled, nothing to do...");
+            }
         }
     }
 
@@ -253,6 +261,7 @@ public abstract class AbstractFlinkResourceReconciler<
 
     @Override
     public final DeleteControl cleanup(CR resource, Context<?> context) {
+        resourceScaler.cleanup(resource);
         return cleanupInternal(resource, context);
     }
 
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index 3b777e4e..33b28413 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -28,6 +28,7 @@ import org.apache.flink.kubernetes.operator.api.status.JobStatus;
 import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
@@ -58,8 +59,9 @@ public abstract class AbstractJobReconciler<
             KubernetesClient kubernetesClient,
             FlinkConfigManager configManager,
             EventRecorder eventRecorder,
-            StatusRecorder<CR, STATUS> statusRecorder) {
-        super(kubernetesClient, configManager, eventRecorder, statusRecorder);
+            StatusRecorder<CR, STATUS> statusRecorder,
+            KubernetesOperatorMetricGroup operatorMetricGroup) {
+        super(kubernetesClient, configManager, eventRecorder, statusRecorder, operatorMetricGroup);
     }
 
     @Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index ee40da92..54463dc6 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -31,6 +31,7 @@ import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
 import org.apache.flink.kubernetes.operator.exception.RecoveryFailureException;
 import org.apache.flink.kubernetes.operator.health.ClusterHealthInfo;
+import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
 import org.apache.flink.kubernetes.operator.observer.ClusterHealthEvaluator;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
@@ -69,8 +70,9 @@ public class ApplicationReconciler
             FlinkService flinkService,
             FlinkConfigManager configManager,
             EventRecorder eventRecorder,
-            StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder) {
-        super(kubernetesClient, configManager, eventRecorder, statusRecorder);
+            StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder,
+            KubernetesOperatorMetricGroup operatorMetricGroup) {
+        super(kubernetesClient, configManager, eventRecorder, statusRecorder, operatorMetricGroup);
         this.flinkService = flinkService;
     }
 
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
index bcca7fd1..d5a852b5 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
@@ -23,6 +23,7 @@ import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
 import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.config.Mode;
+import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
 import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
 import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
@@ -41,6 +42,7 @@ public class ReconcilerFactory {
     private final FlinkConfigManager configManager;
     private final EventRecorder eventRecorder;
     private final StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> deploymentStatusRecorder;
+    private final KubernetesOperatorMetricGroup operatorMetricGroup;
     private final Map<Tuple2<Mode, KubernetesDeploymentMode>, Reconciler<FlinkDeployment>>
             reconcilerMap;
 
@@ -49,12 +51,14 @@ public class ReconcilerFactory {
             FlinkServiceFactory flinkServiceFactory,
             FlinkConfigManager configManager,
             EventRecorder eventRecorder,
-            StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> deploymentStatusRecorder) {
+            StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> deploymentStatusRecorder,
+            KubernetesOperatorMetricGroup operatorMetricGroup) {
         this.kubernetesClient = kubernetesClient;
         this.flinkServiceFactory = flinkServiceFactory;
         this.configManager = configManager;
         this.eventRecorder = eventRecorder;
         this.deploymentStatusRecorder = deploymentStatusRecorder;
+        this.operatorMetricGroup = operatorMetricGroup;
         this.reconcilerMap = new ConcurrentHashMap<>();
     }
 
@@ -71,14 +75,16 @@ public class ReconcilerFactory {
                                     flinkServiceFactory.getOrCreate(flinkApp),
                                     configManager,
                                     eventRecorder,
-                                    deploymentStatusRecorder);
+                                    deploymentStatusRecorder,
+                                    operatorMetricGroup);
                         case APPLICATION:
                             return new ApplicationReconciler(
                                     kubernetesClient,
                                     flinkServiceFactory.getOrCreate(flinkApp),
                                     configManager,
                                     eventRecorder,
-                                    deploymentStatusRecorder);
+                                    deploymentStatusRecorder,
+                                    operatorMetricGroup);
                         default:
                             throw new UnsupportedOperationException(
                                     String.format("Unsupported running mode: %s", modes.f0));
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
index 20f945e6..24e7ecea 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
@@ -27,6 +27,7 @@ import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatu
 import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.api.status.ReconciliationStatus;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
@@ -61,8 +62,9 @@ public class SessionReconciler
             FlinkService flinkService,
             FlinkConfigManager configManager,
             EventRecorder eventRecorder,
-            StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder) {
-        super(kubernetesClient, configManager, eventRecorder, statusRecorder);
+            StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder,
+            KubernetesOperatorMetricGroup operatorMetricGroup) {
+        super(kubernetesClient, configManager, eventRecorder, statusRecorder, operatorMetricGroup);
         this.flinkService = flinkService;
     }
 
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
index ad5c5735..1c1de979 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
@@ -25,6 +25,7 @@ import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
 import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
 import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory;
@@ -52,8 +53,9 @@ public class SessionJobReconciler
             FlinkServiceFactory flinkServiceFactory,
             FlinkConfigManager configManager,
             EventRecorder eventRecorder,
-            StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus> statusRecorder) {
-        super(kubernetesClient, configManager, eventRecorder, statusRecorder);
+            StatusRecorder<FlinkSessionJob, FlinkSessionJobStatus> statusRecorder,
+            KubernetesOperatorMetricGroup operatorMetricGroup) {
+        super(kubernetesClient, configManager, eventRecorder, statusRecorder, operatorMetricGroup);
         this.flinkServiceFactory = flinkServiceFactory;
     }
 
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index fdf6991e..933ab745 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -647,8 +647,8 @@ public abstract class AbstractFlinkService implements FlinkService {
                         .toSeconds());
     }
 
-    @VisibleForTesting
-    protected ClusterClient<String> getClusterClient(Configuration conf) throws Exception {
+    @Override
+    public ClusterClient<String> getClusterClient(Configuration conf) throws Exception {
         final String clusterId = conf.get(KubernetesConfigOptions.CLUSTER_ID);
         final String namespace = conf.get(KubernetesConfigOptions.NAMESPACE);
         final int port = conf.getInteger(RestOptions.PORT);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index ae3fb25b..6c75d127 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -18,6 +18,7 @@
 package org.apache.flink.kubernetes.operator.service;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
@@ -102,4 +103,6 @@ public interface FlinkService {
 
     Map<String, String> getMetrics(Configuration conf, String jobId, List<String> metricNames)
             throws Exception;
+
+    ClusterClient<String> getClusterClient(Configuration conf) throws Exception;
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesClientUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesClientUtils.java
index 119a3db8..46002af5 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesClientUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesClientUtils.java
@@ -19,6 +19,7 @@
 package org.apache.flink.kubernetes.operator.utils;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.metrics.KubernetesClientMetrics;
 import org.apache.flink.metrics.MetricGroup;
@@ -28,10 +29,14 @@ import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.KubernetesClientBuilder;
 import io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory;
 import okhttp3.OkHttpClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** Kubernetes client utils. */
 public class KubernetesClientUtils {
 
+    private static final Logger LOG = LoggerFactory.getLogger(KubernetesClientUtils.class);
+
     public static KubernetesClient getKubernetesClient(
             FlinkOperatorConfiguration operatorConfig, MetricGroup metricGroup) {
         return getKubernetesClient(operatorConfig, metricGroup, null);
@@ -63,4 +68,18 @@ public class KubernetesClientUtils {
 
         return clientBuilder.build();
     }
+
+    public static void replaceSpecAfterScaling(
+            KubernetesClient kubernetesClient, AbstractFlinkResource<?, ?> cr) {
+        var inKube = kubernetesClient.resource(cr).get();
+
+        if (cr.getMetadata().getGeneration() == inKube.getMetadata().getGeneration()) {
+            kubernetesClient
+                    .resource(cr)
+                    .lockResourceVersion(inKube.getMetadata().getResourceVersion())
+                    .replace();
+        } else {
+            LOG.info("Spec already upgrading in kube, skipping scale operation.");
+        }
+    }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 9f654409..22efa1f0 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -286,7 +286,7 @@ public class TestingFlinkService extends AbstractFlinkService {
     }
 
     @Override
-    protected ClusterClient<String> getClusterClient(Configuration config) throws Exception {
+    public ClusterClient<String> getClusterClient(Configuration config) throws Exception {
         TestingClusterClient<String> clusterClient = new TestingClusterClient<>(config);
         FlinkVersion flinkVersion = config.get(FlinkConfigBuilder.FLINK_VERSION);
         clusterClient.setListJobsFunction(
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
index 19a924db..1a7151cc 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.kubernetes.operator.controller;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.TestingFlinkServiceFactory;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
@@ -82,7 +84,8 @@ public class TestingFlinkDeploymentController
                                 flinkServiceFactory,
                                 configManager,
                                 eventRecorder,
-                                statusRecorder),
+                                statusRecorder,
+                                TestUtils.createTestMetricGroup(new Configuration())),
                         new FlinkDeploymentObserverFactory(
                                 flinkServiceFactory, configManager, statusRecorder, eventRecorder),
                         statusRecorder,
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java
index 116e147f..e80374ee 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.kubernetes.operator.controller;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.TestingFlinkServiceFactory;
 import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
@@ -82,7 +84,8 @@ public class TestingFlinkSessionJobController
                                 flinkServiceFactory,
                                 configManager,
                                 eventRecorder,
-                                statusRecorder),
+                                statusRecorder,
+                                TestUtils.createTestMetricGroup(new Configuration())),
                         new FlinkSessionJobObserver(
                                 flinkServiceFactory, configManager, eventRecorder),
                         statusRecorder,
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java
index 9c426861..d9c057d7 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java
@@ -79,7 +79,8 @@ public class FlinkSessionJobObserverTest {
                         flinkServiceFactory,
                         configManager,
                         eventRecorder,
-                        statusRecorder);
+                        statusRecorder,
+                        TestUtils.createTestMetricGroup(new Configuration()));
     }
 
     @Test
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 6c2f6ee2..a4f4acbc 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -112,7 +112,8 @@ public class ApplicationReconcilerTest {
                         flinkService,
                         configManager,
                         eventRecorder,
-                        statusRecorder);
+                        statusRecorder,
+                        TestUtils.createTestMetricGroup(new Configuration()));
     }
 
     @ParameterizedTest
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
index 2445741f..0e8549a0 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
@@ -80,7 +80,8 @@ public class ApplicationReconcilerUpgradeModeTest {
                         flinkService,
                         configManager,
                         eventRecorder,
-                        statusRecoder);
+                        statusRecoder,
+                        TestUtils.createTestMetricGroup(new Configuration()));
     }
 
     @ParameterizedTest
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
index f5ee07b7..1b0b6b07 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
@@ -80,7 +80,8 @@ public class SessionReconcilerTest {
                         flinkService,
                         configManager,
                         eventRecorder,
-                        new TestingStatusRecorder<>());
+                        new TestingStatusRecorder<>(),
+                        TestUtils.createTestMetricGroup(new Configuration()));
         FlinkDeployment deployment = TestUtils.buildSessionCluster();
         kubernetesClient.resource(deployment).createOrReplace();
         reconciler.reconcile(deployment, flinkService.getContext());
@@ -96,7 +97,8 @@ public class SessionReconcilerTest {
                         flinkService,
                         configManager,
                         eventRecorder,
-                        new TestingStatusRecorder<>());
+                        new TestingStatusRecorder<>(),
+                        TestUtils.createTestMetricGroup(new Configuration()));
 
         FlinkDeployment deployment = TestUtils.buildSessionCluster();
         kubernetesClient.resource(deployment).createOrReplace();
@@ -145,7 +147,8 @@ public class SessionReconcilerTest {
                         flinkService,
                         configManager,
                         eventRecorder,
-                        new TestingStatusRecorder<>());
+                        new TestingStatusRecorder<>(),
+                        TestUtils.createTestMetricGroup(new Configuration()));
 
         FlinkDeployment flinkApp = TestUtils.buildApplicationCluster();
         ObjectMeta deployMeta = flinkApp.getMetadata();
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
index b9309c9f..f6d8e81f 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
@@ -107,14 +107,15 @@ public class SessionJobReconcilerTest {
                     }
                 };
         statusRecoder = new TestingStatusRecorder<>();
+        kubernetesClient.resource(TestUtils.buildSessionJob()).createOrReplace();
         reconciler =
                 new SessionJobReconciler(
                         kubernetesClient,
                         flinkServiceFactory,
                         configManager,
                         eventRecorder,
-                        statusRecoder);
-        kubernetesClient.resource(TestUtils.buildSessionJob()).createOrReplace();
+                        statusRecoder,
+                        TestUtils.createTestMetricGroup(new Configuration()));
     }
 
     @Test
@@ -610,7 +611,12 @@ public class SessionJobReconcilerTest {
         // Force upgrade when savepoint is in progress.
         reconciler =
                 new SessionJobReconciler(
-                        null, flinkServiceFactory, configManager, eventRecorder, statusRecoder);
+                        null,
+                        flinkServiceFactory,
+                        configManager,
+                        eventRecorder,
+                        statusRecoder,
+                        TestUtils.createTestMetricGroup(new Configuration()));
         spSessionJob.getSpec().getJob().setParallelism(100);
         reconciler.reconcile(spSessionJob, readyContext);
         assertEquals(
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
index 234a3e33..a0ba485c 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
@@ -502,7 +502,7 @@ public class NativeFlinkServiceTest {
     private FlinkService createFlinkService(ClusterClient<String> clusterClient) {
         return new NativeFlinkService(client, new FlinkConfigManager(configuration)) {
             @Override
-            protected ClusterClient<String> getClusterClient(Configuration config) {
+            public ClusterClient<String> getClusterClient(Configuration config) {
                 return clusterClient;
             }
         };
diff --git a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
index ef023346..5212526f 100644
--- a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
+++ b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
@@ -40,6 +40,7 @@ spec:
                 - v1_14
                 - v1_15
                 - v1_16
+                - v1_17
                 type: string
               ingress:
                 properties:
diff --git a/pom.xml b/pom.xml
index d1de2e4d..f3576a7b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -58,6 +58,7 @@ under the License.
         <module>flink-kubernetes-docs</module>
         <module>examples/flink-sql-runner-example</module>
         <module>examples/kubernetes-client-examples</module>
+        <module>examples/autoscaling</module>
     </modules>
 
     <properties>