You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2023/05/11 07:40:53 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-31936] Support setting scale up max factor
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new f24668db [FLINK-31936] Support setting scale up max factor
f24668db is described below
commit f24668dbe126551caeee570aa0953c689648a335
Author: Zhanghao Chen <m1...@outlook.com>
AuthorDate: Thu May 11 15:40:46 2023 +0800
[FLINK-31936] Support setting scale up max factor
---
.../shortcodes/generated/auto_scaler_configuration.html | 6 ++++++
.../kubernetes/operator/autoscaler/JobVertexScaler.java | 16 +++++++++++++++-
.../operator/autoscaler/config/AutoScalerOptions.java | 7 +++++++
.../operator/autoscaler/BacklogBasedScalingTest.java | 1 +
.../operator/autoscaler/JobVertexScalerTest.java | 14 ++++++++++++++
.../autoscaler/MetricsCollectionAndEvaluationTest.java | 1 +
.../operator/autoscaler/ScalingExecutorTest.java | 1 +
7 files changed, 45 insertions(+), 1 deletion(-)
diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
index e97f9382..da3c5b9b 100644
--- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
+++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
@@ -68,6 +68,12 @@
<td>Duration</td>
<td>Duration in which no scale down of a vertex is allowed after it has been scaled up.</td>
</tr>
+ <tr>
+ <td><h5>kubernetes.operator.job.autoscaler.scale-up.max-factor</h5></td>
+ <td style="word-wrap: break-word;">2.147483647E9</td>
+ <td>Double</td>
+ <td>Max scale up factor. 2.0 means job can only be scaled up with 200% of the current parallelism.</td>
+ </tr>
<tr>
<td><h5>kubernetes.operator.job.autoscaler.scaling.effectiveness.detection.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
index f0d6a183..90d46a87 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
@@ -38,6 +38,7 @@ import java.util.Map;
import java.util.SortedMap;
import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
@@ -93,6 +94,7 @@ public class JobVertexScaler {
LOG.debug("Target processing capacity for {} is {}", vertex, targetCapacity);
double scaleFactor = targetCapacity / averageTrueProcessingRate;
double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
+ double maxScaleFactor = 1 + conf.get(MAX_SCALE_UP_FACTOR);
if (scaleFactor < minScaleFactor) {
LOG.debug(
"Computed scale factor of {} for {} is capped by maximum scale down factor to {}",
@@ -100,8 +102,19 @@ public class JobVertexScaler {
vertex,
minScaleFactor);
scaleFactor = minScaleFactor;
+ } else if (scaleFactor > maxScaleFactor) {
+ LOG.debug(
+ "Computed scale factor of {} for {} is capped by maximum scale up factor to {}",
+ scaleFactor,
+ vertex,
+ maxScaleFactor);
+ scaleFactor = maxScaleFactor;
}
+ // Cap target capacity according to the capped scale factor
+ double cappedTargetCapacity = averageTrueProcessingRate * scaleFactor;
+ LOG.debug("Capped target processing capacity for {} is {}", vertex, cappedTargetCapacity);
+
int newParallelism =
scale(
currentParallelism,
@@ -124,7 +137,8 @@ public class JobVertexScaler {
// We record our expectations for this scaling operation
evaluatedMetrics.put(
- ScalingMetric.EXPECTED_PROCESSING_RATE, EvaluatedScalingMetric.of(targetCapacity));
+ ScalingMetric.EXPECTED_PROCESSING_RATE,
+ EvaluatedScalingMetric.of(cappedTargetCapacity));
return newParallelism;
}
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
index df32aa5e..4f21fb9d 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
@@ -99,6 +99,13 @@ public class AutoScalerOptions {
.withDescription(
"Max scale down factor. 1 means no limit on scale down, 0.6 means job can only be scaled down with 60% of the original parallelism.");
+ public static final ConfigOption<Double> MAX_SCALE_UP_FACTOR =
+ autoScalerConfig("scale-up.max-factor")
+ .doubleType()
+ .defaultValue((double) Integer.MAX_VALUE)
+ .withDescription(
+ "Max scale up factor. 2.0 means job can only be scaled up with 200% of the current parallelism.");
+
public static final ConfigOption<Duration> CATCH_UP_DURATION =
autoScalerConfig("catch-up.duration")
.durationType()
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
index 7df42c6a..6809dd86 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
@@ -100,6 +100,7 @@ public class BacklogBasedScalingTest extends OperatorTestBase {
defaultConf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofSeconds(2));
defaultConf.set(AutoScalerOptions.SCALING_ENABLED, true);
defaultConf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
+ defaultConf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double) Integer.MAX_VALUE);
defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.8);
defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1);
defaultConf.set(AutoScalerOptions.SCALE_UP_GRACE_PERIOD, Duration.ZERO);
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
index d4c3d6a0..7f50facb 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
@@ -66,6 +66,7 @@ public class JobVertexScalerTest {
vertexScaler = new JobVertexScaler(new EventRecorder(kubernetesClient, eventCollector));
conf = new Configuration();
conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
+ conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double) Integer.MAX_VALUE);
conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
}
@@ -125,6 +126,19 @@ public class JobVertexScalerTest {
4,
vertexScaler.computeScaleTargetParallelism(
flinkDep, conf, op, evaluated(10, 10, 100), Collections.emptySortedMap()));
+
+ conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+ conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.5);
+ assertEquals(
+ 15,
+ vertexScaler.computeScaleTargetParallelism(
+ flinkDep, conf, op, evaluated(10, 200, 10), Collections.emptySortedMap()));
+
+ conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.6);
+ assertEquals(
+ 16,
+ vertexScaler.computeScaleTargetParallelism(
+ flinkDep, conf, op, evaluated(10, 200, 10), Collections.emptySortedMap()));
}
@Test
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
index 88ae09b5..27341510 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -119,6 +119,7 @@ public class MetricsCollectionAndEvaluationTest {
conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO);
conf.set(AutoScalerOptions.SCALING_ENABLED, true);
conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
+ conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double) Integer.MAX_VALUE);
ReconciliationUtils.updateStatusForDeployedSpec(app, conf);
clock = Clock.fixed(Instant.ofEpochSecond(0), ZoneId.systemDefault());
metricsCollector.setClock(clock);
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
index 0bff7357..71103e0e 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
@@ -74,6 +74,7 @@ public class ScalingExecutorTest {
conf.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ZERO);
conf.set(AutoScalerOptions.SCALING_ENABLED, true);
conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
+ conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double) Integer.MAX_VALUE);
conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
flinkDep = TestUtils.buildApplicationCluster();