You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fa...@apache.org on 2023/07/20 11:19:59 UTC

[flink-kubernetes-operator] 01/01: [FLIP-334] Move non-kubernetes related autoscaler classes to flink-autoscaler module

This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch decoupling-autoscaler-k8s-v2
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 2fd674cb7c64ded9f43dd60319cc03b59350dc84
Author: 1996fanrui <19...@gmail.com>
AuthorDate: Thu Jul 20 18:58:53 2023 +0800

    [FLIP-334] Move non-kubernetes related autoscaler classes to flink-autoscaler module
---
 flink-autoscaler/pom.xml                           | 52 +++++++++++++++++++++
 .../flink}/autoscaler/AutoscalerFlinkMetrics.java  | 24 +++++++---
 .../flink}/autoscaler/ScalingMetricEvaluator.java  | 53 +++++++++++-----------
 .../apache/flink}/autoscaler/ScalingSummary.java   |  8 ++--
 .../autoscaler/config/AutoScalerOptions.java       | 10 ++--
 .../autoscaler/metrics/CollectedMetricHistory.java |  4 +-
 .../autoscaler/metrics/CollectedMetrics.java       |  2 +-
 .../org/apache/flink}/autoscaler/metrics/Edge.java |  2 +-
 .../autoscaler/metrics/EvaluatedScalingMetric.java |  2 +-
 .../flink}/autoscaler/metrics/FlinkMetric.java     |  2 +-
 .../autoscaler/metrics/MetricAggregator.java       |  2 +-
 .../flink}/autoscaler/metrics/ScalingMetric.java   |  2 +-
 .../flink}/autoscaler/metrics/ScalingMetrics.java  |  8 ++--
 .../flink}/autoscaler/topology/JobTopology.java    |  2 +-
 .../flink}/autoscaler/topology/VertexInfo.java     |  2 +-
 .../autoscaler/utils/AutoScalerSerDeModule.java    |  4 +-
 .../flink}/autoscaler/utils/AutoScalerUtils.java   | 17 ++++---
 .../autoscaler/ScalingMetricEvaluatorTest.java     | 44 +++++++++---------
 .../autoscaler/metrics/ScalingMetricsTest.java     |  8 ++--
 .../autoscaler/utils/AutoScalerUtilsTest.java      |  4 +-
 .../src/test/resources/log4j2-test.properties      | 26 +++++++++++
 flink-kubernetes-operator-autoscaler/pom.xml       |  6 +++
 .../operator/autoscaler/AutoScalerInfo.java        |  7 +--
 .../operator/autoscaler/JobAutoScalerImpl.java     | 18 ++++----
 .../autoscaler/JobAutoscalerFactoryImpl.java       |  1 +
 .../operator/autoscaler/JobVertexScaler.java       | 29 ++++++------
 .../autoscaler/RestApiMetricsCollector.java        |  2 +-
 .../operator/autoscaler/ScalingExecutor.java       | 19 ++++----
 .../autoscaler/ScalingMetricCollector.java         | 16 +++----
 .../operator/autoscaler/AutoScalerInfoTest.java    |  9 ++--
 .../autoscaler/BacklogBasedScalingTest.java        | 14 +++---
 .../operator/autoscaler/JobAutoScalerImplTest.java | 12 +++--
 .../operator/autoscaler/JobTopologyTest.java       |  2 +-
 .../operator/autoscaler/JobVertexScalerTest.java   |  8 ++--
 .../MetricsCollectionAndEvaluationTest.java        | 15 +++---
 .../autoscaler/RecommendedParallelismTest.java     | 15 +++---
 .../autoscaler/RestApiMetricsCollectorTest.java    |  2 +-
 .../operator/autoscaler/ScalingExecutorTest.java   |  8 ++--
 .../autoscaler/ScalingMetricCollectorTest.java     |  6 +--
 .../autoscaler/TestingMetricsCollector.java        |  4 +-
 pom.xml                                            |  1 +
 41 files changed, 293 insertions(+), 179 deletions(-)

diff --git a/flink-autoscaler/pom.xml b/flink-autoscaler/pom.xml
new file mode 100644
index 00000000..458ba878
--- /dev/null
+++ b/flink-autoscaler/pom.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-kubernetes-operator-parent</artifactId>
+        <version>1.6-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>flink-autoscaler</artifactId>
+    <name>Flink Autoscaler</name>
+    <packaging>jar</packaging>
+
+    <properties>
+        <jackson.version>2.15.0</jackson.version>
+    </properties>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <artifactId>jackson-dataformat-yaml</artifactId>
+            <groupId>com.fasterxml.jackson.dataformat</groupId>
+            <version>${jackson.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>${lombok.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-params</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/AutoscalerFlinkMetrics.java
similarity index 90%
rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java
rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/AutoscalerFlinkMetrics.java
index fab3cb32..1a0f0671 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/AutoscalerFlinkMetrics.java
@@ -15,10 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler;
+package org.apache.flink.autoscaler;
 
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -37,11 +37,11 @@ public class AutoscalerFlinkMetrics {
 
     private static final Logger LOG = LoggerFactory.getLogger(AutoscalerFlinkMetrics.class);
 
-    final Counter numScalings;
+    private final Counter numScalings;
 
-    final Counter numErrors;
+    private final Counter numErrors;
 
-    final Counter numBalanced;
+    private final Counter numBalanced;
 
     private final MetricGroup metricGroup;
 
@@ -54,6 +54,18 @@ public class AutoscalerFlinkMetrics {
         this.metricGroup = metricGroup;
     }
 
+    public Counter getNumScalings() {
+        return numScalings;
+    }
+
+    public Counter getNumErrors() {
+        return numErrors;
+    }
+
+    public Counter getNumBalanced() {
+        return numBalanced;
+    }
+
     public void registerScalingMetrics(
             Supplier<Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>>
                     currentVertexMetrics) {
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
similarity index 82%
rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
index af6d1732..7cb0bf1c 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
@@ -15,44 +15,45 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler;
+package org.apache.flink.autoscaler;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.CollectedMetricHistory;
+import org.apache.flink.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.autoscaler.metrics.Edge;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.utils.AutoScalerUtils;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetricHistory;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.Edge;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
-import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import org.apache.commons.math3.stat.StatUtils;
-import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+
 import java.time.Instant;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.SortedMap;
 
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LAG;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LOAD;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.LAG;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.LOAD;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
 
 /** Job scaling evaluator for autoscaler. */
 public class ScalingMetricEvaluator {
@@ -110,7 +111,7 @@ public class ScalingMetricEvaluator {
                         });
     }
 
-    @NotNull
+    @Nonnull
     private Map<ScalingMetric, EvaluatedScalingMetric> evaluateMetrics(
             Configuration conf,
             HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> scalingOutput,
@@ -154,7 +155,7 @@ public class ScalingMetricEvaluator {
     }
 
     @VisibleForTesting
-    protected static void computeProcessingRateThresholds(
+    public static void computeProcessingRateThresholds(
             Map<ScalingMetric, EvaluatedScalingMetric> metrics,
             Configuration conf,
             boolean processingBacklog) {
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingSummary.java
similarity index 85%
rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java
rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingSummary.java
index 4ff7a6e9..50a3d17b 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingSummary.java
@@ -15,10 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler;
+package org.apache.flink.autoscaler;
 
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import lombok.Data;
@@ -26,7 +26,7 @@ import lombok.NoArgsConstructor;
 
 import java.util.Map;
 
-/** Scaling summary returned by the {@link ScalingMetricEvaluator}. */
+/** Scaling summary returned by the {@link org.apache.flink.autoscaler.ScalingMetricEvaluator}. */
 @Data
 @NoArgsConstructor
 public class ScalingSummary {
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
similarity index 96%
rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
index b31a9bec..68f64467 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
@@ -15,22 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler.config;
+package org.apache.flink.autoscaler.config;
 
+import org.apache.flink.autoscaler.metrics.MetricAggregator;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.MetricAggregator;
 
 import java.time.Duration;
 import java.util.List;
 
-import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.operatorConfig;
-
 /** Config options related to the autoscaler module. */
 public class AutoScalerOptions {
 
+    public static final String K8S_OP_CONF_PREFIX = "kubernetes.operator.";
+
     private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
-        return operatorConfig("job.autoscaler." + key);
+        return ConfigOptions.key(K8S_OP_CONF_PREFIX + "job.autoscaler." + key);
     }
 
     public static final ConfigOption<Boolean> AUTOSCALER_ENABLED =
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetricHistory.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetricHistory.java
similarity index 88%
rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetricHistory.java
rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetricHistory.java
index 43bbf47b..31a19879 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetricHistory.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetricHistory.java
@@ -15,9 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+package org.apache.flink.autoscaler.metrics;
 
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.JobTopology;
 
 import lombok.Data;
 import lombok.Setter;
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetrics.java
similarity index 94%
rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java
rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetrics.java
index 97e3f9fe..bfd85ca2 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetrics.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+package org.apache.flink.autoscaler.metrics;
 
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/Edge.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/Edge.java
similarity index 94%
rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/Edge.java
rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/Edge.java
index c89692a9..1fe938b8 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/Edge.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/Edge.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+package org.apache.flink.autoscaler.metrics;
 
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/EvaluatedScalingMetric.java
similarity index 95%
rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java
rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/EvaluatedScalingMetric.java
index 23788886..c60adf18 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/EvaluatedScalingMetric.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+package org.apache.flink.autoscaler.metrics;
 
 import lombok.Data;
 import lombok.NoArgsConstructor;
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/FlinkMetric.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
similarity index 97%
rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/FlinkMetric.java
rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
index 81609775..7f28d947 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/FlinkMetric.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+package org.apache.flink.autoscaler.metrics;
 
 import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
 
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/MetricAggregator.java
similarity index 95%
rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java
rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/MetricAggregator.java
index c79d4b7d..e147d211 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/MetricAggregator.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+package org.apache.flink.autoscaler.metrics;
 
 import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
 
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
similarity index 97%
rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
index 608e4f1c..b6a3e17d 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+package org.apache.flink.autoscaler.metrics;
 
 /**
  * Supported scaling metrics. These represent high level metrics computed from Flink job metrics
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
similarity index 97%
rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
index 592d0602..afaa21ae 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
@@ -15,12 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+package org.apache.flink.autoscaler.metrics;
 
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.utils.AutoScalerUtils;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
-import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
 
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/JobTopology.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java
similarity index 99%
rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/JobTopology.java
rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java
index 3f160193..1f2f5a63 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/JobTopology.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler.topology;
+package org.apache.flink.autoscaler.topology;
 
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/VertexInfo.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java
similarity index 95%
rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/VertexInfo.java
rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java
index a261ca6d..249be598 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/VertexInfo.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler.topology;
+package org.apache.flink.autoscaler.topology;
 
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerSerDeModule.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerSerDeModule.java
similarity index 95%
rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerSerDeModule.java
rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerSerDeModule.java
index fad516a5..ce991403 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerSerDeModule.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerSerDeModule.java
@@ -15,9 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler.utils;
+package org.apache.flink.autoscaler.utils;
 
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.Edge;
+import org.apache.flink.autoscaler.metrics.Edge;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import com.fasterxml.jackson.core.JsonGenerator;
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtils.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java
similarity index 82%
rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtils.java
rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java
index 54b6bef3..176ad582 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtils.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java
@@ -15,12 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler.utils;
+package org.apache.flink.autoscaler.utils;
 
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import java.util.ArrayList;
@@ -30,9 +30,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
-
 /** AutoScaler utilities. */
 public class AutoScalerUtils {
 
@@ -45,7 +42,8 @@ public class AutoScalerUtils {
         // Target = Lag Catchup Rate + Restart Catchup Rate + Processing at utilization
         // Target = LAG/CATCH_UP + INPUT_RATE*RESTART/CATCH_UP + INPUT_RATE/TARGET_UTIL
 
-        double lagCatchupTargetRate = evaluatedMetrics.get(CATCH_UP_DATA_RATE).getCurrent();
+        double lagCatchupTargetRate =
+                evaluatedMetrics.get(ScalingMetric.CATCH_UP_DATA_RATE).getCurrent();
         if (Double.isNaN(lagCatchupTargetRate)) {
             return Double.NaN;
         }
@@ -56,7 +54,8 @@ public class AutoScalerUtils {
         targetUtilization = Math.max(0., targetUtilization);
         targetUtilization = Math.min(1., targetUtilization);
 
-        double avgInputTargetRate = evaluatedMetrics.get(TARGET_DATA_RATE).getAverage();
+        double avgInputTargetRate =
+                evaluatedMetrics.get(ScalingMetric.TARGET_DATA_RATE).getAverage();
         if (Double.isNaN(avgInputTargetRate)) {
             return Double.NaN;
         }
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
similarity index 86%
rename from flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
rename to flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
index c67b27fe..13b19445 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
@@ -15,18 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler;
+package org.apache.flink.autoscaler;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.CollectedMetricHistory;
+import org.apache.flink.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.autoscaler.metrics.Edge;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.VertexInfo;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetricHistory;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.Edge;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import org.junit.jupiter.api.Test;
@@ -39,19 +39,19 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.CATCH_UP_DURATION;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.RESTART_TIME;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LAG;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LOAD;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.CATCH_UP_DURATION;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.RESTART_TIME;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.LAG;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.LOAD;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
similarity index 98%
rename from flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
rename to flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
index 0ae3e2b7..85a72e5a 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
@@ -15,12 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+package org.apache.flink.autoscaler.metrics;
 
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.VertexInfo;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
 
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtilsTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/AutoScalerUtilsTest.java
similarity index 94%
rename from flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtilsTest.java
rename to flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/AutoScalerUtilsTest.java
index 90588994..a037258a 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtilsTest.java
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/AutoScalerUtilsTest.java
@@ -15,10 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler.utils;
+package org.apache.flink.autoscaler.utils;
 
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import org.junit.jupiter.api.Test;
diff --git a/flink-autoscaler/src/test/resources/log4j2-test.properties b/flink-autoscaler/src/test/resources/log4j2-test.properties
new file mode 100644
index 00000000..47b66644
--- /dev/null
+++ b/flink-autoscaler/src/test/resources/log4j2-test.properties
@@ -0,0 +1,26 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+rootLogger.level = INFO
+rootLogger.appenderRef.console.ref = ConsoleAppender
+
+# Log all infos to the console
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %style{%d}{yellow} %style{%-30c{1.}}{cyan} %highlight{[%-5level]%notEmpty{[%X{resource.namespace}/}%notEmpty{%X{resource.name}]} %msg%n%throwable}
diff --git a/flink-kubernetes-operator-autoscaler/pom.xml b/flink-kubernetes-operator-autoscaler/pom.xml
index 66307e8c..4b77ee05 100644
--- a/flink-kubernetes-operator-autoscaler/pom.xml
+++ b/flink-kubernetes-operator-autoscaler/pom.xml
@@ -36,6 +36,12 @@ under the License.
     </properties>
 
     <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-autoscaler</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>io.fabric8</groupId>
             <artifactId>kubernetes-client</artifactId>
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
index 55c0b0ac..c37b299f 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
@@ -18,11 +18,12 @@
 package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.autoscaler.utils.AutoScalerSerDeModule;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
-import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
-import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerSerDeModule;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.Preconditions;
 
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
index fbefd15a..12904af4 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
@@ -19,10 +19,12 @@ package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.AutoscalerFlinkMetrics;
+import org.apache.flink.autoscaler.ScalingMetricEvaluator;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
 import org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScaler;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
@@ -36,9 +38,9 @@ import org.slf4j.LoggerFactory;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM;
 
 /** Application and SessionJob autoscaler. */
 public class JobAutoScalerImpl implements JobAutoScaler {
@@ -158,16 +160,16 @@ public class JobAutoScalerImpl implements JobAutoScaler {
                     scalingExecutor.scaleResource(resource, autoScalerInfo, conf, evaluatedMetrics);
 
             if (specAdjusted) {
-                flinkMetrics.numScalings.inc();
+                flinkMetrics.getNumScalings().inc();
             } else {
-                flinkMetrics.numBalanced.inc();
+                flinkMetrics.getNumBalanced().inc();
             }
 
             autoScalerInfo.replaceInKubernetes(kubernetesClient);
             return specAdjusted;
         } catch (Throwable e) {
             LOG.error("Error while scaling resource", e);
-            flinkMetrics.numErrors.inc();
+            flinkMetrics.getNumErrors().inc();
             eventRecorder.triggerEvent(
                     resource,
                     EventRecorder.Type.Warning,
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoscalerFactoryImpl.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoscalerFactoryImpl.java
index 277796e5..5ebe9670 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoscalerFactoryImpl.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoscalerFactoryImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.kubernetes.operator.autoscaler;
 
+import org.apache.flink.autoscaler.ScalingMetricEvaluator;
 import org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScaler;
 import org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScalerFactory;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
index 497bb270..d5351fd3 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
@@ -18,12 +18,13 @@
 package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.utils.AutoScalerUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
-import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.Preconditions;
@@ -37,16 +38,16 @@ import java.time.ZoneId;
 import java.util.Map;
 import java.util.SortedMap;
 
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
 
 /** Component responsible for computing vertex parallelism based on the scaling metrics. */
 public class JobVertexScaler {
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java
index 158bb5a2..dca52a0c 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java
@@ -18,9 +18,9 @@
 package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.metrics.FlinkMetric;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
index 8ede4d99..f2ec20d6 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
@@ -18,11 +18,12 @@
 package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
-import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.Preconditions;
@@ -38,12 +39,12 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.SortedMap;
 
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
 
 /** Class responsible for executing scaling decisions. */
 public class ScalingExecutor {
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
index 3ad1db39..e620fb0e 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
@@ -19,17 +19,17 @@ package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.CollectedMetricHistory;
+import org.apache.flink.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetrics;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.utils.AutoScalerUtils;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
-import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetricHistory;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetrics;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
-import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
index cfc2e7f5..94ada9fb 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
@@ -17,11 +17,12 @@
 
 package org.apache.flink.kubernetes.operator.autoscaler;
 
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
index 2c76ae7d..c9e67b71 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
@@ -19,14 +19,16 @@ package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.AutoscalerFlinkMetrics;
+import org.apache.flink.autoscaler.ScalingMetricEvaluator;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.VertexInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.OperatorTestBase;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.utils.EventCollector;
@@ -441,7 +443,7 @@ public class BacklogBasedScalingTest extends OperatorTestBase {
         AutoscalerFlinkMetrics autoscalerFlinkMetrics =
                 autoscaler.flinkMetrics.get(
                         ResourceID.fromResource(getResourceContext(app, ctx).getResource()));
-        assertEquals(scalingCount, autoscalerFlinkMetrics.numScalings.getCount());
-        assertEquals(balancedCount, autoscalerFlinkMetrics.numBalanced.getCount());
+        assertEquals(scalingCount, autoscalerFlinkMetrics.getNumScalings().getCount());
+        assertEquals(balancedCount, autoscalerFlinkMetrics.getNumBalanced().getCount());
     }
 }
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java
index bb46f610..f1749ad9 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java
@@ -38,8 +38,8 @@ import org.junit.jupiter.api.Test;
 
 import java.util.Map;
 
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -76,12 +76,14 @@ public class JobAutoScalerImplTest extends OperatorTestBase {
         ResourceID resourceId = ResourceID.fromResource(app);
 
         autoscaler.scale(resourceContext);
-        Assertions.assertEquals(1, autoscaler.flinkMetrics.get(resourceId).numErrors.getCount());
+        Assertions.assertEquals(
+                1, autoscaler.flinkMetrics.get(resourceId).getNumErrors().getCount());
 
         autoscaler.scale(resourceContext);
-        Assertions.assertEquals(2, autoscaler.flinkMetrics.get(resourceId).numErrors.getCount());
+        Assertions.assertEquals(
+                2, autoscaler.flinkMetrics.get(resourceId).getNumErrors().getCount());
 
-        assertEquals(0, autoscaler.flinkMetrics.get(resourceId).numScalings.getCount());
+        assertEquals(0, autoscaler.flinkMetrics.get(resourceId).getNumScalings().getCount());
     }
 
     @Test
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java
index f76fadce..bd64be28 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.kubernetes.operator.autoscaler;
 
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.JobTopology;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
index fb4655df..82eb2e45 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
@@ -17,12 +17,14 @@
 
 package org.apache.flink.kubernetes.operator.autoscaler;
 
+import org.apache.flink.autoscaler.ScalingMetricEvaluator;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.kubernetes.operator.utils.EventCollector;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
index 0d973b00..f7dbb723 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -19,17 +19,18 @@ package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.ScalingMetricEvaluator;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.CollectedMetricHistory;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.VertexInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetricHistory;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.utils.EventCollector;
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RecommendedParallelismTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RecommendedParallelismTest.java
index 23ee4055..5246eefe 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RecommendedParallelismTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RecommendedParallelismTest.java
@@ -19,15 +19,16 @@ package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.ScalingMetricEvaluator;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.VertexInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.OperatorTestBase;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.utils.EventCollector;
@@ -52,9 +53,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM;
 import static org.apache.flink.kubernetes.operator.autoscaler.AutoscalerTestUtils.getOrCreateInfo;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollectorTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollectorTest.java
index 04d83107..9f3cb451 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollectorTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollectorTest.java
@@ -18,11 +18,11 @@
 package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.metrics.FlinkMetric;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
 
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
index 2191f723..a5ada0c4 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
@@ -18,13 +18,15 @@
 package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.ScalingMetricEvaluator;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.kubernetes.operator.utils.EventCollector;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollectorTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollectorTest.java
index ddf26453..3c11a7f6 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollectorTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollectorTest.java
@@ -19,13 +19,13 @@ package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.VertexInfo;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.messages.JobPlanInfo;
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java
index 9a3c1d13..277ee33b 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java
@@ -18,11 +18,11 @@
 package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.autoscaler.topology.JobTopology;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
diff --git a/pom.xml b/pom.xml
index 330d1d7e..4ba9a6de 100644
--- a/pom.xml
+++ b/pom.xml
@@ -57,6 +57,7 @@ under the License.
         <module>flink-kubernetes-operator-api</module>
         <module>flink-kubernetes-webhook</module>
         <module>flink-kubernetes-docs</module>
+        <module>flink-autoscaler</module>
         <module>examples/flink-sql-runner-example</module>
         <module>examples/flink-beam-example</module>
         <module>examples/kubernetes-client-examples</module>