You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/05/17 15:39:11 UTC

[flink] 09/11: [FLINK-17407] Forward extended resource request to Kubernetes.

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9dd1f6db59511842672f9427a95060029fe7a759
Author: Yangze Guo <ka...@gmail.com>
AuthorDate: Thu Apr 30 15:26:15 2020 +0800

    [FLINK-17407] Forward extended resource request to Kubernetes.
---
 .../flink/kubernetes/KubernetesResourceManager.java |  5 ++++-
 .../decorators/InitJobManagerDecorator.java         |  4 +++-
 .../decorators/InitTaskManagerDecorator.java        |  3 ++-
 .../parameters/KubernetesTaskManagerParameters.java | 10 +++++++++-
 .../flink/kubernetes/utils/KubernetesUtils.java     | 19 +++++++++++++++----
 .../kubeclient/KubernetesTaskManagerTestBase.java   |  4 +++-
 .../decorators/InitTaskManagerDecoratorTest.java    | 21 +++++++++++++++++++++
 .../KubernetesTaskManagerParametersTest.java        |  4 +++-
 8 files changed, 60 insertions(+), 10 deletions(-)

diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
index e8f10c4..b2f8343 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesResourceManagerConfiguration;
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
 import org.apache.flink.kubernetes.kubeclient.factory.KubernetesTaskManagerFactory;
@@ -36,6 +37,7 @@ import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
 import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.externalresource.ExternalResourceUtils;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerFactory;
@@ -313,7 +315,8 @@ public class KubernetesResourceManager extends ActiveResourceManager<KubernetesW
 			flinkConfig,
 			podName,
 			dynamicProperties,
-			taskManagerParameters);
+			taskManagerParameters,
+			ExternalResourceUtils.getExternalResources(flinkConfig, KubernetesConfigOptions.EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX));
 	}
 
 	/**
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecorator.java
index ad54d71..ef04252 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecorator.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecorator.java
@@ -36,6 +36,7 @@ import io.fabric8.kubernetes.api.model.PodBuilder;
 import io.fabric8.kubernetes.api.model.ResourceRequirements;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -84,7 +85,8 @@ public class InitJobManagerDecorator extends AbstractKubernetesStepDecorator {
 	private Container decorateMainContainer(Container container) {
 		final ResourceRequirements requirements = KubernetesUtils.getResourceRequirements(
 				kubernetesJobManagerParameters.getJobManagerMemoryMB(),
-				kubernetesJobManagerParameters.getJobManagerCPU());
+				kubernetesJobManagerParameters.getJobManagerCPU(),
+				Collections.emptyMap());
 
 		return new ContainerBuilder(container)
 				.withName(kubernetesJobManagerParameters.getJobManagerMainContainerName())
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java
index f7814bb..fe9e67c 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java
@@ -79,7 +79,8 @@ public class InitTaskManagerDecorator extends AbstractKubernetesStepDecorator {
 	private Container decorateMainContainer(Container container) {
 		final ResourceRequirements resourceRequirements = KubernetesUtils.getResourceRequirements(
 				kubernetesTaskManagerParameters.getTaskManagerMemoryMB(),
-				kubernetesTaskManagerParameters.getTaskManagerCPU());
+				kubernetesTaskManagerParameters.getTaskManagerCPU(),
+				kubernetesTaskManagerParameters.getTaskManagerExternalResources());
 
 		return new ContainerBuilder(container)
 				.withName(kubernetesTaskManagerParameters.getTaskManagerMainContainerName())
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesTaskManagerParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesTaskManagerParameters.java
index f0e763d..bd2c7a5 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesTaskManagerParameters.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesTaskManagerParameters.java
@@ -46,15 +46,19 @@ public class KubernetesTaskManagerParameters extends AbstractKubernetesParameter
 
 	private final ContaineredTaskManagerParameters containeredTaskManagerParameters;
 
+	private final Map<String, Long> taskManagerExternalResources;
+
 	public KubernetesTaskManagerParameters(
 			Configuration flinkConfig,
 			String podName,
 			String dynamicProperties,
-			ContaineredTaskManagerParameters containeredTaskManagerParameters) {
+			ContaineredTaskManagerParameters containeredTaskManagerParameters,
+			Map<String, Long> taskManagerExternalResources) {
 		super(flinkConfig);
 		this.podName = checkNotNull(podName);
 		this.dynamicProperties = checkNotNull(dynamicProperties);
 		this.containeredTaskManagerParameters = checkNotNull(containeredTaskManagerParameters);
+		this.taskManagerExternalResources = checkNotNull(taskManagerExternalResources);
 	}
 
 	@Override
@@ -102,6 +106,10 @@ public class KubernetesTaskManagerParameters extends AbstractKubernetesParameter
 		return containeredTaskManagerParameters.getTaskExecutorProcessSpec().getCpuCores().getValue().doubleValue();
 	}
 
+	public Map<String, Long> getTaskManagerExternalResources() {
+		return taskManagerExternalResources;
+	}
+
 	public int getRPCPort() {
 		final int taskManagerRpcPort = KubernetesUtils.parsePort(flinkConfig, TaskManagerOptions.RPC_PORT);
 		checkArgument(taskManagerRpcPort > 0, "%s should not be 0.", TaskManagerOptions.RPC_PORT.key());
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
index 877997b..3abc0a2 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
@@ -130,18 +130,29 @@ public class KubernetesUtils {
 	 *
 	 * @param mem Memory in mb.
 	 * @param cpu cpu.
+	 * @param externalResources external resources
 	 * @return KubernetesResource requirements.
 	 */
-	public static ResourceRequirements getResourceRequirements(int mem, double cpu) {
+	public static ResourceRequirements getResourceRequirements(int mem, double cpu, Map<String, Long> externalResources) {
 		final Quantity cpuQuantity = new Quantity(String.valueOf(cpu));
 		final Quantity memQuantity = new Quantity(mem + Constants.RESOURCE_UNIT_MB);
 
-		return new ResourceRequirementsBuilder()
+		ResourceRequirementsBuilder resourceRequirementsBuilder = new ResourceRequirementsBuilder()
 			.addToRequests(Constants.RESOURCE_NAME_MEMORY, memQuantity)
 			.addToRequests(Constants.RESOURCE_NAME_CPU, cpuQuantity)
 			.addToLimits(Constants.RESOURCE_NAME_MEMORY, memQuantity)
-			.addToLimits(Constants.RESOURCE_NAME_CPU, cpuQuantity)
-			.build();
+			.addToLimits(Constants.RESOURCE_NAME_CPU, cpuQuantity);
+
+		// Add the external resources to resource requirement.
+		for (Map.Entry<String, Long> externalResource: externalResources.entrySet()) {
+			final Quantity resourceQuantity = new Quantity(String.valueOf(externalResource.getValue()));
+			resourceRequirementsBuilder
+				.addToRequests(externalResource.getKey(), resourceQuantity)
+				.addToLimits(externalResource.getKey(), resourceQuantity);
+			LOG.info("Request external resource {} with config key {}.", resourceQuantity.getAmount(), externalResource.getKey());
+		}
+
+		return resourceRequirementsBuilder.build();
 	}
 
 	public static String getCommonStartCommand(
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/KubernetesTaskManagerTestBase.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/KubernetesTaskManagerTestBase.java
index e71b52b..1a9b365 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/KubernetesTaskManagerTestBase.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/KubernetesTaskManagerTestBase.java
@@ -27,6 +27,7 @@ import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesTaskManagerPa
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
 import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
+import org.apache.flink.runtime.externalresource.ExternalResourceUtils;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -94,6 +95,7 @@ public class KubernetesTaskManagerTestBase extends KubernetesTestBase {
 				flinkConfig,
 				POD_NAME,
 				DYNAMIC_PROPERTIES,
-				containeredTaskManagerParameters);
+				containeredTaskManagerParameters,
+				ExternalResourceUtils.getExternalResources(flinkConfig, KubernetesConfigOptions.EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX));
 	}
 }
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java
index 23f87be..e2d2c5f 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.kubernetes.kubeclient.decorators;
 
+import org.apache.flink.configuration.ExternalResourceOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.kubeclient.FlinkPod;
 import org.apache.flink.kubernetes.kubeclient.KubernetesTaskManagerTestBase;
@@ -65,6 +66,10 @@ public class InitTaskManagerDecoratorTest extends KubernetesTaskManagerTestBase
 		new Toleration("NoSchedule", "key1", "Equal", null, "value1"),
 		new Toleration("NoExecute", "key2", "Exists", 6000L, null));
 
+	private static final String RESOURCE_NAME = "test";
+	private static final Long RESOURCE_AMOUNT = 2L;
+	private static final String RESOURCE_CONFIG_KEY = "test.com/test";
+
 	private Pod resultPod;
 	private Container resultMainContainer;
 
@@ -75,6 +80,11 @@ public class InitTaskManagerDecoratorTest extends KubernetesTaskManagerTestBase
 		this.flinkConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_SECRETS, IMAGE_PULL_SECRETS);
 		this.flinkConfig.set(KubernetesConfigOptions.TASK_MANAGER_ANNOTATIONS, ANNOTATIONS);
 		this.flinkConfig.setString(KubernetesConfigOptions.TASK_MANAGER_TOLERATIONS.key(), TOLERATION_STRING);
+
+		// Set up external resource configs
+		flinkConfig.setString(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST.key(), RESOURCE_NAME);
+		flinkConfig.setLong(ExternalResourceOptions.getSystemConfigKeyConfigOptionForResource(RESOURCE_NAME, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT);
+		flinkConfig.setString(ExternalResourceOptions.getSystemConfigKeyConfigOptionForResource(RESOURCE_NAME, KubernetesConfigOptions.EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX), RESOURCE_CONFIG_KEY);
 	}
 
 	@Override
@@ -125,6 +135,17 @@ public class InitTaskManagerDecoratorTest extends KubernetesTaskManagerTestBase
 	}
 
 	@Test
+	public void testExternalResourceInResourceRequirements() {
+		final ResourceRequirements resourceRequirements = this.resultMainContainer.getResources();
+
+		final Map<String, Quantity> requests = resourceRequirements.getRequests();
+		assertEquals(Long.toString(RESOURCE_AMOUNT), requests.get(RESOURCE_CONFIG_KEY).getAmount());
+
+		final Map<String, Quantity> limits = resourceRequirements.getLimits();
+		assertEquals(Long.toString(RESOURCE_AMOUNT), limits.get(RESOURCE_CONFIG_KEY).getAmount());
+	}
+
+	@Test
 	public void testMainContainerPorts() {
 		final List<ContainerPort> expectedContainerPorts = Collections.singletonList(
 			new ContainerPortBuilder()
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesTaskManagerParametersTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesTaskManagerParametersTest.java
index 62c6127..203ac97 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesTaskManagerParametersTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesTaskManagerParametersTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
 
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -84,7 +85,8 @@ public class KubernetesTaskManagerParametersTest extends KubernetesTestBase {
 		this.kubernetesTaskManagerParameters = new KubernetesTaskManagerParameters(flinkConfig,
 			POD_NAME,
 			DYNAMIC_PROPERTIES,
-			containeredTaskManagerParameters);
+			containeredTaskManagerParameters,
+			Collections.emptyMap());
 	}
 
 	@Test