You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2021/02/09 07:23:50 UTC

[flink] branch master updated: [FLINK-21102] Add ScaleUpController for declarative scheduler

This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b59057c  [FLINK-21102] Add ScaleUpController for declarative scheduler
b59057c is described below

commit b59057cdec0339e73a6252d99d20c2d67b205b24
Author: Robert Metzger <rm...@apache.org>
AuthorDate: Thu Feb 4 16:24:36 2021 +0100

    [FLINK-21102] Add ScaleUpController for declarative scheduler
---
 .../generated/all_jobmanager_section.html          |  6 +++
 .../generated/expert_scheduling_section.html       |  6 +++
 .../generated/job_manager_configuration.html       |  6 +++
 .../flink/configuration/JobManagerOptions.java     | 11 +++++
 .../scalingpolicy/ReactiveScaleUpController.java   | 40 ++++++++++++++++++
 .../scalingpolicy/ScaleUpController.java           | 39 ++++++++++++++++++
 .../scalingpolicy/ScaleUpControllerTest.java       | 48 ++++++++++++++++++++++
 7 files changed, 156 insertions(+)

diff --git a/docs/_includes/generated/all_jobmanager_section.html b/docs/_includes/generated/all_jobmanager_section.html
index 3cf1e61..b7f3979 100644
--- a/docs/_includes/generated/all_jobmanager_section.html
+++ b/docs/_includes/generated/all_jobmanager_section.html
@@ -15,6 +15,12 @@
             <td>Dictionary for JobManager to store the archives of completed jobs.</td>
         </tr>
         <tr>
+            <td><h5>jobmanager.declarative-scheduler.min-parallelism-increase</h5></td>
+            <td style="word-wrap: break-word;">1</td>
+            <td>Integer</td>
+            <td>Configure the minimum increase in parallelism for a job to scale up.</td>
+        </tr>
+        <tr>
             <td><h5>jobmanager.execution.attempts-history-size</h5></td>
             <td style="word-wrap: break-word;">16</td>
             <td>Integer</td>
diff --git a/docs/_includes/generated/expert_scheduling_section.html b/docs/_includes/generated/expert_scheduling_section.html
index 9020268..f796ab5 100644
--- a/docs/_includes/generated/expert_scheduling_section.html
+++ b/docs/_includes/generated/expert_scheduling_section.html
@@ -15,6 +15,12 @@
             <td>Enable the slot spread out allocation strategy. This strategy tries to spread out the slots evenly across all available <span markdown="span">`TaskExecutors`</span>.</td>
         </tr>
         <tr>
+            <td><h5>jobmanager.declarative-scheduler.min-parallelism-increase</h5></td>
+            <td style="word-wrap: break-word;">1</td>
+            <td>Integer</td>
+            <td>Configure the minimum increase in parallelism for a job to scale up.</td>
+        </tr>
+        <tr>
             <td><h5>slot.idle.timeout</h5></td>
             <td style="word-wrap: break-word;">50000</td>
             <td>Long</td>
diff --git a/docs/_includes/generated/job_manager_configuration.html b/docs/_includes/generated/job_manager_configuration.html
index 793616b..f16b753 100644
--- a/docs/_includes/generated/job_manager_configuration.html
+++ b/docs/_includes/generated/job_manager_configuration.html
@@ -21,6 +21,12 @@
             <td>The local address of the network interface that the job manager binds to. If not configured, '0.0.0.0' will be used.</td>
         </tr>
         <tr>
+            <td><h5>jobmanager.declarative-scheduler.min-parallelism-increase</h5></td>
+            <td style="word-wrap: break-word;">1</td>
+            <td>Integer</td>
+            <td>Configure the minimum increase in parallelism for a job to scale up.</td>
+        </tr>
+        <tr>
             <td><h5>jobmanager.execution.attempts-history-size</h5></td>
             <td style="word-wrap: break-word;">16</td>
             <td>Integer</td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index ed5f423..7518ba6 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -358,6 +358,17 @@ public class JobManagerOptions {
                                     .list(text("'ng': new generation scheduler"))
                                     .build());
 
+    @Documentation.Section({
+        Documentation.Sections.EXPERT_SCHEDULING,
+        Documentation.Sections.ALL_JOB_MANAGER
+    })
+    public static final ConfigOption<Integer> MIN_PARALLELISM_INCREASE =
+            key("jobmanager.declarative-scheduler.min-parallelism-increase")
+                    .intType()
+                    .defaultValue(1)
+                    .withDescription(
+                            "Configure the minimum increase in parallelism for a job to scale up.");
+
     /**
      * Config parameter controlling whether partitions should already be released during the job
      * execution.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/scalingpolicy/ReactiveScaleUpController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/scalingpolicy/ReactiveScaleUpController.java
new file mode 100644
index 0000000..a3bb4a8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/scalingpolicy/ReactiveScaleUpController.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.scalingpolicy;
+
+import org.apache.flink.configuration.Configuration;
+
+import static org.apache.flink.configuration.JobManagerOptions.MIN_PARALLELISM_INCREASE;
+
+/**
+ * Simple scaling policy for a reactive mode. The user can configure a minimum cumulative
+ * parallelism increase to allow a scale up.
+ */
+public class ReactiveScaleUpController implements ScaleUpController {
+
+    private final int minParallelismIncrease;
+
+    public ReactiveScaleUpController(Configuration configuration) {
+        minParallelismIncrease = configuration.get(MIN_PARALLELISM_INCREASE);
+    }
+
+    @Override
+    public boolean canScaleUp(int currentCumulativeParallelism, int newCumulativeParallelism) {
+        return newCumulativeParallelism - currentCumulativeParallelism >= minParallelismIncrease;
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/scalingpolicy/ScaleUpController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/scalingpolicy/ScaleUpController.java
new file mode 100644
index 0000000..b05a683
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/scalingpolicy/ScaleUpController.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.scalingpolicy;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Simple policy for controlling the scale up behavior of the {@link
+ * org.apache.flink.runtime.scheduler.declarative.DeclarativeScheduler}.
+ */
+@Internal
+public interface ScaleUpController {
+
+    /**
+     * This method gets called whenever new resources are available to the scheduler to scale up.
+     *
+     * @param currentCumulativeParallelism Cumulative parallelism of the currently running job
+     *     graph.
+     * @param newCumulativeParallelism Potential new cumulative parallelism with the additional
+     *     resources.
+     * @return true if the policy decided to scale up based on the provided information.
+     */
+    boolean canScaleUp(int currentCumulativeParallelism, int newCumulativeParallelism);
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/scalingpolicy/ScaleUpControllerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/scalingpolicy/ScaleUpControllerTest.java
new file mode 100644
index 0000000..2e762fa
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/scalingpolicy/ScaleUpControllerTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.scalingpolicy;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link ScaleUpController}. */
+public class ScaleUpControllerTest extends TestLogger {
+    private static final Configuration TEST_CONFIG = new Configuration();
+
+    static {
+        TEST_CONFIG.set(JobManagerOptions.MIN_PARALLELISM_INCREASE, 2);
+    }
+
+    @Test
+    public void testScaleUp() {
+        ScaleUpController suc = new ReactiveScaleUpController(TEST_CONFIG);
+        assertThat(suc.canScaleUp(1, 4), is(true));
+    }
+
+    @Test
+    public void testNoScaleUp() {
+        ScaleUpController suc = new ReactiveScaleUpController(TEST_CONFIG);
+        assertThat(suc.canScaleUp(2, 3), is(false));
+    }
+}