You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2023/01/12 11:57:07 UTC
[flink-kubernetes-operator] 01/04: Decouple scaling execution from parallelism computation
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit a66993089eb0e8fb01503d8e1b4f54b0f3983ade
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Thu Jan 5 16:12:04 2023 +0100
Decouple scaling execution from parallelism computation
---
.../operator/autoscaler/JobVertexScaler.java | 189 +++++++++++++++++++++
.../operator/autoscaler/ScalingExecutor.java | 148 +---------------
.../operator/autoscaler/JobVertexScalerTest.java | 186 ++++++++++++++++++++
.../operator/autoscaler/ScalingExecutorTest.java | 122 +------------
4 files changed, 384 insertions(+), 261 deletions(-)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
new file mode 100644
index 00000000..7b21a244
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Map;
+import java.util.SortedMap;
+
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+
+/** Component responsible for computing vertex parallelism based on the scaling metrics. */
+public class JobVertexScaler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JobVertexScaler.class);
+
+ private Clock clock = Clock.system(ZoneId.systemDefault());
+
+ public int computeScaleTargetParallelism(
+ Configuration conf,
+ JobVertexID vertex,
+ Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
+ SortedMap<Instant, ScalingSummary> history) {
+
+ var currentParallelism = (int) evaluatedMetrics.get(PARALLELISM).getCurrent();
+ double averageTrueProcessingRate = evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
+
+ if (Double.isNaN(averageTrueProcessingRate)) {
+ LOG.info(
+ "True processing rate is not available for {}, cannot compute new parallelism",
+ vertex);
+ return currentParallelism;
+ }
+
+ double targetCapacity =
+ AutoScalerUtils.getTargetProcessingCapacity(
+ evaluatedMetrics, conf, conf.get(TARGET_UTILIZATION), true);
+ if (Double.isNaN(targetCapacity)) {
+ LOG.info(
+ "Target data rate is not available for {}, cannot compute new parallelism",
+ vertex);
+ return currentParallelism;
+ }
+
+ LOG.info("Target processing capacity for {} is {}", vertex, targetCapacity);
+ double scaleFactor = targetCapacity / averageTrueProcessingRate;
+ double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
+ if (scaleFactor < minScaleFactor) {
+ LOG.info(
+ "Computed scale factor of {} for {} is capped by maximum scale down factor to {}",
+ scaleFactor,
+ vertex,
+ minScaleFactor);
+ scaleFactor = minScaleFactor;
+ }
+
+ int newParallelism =
+ scale(
+ currentParallelism,
+ (int) evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
+ scaleFactor,
+ conf.getInteger(VERTEX_MIN_PARALLELISM),
+ conf.getInteger(VERTEX_MAX_PARALLELISM));
+
+ if (!history.isEmpty()) {
+ if (detectImmediateScaleDownAfterScaleUp(
+ conf, history, currentParallelism, newParallelism)) {
+ LOG.info(
+ "Skipping immediate scale down after scale up for {} resetting target parallelism to {}",
+ vertex,
+ currentParallelism);
+ newParallelism = currentParallelism;
+ }
+
+ // currentParallelism = 2 , newParallelism = 1, minimumProcRate = 1000 r/s
+ // history
+ // currentParallelism 1 => 3 -> empiricalProcRate = 800
+ // empiricalProcRate + upperBoundary < minimumProcRate => don't scale
+ }
+
+ return newParallelism;
+ }
+
+ private boolean detectImmediateScaleDownAfterScaleUp(
+ Configuration conf,
+ SortedMap<Instant, ScalingSummary> history,
+ int currentParallelism,
+ int newParallelism) {
+ var lastScalingTs = history.lastKey();
+ var lastSummary = history.get(lastScalingTs);
+
+ boolean isScaleDown = newParallelism < currentParallelism;
+ boolean lastScaleUp = lastSummary.getNewParallelism() > lastSummary.getCurrentParallelism();
+
+ var gracePeriod = conf.get(SCALE_UP_GRACE_PERIOD);
+
+ boolean withinConfiguredTime =
+ Duration.between(lastScalingTs, clock.instant()).minus(gracePeriod).isNegative();
+
+ return isScaleDown && lastScaleUp && withinConfiguredTime;
+ }
+
+ @VisibleForTesting
+ protected static int scale(
+ int parallelism,
+ int numKeyGroups,
+ double scaleFactor,
+ int minParallelism,
+ int maxParallelism) {
+ Preconditions.checkArgument(
+ minParallelism <= maxParallelism,
+ "The minimum parallelism must not be greater than the maximum parallelism.");
+ if (minParallelism > numKeyGroups) {
+ LOG.warn(
+ "Specified autoscaler minimum parallelism {} is greater than the operator max parallelism {}. The min parallelism will be set to the operator max parallelism.",
+ minParallelism,
+ numKeyGroups);
+ }
+ if (numKeyGroups < maxParallelism && maxParallelism != Integer.MAX_VALUE) {
+ LOG.warn(
+ "Specified autoscaler maximum parallelism {} is greater than the operator max parallelism {}. This means the operator max parallelism can never be reached.",
+ maxParallelism,
+ numKeyGroups);
+ }
+
+ int newParallelism =
+ // Prevent integer overflow when converting from double to integer.
+ // We do not have to detect underflow because doubles cannot
+ // underflow.
+ (int) Math.min(Math.ceil(scaleFactor * parallelism), Integer.MAX_VALUE);
+
+ // Cap parallelism at either number of key groups or parallelism limit
+ final int upperBound = Math.min(numKeyGroups, maxParallelism);
+
+ // Apply min/max parallelism
+ newParallelism = Math.min(Math.max(minParallelism, newParallelism), upperBound);
+
+ // Try to adjust the parallelism such that it divides the number of key groups without a
+ // remainder => state is evenly spread across subtasks
+ for (int p = newParallelism; p <= numKeyGroups / 2 && p <= upperBound; p++) {
+ if (numKeyGroups % p == 0) {
+ return p;
+ }
+ }
+
+ // If key group adjustment fails, use originally computed parallelism
+ return newParallelism;
+ }
+
+ @VisibleForTesting
+ protected void setClock(Clock clock) {
+ this.clock = Preconditions.checkNotNull(clock);
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
index df2d96a9..976ec56e 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.Preconditions;
@@ -35,7 +34,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Clock;
-import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Collections;
@@ -43,15 +41,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.SortedMap;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.STABILIZATION_INTERVAL;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
-import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
-import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
@@ -70,11 +61,17 @@ public class ScalingExecutor implements Cleanup {
private static final Logger LOG = LoggerFactory.getLogger(ScalingExecutor.class);
private final KubernetesClient kubernetesClient;
+ private final JobVertexScaler jobVertexScaler;
private Clock clock = Clock.system(ZoneId.systemDefault());
public ScalingExecutor(KubernetesClient kubernetesClient) {
+ this(kubernetesClient, new JobVertexScaler());
+ }
+
+ public ScalingExecutor(KubernetesClient kubernetesClient, JobVertexScaler jobVertexScaler) {
this.kubernetesClient = kubernetesClient;
+ this.jobVertexScaler = jobVertexScaler;
}
public boolean scaleResource(
@@ -192,7 +189,7 @@ public class ScalingExecutor implements Cleanup {
var currentParallelism =
(int) metrics.get(ScalingMetric.PARALLELISM).getCurrent();
var newParallelism =
- computeScaleTargetParallelism(
+ jobVertexScaler.computeScaleTargetParallelism(
conf,
v,
metrics,
@@ -205,136 +202,6 @@ public class ScalingExecutor implements Cleanup {
return out;
}
- protected int computeScaleTargetParallelism(
- Configuration conf,
- JobVertexID vertex,
- Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
- SortedMap<Instant, ScalingSummary> history) {
-
- var currentParallelism = (int) evaluatedMetrics.get(PARALLELISM).getCurrent();
- double averageTrueProcessingRate = evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
-
- if (Double.isNaN(averageTrueProcessingRate)) {
- LOG.info(
- "True processing rate is not available for {}, cannot compute new parallelism",
- vertex);
- return currentParallelism;
- }
-
- double targetCapacity =
- AutoScalerUtils.getTargetProcessingCapacity(
- evaluatedMetrics, conf, conf.get(TARGET_UTILIZATION), true);
- if (Double.isNaN(targetCapacity)) {
- LOG.info(
- "Target data rate is not available for {}, cannot compute new parallelism",
- vertex);
- return currentParallelism;
- }
-
- LOG.info("Target processing capacity for {} is {}", vertex, targetCapacity);
- double scaleFactor = targetCapacity / averageTrueProcessingRate;
- double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
- if (scaleFactor < minScaleFactor) {
- LOG.info(
- "Computed scale factor of {} for {} is capped by maximum scale down factor to {}",
- scaleFactor,
- vertex,
- minScaleFactor);
- scaleFactor = minScaleFactor;
- }
-
- int newParallelism =
- scale(
- currentParallelism,
- (int) evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
- scaleFactor,
- conf.getInteger(VERTEX_MIN_PARALLELISM),
- conf.getInteger(VERTEX_MAX_PARALLELISM));
-
- if (!history.isEmpty()) {
- if (detectImmediateScaleDownAfterScaleUp(
- conf, history, currentParallelism, newParallelism)) {
- LOG.info(
- "Skipping immediate scale down after scale up for {} resetting target parallelism to {}",
- vertex,
- currentParallelism);
- newParallelism = currentParallelism;
- }
-
- // currentParallelism = 2 , newParallelism = 1, minimumProcRate = 1000 r/s
- // history
- // currentParallelism 1 => 3 -> empiricalProcRate = 800
- // empiricalProcRate + upperBoundary < minimumProcRate => don't scale
- }
-
- return newParallelism;
- }
-
- private boolean detectImmediateScaleDownAfterScaleUp(
- Configuration conf,
- SortedMap<Instant, ScalingSummary> history,
- int currentParallelism,
- int newParallelism) {
- var lastScalingTs = history.lastKey();
- var lastSummary = history.get(lastScalingTs);
-
- boolean isScaleDown = newParallelism < currentParallelism;
- boolean lastScaleUp = lastSummary.getNewParallelism() > lastSummary.getCurrentParallelism();
-
- var gracePeriod = conf.get(SCALE_UP_GRACE_PERIOD);
-
- boolean withinConfiguredTime =
- Duration.between(lastScalingTs, clock.instant()).minus(gracePeriod).isNegative();
-
- return isScaleDown && lastScaleUp && withinConfiguredTime;
- }
-
- public static int scale(
- int parallelism,
- int numKeyGroups,
- double scaleFactor,
- int minParallelism,
- int maxParallelism) {
- Preconditions.checkArgument(
- minParallelism <= maxParallelism,
- "The minimum parallelism must not be greater than the maximum parallelism.");
- if (minParallelism > numKeyGroups) {
- LOG.warn(
- "Specified autoscaler minimum parallelism {} is greater than the operator max parallelism {}. The min parallelism will be set to the operator max parallelism.",
- minParallelism,
- numKeyGroups);
- }
- if (numKeyGroups < maxParallelism && maxParallelism != Integer.MAX_VALUE) {
- LOG.warn(
- "Specified autoscaler maximum parallelism {} is greater than the operator max parallelism {}. This means the operator max parallelism can never be reached.",
- maxParallelism,
- numKeyGroups);
- }
-
- int newParallelism =
- // Prevent integer overflow when converting from double to integer.
- // We do not have to detect underflow because doubles cannot
- // underflow.
- (int) Math.min(Math.ceil(scaleFactor * parallelism), Integer.MAX_VALUE);
-
- // Cap parallelism at either number of key groups or parallelism limit
- final int upperBound = Math.min(numKeyGroups, maxParallelism);
-
- // Apply min/max parallelism
- newParallelism = Math.min(Math.max(minParallelism, newParallelism), upperBound);
-
- // Try to adjust the parallelism such that it divides the number of key groups without a
- // remainder => state is evenly spread across subtasks
- for (int p = newParallelism; p <= numKeyGroups / 2 && p <= upperBound; p++) {
- if (numKeyGroups % p == 0) {
- return p;
- }
- }
-
- // If key group adjustment fails, use originally computed parallelism
- return newParallelism;
- }
-
private void setVertexParallelismOverrides(
AbstractFlinkResource<?, ?> resource,
Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics,
@@ -362,6 +229,7 @@ public class ScalingExecutor implements Cleanup {
@VisibleForTesting
protected void setClock(Clock clock) {
this.clock = Preconditions.checkNotNull(clock);
+ jobVertexScaler.setClock(clock);
}
@Override
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
new file mode 100644
index 00000000..e3a774f7
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.junit.Assert;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Test for vertex parallelism scaler logic. */
+public class JobVertexScalerTest {
+
+ private JobVertexScaler vertexScaler;
+ private Configuration conf;
+
+ @BeforeEach
+ public void setup() {
+ vertexScaler = new JobVertexScaler();
+ conf = new Configuration();
+ conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
+ conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
+ }
+
+ @Test
+ public void testParallelismScaling() {
+ var op = new JobVertexID();
+ conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+ assertEquals(
+ 5,
+ vertexScaler.computeScaleTargetParallelism(
+ conf, op, evaluated(10, 50, 100), Collections.emptySortedMap()));
+
+ conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
+ assertEquals(
+ 8,
+ vertexScaler.computeScaleTargetParallelism(
+ conf, op, evaluated(10, 50, 100), Collections.emptySortedMap()));
+
+ conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
+ assertEquals(
+ 10,
+ vertexScaler.computeScaleTargetParallelism(
+ conf, op, evaluated(10, 80, 100), Collections.emptySortedMap()));
+
+ conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
+ assertEquals(
+ 8,
+ vertexScaler.computeScaleTargetParallelism(
+ conf, op, evaluated(10, 60, 100), Collections.emptySortedMap()));
+
+ assertEquals(
+ 8,
+ vertexScaler.computeScaleTargetParallelism(
+ conf, op, evaluated(10, 59, 100), Collections.emptySortedMap()));
+
+ conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.5);
+ assertEquals(
+ 10,
+ vertexScaler.computeScaleTargetParallelism(
+ conf, op, evaluated(2, 100, 40), Collections.emptySortedMap()));
+
+ conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6);
+ assertEquals(
+ 4,
+ vertexScaler.computeScaleTargetParallelism(
+ conf, op, evaluated(2, 100, 100), Collections.emptySortedMap()));
+
+ conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+ conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.5);
+ assertEquals(
+ 5,
+ vertexScaler.computeScaleTargetParallelism(
+ conf, op, evaluated(10, 10, 100), Collections.emptySortedMap()));
+
+ conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.6);
+ assertEquals(
+ 4,
+ vertexScaler.computeScaleTargetParallelism(
+ conf, op, evaluated(10, 10, 100), Collections.emptySortedMap()));
+ }
+
+ @Test
+ public void testParallelismComputation() {
+ final int minParallelism = 1;
+ final int maxParallelism = Integer.MAX_VALUE;
+ assertEquals(1, JobVertexScaler.scale(1, 720, 0.0001, minParallelism, maxParallelism));
+ assertEquals(1, JobVertexScaler.scale(2, 720, 0.1, minParallelism, maxParallelism));
+ assertEquals(5, JobVertexScaler.scale(6, 720, 0.8, minParallelism, maxParallelism));
+ assertEquals(32, JobVertexScaler.scale(16, 128, 1.5, minParallelism, maxParallelism));
+ assertEquals(400, JobVertexScaler.scale(200, 720, 2, minParallelism, maxParallelism));
+ assertEquals(
+ 720,
+ JobVertexScaler.scale(200, 720, Integer.MAX_VALUE, minParallelism, maxParallelism));
+ }
+
+ @Test
+ public void testParallelismComputationWithLimit() {
+ assertEquals(5, JobVertexScaler.scale(6, 720, 0.8, 1, 700));
+ assertEquals(8, JobVertexScaler.scale(8, 720, 0.8, 8, 700));
+
+ assertEquals(32, JobVertexScaler.scale(16, 128, 1.5, 1, Integer.MAX_VALUE));
+ assertEquals(64, JobVertexScaler.scale(16, 128, 1.5, 60, Integer.MAX_VALUE));
+
+ assertEquals(300, JobVertexScaler.scale(200, 720, 2, 1, 300));
+ assertEquals(600, JobVertexScaler.scale(200, 720, Integer.MAX_VALUE, 1, 600));
+ }
+
+ @Test
+ public void ensureMinParallelismDoesNotExceedMax() {
+ Assert.assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ assertEquals(
+ 600, JobVertexScaler.scale(200, 720, Integer.MAX_VALUE, 500, 499)));
+ }
+
+ @Test
+ public void testMinParallelismLimitIsUsed() {
+ conf.setInteger(AutoScalerOptions.VERTEX_MIN_PARALLELISM, 5);
+ assertEquals(
+ 5,
+ vertexScaler.computeScaleTargetParallelism(
+ conf,
+ new JobVertexID(),
+ evaluated(10, 100, 500),
+ Collections.emptySortedMap()));
+ }
+
+ @Test
+ public void testMaxParallelismLimitIsUsed() {
+ conf.setInteger(AutoScalerOptions.VERTEX_MAX_PARALLELISM, 10);
+ conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+ assertEquals(
+ 10,
+ vertexScaler.computeScaleTargetParallelism(
+ conf,
+ new JobVertexID(),
+ evaluated(10, 500, 100),
+ Collections.emptySortedMap()));
+ }
+
+ private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(
+ int parallelism, double target, double procRate, double catchupRate) {
+ var metrics = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
+ metrics.put(ScalingMetric.PARALLELISM, EvaluatedScalingMetric.of(parallelism));
+ metrics.put(ScalingMetric.MAX_PARALLELISM, EvaluatedScalingMetric.of(720));
+ metrics.put(ScalingMetric.TARGET_DATA_RATE, new EvaluatedScalingMetric(target, target));
+ metrics.put(ScalingMetric.CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(catchupRate));
+ metrics.put(
+ ScalingMetric.TRUE_PROCESSING_RATE, new EvaluatedScalingMetric(procRate, procRate));
+ ScalingMetricEvaluator.computeProcessingRateThresholds(metrics, conf);
+ return metrics;
+ }
+
+ private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(
+ int parallelism, double target, double procRate) {
+ return evaluated(parallelism, target, procRate, 0.);
+ }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
index 9aa67145..e1dc139e 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
-import org.junit.Assert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -37,7 +36,6 @@ import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -47,7 +45,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
-/** Test for scaling metrics collection logic. */
+/** Test for scaling execution logic. */
@EnableKubernetesMockClient(crud = true)
public class ScalingExecutorTest {
@@ -172,124 +170,6 @@ public class ScalingExecutorTest {
assertFalse(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated, scalingSummary));
}
- @Test
- public void testParallelismScaling() {
- var op = new JobVertexID();
- conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
- assertEquals(
- 5,
- scalingDecisionExecutor.computeScaleTargetParallelism(
- conf, op, evaluated(10, 50, 100), Collections.emptySortedMap()));
-
- conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
- assertEquals(
- 8,
- scalingDecisionExecutor.computeScaleTargetParallelism(
- conf, op, evaluated(10, 50, 100), Collections.emptySortedMap()));
-
- conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
- assertEquals(
- 10,
- scalingDecisionExecutor.computeScaleTargetParallelism(
- conf, op, evaluated(10, 80, 100), Collections.emptySortedMap()));
-
- conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
- assertEquals(
- 8,
- scalingDecisionExecutor.computeScaleTargetParallelism(
- conf, op, evaluated(10, 60, 100), Collections.emptySortedMap()));
-
- assertEquals(
- 8,
- scalingDecisionExecutor.computeScaleTargetParallelism(
- conf, op, evaluated(10, 59, 100), Collections.emptySortedMap()));
-
- conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.5);
- assertEquals(
- 10,
- scalingDecisionExecutor.computeScaleTargetParallelism(
- conf, op, evaluated(2, 100, 40), Collections.emptySortedMap()));
-
- conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6);
- assertEquals(
- 4,
- scalingDecisionExecutor.computeScaleTargetParallelism(
- conf, op, evaluated(2, 100, 100), Collections.emptySortedMap()));
-
- conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
- conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.5);
- assertEquals(
- 5,
- scalingDecisionExecutor.computeScaleTargetParallelism(
- conf, op, evaluated(10, 10, 100), Collections.emptySortedMap()));
-
- conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.6);
- assertEquals(
- 4,
- scalingDecisionExecutor.computeScaleTargetParallelism(
- conf, op, evaluated(10, 10, 100), Collections.emptySortedMap()));
- }
-
- @Test
- public void testParallelismComputation() {
- final int minParallelism = 1;
- final int maxParallelism = Integer.MAX_VALUE;
- assertEquals(1, ScalingExecutor.scale(1, 720, 0.0001, minParallelism, maxParallelism));
- assertEquals(1, ScalingExecutor.scale(2, 720, 0.1, minParallelism, maxParallelism));
- assertEquals(5, ScalingExecutor.scale(6, 720, 0.8, minParallelism, maxParallelism));
- assertEquals(32, ScalingExecutor.scale(16, 128, 1.5, minParallelism, maxParallelism));
- assertEquals(400, ScalingExecutor.scale(200, 720, 2, minParallelism, maxParallelism));
- assertEquals(
- 720,
- ScalingExecutor.scale(200, 720, Integer.MAX_VALUE, minParallelism, maxParallelism));
- }
-
- @Test
- public void testParallelismComputationWithLimit() {
- assertEquals(5, ScalingExecutor.scale(6, 720, 0.8, 1, 700));
- assertEquals(8, ScalingExecutor.scale(8, 720, 0.8, 8, 700));
-
- assertEquals(32, ScalingExecutor.scale(16, 128, 1.5, 1, Integer.MAX_VALUE));
- assertEquals(64, ScalingExecutor.scale(16, 128, 1.5, 60, Integer.MAX_VALUE));
-
- assertEquals(300, ScalingExecutor.scale(200, 720, 2, 1, 300));
- assertEquals(600, ScalingExecutor.scale(200, 720, Integer.MAX_VALUE, 1, 600));
- }
-
- @Test
- public void ensureMinParallelismDoesNotExceedMax() {
- Assert.assertThrows(
- IllegalArgumentException.class,
- () ->
- assertEquals(
- 600, ScalingExecutor.scale(200, 720, Integer.MAX_VALUE, 500, 499)));
- }
-
- @Test
- public void testMinParallelismLimitIsUsed() {
- conf.setInteger(AutoScalerOptions.VERTEX_MIN_PARALLELISM, 5);
- assertEquals(
- 5,
- scalingDecisionExecutor.computeScaleTargetParallelism(
- conf,
- new JobVertexID(),
- evaluated(10, 100, 500),
- Collections.emptySortedMap()));
- }
-
- @Test
- public void testMaxParallelismLimitIsUsed() {
- conf.setInteger(AutoScalerOptions.VERTEX_MAX_PARALLELISM, 10);
- conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
- assertEquals(
- 10,
- scalingDecisionExecutor.computeScaleTargetParallelism(
- conf,
- new JobVertexID(),
- evaluated(10, 500, 100),
- Collections.emptySortedMap()));
- }
-
@Test
public void testScaleDownAfterScaleUpDetection() throws Exception {
var op = new JobVertexID();