You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/04/09 20:43:16 UTC
[airflow] 02/04: Bugfix: Fix overriding `pod_template_file` in
KubernetesExecutor (#15197)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 091fae90a0a564e2da92ead7dd5be2c1e8b56301
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Mon Apr 5 16:56:00 2021 +0100
Bugfix: Fix overriding `pod_template_file` in KubernetesExecutor (#15197)
This feature was added in https://github.com/apache/airflow/pull/11784 but
it was broken as it got `pod_template_override` from `executor_config`
instead of `pod_template_file`.
closes #14199
(cherry picked from commit 5606137ba32c0daa87d557301d82f7f2bdc0b0a4)
---
.../example_kubernetes_executor_config.py | 3 +-
airflow/executors/kubernetes_executor.py | 2 +-
.../basic_template.yaml | 4 +-
docs/apache-airflow/executor/kubernetes.rst | 2 +-
.../basic_template.yaml | 34 ++++++++
tests/executors/test_kubernetes_executor.py | 91 +++++++++++++++++++++-
6 files changed, 130 insertions(+), 6 deletions(-)
diff --git a/airflow/example_dags/example_kubernetes_executor_config.py b/airflow/example_dags/example_kubernetes_executor_config.py
index cbd69cb..5290dd8 100644
--- a/airflow/example_dags/example_kubernetes_executor_config.py
+++ b/airflow/example_dags/example_kubernetes_executor_config.py
@@ -24,6 +24,7 @@ import os
from airflow import DAG
from airflow.example_dags.libs.helper import print_stuff
from airflow.operators.python import PythonOperator
+from airflow.settings import AIRFLOW_HOME
from airflow.utils.dates import days_ago
default_args = {
@@ -110,7 +111,7 @@ try:
task_id="task_with_template",
python_callable=print_stuff,
executor_config={
- "pod_template_file": "/usr/local/airflow/pod_templates/basic_template.yaml",
+ "pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/basic_template.yaml"),
"pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
},
)
diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index 7e3d82b..ec7cbf7 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -496,7 +496,7 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
return
if executor_config:
- pod_template_file = executor_config.get("pod_template_override", None)
+ pod_template_file = executor_config.get("pod_template_file", None)
else:
pod_template_file = None
if not self.task_queue:
diff --git a/airflow/kubernetes_executor_templates/basic_template.yaml b/airflow/kubernetes_executor_templates/basic_template.yaml
index a953867..a6eb83f 100644
--- a/airflow/kubernetes_executor_templates/basic_template.yaml
+++ b/airflow/kubernetes_executor_templates/basic_template.yaml
@@ -69,8 +69,8 @@ spec:
defaultMode: 420
restartPolicy: Never
terminationGracePeriodSeconds: 30
- serviceAccountName: airflow-worker-serviceaccount
- serviceAccount: airflow-worker-serviceaccount
+ serviceAccountName: airflow-worker
+ serviceAccount: airflow-worker
securityContext:
runAsUser: 50000
fsGroup: 50000
diff --git a/docs/apache-airflow/executor/kubernetes.rst b/docs/apache-airflow/executor/kubernetes.rst
index 217a29c..61d13f4 100644
--- a/docs/apache-airflow/executor/kubernetes.rst
+++ b/docs/apache-airflow/executor/kubernetes.rst
@@ -125,7 +125,7 @@ name ``base`` and a second container containing your desired sidecar.
:end-before: [END task_with_sidecar]
You can also create custom ``pod_template_file`` on a per-task basis so that you can recycle the same base values between multiple tasks.
-This will replace the default ``pod_template_file`` named in the airflow.cfg and then override that template using the ``pod_override_spec``.
+This will replace the default ``pod_template_file`` named in the airflow.cfg and then override that template using the ``pod_override``.
Here is an example of a task with both features:
diff --git a/tests/executors/kubernetes_executor_template_files/basic_template.yaml b/tests/executors/kubernetes_executor_template_files/basic_template.yaml
new file mode 100644
index 0000000..1fb00f2
--- /dev/null
+++ b/tests/executors/kubernetes_executor_template_files/basic_template.yaml
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+kind: Pod
+apiVersion: v1
+metadata:
+ name: dummy-name-dont-delete
+ namespace: dummy-name-dont-delete
+ labels:
+ mylabel: foo
+spec:
+ containers:
+ - name: base
+ image: dummy-name-dont-delete
+ securityContext:
+ runAsUser: 50000
+ fsGroup: 50000
+ imagePullSecrets:
+ - name: airflow-registry
+ schedulerName: default-scheduler
diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py
index 68b0006..8d3d5b4 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
#
+import pathlib
import random
import re
import string
@@ -22,6 +23,7 @@ import unittest
from datetime import datetime
from unittest import mock
+import pytest
from kubernetes.client import models as k8s
from urllib3 import HTTPResponse
@@ -39,7 +41,7 @@ try:
get_base_pod_from_template,
)
from airflow.kubernetes import pod_generator
- from airflow.kubernetes.pod_generator import PodGenerator
+ from airflow.kubernetes.pod_generator import PodGenerator, datetime_to_label_safe_datestring
from airflow.utils.state import State
except ImportError:
AirflowKubernetesScheduler = None # type: ignore
@@ -215,6 +217,93 @@ class TestKubernetesExecutor(unittest.TestCase):
assert list(executor.event_buffer.values())[0][1] == "Invalid executor_config passed"
+ @pytest.mark.execution_timeout(10)
+ @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+ @mock.patch('airflow.kubernetes.pod_launcher.PodLauncher.run_pod_async')
+ @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
+ def test_pod_template_file_override_in_executor_config(self, mock_get_kube_client, mock_run_pod_async):
+ current_folder = pathlib.Path(__file__).parent.absolute()
+ template_file = str(
+ (current_folder / "kubernetes_executor_template_files" / "basic_template.yaml").absolute()
+ )
+
+ mock_kube_client = mock.patch('kubernetes.client.CoreV1Api', autospec=True)
+ mock_get_kube_client.return_value = mock_kube_client
+
+ with conf_vars({('kubernetes', 'pod_template_file'): ''}):
+ executor = self.kubernetes_executor
+ executor.start()
+
+ assert executor.event_buffer == {}
+ assert executor.task_queue.empty()
+
+ execution_date = datetime.utcnow()
+
+ executor.execute_async(
+ key=('dag', 'task', execution_date, 1),
+ queue=None,
+ command=['airflow', 'tasks', 'run', 'true', 'some_parameter'],
+ executor_config={
+ "pod_template_file": template_file,
+ "pod_override": k8s.V1Pod(
+ metadata=k8s.V1ObjectMeta(labels={"release": "stable"}),
+ spec=k8s.V1PodSpec(
+ containers=[k8s.V1Container(name="base", image="airflow:3.6")],
+ ),
+ ),
+ },
+ )
+
+ assert not executor.task_queue.empty()
+ task = executor.task_queue.get_nowait()
+ _, _, expected_executor_config, expected_pod_template_file = task
+
+ # Test that the correct values have been put to queue
+ assert expected_executor_config.metadata.labels == {'release': 'stable'}
+ assert expected_pod_template_file == template_file
+
+ self.kubernetes_executor.kube_scheduler.run_next(task)
+ mock_run_pod_async.assert_called_once_with(
+ k8s.V1Pod(
+ api_version="v1",
+ kind="Pod",
+ metadata=k8s.V1ObjectMeta(
+ name=mock.ANY,
+ namespace="default",
+ annotations={
+ 'dag_id': 'dag',
+ 'execution_date': execution_date.isoformat(),
+ 'task_id': 'task',
+ 'try_number': '1',
+ },
+ labels={
+ 'airflow-worker': '5',
+ 'airflow_version': mock.ANY,
+ 'dag_id': 'dag',
+ 'execution_date': datetime_to_label_safe_datestring(execution_date),
+ 'kubernetes_executor': 'True',
+ 'mylabel': 'foo',
+ 'release': 'stable',
+ 'task_id': 'task',
+ 'try_number': '1',
+ },
+ ),
+ spec=k8s.V1PodSpec(
+ containers=[
+ k8s.V1Container(
+ name="base",
+ image="airflow:3.6",
+ args=['airflow', 'tasks', 'run', 'true', 'some_parameter'],
+ env=[k8s.V1EnvVar(name='AIRFLOW_IS_K8S_EXECUTOR_POD', value='True')],
+ )
+ ],
+ image_pull_secrets=[k8s.V1LocalObjectReference(name='airflow-registry')],
+ scheduler_name='default-scheduler',
+ security_context=k8s.V1PodSecurityContext(fs_group=50000, run_as_user=50000),
+ ),
+ )
+ )
+
@mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
@mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
def test_change_state_running(self, mock_get_kube_client, mock_kubernetes_job_watcher):