You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/08/22 18:26:29 UTC

[airflow] 01/03: Fix broken Kubernetes PodRuntimeInfoEnv (#10478)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit d784d8b2047b5b48ee242adb14b56a00522060ed
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Sat Aug 22 17:52:41 2020 +0100

    Fix broken Kubernetes PodRuntimeInfoEnv (#10478)
    
    closes https://github.com/apache/airflow/issues/10456
    
    (cherry picked from commit 47c6657ce012f6db147fdcce3ca5e77f46a9e491)
---
 airflow/kubernetes/pod_runtime_info_env.py       |  2 +-
 kubernetes_tests/test_kubernetes_pod_operator.py | 38 +++++++++++++++++++++++-
 2 files changed, 38 insertions(+), 2 deletions(-)

diff --git a/airflow/kubernetes/pod_runtime_info_env.py b/airflow/kubernetes/pod_runtime_info_env.py
index 72e2151..95fbe6b 100644
--- a/airflow/kubernetes/pod_runtime_info_env.py
+++ b/airflow/kubernetes/pod_runtime_info_env.py
@@ -47,7 +47,7 @@ class PodRuntimeInfoEnv(K8SModel):
             name=self.name,
             value_from=k8s.V1EnvVarSource(
                 field_ref=k8s.V1ObjectFieldSelector(
-                    self.field_path
+                    field_path=self.field_path
                 )
             )
         )
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py
index 89572f9..a5c9731 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -32,6 +32,7 @@ from airflow.kubernetes import kube_client
 from airflow.kubernetes.pod import Port
 from airflow.kubernetes.pod_generator import PodDefaults
 from airflow.kubernetes.pod_launcher import PodLauncher
+from airflow.kubernetes.pod_runtime_info_env import PodRuntimeInfoEnv
 from airflow.kubernetes.secret import Secret
 from airflow.kubernetes.volume import Volume
 from airflow.kubernetes.volume_mount import VolumeMount
@@ -669,7 +670,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
 
     def test_pod_failure(self):
         """
-            Tests that the task fails when a pod reports a failure
+        Tests that the task fails when a pod reports a failure
         """
         bad_internal_command = ["foobar 10 "]
         k = KubernetesPodOperator(
@@ -780,6 +781,41 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             ))]
         )
 
+    def test_env_vars(self):
+        # WHEN
+        k = KubernetesPodOperator(
+            namespace='default',
+            image="ubuntu:16.04",
+            cmds=["bash", "-cx"],
+            arguments=["echo 10"],
+            env_vars={"ENV1": "val1", "ENV2": "val2", },
+            pod_runtime_info_envs=[PodRuntimeInfoEnv("ENV3", "status.podIP")],
+            labels={"foo": "bar"},
+            name="test",
+            task_id="task",
+            in_cluster=False,
+            do_xcom_push=False,
+        )
+
+        context = create_context(k)
+        k.execute(context)
+
+        # THEN
+        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+        self.expected_pod['spec']['containers'][0]['env'] = [
+            {'name': 'ENV1', 'value': 'val1'},
+            {'name': 'ENV2', 'value': 'val2'},
+            {
+                'name': 'ENV3',
+                'valueFrom': {
+                    'fieldRef': {
+                        'fieldPath': 'status.podIP'
+                    }
+                }
+            }
+        ]
+        self.assertEqual(self.expected_pod, actual_pod)
+
     def test_init_container(self):
         # GIVEN
         volume_mounts = [k8s.V1VolumeMount(