You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2023/12/07 08:11:53 UTC

(flink-kubernetes-operator) branch main updated: [FLINK-33645] Taskmanager env vars in config not given to taskmanager

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new e4528260 [FLINK-33645] Taskmanager env vars in config not given to taskmanager
e4528260 is described below

commit e4528260be78dabeb6552afda10e78802021984d
Author: Tony Garrard <ga...@uk.ibm.com>
AuthorDate: Thu Dec 7 08:11:48 2023 +0000

    [FLINK-33645] Taskmanager env vars in config not given to taskmanager
    
    
    Signed-off-by: A. Garrard <GA...@uk.ibm.com>
---
 .../StandaloneKubernetesTaskManagerParameters.java |  8 ++++++--
 .../operator/kubeclient/utils/TestUtils.java       |  5 +++++
 .../KubernetesStandaloneClusterDescriptorTest.java | 22 ++++++++++++++++++++++
 3 files changed, 33 insertions(+), 2 deletions(-)

diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesTaskManagerParameters.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesTaskManagerParameters.java
index de313f27..2517b756 100644
--- a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesTaskManagerParameters.java
+++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesTaskManagerParameters.java
@@ -19,6 +19,8 @@ package org.apache.flink.kubernetes.operator.kubeclient.parameters;
 
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters;
@@ -73,8 +75,10 @@ public class StandaloneKubernetesTaskManagerParameters extends AbstractKubernete
 
     @Override
     public Map<String, String> getEnvironments() {
-        // TMs have environment set using the pod template.
-        return new HashMap<>();
+        // TMs have environment set using the pod template and config containerized.taskmanager.env
+        return new HashMap<>(
+                ConfigurationUtils.getPrefixedKeyValuePairs(
+                        ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX, flinkConfig));
     }
 
     @Override
diff --git a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/utils/TestUtils.java b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/utils/TestUtils.java
index 8d73dd10..203275f8 100644
--- a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/utils/TestUtils.java
+++ b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/utils/TestUtils.java
@@ -54,6 +54,11 @@ public class TestUtils {
     public static final double TASK_MANAGER_CPU = 4;
     public static final double JOB_MANAGER_CPU = 2;
 
+    public static final String USER_ENV_VAR = "USER_ENV";
+
+    public static final String JM_ENV_VALUE = "TEST_JM";
+    public static final String TM_ENV_VALUE = "TEST_TM";
+
     public static Map<String, String> generateTestStringStringMap(
             String keyPrefix, String valuePrefix, int entries) {
         Map<String, String> map = new HashMap<>();
diff --git a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java
index 921bccf0..b53c51a8 100644
--- a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java
+++ b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java
@@ -21,12 +21,14 @@ import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.deployment.application.ApplicationConfiguration;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
 import org.apache.flink.kubernetes.operator.kubeclient.Fabric8FlinkStandaloneKubeClient;
 import org.apache.flink.kubernetes.operator.kubeclient.FlinkStandaloneKubeClient;
 import org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils;
+import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.EnvVar;
 import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.apps.Deployment;
 import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.Config;
 import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.ConfigBuilder;
@@ -43,7 +45,10 @@ import org.junit.jupiter.api.Test;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils.JM_ENV_VALUE;
 import static org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils.TEST_NAMESPACE;
+import static org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils.TM_ENV_VALUE;
+import static org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils.USER_ENV_VAR;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -76,6 +81,12 @@ public class KubernetesStandaloneClusterDescriptorTest {
         flinkConfig.setString(BlobServerOptions.PORT, String.valueOf(0));
         flinkConfig.setString(TaskManagerOptions.RPC_PORT, String.valueOf(0));
         flinkConfig.setString(RestOptions.BIND_PORT, String.valueOf(0));
+        flinkConfig.setString(
+                ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX + USER_ENV_VAR,
+                JM_ENV_VALUE);
+        flinkConfig.setString(
+                ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + USER_ENV_VAR,
+                TM_ENV_VALUE);
 
         var clusterClientProvider = clusterDescriptor.deploySessionCluster(clusterSpecification);
 
@@ -111,6 +122,17 @@ public class KubernetesStandaloneClusterDescriptorTest {
         assertTrue(
                 jmDeployment.getSpec().getTemplate().getSpec().getContainers().stream()
                         .anyMatch(c -> c.getArgs().contains("jobmanager")));
+        List<EnvVar> envVars =
+                jmDeployment.getSpec().getTemplate().getSpec().getContainers().get(0).getEnv();
+        assertTrue(envVars.contains(new EnvVar(USER_ENV_VAR, JM_ENV_VALUE, null)));
+
+        Deployment tmDeployment =
+                deployments.stream()
+                        .filter(d -> d.getMetadata().getName().equals(expectedTMDeploymentName))
+                        .findFirst()
+                        .orElse(null);
+        envVars = tmDeployment.getSpec().getTemplate().getSpec().getContainers().get(0).getEnv();
+        assertTrue(envVars.contains(new EnvVar(USER_ENV_VAR, TM_ENV_VALUE, null)));
 
         var clusterClient = clusterClientProvider.getClusterClient();