You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2022/04/29 21:36:06 UTC

[airflow] branch main updated: Fix ``KubernetesPodOperator`` with `KubernetesExecutor`` on 2.3.0 (#23371)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8e3abe4180 Fix ``KubernetesPodOperator`` with `KubernetesExecutor`` on 2.3.0 (#23371)
8e3abe4180 is described below

commit 8e3abe418021a3ba241ead1cad79a1c5b492c587
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Fri Apr 29 15:35:44 2022 -0600

    Fix ``KubernetesPodOperator`` with `KubernetesExecutor`` on 2.3.0 (#23371)
    
    KubernetesPodOperator was mistakenly trying to reattach to it's
    KubernetesExecutor worker, where it would get stuck watching itself for
    logs. We will properly filter for KPO's only, and ignore
    KubernetesExecutor workers for good measure.
---
 airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py  | 10 +++++++---
 .../providers/cncf/kubernetes/operators/test_kubernetes_pod.py |  5 ++++-
 2 files changed, 11 insertions(+), 4 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index 8a79658381..d075915bf7 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -290,7 +290,12 @@ class KubernetesPodOperator(BaseOperator):
         ti = context['ti']
         run_id = context['run_id']
 
-        labels = {'dag_id': ti.dag_id, 'task_id': ti.task_id, 'run_id': run_id}
+        labels = {
+            'dag_id': ti.dag_id,
+            'task_id': ti.task_id,
+            'run_id': run_id,
+            'kubernetes_pod_operator': 'True',
+        }
 
         # If running on Airflow 2.3+:
         map_index = getattr(ti, 'map_index', -1)
@@ -433,7 +438,7 @@ class KubernetesPodOperator(BaseOperator):
     def _build_find_pod_label_selector(self, context: Optional[dict] = None) -> str:
         labels = self._get_ti_pod_labels(context, include_try_number=False)
         label_strings = [f'{label_id}={label}' for label_id, label in sorted(labels.items())]
-        return ','.join(label_strings) + f',{self.POD_CHECKED_KEY}!=True'
+        return ','.join(label_strings) + f',{self.POD_CHECKED_KEY}!=True,!airflow-worker'
 
     def _set_name(self, name):
         if name is None:
@@ -541,7 +546,6 @@ class KubernetesPodOperator(BaseOperator):
         pod.metadata.labels.update(
             {
                 'airflow_version': airflow_version.replace('+', '-'),
-                'kubernetes_pod_operator': 'True',
             }
         )
         pod_mutation_hook(pod)
diff --git a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
index ee2a3cdb43..e70bf88326 100644
--- a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
@@ -228,7 +228,10 @@ class TestKubernetesPodOperator:
         self.run_pod(k)
         self.client_mock.return_value.list_namespaced_pod.assert_called_once()
         _, kwargs = self.client_mock.return_value.list_namespaced_pod.call_args
-        assert kwargs['label_selector'] == 'dag_id=dag,run_id=test,task_id=task,already_checked!=True'
+        assert kwargs['label_selector'] == (
+            'dag_id=dag,kubernetes_pod_operator=True,run_id=test,task_id=task,'
+            'already_checked!=True,!airflow-worker'
+        )
 
     def test_image_pull_secrets_correctly_set(self):
         fake_pull_secrets = "fakeSecret"