You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by az...@apache.org on 2020/02/25 09:06:17 UTC

[flink] branch release-1.10 updated (543e24c -> 82f6dc2)

This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 543e24c  [FLINK-16013][core] Make complex type config options could be parsed correctly
     new ab3e390  [hotfix] Minor clean-up in TaskExecutorProcessUtils.
     new ac2aaf9  [FLINK-16111][k8s] Fix Kubernetes deployment not respecting 'taskmanager.cpu.cores'.
     new 8858d75  [hotfix][yarn][test] Add test cases for validating Yarn deployment respecting the cpu configuration fallback order.
     new 82f6dc2  [hotfix][mesos][test] Update test cases for validating Mesos deployment respecting the cpu configuration fallback order.

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../kubernetes/KubernetesResourceManager.java      |  3 +-
 .../kubernetes/KubernetesResourceManagerTest.java  | 28 +++++++++++
 .../MesosTaskManagerParameters.java                |  4 +-
 .../MesosTaskManagerParametersTest.java            |  3 ++
 .../clusterframework/TaskExecutorProcessUtils.java | 11 +++--
 .../apache/flink/yarn/YarnResourceManagerTest.java | 55 ++++++++++++++++++++++
 6 files changed, 96 insertions(+), 8 deletions(-)


[flink] 04/04: [hotfix][mesos][test] Update test cases for validating Mesos deployment respecting the cpu configuration fallback order.

Posted by az...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 82f6dc2dbcf16f0cd7b28d562c2e2d75b3060eb3
Author: Xintong Song <to...@gmail.com>
AuthorDate: Mon Feb 17 19:57:43 2020 +0800

    [hotfix][mesos][test] Update test cases for validating Mesos deployment respecting the cpu configuration fallback order.
---
 .../mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
index cdddd91..5792f77 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
@@ -232,6 +232,8 @@ public class MesosTaskManagerParametersTest extends TestLogger {
 	public void testConfigCpuCores() {
 		Configuration config = getConfiguration();
 		config.setDouble(TaskManagerOptions.CPU_CORES, 1.5);
+		config.setDouble(MesosTaskManagerParameters.MESOS_RM_TASKS_CPUS, 2.5);
+		config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);
 		MesosTaskManagerParameters mesosTaskManagerParameters = MesosTaskManagerParameters.create(config);
 		assertThat(mesosTaskManagerParameters.cpus(), is(1.5));
 	}
@@ -240,6 +242,7 @@ public class MesosTaskManagerParametersTest extends TestLogger {
 	public void testLegacyConfigCpuCores() {
 		Configuration config = getConfiguration();
 		config.setDouble(MesosTaskManagerParameters.MESOS_RM_TASKS_CPUS, 1.5);
+		config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);
 		MesosTaskManagerParameters mesosTaskManagerParameters = MesosTaskManagerParameters.create(config);
 		assertThat(mesosTaskManagerParameters.cpus(), is(1.5));
 	}


[flink] 01/04: [hotfix] Minor clean-up in TaskExecutorProcessUtils.

Posted by az...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ab3e3906c5f362a539dd7a6fe2f400dfab32a93e
Author: Xintong Song <to...@gmail.com>
AuthorDate: Mon Feb 24 19:01:46 2020 +0800

    [hotfix] Minor clean-up in TaskExecutorProcessUtils.
---
 .../flink/runtime/clusterframework/TaskExecutorProcessUtils.java  | 8 ++------
 1 file changed, 2 insertions(+), 6 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java
index 31b65d5..917c575 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java
@@ -640,15 +640,11 @@ public class TaskExecutorProcessUtils {
 		}
 	}
 
-	public static CPUResource getCpuCoresWithFallback(final Configuration config, double fallback) {
-		return getCpuCores(config, fallback);
-	}
-
 	private static CPUResource getCpuCores(final Configuration config) {
-		return getCpuCores(config, -1.0);
+		return getCpuCoresWithFallback(config, -1.0);
 	}
 
-	private static CPUResource getCpuCores(final Configuration config, double fallback) {
+	public static CPUResource getCpuCoresWithFallback(final Configuration config, double fallback) {
 		final double cpuCores;
 		if (config.contains(TaskManagerOptions.CPU_CORES)) {
 			cpuCores = config.getDouble(TaskManagerOptions.CPU_CORES);


[flink] 03/04: [hotfix][yarn][test] Add test cases for validating Yarn deployment respecting the cpu configuration fallback order.

Posted by az...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8858d75697ba4a3e888b8a86981b4bb6e1bd558f
Author: Xintong Song <to...@gmail.com>
AuthorDate: Mon Feb 17 19:51:58 2020 +0800

    [hotfix][yarn][test] Add test cases for validating Yarn deployment respecting the cpu configuration fallback order.
---
 .../apache/flink/yarn/YarnResourceManagerTest.java | 55 ++++++++++++++++++++++
 1 file changed, 55 insertions(+)

diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index 2798076..54f26b6 100755
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.yarn;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -56,6 +57,7 @@ import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.RunnableWithException;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
 
@@ -526,6 +528,59 @@ public class YarnResourceManagerTest extends TestLogger {
 		}};
 	}
 
+	@Test
+	public void testGetCpuCoresCommonOption() throws Exception {
+		final Configuration configuration = new Configuration();
+		configuration.setDouble(TaskManagerOptions.CPU_CORES, 1.0);
+		configuration.setInteger(YarnConfigOptions.VCORES, 2);
+		configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);
+
+		new Context() {{
+			runTest(() -> assertThat(resourceManager.getCpuCores(configuration), is(1.0)));
+		}};
+	}
+
+	@Test
+	public void testGetCpuCoresYarnOption() throws Exception {
+		final Configuration configuration = new Configuration();
+		configuration.setInteger(YarnConfigOptions.VCORES, 2);
+		configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);
+
+		new Context() {{
+			runTest(() -> assertThat(resourceManager.getCpuCores(configuration), is(2.0)));
+		}};
+	}
+
+	@Test
+	public void testGetCpuCoresNumSlots() throws Exception {
+		final Configuration configuration = new Configuration();
+		configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);
+
+		new Context() {{
+			runTest(() -> assertThat(resourceManager.getCpuCores(configuration), is(3.0)));
+		}};
+	}
+
+	@Test
+	public void testGetCpuRoundUp() throws Exception {
+		final Configuration configuration = new Configuration();
+		configuration.setDouble(TaskManagerOptions.CPU_CORES, 0.5);
+
+		new Context() {{
+			runTest(() -> assertThat(resourceManager.getCpuCores(configuration), is(1.0)));
+		}};
+	}
+
+	@Test(expected = IllegalConfigurationException.class)
+	public void testGetCpuExceedMaxInt() throws Exception {
+		final Configuration configuration = new Configuration();
+		configuration.setDouble(TaskManagerOptions.CPU_CORES, Double.MAX_VALUE);
+
+		new Context() {{
+			resourceManager.getCpuCores(configuration);
+		}};
+	}
+
 	private void registerSlotRequest(
 			TestingYarnResourceManager resourceManager,
 			MockResourceManagerRuntimeServices rmServices,


[flink] 02/04: [FLINK-16111][k8s] Fix Kubernetes deployment not respecting 'taskmanager.cpu.cores'.

Posted by az...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ac2aaf9277333a6d8ac5aa1c0c81189f56e6ffd4
Author: Xintong Song <to...@gmail.com>
AuthorDate: Mon Feb 17 15:43:16 2020 +0800

    [FLINK-16111][k8s] Fix Kubernetes deployment not respecting 'taskmanager.cpu.cores'.
    
    This closes #11110.
---
 .../kubernetes/KubernetesResourceManager.java      |  3 ++-
 .../kubernetes/KubernetesResourceManagerTest.java  | 28 ++++++++++++++++++++++
 .../MesosTaskManagerParameters.java                |  4 ++--
 .../clusterframework/TaskExecutorProcessUtils.java |  5 ++++
 4 files changed, 37 insertions(+), 3 deletions(-)

diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
index 4c94a28..fdf3afc 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
@@ -32,6 +32,7 @@ import org.apache.flink.kubernetes.utils.KubernetesUtils;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
@@ -338,6 +339,6 @@ public class KubernetesResourceManager extends ActiveResourceManager<KubernetesW
 
 	@Override
 	protected double getCpuCores(Configuration configuration) {
-		return flinkConfig.getDouble(KubernetesConfigOptions.TASK_MANAGER_CPU, numSlotsPerTaskManager);
+		return TaskExecutorProcessUtils.getCpuCoresWithFallbackConfigOption(configuration, KubernetesConfigOptions.TASK_MANAGER_CPU);
 	}
 }
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
index 0d35a2a..08d8f51 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
@@ -80,6 +80,7 @@ import java.util.stream.Collectors;
 
 import static junit.framework.TestCase.assertEquals;
 import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
@@ -287,6 +288,33 @@ public class KubernetesResourceManagerTest extends KubernetesTestBase {
 			Matchers.containsInAnyOrder(taskManagerPrefix + "1-1", taskManagerPrefix + "2-1"));
 	}
 
+	@Test
+	public void testGetCpuCoresCommonOption() {
+		final Configuration configuration = new Configuration();
+		configuration.setDouble(TaskManagerOptions.CPU_CORES, 1.0);
+		configuration.setDouble(KubernetesConfigOptions.TASK_MANAGER_CPU, 2.0);
+		configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);
+
+		assertThat(resourceManager.getCpuCores(configuration), is(1.0));
+	}
+
+	@Test
+	public void testGetCpuCoresKubernetesOption() {
+		final Configuration configuration = new Configuration();
+		configuration.setDouble(KubernetesConfigOptions.TASK_MANAGER_CPU, 2.0);
+		configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);
+
+		assertThat(resourceManager.getCpuCores(configuration), is(2.0));
+	}
+
+	@Test
+	public void testGetCpuCoresNumSlots() {
+		final Configuration configuration = new Configuration();
+		configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);
+
+		assertThat(resourceManager.getCpuCores(configuration), is(3.0));
+	}
+
 	private TestingKubernetesResourceManager createAndStartResourceManager(Configuration configuration) throws Exception {
 
 		final TestingRpcService rpcService = new TestingRpcService(configuration);
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
index 4cd8516..d4bf66f 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
@@ -75,6 +75,7 @@ public class MesosTaskManagerParameters {
 
 	public static final ConfigOption<Double> MESOS_RM_TASKS_CPUS =
 		key("mesos.resourcemanager.tasks.cpus")
+		.doubleType()
 		.defaultValue(0.0)
 		.withDescription("CPUs to assign to the Mesos workers.");
 
@@ -424,8 +425,7 @@ public class MesosTaskManagerParameters {
 	}
 
 	private static double getCpuCores(final Configuration configuration) {
-		double fallback = configuration.getDouble(MESOS_RM_TASKS_CPUS);
-		return TaskExecutorProcessUtils.getCpuCoresWithFallback(configuration, fallback).getValue().doubleValue();
+		return TaskExecutorProcessUtils.getCpuCoresWithFallbackConfigOption(configuration, MESOS_RM_TASKS_CPUS);
 	}
 
 	private static MemorySize getTotalProcessMemory(final Configuration configuration) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java
index 917c575..9b7ae12 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtils.java
@@ -644,6 +644,11 @@ public class TaskExecutorProcessUtils {
 		return getCpuCoresWithFallback(config, -1.0);
 	}
 
+	public static double getCpuCoresWithFallbackConfigOption(final Configuration config, ConfigOption<Double> fallbackOption) {
+		double fallbackValue = config.getDouble(fallbackOption);
+		return TaskExecutorProcessUtils.getCpuCoresWithFallback(config, fallbackValue).getValue().doubleValue();
+	}
+
 	public static CPUResource getCpuCoresWithFallback(final Configuration config, double fallback) {
 		final double cpuCores;
 		if (config.contains(TaskManagerOptions.CPU_CORES)) {