You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/01/12 16:21:30 UTC

[airflow] branch v1-10-stable updated: KubernetesExecutor should accept images from executor_config (#13074)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-stable
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v1-10-stable by this push:
     new ee88f5f  KubernetesExecutor should accept images from executor_config (#13074)
ee88f5f is described below

commit ee88f5f46ef32d2eb5e9be56e723de58b6736375
Author: Daniel Imberman <da...@gmail.com>
AuthorDate: Tue Jan 12 08:21:19 2021 -0800

    KubernetesExecutor should accept images from executor_config (#13074)
    
    Addresses: https://github.com/apache/airflow/issues/13003
    
    Users should be able to specify custom images in the executor_config. Not being
    able to is a regression.
---
 airflow/kubernetes/pod_generator.py    |  8 ++-
 tests/kubernetes/test_pod_generator.py | 89 ++++++++++++++++++++++++++++++++++
 2 files changed, 96 insertions(+), 1 deletion(-)

diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py
index 2d30ca9..4df3198 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -535,9 +535,15 @@ class PodGenerator(object):
             - executor_config
             - dynamic arguments
         """
+        try:
+            image = pod_override_object.spec.containers[0].image  # type: ignore
+            if not image:
+                image = kube_image
+        except Exception:  # pylint: disable=W0703
+            image = kube_image
         dynamic_pod = PodGenerator(
             namespace=namespace,
-            image=kube_image,
+            image=image,
             labels={
                 'airflow-worker': worker_uuid,
                 'dag_id': make_safe_label_value(dag_id),
diff --git a/tests/kubernetes/test_pod_generator.py b/tests/kubernetes/test_pod_generator.py
index fed7c97..1f907f7 100644
--- a/tests/kubernetes/test_pod_generator.py
+++ b/tests/kubernetes/test_pod_generator.py
@@ -846,6 +846,95 @@ class TestPodGenerator(unittest.TestCase):
         }, sanitized_result)
 
     @mock.patch('uuid.uuid4')
+    def test_construct_with_image(self, mock_uuid):
+        mock_uuid.return_value = self.static_uuid
+        mock_uuid.return_value = self.static_uuid
+        worker_config = k8s.V1Pod(
+            metadata=k8s.V1ObjectMeta(
+                name='gets-overridden-by-dynamic-args',
+                annotations={
+                    'should': 'stay'
+                }
+            ),
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name='base',
+                        resources=k8s.V1ResourceRequirements(
+                            limits={
+                                'cpu': '1m',
+                                'memory': '1G'
+                            }
+                        ),
+                        security_context=k8s.V1SecurityContext(
+                            run_as_user=1
+                        )
+                    )
+                ]
+            )
+        )
+        executor_config = k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name='base',
+                        image="my-image",
+                        resources=k8s.V1ResourceRequirements(
+                            limits={
+                                'cpu': '2m',
+                                'memory': '2G'
+                            }
+                        )
+                    )
+                ]
+            )
+        )
+        result = PodGenerator.construct_pod(
+            dag_id='dag_id',
+            task_id='task_id',
+            pod_id='pod_id',
+            try_number=3,
+            kube_image='kube_image',
+            date=self.execution_date,
+            command=['command'],
+            pod_override_object=executor_config,
+            base_worker_pod=worker_config,
+            namespace='namespace',
+            worker_uuid='uuid',
+        )
+        self.assertEqual(result.spec.containers[0].image, "my-image")
+
+        executor_config_no_image = k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name='base',
+                        resources=k8s.V1ResourceRequirements(
+                            limits={
+                                'cpu': '2m',
+                                'memory': '2G'
+                            }
+                        )
+                    )
+                ]
+            )
+        )
+        result = PodGenerator.construct_pod(
+            dag_id='dag_id',
+            task_id='task_id',
+            pod_id='pod_id',
+            try_number=3,
+            kube_image='kube_image',
+            date=self.execution_date,
+            command=['command'],
+            pod_override_object=executor_config_no_image,
+            base_worker_pod=worker_config,
+            namespace='namespace',
+            worker_uuid='uuid',
+        )
+        self.assertEqual(result.spec.containers[0].image, "kube_image")
+
+    @mock.patch('uuid.uuid4')
     def test_construct_pod_with_mutation(self, mock_uuid):
         mock_uuid.return_value = self.static_uuid
         worker_config = k8s.V1Pod(