You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/05/17 15:39:11 UTC
[flink] 09/11: [FLINK-17407] Forward extended resource request to
Kubernetes.
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9dd1f6db59511842672f9427a95060029fe7a759
Author: Yangze Guo <ka...@gmail.com>
AuthorDate: Thu Apr 30 15:26:15 2020 +0800
[FLINK-17407] Forward extended resource request to Kubernetes.
---
.../flink/kubernetes/KubernetesResourceManager.java | 5 ++++-
.../decorators/InitJobManagerDecorator.java | 4 +++-
.../decorators/InitTaskManagerDecorator.java | 3 ++-
.../parameters/KubernetesTaskManagerParameters.java | 10 +++++++++-
.../flink/kubernetes/utils/KubernetesUtils.java | 19 +++++++++++++++----
.../kubeclient/KubernetesTaskManagerTestBase.java | 4 +++-
.../decorators/InitTaskManagerDecoratorTest.java | 21 +++++++++++++++++++++
.../KubernetesTaskManagerParametersTest.java | 4 +++-
8 files changed, 60 insertions(+), 10 deletions(-)
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
index e8f10c4..b2f8343 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesResourceManagerConfiguration;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.factory.KubernetesTaskManagerFactory;
@@ -36,6 +37,7 @@ import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.externalresource.ExternalResourceUtils;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerFactory;
@@ -313,7 +315,8 @@ public class KubernetesResourceManager extends ActiveResourceManager<KubernetesW
flinkConfig,
podName,
dynamicProperties,
- taskManagerParameters);
+ taskManagerParameters,
+ ExternalResourceUtils.getExternalResources(flinkConfig, KubernetesConfigOptions.EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX));
}
/**
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecorator.java
index ad54d71..ef04252 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecorator.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecorator.java
@@ -36,6 +36,7 @@ import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.ResourceRequirements;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@@ -84,7 +85,8 @@ public class InitJobManagerDecorator extends AbstractKubernetesStepDecorator {
private Container decorateMainContainer(Container container) {
final ResourceRequirements requirements = KubernetesUtils.getResourceRequirements(
kubernetesJobManagerParameters.getJobManagerMemoryMB(),
- kubernetesJobManagerParameters.getJobManagerCPU());
+ kubernetesJobManagerParameters.getJobManagerCPU(),
+ Collections.emptyMap());
return new ContainerBuilder(container)
.withName(kubernetesJobManagerParameters.getJobManagerMainContainerName())
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java
index f7814bb..fe9e67c 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java
@@ -79,7 +79,8 @@ public class InitTaskManagerDecorator extends AbstractKubernetesStepDecorator {
private Container decorateMainContainer(Container container) {
final ResourceRequirements resourceRequirements = KubernetesUtils.getResourceRequirements(
kubernetesTaskManagerParameters.getTaskManagerMemoryMB(),
- kubernetesTaskManagerParameters.getTaskManagerCPU());
+ kubernetesTaskManagerParameters.getTaskManagerCPU(),
+ kubernetesTaskManagerParameters.getTaskManagerExternalResources());
return new ContainerBuilder(container)
.withName(kubernetesTaskManagerParameters.getTaskManagerMainContainerName())
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesTaskManagerParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesTaskManagerParameters.java
index f0e763d..bd2c7a5 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesTaskManagerParameters.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesTaskManagerParameters.java
@@ -46,15 +46,19 @@ public class KubernetesTaskManagerParameters extends AbstractKubernetesParameter
private final ContaineredTaskManagerParameters containeredTaskManagerParameters;
+ private final Map<String, Long> taskManagerExternalResources;
+
public KubernetesTaskManagerParameters(
Configuration flinkConfig,
String podName,
String dynamicProperties,
- ContaineredTaskManagerParameters containeredTaskManagerParameters) {
+ ContaineredTaskManagerParameters containeredTaskManagerParameters,
+ Map<String, Long> taskManagerExternalResources) {
super(flinkConfig);
this.podName = checkNotNull(podName);
this.dynamicProperties = checkNotNull(dynamicProperties);
this.containeredTaskManagerParameters = checkNotNull(containeredTaskManagerParameters);
+ this.taskManagerExternalResources = checkNotNull(taskManagerExternalResources);
}
@Override
@@ -102,6 +106,10 @@ public class KubernetesTaskManagerParameters extends AbstractKubernetesParameter
return containeredTaskManagerParameters.getTaskExecutorProcessSpec().getCpuCores().getValue().doubleValue();
}
+ public Map<String, Long> getTaskManagerExternalResources() {
+ return taskManagerExternalResources;
+ }
+
public int getRPCPort() {
final int taskManagerRpcPort = KubernetesUtils.parsePort(flinkConfig, TaskManagerOptions.RPC_PORT);
checkArgument(taskManagerRpcPort > 0, "%s should not be 0.", TaskManagerOptions.RPC_PORT.key());
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
index 877997b..3abc0a2 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
@@ -130,18 +130,29 @@ public class KubernetesUtils {
*
* @param mem Memory in mb.
* @param cpu cpu.
+ * @param externalResources external resources
* @return KubernetesResource requirements.
*/
- public static ResourceRequirements getResourceRequirements(int mem, double cpu) {
+ public static ResourceRequirements getResourceRequirements(int mem, double cpu, Map<String, Long> externalResources) {
final Quantity cpuQuantity = new Quantity(String.valueOf(cpu));
final Quantity memQuantity = new Quantity(mem + Constants.RESOURCE_UNIT_MB);
- return new ResourceRequirementsBuilder()
+ ResourceRequirementsBuilder resourceRequirementsBuilder = new ResourceRequirementsBuilder()
.addToRequests(Constants.RESOURCE_NAME_MEMORY, memQuantity)
.addToRequests(Constants.RESOURCE_NAME_CPU, cpuQuantity)
.addToLimits(Constants.RESOURCE_NAME_MEMORY, memQuantity)
- .addToLimits(Constants.RESOURCE_NAME_CPU, cpuQuantity)
- .build();
+ .addToLimits(Constants.RESOURCE_NAME_CPU, cpuQuantity);
+
+ // Add the external resources to resource requirement.
+ for (Map.Entry<String, Long> externalResource: externalResources.entrySet()) {
+ final Quantity resourceQuantity = new Quantity(String.valueOf(externalResource.getValue()));
+ resourceRequirementsBuilder
+ .addToRequests(externalResource.getKey(), resourceQuantity)
+ .addToLimits(externalResource.getKey(), resourceQuantity);
+ LOG.info("Request external resource {} with config key {}.", resourceQuantity.getAmount(), externalResource.getKey());
+ }
+
+ return resourceRequirementsBuilder.build();
}
public static String getCommonStartCommand(
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/KubernetesTaskManagerTestBase.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/KubernetesTaskManagerTestBase.java
index e71b52b..1a9b365 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/KubernetesTaskManagerTestBase.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/KubernetesTaskManagerTestBase.java
@@ -27,6 +27,7 @@ import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesTaskManagerPa
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
+import org.apache.flink.runtime.externalresource.ExternalResourceUtils;
import java.util.HashMap;
import java.util.Map;
@@ -94,6 +95,7 @@ public class KubernetesTaskManagerTestBase extends KubernetesTestBase {
flinkConfig,
POD_NAME,
DYNAMIC_PROPERTIES,
- containeredTaskManagerParameters);
+ containeredTaskManagerParameters,
+ ExternalResourceUtils.getExternalResources(flinkConfig, KubernetesConfigOptions.EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX));
}
}
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java
index 23f87be..e2d2c5f 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.kubernetes.kubeclient.decorators;
+import org.apache.flink.configuration.ExternalResourceOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.KubernetesTaskManagerTestBase;
@@ -65,6 +66,10 @@ public class InitTaskManagerDecoratorTest extends KubernetesTaskManagerTestBase
new Toleration("NoSchedule", "key1", "Equal", null, "value1"),
new Toleration("NoExecute", "key2", "Exists", 6000L, null));
+ private static final String RESOURCE_NAME = "test";
+ private static final Long RESOURCE_AMOUNT = 2L;
+ private static final String RESOURCE_CONFIG_KEY = "test.com/test";
+
private Pod resultPod;
private Container resultMainContainer;
@@ -75,6 +80,11 @@ public class InitTaskManagerDecoratorTest extends KubernetesTaskManagerTestBase
this.flinkConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_SECRETS, IMAGE_PULL_SECRETS);
this.flinkConfig.set(KubernetesConfigOptions.TASK_MANAGER_ANNOTATIONS, ANNOTATIONS);
this.flinkConfig.setString(KubernetesConfigOptions.TASK_MANAGER_TOLERATIONS.key(), TOLERATION_STRING);
+
+ // Set up external resource configs
+ flinkConfig.setString(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST.key(), RESOURCE_NAME);
+ flinkConfig.setLong(ExternalResourceOptions.getSystemConfigKeyConfigOptionForResource(RESOURCE_NAME, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT);
+ flinkConfig.setString(ExternalResourceOptions.getSystemConfigKeyConfigOptionForResource(RESOURCE_NAME, KubernetesConfigOptions.EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX), RESOURCE_CONFIG_KEY);
}
@Override
@@ -125,6 +135,17 @@ public class InitTaskManagerDecoratorTest extends KubernetesTaskManagerTestBase
}
@Test
+ public void testExternalResourceInResourceRequirements() {
+ final ResourceRequirements resourceRequirements = this.resultMainContainer.getResources();
+
+ final Map<String, Quantity> requests = resourceRequirements.getRequests();
+ assertEquals(Long.toString(RESOURCE_AMOUNT), requests.get(RESOURCE_CONFIG_KEY).getAmount());
+
+ final Map<String, Quantity> limits = resourceRequirements.getLimits();
+ assertEquals(Long.toString(RESOURCE_AMOUNT), limits.get(RESOURCE_CONFIG_KEY).getAmount());
+ }
+
+ @Test
public void testMainContainerPorts() {
final List<ContainerPort> expectedContainerPorts = Collections.singletonList(
new ContainerPortBuilder()
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesTaskManagerParametersTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesTaskManagerParametersTest.java
index 62c6127..203ac97 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesTaskManagerParametersTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesTaskManagerParametersTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
import org.junit.Test;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -84,7 +85,8 @@ public class KubernetesTaskManagerParametersTest extends KubernetesTestBase {
this.kubernetesTaskManagerParameters = new KubernetesTaskManagerParameters(flinkConfig,
POD_NAME,
DYNAMIC_PROPERTIES,
- containeredTaskManagerParameters);
+ containeredTaskManagerParameters,
+ Collections.emptyMap());
}
@Test