You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/12/19 21:16:03 UTC
[flink-kubernetes-operator] 04/09: [FLINK-30260][autoscaler] Add scaling metric evaluation logic
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 0b3cc1312e9070ac0a28cea76e1c6d9284f52acd
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Thu Dec 1 08:59:46 2022 +0100
[FLINK-30260][autoscaler] Add scaling metric evaluation logic
---
.../autoscaler/ScalingMetricEvaluator.java | 228 +++++++++++++
.../autoscaler/BacklogBasedScalingTest.java | 352 +++++++++++++++++++++
.../autoscaler/ScalingMetricEvaluatorTest.java | 227 +++++++++++++
3 files changed, 807 insertions(+)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
new file mode 100644
index 00000000..fdf5e33f
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
+import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.math3.stat.StatUtils;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedMap;
+
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LAG;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.OUTPUT_RATIO;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_OUTPUT_RATE;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+
+/** Job scaling evaluator for autoscaler. */
+public class ScalingMetricEvaluator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ScalingMetricEvaluator.class);
+
+ private Clock clock = Clock.systemDefaultZone();
+
+ public Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluate(
+ Configuration conf, CollectedMetrics collectedMetrics) {
+
+ var scalingOutput = new HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>();
+ var metricsHistory = collectedMetrics.getMetricHistory();
+ var topology = collectedMetrics.getJobTopology();
+
+ for (var vertex : topology.getVerticesInTopologicalOrder()) {
+ scalingOutput.put(
+ vertex,
+ computeVertexScalingSummary(
+ conf, scalingOutput, metricsHistory, topology, vertex));
+ }
+
+ return scalingOutput;
+ }
+
+ @NotNull
+ private Map<ScalingMetric, EvaluatedScalingMetric> computeVertexScalingSummary(
+ Configuration conf,
+ HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> scalingOutput,
+ SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> metricsHistory,
+ JobTopology topology,
+ JobVertexID vertex) {
+
+ var latestVertexMetrics = metricsHistory.get(metricsHistory.lastKey()).get(vertex);
+
+ var evaluatedMetrics = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
+ computeTargetDataRate(
+ topology,
+ vertex,
+ conf,
+ scalingOutput,
+ metricsHistory,
+ latestVertexMetrics,
+ evaluatedMetrics);
+
+ evaluatedMetrics.put(
+ TRUE_PROCESSING_RATE,
+ new EvaluatedScalingMetric(
+ latestVertexMetrics.get(TRUE_PROCESSING_RATE),
+ getAverage(TRUE_PROCESSING_RATE, vertex, metricsHistory, conf)));
+
+ evaluatedMetrics.put(
+ PARALLELISM, EvaluatedScalingMetric.of(topology.getParallelisms().get(vertex)));
+ evaluatedMetrics.put(
+ MAX_PARALLELISM,
+ EvaluatedScalingMetric.of(topology.getMaxParallelisms().get(vertex)));
+
+ computeProcessingRateThresholds(evaluatedMetrics, conf);
+
+ var isSink = topology.getOutputs().get(vertex).isEmpty();
+ if (!isSink) {
+ evaluatedMetrics.put(
+ TRUE_OUTPUT_RATE,
+ new EvaluatedScalingMetric(
+ latestVertexMetrics.get(TRUE_OUTPUT_RATE),
+ getAverage(TRUE_OUTPUT_RATE, vertex, metricsHistory, conf)));
+ evaluatedMetrics.put(
+ OUTPUT_RATIO,
+ new EvaluatedScalingMetric(
+ latestVertexMetrics.get(OUTPUT_RATIO),
+ getAverage(OUTPUT_RATIO, vertex, metricsHistory, conf)));
+ }
+
+ return evaluatedMetrics;
+ }
+
+ @VisibleForTesting
+ protected static void computeProcessingRateThresholds(
+ Map<ScalingMetric, EvaluatedScalingMetric> metrics, Configuration conf) {
+
+ double utilizationBoundary = conf.getDouble(TARGET_UTILIZATION_BOUNDARY);
+
+ double scaleUpThreshold =
+ AutoScalerUtils.getTargetProcessingCapacity(
+ metrics, conf, conf.get(TARGET_UTILIZATION) + utilizationBoundary, false);
+
+ double scaleDownThreshold =
+ AutoScalerUtils.getTargetProcessingCapacity(
+ metrics, conf, conf.get(TARGET_UTILIZATION) - utilizationBoundary, true);
+
+ metrics.put(SCALE_UP_RATE_THRESHOLD, EvaluatedScalingMetric.of(scaleUpThreshold));
+ metrics.put(SCALE_DOWN_RATE_THRESHOLD, EvaluatedScalingMetric.of(scaleDownThreshold));
+ }
+
+ private void computeTargetDataRate(
+ JobTopology topology,
+ JobVertexID vertex,
+ Configuration conf,
+ HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> alreadyEvaluated,
+ SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> metricsHistory,
+ Map<ScalingMetric, Double> latestVertexMetrics,
+ Map<ScalingMetric, EvaluatedScalingMetric> out) {
+
+ boolean isSource = topology.getInputs().get(vertex).isEmpty();
+ if (isSource) {
+ double catchUpTargetSec = conf.get(AutoScalerOptions.CATCH_UP_DURATION).toSeconds();
+
+ var sourceRateMetric =
+ latestVertexMetrics.containsKey(TARGET_DATA_RATE)
+ ? TARGET_DATA_RATE
+ : SOURCE_DATA_RATE;
+ if (!latestVertexMetrics.containsKey(sourceRateMetric)) {
+ throw new RuntimeException(
+ "Cannot evaluate metrics without source target rate information");
+ }
+
+ out.put(
+ TARGET_DATA_RATE,
+ new EvaluatedScalingMetric(
+ latestVertexMetrics.get(sourceRateMetric),
+ getAverage(sourceRateMetric, vertex, metricsHistory, conf)));
+
+ double lag = latestVertexMetrics.getOrDefault(LAG, 0.);
+ double catchUpInputRate = catchUpTargetSec == 0 ? 0 : lag / catchUpTargetSec;
+ if (catchUpInputRate > 0) {
+ LOG.info(
+ "Extra backlog processing input rate for {} is {}",
+ vertex,
+ catchUpInputRate);
+ }
+ out.put(CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(catchUpInputRate));
+ } else {
+ var inputs = topology.getInputs().get(vertex);
+ double sumCurrentTargetRate = 0;
+ double sumAvgTargetRate = 0;
+ double sumCatchUpDataRate = 0;
+ for (var inputVertex : inputs) {
+ var inputEvaluatedMetrics = alreadyEvaluated.get(inputVertex);
+ var inputTargetRate = inputEvaluatedMetrics.get(TARGET_DATA_RATE);
+ var outputRateMultiplier = inputEvaluatedMetrics.get(OUTPUT_RATIO).getAverage();
+ sumCurrentTargetRate += inputTargetRate.getCurrent() * outputRateMultiplier;
+ sumAvgTargetRate += inputTargetRate.getAverage() * outputRateMultiplier;
+ sumCatchUpDataRate +=
+ inputEvaluatedMetrics.get(CATCH_UP_DATA_RATE).getCurrent()
+ * outputRateMultiplier;
+ }
+ out.put(
+ TARGET_DATA_RATE,
+ new EvaluatedScalingMetric(sumCurrentTargetRate, sumAvgTargetRate));
+ out.put(CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(sumCatchUpDataRate));
+ }
+ }
+
+ private double getAverage(
+ ScalingMetric metric,
+ JobVertexID jobVertexId,
+ SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> metricsHistory,
+ Configuration conf) {
+ return StatUtils.mean(
+ metricsHistory
+ .tailMap(clock.instant().minus(conf.get(AutoScalerOptions.METRICS_WINDOW)))
+ .values().stream()
+ .map(m -> m.get(jobVertexId))
+ .filter(m -> m.containsKey(metric))
+ .mapToDouble(m -> m.get(metric))
+ .filter(d -> !Double.isNaN(d))
+ .toArray());
+ }
+
+ @VisibleForTesting
+ protected void setClock(Clock clock) {
+ this.clock = Preconditions.checkNotNull(clock);
+ }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
new file mode 100644
index 00000000..1e1466c8
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.TestingFlinkService;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
+import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+/** Test for scaling metrics collection logic. */
+@EnableKubernetesMockClient(crud = true)
+public class BacklogBasedScalingTest {
+
+ private ScalingMetricEvaluator evaluator;
+ private TestingFlinkService service;
+ private TestingMetricsCollector metricsCollector;
+ private ScalingExecutor scalingExecutor;
+
+ private FlinkDeployment app;
+ private JobVertexID source1, sink;
+
+ private FlinkConfigManager confManager;
+ private JobAutoScaler autoscaler;
+
+ private KubernetesClient kubernetesClient;
+
+ @BeforeEach
+ public void setup() {
+ evaluator = new ScalingMetricEvaluator();
+ scalingExecutor = new ScalingExecutor(kubernetesClient);
+ service = new TestingFlinkService();
+
+ app = TestUtils.buildApplicationCluster();
+ app.getMetadata().setGeneration(1L);
+ app.getStatus().getJobStatus().setJobId(new JobID().toHexString());
+ kubernetesClient.resource(app).createOrReplace();
+
+ source1 = new JobVertexID();
+ sink = new JobVertexID();
+
+ metricsCollector =
+ new TestingMetricsCollector(
+ new JobTopology(
+ new VertexInfo(source1, Set.of(), 1, 720),
+ new VertexInfo(sink, Set.of(source1), 1, 720)));
+
+ var defaultConf = new Configuration();
+ defaultConf.set(AutoScalerOptions.AUTOSCALER_ENABLED, true);
+ defaultConf.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ZERO);
+ defaultConf.set(AutoScalerOptions.RESTART_TIME, Duration.ofSeconds(1));
+ defaultConf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofSeconds(2));
+ defaultConf.set(AutoScalerOptions.SCALING_ENABLED, true);
+ defaultConf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
+ defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.8);
+ defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1);
+ defaultConf.set(AutoScalerOptions.SCALE_UP_GRACE_PERIOD, Duration.ZERO);
+
+ confManager = new FlinkConfigManager(defaultConf);
+ ReconciliationUtils.updateStatusForDeployedSpec(
+ app, confManager.getDeployConfig(app.getMetadata(), app.getSpec()));
+ app.getStatus().getJobStatus().setState(JobStatus.RUNNING.name());
+
+ autoscaler =
+ new JobAutoScaler(
+ kubernetesClient,
+ confManager,
+ metricsCollector,
+ evaluator,
+ scalingExecutor,
+ TestUtils.createTestMetricGroup(new Configuration()));
+ }
+
+ @Test
+ public void test() throws Exception {
+ var ctx = createAutoscalerTestContext();
+ var now = Instant.now();
+ setClocksTo(now);
+ app.getStatus().getJobStatus().setStartTime(String.valueOf(now.toEpochMilli()));
+ metricsCollector.setCurrentMetrics(
+ Map.of(
+ source1,
+ Map.of(
+ FlinkMetric.BUSY_TIME_PER_SEC,
+ new AggregatedMetric("", Double.NaN, Double.NaN, 850., Double.NaN),
+ FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+ new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 500.),
+ FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+ new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 500.),
+ FlinkMetric.PENDING_RECORDS,
+ new AggregatedMetric(
+ "", Double.NaN, Double.NaN, Double.NaN, 2000.)),
+ sink,
+ Map.of(
+ FlinkMetric.BUSY_TIME_PER_SEC,
+ new AggregatedMetric("", Double.NaN, Double.NaN, 850., Double.NaN),
+ FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+ new AggregatedMetric(
+ "", Double.NaN, Double.NaN, Double.NaN, 500.))));
+
+ autoscaler.scale(app, service, confManager.getObserveConfig(app), ctx);
+ assertFalse(AutoScalerInfo.forResource(app, kubernetesClient).getMetricHistory().isEmpty());
+
+ now = now.plus(Duration.ofSeconds(1));
+ setClocksTo(now);
+ autoscaler.scale(app, service, confManager.getObserveConfig(app), ctx);
+
+ var scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
+ assertEquals(4, scaledParallelism.get(source1));
+ assertEquals(4, scaledParallelism.get(sink));
+
+ metricsCollector.setJobTopology(
+ new JobTopology(
+ new VertexInfo(source1, Set.of(), 4, 24),
+ new VertexInfo(sink, Set.of(source1), 4, 720)));
+ metricsCollector.setCurrentMetrics(
+ Map.of(
+ source1,
+ Map.of(
+ FlinkMetric.BUSY_TIME_PER_SEC,
+ new AggregatedMetric("", Double.NaN, Double.NaN, 1000., Double.NaN),
+ FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+ new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 1800.),
+ FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+ new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 1800.),
+ FlinkMetric.PENDING_RECORDS,
+ new AggregatedMetric(
+ "", Double.NaN, Double.NaN, Double.NaN, 2500.)),
+ sink,
+ Map.of(
+ FlinkMetric.BUSY_TIME_PER_SEC,
+ new AggregatedMetric("", Double.NaN, Double.NaN, 1000., Double.NaN),
+ FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+ new AggregatedMetric(
+ "", Double.NaN, Double.NaN, Double.NaN, 1800.))));
+
+ now = now.plus(Duration.ofSeconds(1));
+ setClocksTo(now);
+ app.getStatus().getJobStatus().setStartTime(String.valueOf(now.toEpochMilli()));
+ autoscaler.scale(
+ app, service, confManager.getObserveConfig(app), createAutoscalerTestContext());
+ assertFalse(AutoScalerInfo.forResource(app, kubernetesClient).getMetricHistory().isEmpty());
+ scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
+ assertEquals(4, scaledParallelism.get(source1));
+ assertEquals(4, scaledParallelism.get(sink));
+
+ metricsCollector.setCurrentMetrics(
+ Map.of(
+ source1,
+ Map.of(
+ FlinkMetric.BUSY_TIME_PER_SEC,
+ new AggregatedMetric("", Double.NaN, Double.NaN, 1000., Double.NaN),
+ FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+ new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 1800.),
+ FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+ new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 1800.),
+ FlinkMetric.PENDING_RECORDS,
+ new AggregatedMetric(
+ "", Double.NaN, Double.NaN, Double.NaN, 1200.)),
+ sink,
+ Map.of(
+ FlinkMetric.BUSY_TIME_PER_SEC,
+ new AggregatedMetric("", Double.NaN, Double.NaN, 1000., Double.NaN),
+ FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+ new AggregatedMetric(
+ "", Double.NaN, Double.NaN, Double.NaN, 1800.))));
+
+ now = now.plus(Duration.ofSeconds(1));
+ setClocksTo(now);
+ autoscaler.scale(
+ app, service, confManager.getObserveConfig(app), createAutoscalerTestContext());
+ assertFalse(AutoScalerInfo.forResource(app, kubernetesClient).getMetricHistory().isEmpty());
+ scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
+ assertEquals(4, scaledParallelism.get(source1));
+ assertEquals(4, scaledParallelism.get(sink));
+
+ // We have finally caught up to our original lag, time to scale down
+ metricsCollector.setCurrentMetrics(
+ Map.of(
+ source1,
+ Map.of(
+ FlinkMetric.BUSY_TIME_PER_SEC,
+ new AggregatedMetric("", Double.NaN, Double.NaN, 600., Double.NaN),
+ FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+ new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 800.),
+ FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+ new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 800.),
+ FlinkMetric.PENDING_RECORDS,
+ new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 0.)),
+ sink,
+ Map.of(
+ FlinkMetric.BUSY_TIME_PER_SEC,
+ new AggregatedMetric("", Double.NaN, Double.NaN, 600., Double.NaN),
+ FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+ new AggregatedMetric(
+ "", Double.NaN, Double.NaN, Double.NaN, 800.))));
+ now = now.plus(Duration.ofSeconds(1));
+ setClocksTo(now);
+ autoscaler.scale(
+ app, service, confManager.getObserveConfig(app), createAutoscalerTestContext());
+
+ scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
+ assertEquals(2, scaledParallelism.get(source1));
+ assertEquals(2, scaledParallelism.get(sink));
+ metricsCollector.setJobTopology(
+ new JobTopology(
+ new VertexInfo(source1, Set.of(), 2, 24),
+ new VertexInfo(sink, Set.of(source1), 2, 720)));
+
+ metricsCollector.setCurrentMetrics(
+ Map.of(
+ source1,
+ Map.of(
+ FlinkMetric.BUSY_TIME_PER_SEC,
+ new AggregatedMetric("", Double.NaN, Double.NaN, 1000., Double.NaN),
+ FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+ new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 900.),
+ FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+ new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 900.),
+ FlinkMetric.PENDING_RECORDS,
+ new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 500.)),
+ sink,
+ Map.of(
+ FlinkMetric.BUSY_TIME_PER_SEC,
+ new AggregatedMetric("", Double.NaN, Double.NaN, 1000., Double.NaN),
+ FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+ new AggregatedMetric(
+ "", Double.NaN, Double.NaN, Double.NaN, 900.))));
+ now = now.plus(Duration.ofSeconds(1));
+ setClocksTo(now);
+ app.getStatus().getJobStatus().setStartTime(String.valueOf(now.toEpochMilli()));
+ autoscaler.scale(
+ app, service, confManager.getObserveConfig(app), createAutoscalerTestContext());
+ scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
+ assertEquals(2, scaledParallelism.get(source1));
+ assertEquals(2, scaledParallelism.get(sink));
+
+ metricsCollector.setCurrentMetrics(
+ Map.of(
+ source1,
+ Map.of(
+ FlinkMetric.BUSY_TIME_PER_SEC,
+ new AggregatedMetric("", Double.NaN, Double.NaN, 1000., Double.NaN),
+ FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+ new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 900.),
+ FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+ new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 900.),
+ FlinkMetric.PENDING_RECORDS,
+ new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 100.)),
+ sink,
+ Map.of(
+ FlinkMetric.BUSY_TIME_PER_SEC,
+ new AggregatedMetric("", Double.NaN, Double.NaN, 1000., Double.NaN),
+ FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+ new AggregatedMetric(
+ "", Double.NaN, Double.NaN, Double.NaN, 900.))));
+ now = now.plus(Duration.ofSeconds(1));
+ setClocksTo(now);
+ autoscaler.scale(
+ app, service, confManager.getObserveConfig(app), createAutoscalerTestContext());
+ scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
+ assertEquals(2, scaledParallelism.get(source1));
+ assertEquals(2, scaledParallelism.get(sink));
+
+ metricsCollector.setCurrentMetrics(
+ Map.of(
+ source1,
+ Map.of(
+ FlinkMetric.BUSY_TIME_PER_SEC,
+ new AggregatedMetric("", Double.NaN, Double.NaN, 500., Double.NaN),
+ FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+ new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 500.),
+ FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+ new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 500.),
+ FlinkMetric.PENDING_RECORDS,
+ new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 0.)),
+ sink,
+ Map.of(
+ FlinkMetric.BUSY_TIME_PER_SEC,
+ new AggregatedMetric("", Double.NaN, Double.NaN, 500., Double.NaN),
+ FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+ new AggregatedMetric(
+ "", Double.NaN, Double.NaN, Double.NaN, 500.))));
+ now = now.plus(Duration.ofSeconds(1));
+ setClocksTo(now);
+ autoscaler.scale(
+ app, service, confManager.getObserveConfig(app), createAutoscalerTestContext());
+ scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
+ assertEquals(2, scaledParallelism.get(source1));
+ assertEquals(2, scaledParallelism.get(sink));
+ }
+
+ private void setClocksTo(Instant time) {
+ var clock = Clock.fixed(time, ZoneId.systemDefault());
+ metricsCollector.setClock(clock);
+ evaluator.setClock(clock);
+ scalingExecutor.setClock(clock);
+ }
+
+ @NotNull
+ private TestUtils.TestingContext<HasMetadata> createAutoscalerTestContext() {
+ return new TestUtils.TestingContext<>() {
+ public <T1> Set<T1> getSecondaryResources(Class<T1> aClass) {
+ return (Set)
+ kubernetesClient.configMaps().inAnyNamespace().list().getItems().stream()
+ .collect(Collectors.toSet());
+ }
+ };
+ }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
new file mode 100644
index 00000000..7a4af640
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
+import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.CATCH_UP_DURATION;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.RESTART_TIME;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LAG;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.OUTPUT_RATIO;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_OUTPUT_RATE;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Scaling evaluator test. */
+public class ScalingMetricEvaluatorTest {
+
+ @Test
+ public void testLagBasedSourceScaling() {
+ var source = new JobVertexID();
+ var sink = new JobVertexID();
+
+ var topology =
+ new JobTopology(
+ new VertexInfo(source, Collections.emptySet(), 1, 1),
+ new VertexInfo(sink, Set.of(source), 1, 1));
+
+ var evaluator = new ScalingMetricEvaluator();
+
+ var metricHistory = new TreeMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>>();
+
+ metricHistory.put(
+ Instant.now(),
+ Map.of(
+ source,
+ Map.of(
+ SOURCE_DATA_RATE,
+ 100.,
+ LAG,
+ 0.,
+ OUTPUT_RATIO,
+ 2.,
+ TRUE_OUTPUT_RATE,
+ 200.,
+ TRUE_PROCESSING_RATE,
+ 200.),
+ sink,
+ Map.of(TRUE_PROCESSING_RATE, 2000.)));
+
+ metricHistory.put(
+ Instant.now(),
+ Map.of(
+ source,
+ Map.of(
+ SOURCE_DATA_RATE, 200.,
+ LAG, 1000.,
+ OUTPUT_RATIO, 2.,
+ TRUE_OUTPUT_RATE, 200.,
+ TRUE_PROCESSING_RATE, 200.),
+ sink,
+ Map.of(TRUE_PROCESSING_RATE, 2000.)));
+
+ var conf = new Configuration();
+
+ conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofSeconds(2));
+ conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO);
+ var evaluatedMetrics =
+ evaluator.evaluate(conf, new CollectedMetrics(topology, metricHistory));
+ assertEquals(
+ new EvaluatedScalingMetric(200, 150),
+ evaluatedMetrics.get(source).get(TARGET_DATA_RATE));
+ assertEquals(
+ EvaluatedScalingMetric.of(500),
+ evaluatedMetrics.get(source).get(CATCH_UP_DATA_RATE));
+ assertEquals(
+ new EvaluatedScalingMetric(400, 300),
+ evaluatedMetrics.get(sink).get(TARGET_DATA_RATE));
+ assertEquals(
+ EvaluatedScalingMetric.of(1000),
+ evaluatedMetrics.get(sink).get(CATCH_UP_DATA_RATE));
+
+ conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofSeconds(1));
+ evaluatedMetrics = evaluator.evaluate(conf, new CollectedMetrics(topology, metricHistory));
+ assertEquals(
+ new EvaluatedScalingMetric(200, 150),
+ evaluatedMetrics.get(source).get(TARGET_DATA_RATE));
+ assertEquals(
+ EvaluatedScalingMetric.of(1000),
+ evaluatedMetrics.get(source).get(CATCH_UP_DATA_RATE));
+ assertEquals(
+ new EvaluatedScalingMetric(400, 300),
+ evaluatedMetrics.get(sink).get(TARGET_DATA_RATE));
+ assertEquals(
+ EvaluatedScalingMetric.of(2000),
+ evaluatedMetrics.get(sink).get(CATCH_UP_DATA_RATE));
+
+ // Restart time should not affect evaluated metrics
+ conf.set(AutoScalerOptions.RESTART_TIME, Duration.ofSeconds(2));
+
+ evaluatedMetrics = evaluator.evaluate(conf, new CollectedMetrics(topology, metricHistory));
+ assertEquals(
+ new EvaluatedScalingMetric(200, 150),
+ evaluatedMetrics.get(source).get(TARGET_DATA_RATE));
+ assertEquals(
+ EvaluatedScalingMetric.of(1000),
+ evaluatedMetrics.get(source).get(CATCH_UP_DATA_RATE));
+ assertEquals(
+ new EvaluatedScalingMetric(400, 300),
+ evaluatedMetrics.get(sink).get(TARGET_DATA_RATE));
+ assertEquals(
+ EvaluatedScalingMetric.of(2000),
+ evaluatedMetrics.get(sink).get(CATCH_UP_DATA_RATE));
+
+ // Turn off lag based scaling
+ conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
+ evaluatedMetrics = evaluator.evaluate(conf, new CollectedMetrics(topology, metricHistory));
+ assertEquals(
+ new EvaluatedScalingMetric(200, 150),
+ evaluatedMetrics.get(source).get(TARGET_DATA_RATE));
+ assertEquals(
+ EvaluatedScalingMetric.of(0), evaluatedMetrics.get(source).get(CATCH_UP_DATA_RATE));
+ assertEquals(
+ new EvaluatedScalingMetric(400, 300),
+ evaluatedMetrics.get(sink).get(TARGET_DATA_RATE));
+ assertEquals(
+ EvaluatedScalingMetric.of(0), evaluatedMetrics.get(sink).get(CATCH_UP_DATA_RATE));
+
+ // Test 0 lag
+ metricHistory.clear();
+ metricHistory.put(
+ Instant.now(),
+ Map.of(
+ source,
+ Map.of(
+ SOURCE_DATA_RATE,
+ 100.,
+ LAG,
+ 0.,
+ OUTPUT_RATIO,
+ 2.,
+ TRUE_OUTPUT_RATE,
+ 200.,
+ TRUE_PROCESSING_RATE,
+ 200.),
+ sink,
+ Map.of(TRUE_PROCESSING_RATE, 2000.)));
+
+ conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofMinutes(1));
+ evaluatedMetrics = evaluator.evaluate(conf, new CollectedMetrics(topology, metricHistory));
+ assertEquals(
+ new EvaluatedScalingMetric(100, 100),
+ evaluatedMetrics.get(source).get(TARGET_DATA_RATE));
+ assertEquals(
+ new EvaluatedScalingMetric(200, 200),
+ evaluatedMetrics.get(sink).get(TARGET_DATA_RATE));
+ }
+
+ @Test
+ public void testUtilizationBoundaryComputation() {
+
+ var conf = new Configuration();
+ conf.set(TARGET_UTILIZATION, 0.8);
+ conf.set(TARGET_UTILIZATION_BOUNDARY, 0.1);
+ conf.set(RESTART_TIME, Duration.ofSeconds(1));
+ conf.set(CATCH_UP_DURATION, Duration.ZERO);
+
+ // Default behaviour, restart time does not factor in
+ assertEquals(Tuple2.of(778.0, 1000.0), getThresholds(700, 0, conf));
+
+ conf.set(CATCH_UP_DURATION, Duration.ofSeconds(2));
+ assertEquals(Tuple2.of(1128.0, 1700.0), getThresholds(700, 350, conf));
+ assertEquals(Tuple2.of(778.0, 1350.0), getThresholds(700, 0, conf));
+ }
+
+ private Tuple2<Double, Double> getThresholds(
+ double inputTargetRate, double catchUpRate, Configuration conf) {
+ var map = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
+
+ map.put(TARGET_DATA_RATE, new EvaluatedScalingMetric(Double.NaN, inputTargetRate));
+ map.put(CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(catchUpRate));
+
+ ScalingMetricEvaluator.computeProcessingRateThresholds(map, conf);
+ return Tuple2.of(
+ map.get(SCALE_UP_RATE_THRESHOLD).getCurrent(),
+ map.get(SCALE_DOWN_RATE_THRESHOLD).getCurrent());
+ }
+}