You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by az...@apache.org on 2020/02/25 09:06:19 UTC

[flink] 02/04: [FLINK-16111][k8s] Fix Kubernetes deployment not respecting 'taskmanager.cpu.cores'.

This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ac2aaf9277333a6d8ac5aa1c0c81189f56e6ffd4
Author: Xintong Song <to...@gmail.com>
AuthorDate: Mon Feb 17 15:43:16 2020 +0800

    [FLINK-16111][k8s] Fix Kubernetes deployment not respecting 'taskmanager.cpu.cores'.
    
    This closes #11110.
---
 .../kubernetes/KubernetesResourceManager.java      |  3 ++-
 .../kubernetes/KubernetesResourceManagerTest.java  | 28 ++++++++++++++++++++++
 .../MesosTaskManagerParameters.java                |  4 ++--
 .../clusterframework/TaskExecutorProcessUtils.java |  5 ++++
 4 files changed, 37 insertions(+), 3 deletions(-)

diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
index 4c94a28..fdf3afc 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
@@ -32,6 +32,7 @@ import org.apache.flink.kubernetes.utils.KubernetesUtils;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
@@ -338,6 +339,6 @@ public class KubernetesResourceManager extends ActiveResourceManager<KubernetesW
 
 	@Override
 	protected double getCpuCores(Configuration configuration) {
-		return flinkConfig.getDouble(KubernetesConfigOptions.TASK_MANAGER_CPU, numSlotsPerTaskManager);
+		return TaskExecutorProcessUtils.getCpuCoresWithFallbackConfigOption(configuration, KubernetesConfigOptions.TASK_MANAGER_CPU);
 	}
 }
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
index 0d35a2a..08d8f51 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
@@ -80,6 +80,7 @@ import java.util.stream.Collectors;
 
 import static junit.framework.TestCase.assertEquals;
 import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
@@ -287,6 +288,33 @@ public class KubernetesResourceManagerTest extends KubernetesTestBase {
 			Matchers.containsInAnyOrder(taskManagerPrefix + "1-1", taskManagerPrefix + "2-1"));
 	}
 
+	@Test
+	public void testGetCpuCoresCommonOption() {
+		final Configuration configuration = new Configuration();
+		configuration.setDouble(TaskManagerOptions.CPU_CORES, 1.0);
+		configuration.setDouble(KubernetesConfigOptions.TASK_MANAGER_CPU, 2.0);
+		configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);
+
+		assertThat(resourceManager.getCpuCores(configuration), is(1.0));
+	}
+
+	@Test
+	public void testGetCpuCoresKubernetesOption() {
+		final Configuration configuration = new Configuration();
+		configuration.setDouble(KubernetesConfigOptions.TASK_MANAGER_CPU, 2.0);
+		configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);
+
+		assertThat(resourceManager.getCpuCores(configuration), is(2.0));
+	}
+
+	@Test
+	public void testGetCpuCoresNumSlots() {
+		final Configuration configuration = new Configuration();
+		configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);
+
+		assertThat(resourceManager.getCpuCores(configuration), is(3.0));
+	}
+
 	private TestingKubernetesResourceManager createAndStartResourceManager(Configuration configuration) throws Exception {
 
 		final TestingRpcService rpcService = new TestingRpcService(configuration);
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
index 4cd8516..d4bf66f 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
@@ -75,6 +75,7 @@ public class MesosTaskManagerParameters {
 
 	public static final ConfigOption<Double> MESOS_RM_TASKS_CPUS =
 		key("mesos.resourcemanager.tasks.cpus")
+		.doubleType()
 		.defaultValue(0.0)
 		.withDescription("CPUs to assign to the Mesos workers.");
 
@@ -424,8 +425,7 @@ public class MesosTaskManagerParameters {
 	}
 
 	private static double getCpuCores(final Configuration configuration) {
-		double fallback = configuration.getDouble(MESOS_RM_TASKS_CPUS);
-		return TaskExecutorProcessUtils.getCpuCoresWithFallback(configuration, fallback).getValue().doubleValue();
+		return TaskExecutorProcessUtils.getCpuCoresWithFallbackConfigOption(configuration, MESOS_RM_TASKS_CPUS);
 	}
 
 	private static MemorySize getTotalProcessMemory(final Configuration configuration) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java
index 917c575..9b7ae12 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java
@@ -644,6 +644,11 @@ public class TaskExecutorProcessUtils {
 		return getCpuCoresWithFallback(config, -1.0);
 	}
 
+	public static double getCpuCoresWithFallbackConfigOption(final Configuration config, ConfigOption<Double> fallbackOption) {
+		double fallbackValue = config.getDouble(fallbackOption);
+		return TaskExecutorProcessUtils.getCpuCoresWithFallback(config, fallbackValue).getValue().doubleValue();
+	}
+
 	public static CPUResource getCpuCoresWithFallback(final Configuration config, double fallback) {
 		final double cpuCores;
 		if (config.contains(TaskManagerOptions.CPU_CORES)) {