You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/04/09 20:43:16 UTC

[airflow] 02/04: Bugfix: Fix overriding `pod_template_file` in KubernetesExecutor (#15197)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 091fae90a0a564e2da92ead7dd5be2c1e8b56301
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Mon Apr 5 16:56:00 2021 +0100

    Bugfix: Fix overriding `pod_template_file` in KubernetesExecutor (#15197)
    
    This feature was added in https://github.com/apache/airflow/pull/11784 but
    it was broken as it got `pod_template_override` from `executor_config`
    instead of `pod_template_file`.
    
    closes #14199
    
    (cherry picked from commit 5606137ba32c0daa87d557301d82f7f2bdc0b0a4)
---
 .../example_kubernetes_executor_config.py          |  3 +-
 airflow/executors/kubernetes_executor.py           |  2 +-
 .../basic_template.yaml                            |  4 +-
 docs/apache-airflow/executor/kubernetes.rst        |  2 +-
 .../basic_template.yaml                            | 34 ++++++++
 tests/executors/test_kubernetes_executor.py        | 91 +++++++++++++++++++++-
 6 files changed, 130 insertions(+), 6 deletions(-)

diff --git a/airflow/example_dags/example_kubernetes_executor_config.py b/airflow/example_dags/example_kubernetes_executor_config.py
index cbd69cb..5290dd8 100644
--- a/airflow/example_dags/example_kubernetes_executor_config.py
+++ b/airflow/example_dags/example_kubernetes_executor_config.py
@@ -24,6 +24,7 @@ import os
 from airflow import DAG
 from airflow.example_dags.libs.helper import print_stuff
 from airflow.operators.python import PythonOperator
+from airflow.settings import AIRFLOW_HOME
 from airflow.utils.dates import days_ago
 
 default_args = {
@@ -110,7 +111,7 @@ try:
             task_id="task_with_template",
             python_callable=print_stuff,
             executor_config={
-                "pod_template_file": "/usr/local/airflow/pod_templates/basic_template.yaml",
+                "pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/basic_template.yaml"),
                 "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
             },
         )
diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index 7e3d82b..ec7cbf7 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -496,7 +496,7 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
             return
 
         if executor_config:
-            pod_template_file = executor_config.get("pod_template_override", None)
+            pod_template_file = executor_config.get("pod_template_file", None)
         else:
             pod_template_file = None
         if not self.task_queue:
diff --git a/airflow/kubernetes_executor_templates/basic_template.yaml b/airflow/kubernetes_executor_templates/basic_template.yaml
index a953867..a6eb83f 100644
--- a/airflow/kubernetes_executor_templates/basic_template.yaml
+++ b/airflow/kubernetes_executor_templates/basic_template.yaml
@@ -69,8 +69,8 @@ spec:
         defaultMode: 420
   restartPolicy: Never
   terminationGracePeriodSeconds: 30
-  serviceAccountName: airflow-worker-serviceaccount
-  serviceAccount: airflow-worker-serviceaccount
+  serviceAccountName: airflow-worker
+  serviceAccount: airflow-worker
   securityContext:
     runAsUser: 50000
     fsGroup: 50000
diff --git a/docs/apache-airflow/executor/kubernetes.rst b/docs/apache-airflow/executor/kubernetes.rst
index 217a29c..61d13f4 100644
--- a/docs/apache-airflow/executor/kubernetes.rst
+++ b/docs/apache-airflow/executor/kubernetes.rst
@@ -125,7 +125,7 @@ name ``base`` and a second container containing your desired sidecar.
     :end-before: [END task_with_sidecar]
 
 You can also create custom ``pod_template_file`` on a per-task basis so that you can recycle the same base values between multiple tasks.
-This will replace the default ``pod_template_file`` named in the airflow.cfg and then override that template using the ``pod_override_spec``.
+This will replace the default ``pod_template_file`` named in the airflow.cfg and then override that template using the ``pod_override``.
 
 Here is an example of a task with both features:
 
diff --git a/tests/executors/kubernetes_executor_template_files/basic_template.yaml b/tests/executors/kubernetes_executor_template_files/basic_template.yaml
new file mode 100644
index 0000000..1fb00f2
--- /dev/null
+++ b/tests/executors/kubernetes_executor_template_files/basic_template.yaml
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+kind: Pod
+apiVersion: v1
+metadata:
+  name: dummy-name-dont-delete
+  namespace: dummy-name-dont-delete
+  labels:
+    mylabel: foo
+spec:
+  containers:
+    - name: base
+      image: dummy-name-dont-delete
+  securityContext:
+    runAsUser: 50000
+    fsGroup: 50000
+  imagePullSecrets:
+    - name: airflow-registry
+  schedulerName: default-scheduler
diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py
index 68b0006..8d3d5b4 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -15,6 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+import pathlib
 import random
 import re
 import string
@@ -22,6 +23,7 @@ import unittest
 from datetime import datetime
 from unittest import mock
 
+import pytest
 from kubernetes.client import models as k8s
 from urllib3 import HTTPResponse
 
@@ -39,7 +41,7 @@ try:
         get_base_pod_from_template,
     )
     from airflow.kubernetes import pod_generator
-    from airflow.kubernetes.pod_generator import PodGenerator
+    from airflow.kubernetes.pod_generator import PodGenerator, datetime_to_label_safe_datestring
     from airflow.utils.state import State
 except ImportError:
     AirflowKubernetesScheduler = None  # type: ignore
@@ -215,6 +217,93 @@ class TestKubernetesExecutor(unittest.TestCase):
 
         assert list(executor.event_buffer.values())[0][1] == "Invalid executor_config passed"
 
+    @pytest.mark.execution_timeout(10)
+    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+    @mock.patch('airflow.kubernetes.pod_launcher.PodLauncher.run_pod_async')
+    @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
+    def test_pod_template_file_override_in_executor_config(self, mock_get_kube_client, mock_run_pod_async):
+        current_folder = pathlib.Path(__file__).parent.absolute()
+        template_file = str(
+            (current_folder / "kubernetes_executor_template_files" / "basic_template.yaml").absolute()
+        )
+
+        mock_kube_client = mock.patch('kubernetes.client.CoreV1Api', autospec=True)
+        mock_get_kube_client.return_value = mock_kube_client
+
+        with conf_vars({('kubernetes', 'pod_template_file'): ''}):
+            executor = self.kubernetes_executor
+            executor.start()
+
+            assert executor.event_buffer == {}
+            assert executor.task_queue.empty()
+
+            execution_date = datetime.utcnow()
+
+            executor.execute_async(
+                key=('dag', 'task', execution_date, 1),
+                queue=None,
+                command=['airflow', 'tasks', 'run', 'true', 'some_parameter'],
+                executor_config={
+                    "pod_template_file": template_file,
+                    "pod_override": k8s.V1Pod(
+                        metadata=k8s.V1ObjectMeta(labels={"release": "stable"}),
+                        spec=k8s.V1PodSpec(
+                            containers=[k8s.V1Container(name="base", image="airflow:3.6")],
+                        ),
+                    ),
+                },
+            )
+
+            assert not executor.task_queue.empty()
+            task = executor.task_queue.get_nowait()
+            _, _, expected_executor_config, expected_pod_template_file = task
+
+            # Test that the correct values have been put to queue
+            assert expected_executor_config.metadata.labels == {'release': 'stable'}
+            assert expected_pod_template_file == template_file
+
+            self.kubernetes_executor.kube_scheduler.run_next(task)
+            mock_run_pod_async.assert_called_once_with(
+                k8s.V1Pod(
+                    api_version="v1",
+                    kind="Pod",
+                    metadata=k8s.V1ObjectMeta(
+                        name=mock.ANY,
+                        namespace="default",
+                        annotations={
+                            'dag_id': 'dag',
+                            'execution_date': execution_date.isoformat(),
+                            'task_id': 'task',
+                            'try_number': '1',
+                        },
+                        labels={
+                            'airflow-worker': '5',
+                            'airflow_version': mock.ANY,
+                            'dag_id': 'dag',
+                            'execution_date': datetime_to_label_safe_datestring(execution_date),
+                            'kubernetes_executor': 'True',
+                            'mylabel': 'foo',
+                            'release': 'stable',
+                            'task_id': 'task',
+                            'try_number': '1',
+                        },
+                    ),
+                    spec=k8s.V1PodSpec(
+                        containers=[
+                            k8s.V1Container(
+                                name="base",
+                                image="airflow:3.6",
+                                args=['airflow', 'tasks', 'run', 'true', 'some_parameter'],
+                                env=[k8s.V1EnvVar(name='AIRFLOW_IS_K8S_EXECUTOR_POD', value='True')],
+                            )
+                        ],
+                        image_pull_secrets=[k8s.V1LocalObjectReference(name='airflow-registry')],
+                        scheduler_name='default-scheduler',
+                        security_context=k8s.V1PodSecurityContext(fs_group=50000, run_as_user=50000),
+                    ),
+                )
+            )
+
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
     def test_change_state_running(self, mock_get_kube_client, mock_kubernetes_job_watcher):