You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by az...@apache.org on 2020/02/25 09:06:19 UTC
[flink] 02/04: [FLINK-16111][k8s] Fix Kubernetes deployment not
respecting 'taskmanager.cpu.cores'.
This is an automated email from the ASF dual-hosted git repository.
azagrebin pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
commit ac2aaf9277333a6d8ac5aa1c0c81189f56e6ffd4
Author: Xintong Song <to...@gmail.com>
AuthorDate: Mon Feb 17 15:43:16 2020 +0800
[FLINK-16111][k8s] Fix Kubernetes deployment not respecting 'taskmanager.cpu.cores'.
This closes #11110.
---
.../kubernetes/KubernetesResourceManager.java | 3 ++-
.../kubernetes/KubernetesResourceManagerTest.java | 28 ++++++++++++++++++++++
.../MesosTaskManagerParameters.java | 4 ++--
.../clusterframework/TaskExecutorProcessUtils.java | 5 ++++
4 files changed, 37 insertions(+), 3 deletions(-)
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
index 4c94a28..fdf3afc 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
@@ -32,6 +32,7 @@ import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
@@ -338,6 +339,6 @@ public class KubernetesResourceManager extends ActiveResourceManager<KubernetesW
@Override
protected double getCpuCores(Configuration configuration) {
- return flinkConfig.getDouble(KubernetesConfigOptions.TASK_MANAGER_CPU, numSlotsPerTaskManager);
+ return TaskExecutorProcessUtils.getCpuCoresWithFallbackConfigOption(configuration, KubernetesConfigOptions.TASK_MANAGER_CPU);
}
}
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
index 0d35a2a..08d8f51 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
@@ -80,6 +80,7 @@ import java.util.stream.Collectors;
import static junit.framework.TestCase.assertEquals;
import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -287,6 +288,33 @@ public class KubernetesResourceManagerTest extends KubernetesTestBase {
Matchers.containsInAnyOrder(taskManagerPrefix + "1-1", taskManagerPrefix + "2-1"));
}
+ @Test
+ public void testGetCpuCoresCommonOption() {
+ final Configuration configuration = new Configuration();
+ configuration.setDouble(TaskManagerOptions.CPU_CORES, 1.0);
+ configuration.setDouble(KubernetesConfigOptions.TASK_MANAGER_CPU, 2.0);
+ configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);
+
+ assertThat(resourceManager.getCpuCores(configuration), is(1.0));
+ }
+
+ @Test
+ public void testGetCpuCoresKubernetesOption() {
+ final Configuration configuration = new Configuration();
+ configuration.setDouble(KubernetesConfigOptions.TASK_MANAGER_CPU, 2.0);
+ configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);
+
+ assertThat(resourceManager.getCpuCores(configuration), is(2.0));
+ }
+
+ @Test
+ public void testGetCpuCoresNumSlots() {
+ final Configuration configuration = new Configuration();
+ configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);
+
+ assertThat(resourceManager.getCpuCores(configuration), is(3.0));
+ }
+
private TestingKubernetesResourceManager createAndStartResourceManager(Configuration configuration) throws Exception {
final TestingRpcService rpcService = new TestingRpcService(configuration);
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
index 4cd8516..d4bf66f 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
@@ -75,6 +75,7 @@ public class MesosTaskManagerParameters {
public static final ConfigOption<Double> MESOS_RM_TASKS_CPUS =
key("mesos.resourcemanager.tasks.cpus")
+ .doubleType()
.defaultValue(0.0)
.withDescription("CPUs to assign to the Mesos workers.");
@@ -424,8 +425,7 @@ public class MesosTaskManagerParameters {
}
private static double getCpuCores(final Configuration configuration) {
- double fallback = configuration.getDouble(MESOS_RM_TASKS_CPUS);
- return TaskExecutorProcessUtils.getCpuCoresWithFallback(configuration, fallback).getValue().doubleValue();
+ return TaskExecutorProcessUtils.getCpuCoresWithFallbackConfigOption(configuration, MESOS_RM_TASKS_CPUS);
}
private static MemorySize getTotalProcessMemory(final Configuration configuration) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java
index 917c575..9b7ae12 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java
@@ -644,6 +644,11 @@ public class TaskExecutorProcessUtils {
return getCpuCoresWithFallback(config, -1.0);
}
+ public static double getCpuCoresWithFallbackConfigOption(final Configuration config, ConfigOption<Double> fallbackOption) {
+ double fallbackValue = config.getDouble(fallbackOption);
+ return TaskExecutorProcessUtils.getCpuCoresWithFallback(config, fallbackValue).getValue().doubleValue();
+ }
+
public static CPUResource getCpuCoresWithFallback(final Configuration config, double fallback) {
final double cpuCores;
if (config.contains(TaskManagerOptions.CPU_CORES)) {