You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2023/05/11 07:40:53 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-31936] Support setting scale up max factor

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new f24668db [FLINK-31936] Support setting scale up max factor
f24668db is described below

commit f24668dbe126551caeee570aa0953c689648a335
Author: Zhanghao Chen <m1...@outlook.com>
AuthorDate: Thu May 11 15:40:46 2023 +0800

    [FLINK-31936] Support setting scale up max factor
---
 .../shortcodes/generated/auto_scaler_configuration.html  |  6 ++++++
 .../kubernetes/operator/autoscaler/JobVertexScaler.java  | 16 +++++++++++++++-
 .../operator/autoscaler/config/AutoScalerOptions.java    |  7 +++++++
 .../operator/autoscaler/BacklogBasedScalingTest.java     |  1 +
 .../operator/autoscaler/JobVertexScalerTest.java         | 14 ++++++++++++++
 .../autoscaler/MetricsCollectionAndEvaluationTest.java   |  1 +
 .../operator/autoscaler/ScalingExecutorTest.java         |  1 +
 7 files changed, 45 insertions(+), 1 deletion(-)

diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
index e97f9382..da3c5b9b 100644
--- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
+++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
@@ -68,6 +68,12 @@
             <td>Duration</td>
             <td>Duration in which no scale down of a vertex is allowed after it has been scaled up.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.job.autoscaler.scale-up.max-factor</h5></td>
+            <td style="word-wrap: break-word;">2.147483647E9</td>
+            <td>Double</td>
+            <td>Max scale up factor. 2.0 means job can only be scaled up with 200% of the current parallelism.</td>
+        </tr>
         <tr>
             <td><h5>kubernetes.operator.job.autoscaler.scaling.effectiveness.detection.enabled</h5></td>
             <td style="word-wrap: break-word;">true</td>
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
index f0d6a183..90d46a87 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
@@ -38,6 +38,7 @@ import java.util.Map;
 import java.util.SortedMap;
 
 import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
 import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
 import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
 import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
@@ -93,6 +94,7 @@ public class JobVertexScaler {
         LOG.debug("Target processing capacity for {} is {}", vertex, targetCapacity);
         double scaleFactor = targetCapacity / averageTrueProcessingRate;
         double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
+        double maxScaleFactor = 1 + conf.get(MAX_SCALE_UP_FACTOR);
         if (scaleFactor < minScaleFactor) {
             LOG.debug(
                     "Computed scale factor of {} for {} is capped by maximum scale down factor to {}",
@@ -100,8 +102,19 @@ public class JobVertexScaler {
                     vertex,
                     minScaleFactor);
             scaleFactor = minScaleFactor;
+        } else if (scaleFactor > maxScaleFactor) {
+            LOG.debug(
+                    "Computed scale factor of {} for {} is capped by maximum scale up factor to {}",
+                    scaleFactor,
+                    vertex,
+                    maxScaleFactor);
+            scaleFactor = maxScaleFactor;
         }
 
+        // Cap target capacity according to the capped scale factor
+        double cappedTargetCapacity = averageTrueProcessingRate * scaleFactor;
+        LOG.debug("Capped target processing capacity for {} is {}", vertex, cappedTargetCapacity);
+
         int newParallelism =
                 scale(
                         currentParallelism,
@@ -124,7 +137,8 @@ public class JobVertexScaler {
 
         // We record our expectations for this scaling operation
         evaluatedMetrics.put(
-                ScalingMetric.EXPECTED_PROCESSING_RATE, EvaluatedScalingMetric.of(targetCapacity));
+                ScalingMetric.EXPECTED_PROCESSING_RATE,
+                EvaluatedScalingMetric.of(cappedTargetCapacity));
         return newParallelism;
     }
 
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
index df32aa5e..4f21fb9d 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
@@ -99,6 +99,13 @@ public class AutoScalerOptions {
                     .withDescription(
                             "Max scale down factor. 1 means no limit on scale down, 0.6 means job can only be scaled down with 60% of the original parallelism.");
 
+    public static final ConfigOption<Double> MAX_SCALE_UP_FACTOR =
+            autoScalerConfig("scale-up.max-factor")
+                    .doubleType()
+                    .defaultValue((double) Integer.MAX_VALUE)
+                    .withDescription(
+                            "Max scale up factor. 2.0 means job can only be scaled up with 200% of the current parallelism.");
+
     public static final ConfigOption<Duration> CATCH_UP_DURATION =
             autoScalerConfig("catch-up.duration")
                     .durationType()
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
index 7df42c6a..6809dd86 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
@@ -100,6 +100,7 @@ public class BacklogBasedScalingTest extends OperatorTestBase {
         defaultConf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofSeconds(2));
         defaultConf.set(AutoScalerOptions.SCALING_ENABLED, true);
         defaultConf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
+        defaultConf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double) Integer.MAX_VALUE);
         defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.8);
         defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1);
         defaultConf.set(AutoScalerOptions.SCALE_UP_GRACE_PERIOD, Duration.ZERO);
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
index d4c3d6a0..7f50facb 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
@@ -66,6 +66,7 @@ public class JobVertexScalerTest {
         vertexScaler = new JobVertexScaler(new EventRecorder(kubernetesClient, eventCollector));
         conf = new Configuration();
         conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
+        conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double) Integer.MAX_VALUE);
         conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
     }
 
@@ -125,6 +126,19 @@ public class JobVertexScalerTest {
                 4,
                 vertexScaler.computeScaleTargetParallelism(
                         flinkDep, conf, op, evaluated(10, 10, 100), Collections.emptySortedMap()));
+
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+        conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.5);
+        assertEquals(
+                15,
+                vertexScaler.computeScaleTargetParallelism(
+                        flinkDep, conf, op, evaluated(10, 200, 10), Collections.emptySortedMap()));
+
+        conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.6);
+        assertEquals(
+                16,
+                vertexScaler.computeScaleTargetParallelism(
+                        flinkDep, conf, op, evaluated(10, 200, 10), Collections.emptySortedMap()));
     }
 
     @Test
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
index 88ae09b5..27341510 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -119,6 +119,7 @@ public class MetricsCollectionAndEvaluationTest {
         conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO);
         conf.set(AutoScalerOptions.SCALING_ENABLED, true);
         conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
+        conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double) Integer.MAX_VALUE);
         ReconciliationUtils.updateStatusForDeployedSpec(app, conf);
         clock = Clock.fixed(Instant.ofEpochSecond(0), ZoneId.systemDefault());
         metricsCollector.setClock(clock);
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
index 0bff7357..71103e0e 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
@@ -74,6 +74,7 @@ public class ScalingExecutorTest {
         conf.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ZERO);
         conf.set(AutoScalerOptions.SCALING_ENABLED, true);
         conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
+        conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double) Integer.MAX_VALUE);
         conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
 
         flinkDep = TestUtils.buildApplicationCluster();