You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/08/19 23:08:57 UTC
[airflow] branch v1-10-stable updated: Make KubernetesExecutor
recognize kubernetes_labels (#10412)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v1-10-stable
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v1-10-stable by this push:
new db72074 Make KubernetesExecutor recognize kubernetes_labels (#10412)
db72074 is described below
commit db720743c5f5828fabed9dd88384c86a41e2f997
Author: Daniel Imberman <da...@gmail.com>
AuthorDate: Wed Aug 19 16:06:56 2020 -0700
Make KubernetesExecutor recognize kubernetes_labels (#10412)
KubernetesExecutor needs to inject `kubernetes_labels` configs
into the worker_config
(cherry picked from commit e195c6a3d261ad44d7be4c8d1f788e86d55ea2f5)
---
airflow/kubernetes/pod_generator.py | 5 +++--
airflow/kubernetes/worker_configuration.py | 1 +
tests/kubernetes/test_worker_configuration.py | 1 +
3 files changed, 5 insertions(+), 2 deletions(-)
diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py
index 2c7145d..ed518d1 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -614,8 +614,9 @@ def merge_objects(base_obj, client_obj):
client_obj_cp = copy.deepcopy(client_obj)
if isinstance(base_obj, dict) and isinstance(client_obj_cp, dict):
- client_obj_cp.update(base_obj)
- return client_obj_cp
+ base_obj_cp = copy.deepcopy(base_obj)
+ base_obj_cp.update(client_obj_cp)
+ return base_obj_cp
for base_key in base_obj.to_dict().keys():
base_val = getattr(base_obj, base_key, None)
diff --git a/airflow/kubernetes/worker_configuration.py b/airflow/kubernetes/worker_configuration.py
index 5214d4c..327d7d0 100644
--- a/airflow/kubernetes/worker_configuration.py
+++ b/airflow/kubernetes/worker_configuration.py
@@ -442,6 +442,7 @@ class WorkerConfiguration(LoggingMixin):
volumes=self._get_volumes(),
volume_mounts=self._get_volume_mounts(),
init_containers=self._get_init_containers(),
+ labels=self.kube_config.kube_labels,
annotations=self.kube_config.kube_annotations,
affinity=self.kube_config.kube_affinity,
tolerations=self.kube_config.kube_tolerations,
diff --git a/tests/kubernetes/test_worker_configuration.py b/tests/kubernetes/test_worker_configuration.py
index 59c9326..40271dc 100644
--- a/tests/kubernetes/test_worker_configuration.py
+++ b/tests/kubernetes/test_worker_configuration.py
@@ -391,6 +391,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
'dag_id': 'test_dag_id',
'execution_date': '2019-11-21 11:08:22.920875',
'kubernetes_executor': 'True',
+ 'my_label': 'label_id',
'task_id': 'test_task_id',
'try_number': '1'
}