You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/03/26 14:41:52 UTC

[airflow] branch v2-0-test updated: Fix support for long dag_id and task_id in KubernetesExecutor (#14703)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-0-test by this push:
     new ee1026f  Fix support for long dag_id and task_id in KubernetesExecutor (#14703)
ee1026f is described below

commit ee1026ff384c2abf558447225d68b4ef03a6d09e
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Fri Mar 26 08:41:18 2021 -0600

    Fix support for long dag_id and task_id in KubernetesExecutor (#14703)
    
    The key used to remove a task from executor.running is reconstituted
    from pod annotations, so make sure the full dag_id and task_id are in
    the annotations.
    
    (cherry picked from commit b5e7ada34536259e21fca5032ef67b5e33722c05)
---
 airflow/kubernetes/pod_generator.py    | 10 +++-------
 tests/kubernetes/test_pod_generator.py |  3 +++
 2 files changed, 6 insertions(+), 7 deletions(-)

diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py
index da2001b..42394ba 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -368,10 +368,6 @@ class PodGenerator:
         except Exception:  # pylint: disable=W0703
             image = kube_image
 
-        task_id = make_safe_label_value(task_id)
-        dag_id = make_safe_label_value(dag_id)
-        scheduler_job_id = make_safe_label_value(str(scheduler_job_id))
-
         dynamic_pod = k8s.V1Pod(
             metadata=k8s.V1ObjectMeta(
                 namespace=namespace,
@@ -383,9 +379,9 @@ class PodGenerator:
                 },
                 name=PodGenerator.make_unique_pod_id(pod_id),
                 labels={
-                    'airflow-worker': scheduler_job_id,
-                    'dag_id': dag_id,
-                    'task_id': task_id,
+                    'airflow-worker': make_safe_label_value(str(scheduler_job_id)),
+                    'dag_id': make_safe_label_value(dag_id),
+                    'task_id': make_safe_label_value(task_id),
                     'execution_date': datetime_to_label_safe_datestring(date),
                     'try_number': str(try_number),
                     'airflow_version': airflow_version.replace('+', '-'),
diff --git a/tests/kubernetes/test_pod_generator.py b/tests/kubernetes/test_pod_generator.py
index 17a942a..da2a488 100644
--- a/tests/kubernetes/test_pod_generator.py
+++ b/tests/kubernetes/test_pod_generator.py
@@ -500,6 +500,9 @@ class TestPodGenerator(unittest.TestCase):
         for _, v in result.metadata.labels.items():
             assert len(v) <= 63
 
+        assert 'a' * 512 == result.metadata.annotations['dag_id']
+        assert 'a' * 512 == result.metadata.annotations['task_id']
+
     def test_merge_objects_empty(self):
         annotations = {'foo1': 'bar1'}
         base_obj = k8s.V1ObjectMeta(annotations=annotations)