You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2021/02/09 07:23:50 UTC
[flink] branch master updated: [FLINK-21102] Add ScaleUpController
for declarative scheduler
This is an automated email from the ASF dual-hosted git repository.
rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b59057c [FLINK-21102] Add ScaleUpController for declarative scheduler
b59057c is described below
commit b59057cdec0339e73a6252d99d20c2d67b205b24
Author: Robert Metzger <rm...@apache.org>
AuthorDate: Thu Feb 4 16:24:36 2021 +0100
[FLINK-21102] Add ScaleUpController for declarative scheduler
---
.../generated/all_jobmanager_section.html | 6 +++
.../generated/expert_scheduling_section.html | 6 +++
.../generated/job_manager_configuration.html | 6 +++
.../flink/configuration/JobManagerOptions.java | 11 +++++
.../scalingpolicy/ReactiveScaleUpController.java | 40 ++++++++++++++++++
.../scalingpolicy/ScaleUpController.java | 39 ++++++++++++++++++
.../scalingpolicy/ScaleUpControllerTest.java | 48 ++++++++++++++++++++++
7 files changed, 156 insertions(+)
diff --git a/docs/_includes/generated/all_jobmanager_section.html b/docs/_includes/generated/all_jobmanager_section.html
index 3cf1e61..b7f3979 100644
--- a/docs/_includes/generated/all_jobmanager_section.html
+++ b/docs/_includes/generated/all_jobmanager_section.html
@@ -15,6 +15,12 @@
<td>Dictionary for JobManager to store the archives of completed jobs.</td>
</tr>
<tr>
+ <td><h5>jobmanager.declarative-scheduler.min-parallelism-increase</h5></td>
+ <td style="word-wrap: break-word;">1</td>
+ <td>Integer</td>
+ <td>Configure the minimum increase in parallelism for a job to scale up.</td>
+ </tr>
+ <tr>
<td><h5>jobmanager.execution.attempts-history-size</h5></td>
<td style="word-wrap: break-word;">16</td>
<td>Integer</td>
diff --git a/docs/_includes/generated/expert_scheduling_section.html b/docs/_includes/generated/expert_scheduling_section.html
index 9020268..f796ab5 100644
--- a/docs/_includes/generated/expert_scheduling_section.html
+++ b/docs/_includes/generated/expert_scheduling_section.html
@@ -15,6 +15,12 @@
<td>Enable the slot spread out allocation strategy. This strategy tries to spread out the slots evenly across all available <span markdown="span">`TaskExecutors`</span>.</td>
</tr>
<tr>
+ <td><h5>jobmanager.declarative-scheduler.min-parallelism-increase</h5></td>
+ <td style="word-wrap: break-word;">1</td>
+ <td>Integer</td>
+ <td>Configure the minimum increase in parallelism for a job to scale up.</td>
+ </tr>
+ <tr>
<td><h5>slot.idle.timeout</h5></td>
<td style="word-wrap: break-word;">50000</td>
<td>Long</td>
diff --git a/docs/_includes/generated/job_manager_configuration.html b/docs/_includes/generated/job_manager_configuration.html
index 793616b..f16b753 100644
--- a/docs/_includes/generated/job_manager_configuration.html
+++ b/docs/_includes/generated/job_manager_configuration.html
@@ -21,6 +21,12 @@
<td>The local address of the network interface that the job manager binds to. If not configured, '0.0.0.0' will be used.</td>
</tr>
<tr>
+ <td><h5>jobmanager.declarative-scheduler.min-parallelism-increase</h5></td>
+ <td style="word-wrap: break-word;">1</td>
+ <td>Integer</td>
+ <td>Configure the minimum increase in parallelism for a job to scale up.</td>
+ </tr>
+ <tr>
<td><h5>jobmanager.execution.attempts-history-size</h5></td>
<td style="word-wrap: break-word;">16</td>
<td>Integer</td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index ed5f423..7518ba6 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -358,6 +358,17 @@ public class JobManagerOptions {
.list(text("'ng': new generation scheduler"))
.build());
+ @Documentation.Section({
+ Documentation.Sections.EXPERT_SCHEDULING,
+ Documentation.Sections.ALL_JOB_MANAGER
+ })
+ public static final ConfigOption<Integer> MIN_PARALLELISM_INCREASE =
+ key("jobmanager.declarative-scheduler.min-parallelism-increase")
+ .intType()
+ .defaultValue(1)
+ .withDescription(
+ "Configure the minimum increase in parallelism for a job to scale up.");
+
/**
* Config parameter controlling whether partitions should already be released during the job
* execution.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/scalingpolicy/ReactiveScaleUpController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/scalingpolicy/ReactiveScaleUpController.java
new file mode 100644
index 0000000..a3bb4a8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/scalingpolicy/ReactiveScaleUpController.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.scalingpolicy;
+
+import org.apache.flink.configuration.Configuration;
+
+import static org.apache.flink.configuration.JobManagerOptions.MIN_PARALLELISM_INCREASE;
+
+/**
+ * Simple scaling policy for a reactive mode. The user can configure a minimum cumulative
+ * parallelism increase to allow a scale up.
+ */
+public class ReactiveScaleUpController implements ScaleUpController {
+
+ private final int minParallelismIncrease;
+
+ public ReactiveScaleUpController(Configuration configuration) {
+ minParallelismIncrease = configuration.get(MIN_PARALLELISM_INCREASE);
+ }
+
+ @Override
+ public boolean canScaleUp(int currentCumulativeParallelism, int newCumulativeParallelism) {
+ return newCumulativeParallelism - currentCumulativeParallelism >= minParallelismIncrease;
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/scalingpolicy/ScaleUpController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/scalingpolicy/ScaleUpController.java
new file mode 100644
index 0000000..b05a683
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/scalingpolicy/ScaleUpController.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.scalingpolicy;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Simple policy for controlling the scale up behavior of the {@link
+ * org.apache.flink.runtime.scheduler.declarative.DeclarativeScheduler}.
+ */
+@Internal
+public interface ScaleUpController {
+
+ /**
+ * This method gets called whenever new resources are available to the scheduler to scale up.
+ *
+ * @param currentCumulativeParallelism Cumulative parallelism of the currently running job
+ * graph.
+ * @param newCumulativeParallelism Potential new cumulative parallelism with the additional
+ * resources.
+ * @return true if the policy decided to scale up based on the provided information.
+ */
+ boolean canScaleUp(int currentCumulativeParallelism, int newCumulativeParallelism);
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/scalingpolicy/ScaleUpControllerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/scalingpolicy/ScaleUpControllerTest.java
new file mode 100644
index 0000000..2e762fa
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/scalingpolicy/ScaleUpControllerTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.scalingpolicy;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link ScaleUpController}. */
+public class ScaleUpControllerTest extends TestLogger {
+ private static final Configuration TEST_CONFIG = new Configuration();
+
+ static {
+ TEST_CONFIG.set(JobManagerOptions.MIN_PARALLELISM_INCREASE, 2);
+ }
+
+ @Test
+ public void testScaleUp() {
+ ScaleUpController suc = new ReactiveScaleUpController(TEST_CONFIG);
+ assertThat(suc.canScaleUp(1, 4), is(true));
+ }
+
+ @Test
+ public void testNoScaleUp() {
+ ScaleUpController suc = new ReactiveScaleUpController(TEST_CONFIG);
+ assertThat(suc.canScaleUp(2, 3), is(false));
+ }
+}