You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fa...@apache.org on 2023/07/20 11:19:59 UTC
[flink-kubernetes-operator] 01/01: [FLIP-334] Move non-kubernetes related autoscaler classes to flink-autoscaler module
This is an automated email from the ASF dual-hosted git repository.
fanrui pushed a commit to branch decoupling-autoscaler-k8s-v2
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 2fd674cb7c64ded9f43dd60319cc03b59350dc84
Author: 1996fanrui <19...@gmail.com>
AuthorDate: Thu Jul 20 18:58:53 2023 +0800
[FLIP-334] Move non-kubernetes related autoscaler classes to flink-autoscaler module
---
flink-autoscaler/pom.xml | 52 +++++++++++++++++++++
.../flink}/autoscaler/AutoscalerFlinkMetrics.java | 24 +++++++---
.../flink}/autoscaler/ScalingMetricEvaluator.java | 53 +++++++++++-----------
.../apache/flink}/autoscaler/ScalingSummary.java | 8 ++--
.../autoscaler/config/AutoScalerOptions.java | 10 ++--
.../autoscaler/metrics/CollectedMetricHistory.java | 4 +-
.../autoscaler/metrics/CollectedMetrics.java | 2 +-
.../org/apache/flink}/autoscaler/metrics/Edge.java | 2 +-
.../autoscaler/metrics/EvaluatedScalingMetric.java | 2 +-
.../flink}/autoscaler/metrics/FlinkMetric.java | 2 +-
.../autoscaler/metrics/MetricAggregator.java | 2 +-
.../flink}/autoscaler/metrics/ScalingMetric.java | 2 +-
.../flink}/autoscaler/metrics/ScalingMetrics.java | 8 ++--
.../flink}/autoscaler/topology/JobTopology.java | 2 +-
.../flink}/autoscaler/topology/VertexInfo.java | 2 +-
.../autoscaler/utils/AutoScalerSerDeModule.java | 4 +-
.../flink}/autoscaler/utils/AutoScalerUtils.java | 17 ++++---
.../autoscaler/ScalingMetricEvaluatorTest.java | 44 +++++++++---------
.../autoscaler/metrics/ScalingMetricsTest.java | 8 ++--
.../autoscaler/utils/AutoScalerUtilsTest.java | 4 +-
.../src/test/resources/log4j2-test.properties | 26 +++++++++++
flink-kubernetes-operator-autoscaler/pom.xml | 6 +++
.../operator/autoscaler/AutoScalerInfo.java | 7 +--
.../operator/autoscaler/JobAutoScalerImpl.java | 18 ++++----
.../autoscaler/JobAutoscalerFactoryImpl.java | 1 +
.../operator/autoscaler/JobVertexScaler.java | 29 ++++++------
.../autoscaler/RestApiMetricsCollector.java | 2 +-
.../operator/autoscaler/ScalingExecutor.java | 19 ++++----
.../autoscaler/ScalingMetricCollector.java | 16 +++----
.../operator/autoscaler/AutoScalerInfoTest.java | 9 ++--
.../autoscaler/BacklogBasedScalingTest.java | 14 +++---
.../operator/autoscaler/JobAutoScalerImplTest.java | 12 +++--
.../operator/autoscaler/JobTopologyTest.java | 2 +-
.../operator/autoscaler/JobVertexScalerTest.java | 8 ++--
.../MetricsCollectionAndEvaluationTest.java | 15 +++---
.../autoscaler/RecommendedParallelismTest.java | 15 +++---
.../autoscaler/RestApiMetricsCollectorTest.java | 2 +-
.../operator/autoscaler/ScalingExecutorTest.java | 8 ++--
.../autoscaler/ScalingMetricCollectorTest.java | 6 +--
.../autoscaler/TestingMetricsCollector.java | 4 +-
pom.xml | 1 +
41 files changed, 293 insertions(+), 179 deletions(-)
diff --git a/flink-autoscaler/pom.xml b/flink-autoscaler/pom.xml
new file mode 100644
index 00000000..458ba878
--- /dev/null
+++ b/flink-autoscaler/pom.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-kubernetes-operator-parent</artifactId>
+ <version>1.6-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-autoscaler</artifactId>
+ <name>Flink Autoscaler</name>
+ <packaging>jar</packaging>
+
+ <properties>
+ <jackson.version>2.15.0</jackson.version>
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <artifactId>jackson-dataformat-yaml</artifactId>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <version>${jackson.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <version>${lombok.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-params</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/AutoscalerFlinkMetrics.java
similarity index 90%
rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java
rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/AutoscalerFlinkMetrics.java
index fab3cb32..1a0f0671 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/AutoscalerFlinkMetrics.java
@@ -15,10 +15,10 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.autoscaler;
+package org.apache.flink.autoscaler;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -37,11 +37,11 @@ public class AutoscalerFlinkMetrics {
private static final Logger LOG = LoggerFactory.getLogger(AutoscalerFlinkMetrics.class);
- final Counter numScalings;
+ private final Counter numScalings;
- final Counter numErrors;
+ private final Counter numErrors;
- final Counter numBalanced;
+ private final Counter numBalanced;
private final MetricGroup metricGroup;
@@ -54,6 +54,18 @@ public class AutoscalerFlinkMetrics {
this.metricGroup = metricGroup;
}
+ public Counter getNumScalings() {
+ return numScalings;
+ }
+
+ public Counter getNumErrors() {
+ return numErrors;
+ }
+
+ public Counter getNumBalanced() {
+ return numBalanced;
+ }
+
public void registerScalingMetrics(
Supplier<Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>>
currentVertexMetrics) {
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
similarity index 82%
rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
index af6d1732..7cb0bf1c 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
@@ -15,44 +15,45 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.autoscaler;
+package org.apache.flink.autoscaler;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.CollectedMetricHistory;
+import org.apache.flink.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.autoscaler.metrics.Edge;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.utils.AutoScalerUtils;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetricHistory;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.Edge;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
-import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.commons.math3.stat.StatUtils;
-import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnull;
+
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.SortedMap;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LAG;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LOAD;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.LAG;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.LOAD;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
/** Job scaling evaluator for autoscaler. */
public class ScalingMetricEvaluator {
@@ -110,7 +111,7 @@ public class ScalingMetricEvaluator {
});
}
- @NotNull
+ @Nonnull
private Map<ScalingMetric, EvaluatedScalingMetric> evaluateMetrics(
Configuration conf,
HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> scalingOutput,
@@ -154,7 +155,7 @@ public class ScalingMetricEvaluator {
}
@VisibleForTesting
- protected static void computeProcessingRateThresholds(
+ public static void computeProcessingRateThresholds(
Map<ScalingMetric, EvaluatedScalingMetric> metrics,
Configuration conf,
boolean processingBacklog) {
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingSummary.java
similarity index 85%
rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java
rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingSummary.java
index 4ff7a6e9..50a3d17b 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingSummary.java
@@ -15,10 +15,10 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.autoscaler;
+package org.apache.flink.autoscaler;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Data;
@@ -26,7 +26,7 @@ import lombok.NoArgsConstructor;
import java.util.Map;
-/** Scaling summary returned by the {@link ScalingMetricEvaluator}. */
+/** Scaling summary returned by the {@link org.apache.flink.autoscaler.ScalingMetricEvaluator}. */
@Data
@NoArgsConstructor
public class ScalingSummary {
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
similarity index 96%
rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
index b31a9bec..68f64467 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
@@ -15,22 +15,22 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.autoscaler.config;
+package org.apache.flink.autoscaler.config;
+import org.apache.flink.autoscaler.metrics.MetricAggregator;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.MetricAggregator;
import java.time.Duration;
import java.util.List;
-import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.operatorConfig;
-
/** Config options related to the autoscaler module. */
public class AutoScalerOptions {
+ public static final String K8S_OP_CONF_PREFIX = "kubernetes.operator.";
+
private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
- return operatorConfig("job.autoscaler." + key);
+ return ConfigOptions.key(K8S_OP_CONF_PREFIX + "job.autoscaler." + key);
}
public static final ConfigOption<Boolean> AUTOSCALER_ENABLED =
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetricHistory.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetricHistory.java
similarity index 88%
rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetricHistory.java
rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetricHistory.java
index 43bbf47b..31a19879 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetricHistory.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetricHistory.java
@@ -15,9 +15,9 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+package org.apache.flink.autoscaler.metrics;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.JobTopology;
import lombok.Data;
import lombok.Setter;
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetrics.java
similarity index 94%
rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java
rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetrics.java
index 97e3f9fe..bfd85ca2 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetrics.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+package org.apache.flink.autoscaler.metrics;
import org.apache.flink.runtime.jobgraph.JobVertexID;
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/Edge.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/Edge.java
similarity index 94%
rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/Edge.java
rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/Edge.java
index c89692a9..1fe938b8 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/Edge.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/Edge.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+package org.apache.flink.autoscaler.metrics;
import org.apache.flink.runtime.jobgraph.JobVertexID;
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/EvaluatedScalingMetric.java
similarity index 95%
rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java
rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/EvaluatedScalingMetric.java
index 23788886..c60adf18 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/EvaluatedScalingMetric.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+package org.apache.flink.autoscaler.metrics;
import lombok.Data;
import lombok.NoArgsConstructor;
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/FlinkMetric.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
similarity index 97%
rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/FlinkMetric.java
rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
index 81609775..7f28d947 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/FlinkMetric.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+package org.apache.flink.autoscaler.metrics;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/MetricAggregator.java
similarity index 95%
rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java
rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/MetricAggregator.java
index c79d4b7d..e147d211 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/MetricAggregator.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+package org.apache.flink.autoscaler.metrics;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
similarity index 97%
rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
index 608e4f1c..b6a3e17d 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+package org.apache.flink.autoscaler.metrics;
/**
* Supported scaling metrics. These represent high level metrics computed from Flink job metrics
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
similarity index 97%
rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
index 592d0602..afaa21ae 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
@@ -15,12 +15,12 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+package org.apache.flink.autoscaler.metrics;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.utils.AutoScalerUtils;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
-import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/JobTopology.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java
similarity index 99%
rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/JobTopology.java
rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java
index 3f160193..1f2f5a63 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/JobTopology.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.autoscaler.topology;
+package org.apache.flink.autoscaler.topology;
import org.apache.flink.runtime.jobgraph.JobVertexID;
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/VertexInfo.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java
similarity index 95%
rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/VertexInfo.java
rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java
index a261ca6d..249be598 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/VertexInfo.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.autoscaler.topology;
+package org.apache.flink.autoscaler.topology;
import org.apache.flink.runtime.jobgraph.JobVertexID;
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerSerDeModule.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerSerDeModule.java
similarity index 95%
rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerSerDeModule.java
rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerSerDeModule.java
index fad516a5..ce991403 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerSerDeModule.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerSerDeModule.java
@@ -15,9 +15,9 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.autoscaler.utils;
+package org.apache.flink.autoscaler.utils;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.Edge;
+import org.apache.flink.autoscaler.metrics.Edge;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import com.fasterxml.jackson.core.JsonGenerator;
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtils.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java
similarity index 82%
rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtils.java
rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java
index 54b6bef3..176ad582 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtils.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java
@@ -15,12 +15,12 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.autoscaler.utils;
+package org.apache.flink.autoscaler.utils;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import java.util.ArrayList;
@@ -30,9 +30,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
-
/** AutoScaler utilities. */
public class AutoScalerUtils {
@@ -45,7 +42,8 @@ public class AutoScalerUtils {
// Target = Lag Catchup Rate + Restart Catchup Rate + Processing at utilization
// Target = LAG/CATCH_UP + INPUT_RATE*RESTART/CATCH_UP + INPUT_RATE/TARGET_UTIL
- double lagCatchupTargetRate = evaluatedMetrics.get(CATCH_UP_DATA_RATE).getCurrent();
+ double lagCatchupTargetRate =
+ evaluatedMetrics.get(ScalingMetric.CATCH_UP_DATA_RATE).getCurrent();
if (Double.isNaN(lagCatchupTargetRate)) {
return Double.NaN;
}
@@ -56,7 +54,8 @@ public class AutoScalerUtils {
targetUtilization = Math.max(0., targetUtilization);
targetUtilization = Math.min(1., targetUtilization);
- double avgInputTargetRate = evaluatedMetrics.get(TARGET_DATA_RATE).getAverage();
+ double avgInputTargetRate =
+ evaluatedMetrics.get(ScalingMetric.TARGET_DATA_RATE).getAverage();
if (Double.isNaN(avgInputTargetRate)) {
return Double.NaN;
}
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
similarity index 86%
rename from flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
rename to flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
index c67b27fe..13b19445 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
@@ -15,18 +15,18 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.autoscaler;
+package org.apache.flink.autoscaler;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.CollectedMetricHistory;
+import org.apache.flink.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.autoscaler.metrics.Edge;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.VertexInfo;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetricHistory;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.Edge;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.junit.jupiter.api.Test;
@@ -39,19 +39,19 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.CATCH_UP_DURATION;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.RESTART_TIME;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LAG;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LOAD;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.CATCH_UP_DURATION;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.RESTART_TIME;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.LAG;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.LOAD;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
similarity index 98%
rename from flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
rename to flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
index 0ae3e2b7..85a72e5a 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
@@ -15,12 +15,12 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+package org.apache.flink.autoscaler.metrics;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.VertexInfo;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtilsTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/AutoScalerUtilsTest.java
similarity index 94%
rename from flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtilsTest.java
rename to flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/AutoScalerUtilsTest.java
index 90588994..a037258a 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtilsTest.java
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/AutoScalerUtilsTest.java
@@ -15,10 +15,10 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.autoscaler.utils;
+package org.apache.flink.autoscaler.utils;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.junit.jupiter.api.Test;
diff --git a/flink-autoscaler/src/test/resources/log4j2-test.properties b/flink-autoscaler/src/test/resources/log4j2-test.properties
new file mode 100644
index 00000000..47b66644
--- /dev/null
+++ b/flink-autoscaler/src/test/resources/log4j2-test.properties
@@ -0,0 +1,26 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+rootLogger.level = INFO
+rootLogger.appenderRef.console.ref = ConsoleAppender
+
+# Log all infos to the console
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %style{%d}{yellow} %style{%-30c{1.}}{cyan} %highlight{[%-5level]%notEmpty{[%X{resource.namespace}/}%notEmpty{%X{resource.name}]} %msg%n%throwable}
diff --git a/flink-kubernetes-operator-autoscaler/pom.xml b/flink-kubernetes-operator-autoscaler/pom.xml
index 66307e8c..4b77ee05 100644
--- a/flink-kubernetes-operator-autoscaler/pom.xml
+++ b/flink-kubernetes-operator-autoscaler/pom.xml
@@ -36,6 +36,12 @@ under the License.
</properties>
<dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-autoscaler</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
index 55c0b0ac..c37b299f 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
@@ -18,11 +18,12 @@
package org.apache.flink.kubernetes.operator.autoscaler;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.autoscaler.utils.AutoScalerSerDeModule;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
-import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
-import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerSerDeModule;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.Preconditions;
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
index fbefd15a..12904af4 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
@@ -19,10 +19,12 @@ package org.apache.flink.kubernetes.operator.autoscaler;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.AutoscalerFlinkMetrics;
+import org.apache.flink.autoscaler.ScalingMetricEvaluator;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScaler;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
@@ -36,9 +38,9 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM;
/** Application and SessionJob autoscaler. */
public class JobAutoScalerImpl implements JobAutoScaler {
@@ -158,16 +160,16 @@ public class JobAutoScalerImpl implements JobAutoScaler {
scalingExecutor.scaleResource(resource, autoScalerInfo, conf, evaluatedMetrics);
if (specAdjusted) {
- flinkMetrics.numScalings.inc();
+ flinkMetrics.getNumScalings().inc();
} else {
- flinkMetrics.numBalanced.inc();
+ flinkMetrics.getNumBalanced().inc();
}
autoScalerInfo.replaceInKubernetes(kubernetesClient);
return specAdjusted;
} catch (Throwable e) {
LOG.error("Error while scaling resource", e);
- flinkMetrics.numErrors.inc();
+ flinkMetrics.getNumErrors().inc();
eventRecorder.triggerEvent(
resource,
EventRecorder.Type.Warning,
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoscalerFactoryImpl.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoscalerFactoryImpl.java
index 277796e5..5ebe9670 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoscalerFactoryImpl.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoscalerFactoryImpl.java
@@ -17,6 +17,7 @@
package org.apache.flink.kubernetes.operator.autoscaler;
+import org.apache.flink.autoscaler.ScalingMetricEvaluator;
import org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScaler;
import org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScalerFactory;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
index 497bb270..d5351fd3 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
@@ -18,12 +18,13 @@
package org.apache.flink.kubernetes.operator.autoscaler;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.utils.AutoScalerUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
-import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.Preconditions;
@@ -37,16 +38,16 @@ import java.time.ZoneId;
import java.util.Map;
import java.util.SortedMap;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
/** Component responsible for computing vertex parallelism based on the scaling metrics. */
public class JobVertexScaler {
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java
index 158bb5a2..dca52a0c 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java
@@ -18,9 +18,9 @@
package org.apache.flink.kubernetes.operator.autoscaler;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.metrics.FlinkMetric;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
index 8ede4d99..f2ec20d6 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
@@ -18,11 +18,12 @@
package org.apache.flink.kubernetes.operator.autoscaler;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
-import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.Preconditions;
@@ -38,12 +39,12 @@ import java.util.HashMap;
import java.util.Map;
import java.util.SortedMap;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
/** Class responsible for executing scaling decisions. */
public class ScalingExecutor {
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
index 3ad1db39..e620fb0e 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
@@ -19,17 +19,17 @@ package org.apache.flink.kubernetes.operator.autoscaler;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.CollectedMetricHistory;
+import org.apache.flink.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetrics;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.utils.AutoScalerUtils;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
-import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetricHistory;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetrics;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
-import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.JobVertexID;
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
index cfc2e7f5..94ada9fb 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
@@ -17,11 +17,12 @@
package org.apache.flink.kubernetes.operator.autoscaler;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import com.fasterxml.jackson.core.JsonProcessingException;
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
index 2c76ae7d..c9e67b71 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
@@ -19,14 +19,16 @@ package org.apache.flink.kubernetes.operator.autoscaler;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.AutoscalerFlinkMetrics;
+import org.apache.flink.autoscaler.ScalingMetricEvaluator;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.VertexInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.OperatorTestBase;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EventCollector;
@@ -441,7 +443,7 @@ public class BacklogBasedScalingTest extends OperatorTestBase {
AutoscalerFlinkMetrics autoscalerFlinkMetrics =
autoscaler.flinkMetrics.get(
ResourceID.fromResource(getResourceContext(app, ctx).getResource()));
- assertEquals(scalingCount, autoscalerFlinkMetrics.numScalings.getCount());
- assertEquals(balancedCount, autoscalerFlinkMetrics.numBalanced.getCount());
+ assertEquals(scalingCount, autoscalerFlinkMetrics.getNumScalings().getCount());
+ assertEquals(balancedCount, autoscalerFlinkMetrics.getNumBalanced().getCount());
}
}
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java
index bb46f610..f1749ad9 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java
@@ -38,8 +38,8 @@ import org.junit.jupiter.api.Test;
import java.util.Map;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -76,12 +76,14 @@ public class JobAutoScalerImplTest extends OperatorTestBase {
ResourceID resourceId = ResourceID.fromResource(app);
autoscaler.scale(resourceContext);
- Assertions.assertEquals(1, autoscaler.flinkMetrics.get(resourceId).numErrors.getCount());
+ Assertions.assertEquals(
+ 1, autoscaler.flinkMetrics.get(resourceId).getNumErrors().getCount());
autoscaler.scale(resourceContext);
- Assertions.assertEquals(2, autoscaler.flinkMetrics.get(resourceId).numErrors.getCount());
+ Assertions.assertEquals(
+ 2, autoscaler.flinkMetrics.get(resourceId).getNumErrors().getCount());
- assertEquals(0, autoscaler.flinkMetrics.get(resourceId).numScalings.getCount());
+ assertEquals(0, autoscaler.flinkMetrics.get(resourceId).getNumScalings().getCount());
}
@Test
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java
index f76fadce..bd64be28 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java
@@ -17,7 +17,7 @@
package org.apache.flink.kubernetes.operator.autoscaler;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.JobTopology;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
index fb4655df..82eb2e45 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
@@ -17,12 +17,14 @@
package org.apache.flink.kubernetes.operator.autoscaler;
+import org.apache.flink.autoscaler.ScalingMetricEvaluator;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
import org.apache.flink.kubernetes.operator.utils.EventCollector;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.runtime.jobgraph.JobVertexID;
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
index 0d973b00..f7dbb723 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -19,17 +19,18 @@ package org.apache.flink.kubernetes.operator.autoscaler;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.ScalingMetricEvaluator;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.CollectedMetricHistory;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.VertexInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetricHistory;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EventCollector;
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RecommendedParallelismTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RecommendedParallelismTest.java
index 23ee4055..5246eefe 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RecommendedParallelismTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RecommendedParallelismTest.java
@@ -19,15 +19,16 @@ package org.apache.flink.kubernetes.operator.autoscaler;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.ScalingMetricEvaluator;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.VertexInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.OperatorTestBase;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EventCollector;
@@ -52,9 +53,9 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM;
import static org.apache.flink.kubernetes.operator.autoscaler.AutoscalerTestUtils.getOrCreateInfo;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollectorTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollectorTest.java
index 04d83107..9f3cb451 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollectorTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollectorTest.java
@@ -18,11 +18,11 @@
package org.apache.flink.kubernetes.operator.autoscaler;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.metrics.FlinkMetric;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
index 2191f723..a5ada0c4 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
@@ -18,13 +18,15 @@
package org.apache.flink.kubernetes.operator.autoscaler;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.ScalingMetricEvaluator;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
import org.apache.flink.kubernetes.operator.utils.EventCollector;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.runtime.jobgraph.JobVertexID;
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollectorTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollectorTest.java
index ddf26453..3c11a7f6 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollectorTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollectorTest.java
@@ -19,13 +19,13 @@ package org.apache.flink.kubernetes.operator.autoscaler;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.VertexInfo;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java
index 9a3c1d13..277ee33b 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java
@@ -18,11 +18,11 @@
package org.apache.flink.kubernetes.operator.autoscaler;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.autoscaler.topology.JobTopology;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
diff --git a/pom.xml b/pom.xml
index 330d1d7e..4ba9a6de 100644
--- a/pom.xml
+++ b/pom.xml
@@ -57,6 +57,7 @@ under the License.
<module>flink-kubernetes-operator-api</module>
<module>flink-kubernetes-webhook</module>
<module>flink-kubernetes-docs</module>
+ <module>flink-autoscaler</module>
<module>examples/flink-sql-runner-example</module>
<module>examples/flink-beam-example</module>
<module>examples/kubernetes-client-examples</module>