You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/12/19 21:16:04 UTC
[flink-kubernetes-operator] 05/09: [FLINK-30260][autoscaler] Add scaling execution logic
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 37bf5107a957110cfa4289858f32a0baa00d2dbf
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Thu Dec 1 09:00:40 2022 +0100
[FLINK-30260][autoscaler] Add scaling execution logic
---
.../operator/autoscaler/ScalingExecutor.java | 362 +++++++++++++++++++++
.../operator/autoscaler/ScalingSummary.java | 51 +++
.../operator/autoscaler/ScalingExecutorTest.java | 350 ++++++++++++++++++++
3 files changed, 763 insertions(+)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
new file mode 100644
index 00000000..afd9d2fc
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
+import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedMap;
+
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.STABILIZATION_INTERVAL;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+
+/** Class responsible for executing scaling decisions. */
+public class ScalingExecutor implements Cleanup {
+
+ public static final ConfigOption<Map<String, String>> PARALLELISM_OVERRIDES =
+ ConfigOptions.key("pipeline.jobvertex-parallelism-overrides")
+ .mapType()
+ .defaultValue(Collections.emptyMap())
+ .withDescription(
+ "A parallelism override map (jobVertexId -> parallelism) which will be used to update"
+ + " the parallelism of the corresponding job vertices of submitted JobGraphs.");
+
+ private static final Logger LOG = LoggerFactory.getLogger(ScalingExecutor.class);
+
+ private final KubernetesClient kubernetesClient;
+
+ private Clock clock = Clock.system(ZoneId.systemDefault());
+
+ public ScalingExecutor(KubernetesClient kubernetesClient) {
+ this.kubernetesClient = kubernetesClient;
+ }
+
+ public boolean scaleResource(
+ AbstractFlinkResource<?, ?> resource,
+ AutoScalerInfo scalingInformation,
+ Configuration conf,
+ Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics)
+ throws Exception {
+
+ if (!conf.get(SCALING_ENABLED)) {
+ return false;
+ }
+
+ if (!stabilizationPeriodPassed(resource, conf)) {
+ return false;
+ }
+
+ var scalingHistory = scalingInformation.getScalingHistory();
+ var scalingSummaries = computeScalingSummary(conf, evaluatedMetrics, scalingHistory);
+ if (scalingSummaries.isEmpty()) {
+ LOG.info("All job vertices are currently running at their target parallelism.");
+ return false;
+ }
+
+ if (allVerticesWithinUtilizationTarget(evaluatedMetrics, scalingSummaries)) {
+ return false;
+ }
+
+ LOG.info("Scaling vertices:");
+ scalingSummaries.forEach(
+ (v, s) ->
+ LOG.info(
+ "{} | Parallelism {} -> {}",
+ v,
+ s.getCurrentParallelism(),
+ s.getNewParallelism()));
+
+ setVertexParallelismOverrides(resource, evaluatedMetrics, scalingSummaries);
+
+ KubernetesClientUtils.replaceSpecAfterScaling(kubernetesClient, resource);
+ scalingInformation.addToScalingHistory(clock.instant(), scalingSummaries);
+
+ return true;
+ }
+
+ private boolean stabilizationPeriodPassed(
+ AbstractFlinkResource<?, ?> resource, Configuration conf) {
+ var now = clock.instant();
+ var startTs =
+ Instant.ofEpochMilli(
+ Long.parseLong(resource.getStatus().getJobStatus().getStartTime()));
+ var stableTime = startTs.plus(conf.get(STABILIZATION_INTERVAL));
+
+ if (stableTime.isAfter(now)) {
+ LOG.info("Waiting until {} to stabilize before new scale operation.", stableTime);
+ return false;
+ }
+ return true;
+ }
+
+ protected static boolean allVerticesWithinUtilizationTarget(
+ Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics,
+ Map<JobVertexID, ScalingSummary> scalingSummaries) {
+
+ for (Map.Entry<JobVertexID, ScalingSummary> entry : scalingSummaries.entrySet()) {
+ var vertex = entry.getKey();
+ var scalingSummary = entry.getValue();
+ var metrics = evaluatedMetrics.get(vertex);
+
+ double processingRate = metrics.get(TRUE_PROCESSING_RATE).getAverage();
+ double scaleUpRateThreshold = metrics.get(SCALE_UP_RATE_THRESHOLD).getCurrent();
+ double scaleDownRateThreshold = metrics.get(SCALE_DOWN_RATE_THRESHOLD).getCurrent();
+
+ if (processingRate < scaleUpRateThreshold || processingRate > scaleDownRateThreshold) {
+ LOG.info(
+ "Vertex {}(pCurr={}, pNew={}) processing rate {} is outside ({}, {})",
+ vertex,
+ scalingSummary.getCurrentParallelism(),
+ scalingSummary.getNewParallelism(),
+ processingRate,
+ scaleUpRateThreshold,
+ scaleDownRateThreshold);
+ return false;
+ } else {
+ LOG.debug(
+ "Vertex {}(pCurr={}, pNew={}) processing rate {} is within target ({}, {})",
+ vertex,
+ scalingSummary.getCurrentParallelism(),
+ scalingSummary.getNewParallelism(),
+ processingRate,
+ scaleUpRateThreshold,
+ scaleDownRateThreshold);
+ }
+ }
+ LOG.info("All vertex processing rates are within target.");
+ return true;
+ }
+
+ private Map<JobVertexID, ScalingSummary> computeScalingSummary(
+ Configuration conf,
+ Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics,
+ Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory) {
+
+ var out = new HashMap<JobVertexID, ScalingSummary>();
+ evaluatedMetrics.forEach(
+ (v, metrics) -> {
+ var currentParallelism =
+ (int) metrics.get(ScalingMetric.PARALLELISM).getCurrent();
+ var newParallelism =
+ computeScaleTargetParallelism(
+ conf,
+ v,
+ metrics,
+ scalingHistory.getOrDefault(v, Collections.emptySortedMap()));
+ if (currentParallelism != newParallelism) {
+ out.put(v, new ScalingSummary(currentParallelism, newParallelism, metrics));
+ }
+ });
+
+ return out;
+ }
+
+ protected int computeScaleTargetParallelism(
+ Configuration conf,
+ JobVertexID vertex,
+ Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+ SortedMap<Instant, ScalingSummary> history) {
+
+ var currentParallelism = (int) evaluatedMetrics.get(PARALLELISM).getCurrent();
+ double averageTrueProcessingRate = evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
+
+ if (Double.isNaN(averageTrueProcessingRate)) {
+ LOG.info(
+ "True processing rate is not available for {}, cannot compute new parallelism",
+ vertex);
+ return currentParallelism;
+ }
+
+ double targetCapacity =
+ AutoScalerUtils.getTargetProcessingCapacity(
+ evaluatedMetrics, conf, conf.get(TARGET_UTILIZATION), true);
+ if (Double.isNaN(targetCapacity)) {
+ LOG.info(
+ "Target data rate is not available for {}, cannot compute new parallelism",
+ vertex);
+ return currentParallelism;
+ }
+
+ LOG.info("Target processing capacity for {} is {}", vertex, targetCapacity);
+ double scaleFactor = targetCapacity / averageTrueProcessingRate;
+ double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
+ if (scaleFactor < minScaleFactor) {
+ LOG.info(
+ "Computed scale factor of {} for {} is capped by maximum scale down factor to {}",
+ scaleFactor,
+ vertex,
+ minScaleFactor);
+ scaleFactor = minScaleFactor;
+ }
+
+ int newParallelism =
+ scale(
+ currentParallelism,
+ (int) evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
+ scaleFactor,
+ conf.getInteger(VERTEX_MIN_PARALLELISM),
+ conf.getInteger(VERTEX_MAX_PARALLELISM));
+
+ if (!history.isEmpty()) {
+ if (detectImmediateScaleDownAfterScaleUp(
+ conf, history, currentParallelism, newParallelism)) {
+ LOG.info(
+ "Skipping immediate scale down after scale up for {} resetting target parallelism to {}",
+ vertex,
+ currentParallelism);
+ newParallelism = currentParallelism;
+ }
+
+ // currentParallelism = 2 , newParallelism = 1, minimumProcRate = 1000 r/s
+ // history
+ // currentParallelism 1 => 3 -> empiricalProcRate = 800
+ // empiricalProcRate + upperBoundary < minimumProcRate => don't scale
+ }
+
+ return newParallelism;
+ }
+
+ private boolean detectImmediateScaleDownAfterScaleUp(
+ Configuration conf,
+ SortedMap<Instant, ScalingSummary> history,
+ int currentParallelism,
+ int newParallelism) {
+ var lastScalingTs = history.lastKey();
+ var lastSummary = history.get(lastScalingTs);
+
+ boolean isScaleDown = newParallelism < currentParallelism;
+ boolean lastScaleUp = lastSummary.getNewParallelism() > lastSummary.getCurrentParallelism();
+
+ var gracePeriod = conf.get(SCALE_UP_GRACE_PERIOD);
+
+ boolean withinConfiguredTime =
+ Duration.between(lastScalingTs, clock.instant()).minus(gracePeriod).isNegative();
+
+ return isScaleDown && lastScaleUp && withinConfiguredTime;
+ }
+
+ public static int scale(
+ int parallelism,
+ int numKeyGroups,
+ double scaleFactor,
+ int minParallelism,
+ int maxParallelism) {
+ Preconditions.checkArgument(
+ minParallelism <= maxParallelism,
+ "The minimum parallelism must not be greater than the maximum parallelism.");
+ if (minParallelism > numKeyGroups) {
+ LOG.warn(
+ "Specified autoscaler minimum parallelism {} is greater than the operator max parallelism {}. The min parallelism will be set to the operator max parallelism.",
+ minParallelism,
+ numKeyGroups);
+ }
+ if (numKeyGroups < maxParallelism && maxParallelism != Integer.MAX_VALUE) {
+ LOG.warn(
+ "Specified autoscaler maximum parallelism {} is greater than the operator max parallelism {}. This means the operator max parallelism can never be reached.",
+ maxParallelism,
+ numKeyGroups);
+ }
+
+ int newParallelism =
+ // Prevent integer overflow when converting from double to integer.
+ // We do not have to detect underflow because doubles cannot
+ // underflow.
+ (int) Math.min(Math.ceil(scaleFactor * parallelism), Integer.MAX_VALUE);
+
+ // Cap parallelism at either number of key groups or parallelism limit
+ final int upperBound = Math.min(numKeyGroups, maxParallelism);
+
+ // Apply min/max parallelism
+ newParallelism = Math.min(Math.max(minParallelism, newParallelism), upperBound);
+
+ // Try to adjust the parallelism such that it divides the number of key groups without a
+ // remainder => state is evenly spread across subtasks
+ for (int p = newParallelism; p <= numKeyGroups / 2 && p <= upperBound; p++) {
+ if (numKeyGroups % p == 0) {
+ return p;
+ }
+ }
+
+ // If key group adjustment fails, use originally computed parallelism
+ return newParallelism;
+ }
+
+ private void setVertexParallelismOverrides(
+ AbstractFlinkResource<?, ?> resource,
+ Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics,
+ Map<JobVertexID, ScalingSummary> summaries) {
+ var flinkConf = Configuration.fromMap(resource.getSpec().getFlinkConfiguration());
+ var overrides = new HashMap<String, String>();
+ evaluatedMetrics.forEach(
+ (id, metrics) -> {
+ if (summaries.containsKey(id)) {
+ overrides.put(
+ id.toHexString(),
+ String.valueOf(summaries.get(id).getNewParallelism()));
+ } else {
+ overrides.put(
+ id.toHexString(),
+ String.valueOf(
+ (int) metrics.get(ScalingMetric.PARALLELISM).getCurrent()));
+ }
+ });
+ flinkConf.set(PARALLELISM_OVERRIDES, overrides);
+
+ resource.getSpec().setFlinkConfiguration(flinkConf.toMap());
+ }
+
+ @VisibleForTesting
+ protected void setClock(Clock clock) {
+ this.clock = Preconditions.checkNotNull(clock);
+ }
+
+ @Override
+ public void cleanup(AbstractFlinkResource<?, ?> cr) {
+ // No cleanup is currently necessary
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java
new file mode 100644
index 00000000..7def1716
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Map;
+
+/** Scaling summary returned by the {@link ScalingMetricEvaluator}. */
+@Data
+@NoArgsConstructor
+public class ScalingSummary {
+
+ private int currentParallelism;
+
+ private int newParallelism;
+
+ private Map<ScalingMetric, EvaluatedScalingMetric> metrics;
+
+ public ScalingSummary(
+ int currentParallelism,
+ int newParallelism,
+ Map<ScalingMetric, EvaluatedScalingMetric> metrics) {
+ if (currentParallelism == newParallelism) {
+ throw new IllegalArgumentException(
+ "Current parallelism should not be equal to newParallelism during scaling.");
+ }
+ this.currentParallelism = currentParallelism;
+ this.newParallelism = newParallelism;
+ this.metrics = metrics;
+ }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
new file mode 100644
index 00000000..e819fd89
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import org.junit.Assert;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.kubernetes.operator.autoscaler.ScalingExecutor.PARALLELISM_OVERRIDES;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Test for scaling metrics collection logic. */
+@EnableKubernetesMockClient(crud = true)
+public class ScalingExecutorTest {
+
+ private ScalingExecutor scalingDecisionExecutor;
+ private Configuration conf;
+ private KubernetesClient kubernetesClient;
+ private FlinkDeployment flinkDep;
+
+ @BeforeEach
+ public void setup() {
+ scalingDecisionExecutor = new ScalingExecutor(kubernetesClient);
+ conf = new Configuration();
+ conf.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ZERO);
+ conf.set(AutoScalerOptions.SCALING_ENABLED, true);
+ conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
+ conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
+
+ flinkDep = TestUtils.buildApplicationCluster();
+ kubernetesClient.resource(flinkDep).createOrReplace();
+ flinkDep.getStatus()
+ .getJobStatus()
+ .setStartTime(String.valueOf(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testStabilizationPeriod() throws Exception {
+ conf.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ofMinutes(1));
+
+ var metrics = Map.of(new JobVertexID(), evaluated(1, 110, 100));
+
+ var scalingInfo = new AutoScalerInfo(new HashMap<>());
+ var clock = Clock.fixed(Instant.now(), ZoneId.systemDefault());
+ flinkDep.getStatus()
+ .getJobStatus()
+ .setStartTime(String.valueOf(clock.instant().toEpochMilli()));
+
+ scalingDecisionExecutor.setClock(clock);
+ assertFalse(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics));
+
+ clock = Clock.offset(clock, Duration.ofSeconds(30));
+ scalingDecisionExecutor.setClock(clock);
+ assertFalse(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics));
+
+ clock = Clock.offset(clock, Duration.ofSeconds(20));
+ scalingDecisionExecutor.setClock(clock);
+ assertFalse(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics));
+
+ clock = Clock.offset(clock, Duration.ofSeconds(20));
+ scalingDecisionExecutor.setClock(clock);
+ assertTrue(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics));
+
+ flinkDep.getStatus()
+ .getJobStatus()
+ .setStartTime(String.valueOf(clock.instant().toEpochMilli()));
+ assertFalse(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics));
+
+ clock = Clock.offset(clock, Duration.ofSeconds(59));
+ scalingDecisionExecutor.setClock(clock);
+ assertFalse(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics));
+
+ clock = Clock.offset(clock, Duration.ofSeconds(2));
+ scalingDecisionExecutor.setClock(clock);
+ assertTrue(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics));
+ }
+
+ @Test
+ public void testUtilizationBoundaries() {
+ // Restart time should not affect utilization boundary
+ conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO);
+ conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
+
+ var flinkDep = TestUtils.buildApplicationCluster();
+ kubernetesClient.resource(flinkDep).createOrReplace();
+
+ var op1 = new JobVertexID();
+
+ conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6);
+ conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.);
+
+ var evaluated = Map.of(op1, evaluated(1, 70, 100));
+ var scalingSummary = Map.of(op1, new ScalingSummary(2, 1, evaluated.get(op1)));
+ assertFalse(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated, scalingSummary));
+
+ conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.2);
+ evaluated = Map.of(op1, evaluated(1, 70, 100));
+ scalingSummary = Map.of(op1, new ScalingSummary(2, 1, evaluated.get(op1)));
+ assertTrue(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated, scalingSummary));
+ assertNull(getScaledParallelism(flinkDep));
+
+ var op2 = new JobVertexID();
+ evaluated =
+ Map.of(
+ op1, evaluated(1, 70, 100),
+ op2, evaluated(1, 85, 100));
+ scalingSummary =
+ Map.of(
+ op1,
+ new ScalingSummary(1, 2, evaluated.get(op1)),
+ op2,
+ new ScalingSummary(1, 2, evaluated.get(op2)));
+
+ assertFalse(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated, scalingSummary));
+
+ evaluated =
+ Map.of(
+ op1, evaluated(1, 70, 100),
+ op2, evaluated(1, 70, 100));
+ scalingSummary =
+ Map.of(
+ op1,
+ new ScalingSummary(1, 2, evaluated.get(op1)),
+ op2,
+ new ScalingSummary(1, 2, evaluated.get(op2)));
+ assertTrue(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated, scalingSummary));
+
+ // Test with backlog based scaling
+ evaluated = Map.of(op1, evaluated(1, 70, 100, 15));
+ scalingSummary = Map.of(op1, new ScalingSummary(1, 2, evaluated.get(op1)));
+ assertFalse(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated, scalingSummary));
+ }
+
+ @Test
+ public void testParallelismScaling() {
+ var op = new JobVertexID();
+ conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+ assertEquals(
+ 5,
+ scalingDecisionExecutor.computeScaleTargetParallelism(
+ conf, op, evaluated(10, 50, 100), Collections.emptySortedMap()));
+
+ conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
+ assertEquals(
+ 8,
+ scalingDecisionExecutor.computeScaleTargetParallelism(
+ conf, op, evaluated(10, 50, 100), Collections.emptySortedMap()));
+
+ conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
+ assertEquals(
+ 10,
+ scalingDecisionExecutor.computeScaleTargetParallelism(
+ conf, op, evaluated(10, 80, 100), Collections.emptySortedMap()));
+
+ conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
+ assertEquals(
+ 8,
+ scalingDecisionExecutor.computeScaleTargetParallelism(
+ conf, op, evaluated(10, 60, 100), Collections.emptySortedMap()));
+
+ assertEquals(
+ 8,
+ scalingDecisionExecutor.computeScaleTargetParallelism(
+ conf, op, evaluated(10, 59, 100), Collections.emptySortedMap()));
+
+ conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.5);
+ assertEquals(
+ 10,
+ scalingDecisionExecutor.computeScaleTargetParallelism(
+ conf, op, evaluated(2, 100, 40), Collections.emptySortedMap()));
+
+ conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6);
+ assertEquals(
+ 4,
+ scalingDecisionExecutor.computeScaleTargetParallelism(
+ conf, op, evaluated(2, 100, 100), Collections.emptySortedMap()));
+
+ conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+ conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.5);
+ assertEquals(
+ 5,
+ scalingDecisionExecutor.computeScaleTargetParallelism(
+ conf, op, evaluated(10, 10, 100), Collections.emptySortedMap()));
+
+ conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.6);
+ assertEquals(
+ 4,
+ scalingDecisionExecutor.computeScaleTargetParallelism(
+ conf, op, evaluated(10, 10, 100), Collections.emptySortedMap()));
+ }
+
+ @Test
+ public void testParallelismComputation() {
+ final int minParallelism = 1;
+ final int maxParallelism = Integer.MAX_VALUE;
+ assertEquals(1, ScalingExecutor.scale(1, 720, 0.0001, minParallelism, maxParallelism));
+ assertEquals(1, ScalingExecutor.scale(2, 720, 0.1, minParallelism, maxParallelism));
+ assertEquals(5, ScalingExecutor.scale(6, 720, 0.8, minParallelism, maxParallelism));
+ assertEquals(32, ScalingExecutor.scale(16, 128, 1.5, minParallelism, maxParallelism));
+ assertEquals(400, ScalingExecutor.scale(200, 720, 2, minParallelism, maxParallelism));
+ assertEquals(
+ 720,
+ ScalingExecutor.scale(200, 720, Integer.MAX_VALUE, minParallelism, maxParallelism));
+ }
+
+ @Test
+ public void testParallelismComputationWithLimit() {
+ assertEquals(5, ScalingExecutor.scale(6, 720, 0.8, 1, 700));
+ assertEquals(8, ScalingExecutor.scale(8, 720, 0.8, 8, 700));
+
+ assertEquals(32, ScalingExecutor.scale(16, 128, 1.5, 1, Integer.MAX_VALUE));
+ assertEquals(64, ScalingExecutor.scale(16, 128, 1.5, 60, Integer.MAX_VALUE));
+
+ assertEquals(300, ScalingExecutor.scale(200, 720, 2, 1, 300));
+ assertEquals(600, ScalingExecutor.scale(200, 720, Integer.MAX_VALUE, 1, 600));
+ }
+
+ @Test
+ public void ensureMinParallelismDoesNotExceedMax() {
+ Assert.assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ assertEquals(
+ 600, ScalingExecutor.scale(200, 720, Integer.MAX_VALUE, 500, 499)));
+ }
+
+ @Test
+ public void testMinParallelismLimitIsUsed() {
+ conf.setInteger(AutoScalerOptions.VERTEX_MIN_PARALLELISM, 5);
+ assertEquals(
+ 5,
+ scalingDecisionExecutor.computeScaleTargetParallelism(
+ conf,
+ new JobVertexID(),
+ evaluated(10, 100, 500),
+ Collections.emptySortedMap()));
+ }
+
+ @Test
+ public void testMaxParallelismLimitIsUsed() {
+ conf.setInteger(AutoScalerOptions.VERTEX_MAX_PARALLELISM, 10);
+ conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+ assertEquals(
+ 10,
+ scalingDecisionExecutor.computeScaleTargetParallelism(
+ conf,
+ new JobVertexID(),
+ evaluated(10, 500, 100),
+ Collections.emptySortedMap()));
+ }
+
+ @Test
+ public void testScaleDownAfterScaleUpDetection() throws Exception {
+ var op = new JobVertexID();
+ var scalingInfo = new AutoScalerInfo(new HashMap<>());
+ conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+ conf.set(AutoScalerOptions.SCALE_UP_GRACE_PERIOD, Duration.ofMinutes(1));
+
+ scalingDecisionExecutor.scaleResource(
+ flinkDep, scalingInfo, conf, Map.of(op, evaluated(5, 100, 50)));
+ assertEquals(Map.of(op, 10), getScaledParallelism(flinkDep));
+
+ // Should now allow scale back down immediately
+ scalingDecisionExecutor.scaleResource(
+ flinkDep, scalingInfo, conf, Map.of(op, evaluated(10, 50, 100)));
+ assertEquals(Map.of(op, 10), getScaledParallelism(flinkDep));
+
+ // Pass some time...
+ var clock = Clock.offset(Clock.systemDefaultZone(), Duration.ofSeconds(61));
+ scalingDecisionExecutor.setClock(clock);
+ scalingDecisionExecutor.scaleResource(
+ flinkDep, scalingInfo, conf, Map.of(op, evaluated(10, 50, 100)));
+ assertEquals(Map.of(op, 5), getScaledParallelism(flinkDep));
+
+ // Allow immediate scale up
+ scalingDecisionExecutor.scaleResource(
+ flinkDep, scalingInfo, conf, Map.of(op, evaluated(5, 100, 50)));
+ assertEquals(Map.of(op, 10), getScaledParallelism(flinkDep));
+ }
+
+ private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(
+ int parallelism, double target, double procRate, double catchupRate) {
+ var metrics = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
+ metrics.put(ScalingMetric.PARALLELISM, EvaluatedScalingMetric.of(parallelism));
+ metrics.put(ScalingMetric.MAX_PARALLELISM, EvaluatedScalingMetric.of(720));
+ metrics.put(ScalingMetric.TARGET_DATA_RATE, new EvaluatedScalingMetric(target, target));
+ metrics.put(ScalingMetric.CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(catchupRate));
+ metrics.put(
+ ScalingMetric.TRUE_PROCESSING_RATE, new EvaluatedScalingMetric(procRate, procRate));
+ ScalingMetricEvaluator.computeProcessingRateThresholds(metrics, conf);
+ return metrics;
+ }
+
+ private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(
+ int parallelism, double target, double procRate) {
+ return evaluated(parallelism, target, procRate, 0.);
+ }
+
+ protected static Map<JobVertexID, Integer> getScaledParallelism(
+ AbstractFlinkResource<?, ?> resource) {
+
+ var conf = Configuration.fromMap(resource.getSpec().getFlinkConfiguration());
+ var overrides = conf.get(PARALLELISM_OVERRIDES);
+ if (overrides == null || overrides.isEmpty()) {
+ return null;
+ }
+
+ var out = new HashMap<JobVertexID, Integer>();
+
+ overrides.forEach((k, v) -> out.put(JobVertexID.fromHexString(k), Integer.parseInt(v)));
+ return out;
+ }
+}