You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/03/26 14:41:52 UTC
[airflow] branch v2-0-test updated: Fix support for long dag_id and
task_id in KubernetesExecutor (#14703)
This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-0-test by this push:
new ee1026f Fix support for long dag_id and task_id in KubernetesExecutor (#14703)
ee1026f is described below
commit ee1026ff384c2abf558447225d68b4ef03a6d09e
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Fri Mar 26 08:41:18 2021 -0600
Fix support for long dag_id and task_id in KubernetesExecutor (#14703)
The key used to remove a task from executor.running is reconstituted
from pod annotations, so make sure the full dag_id and task_id are in
the annotations.
(cherry picked from commit b5e7ada34536259e21fca5032ef67b5e33722c05)
---
airflow/kubernetes/pod_generator.py | 10 +++-------
tests/kubernetes/test_pod_generator.py | 3 +++
2 files changed, 6 insertions(+), 7 deletions(-)
diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py
index da2001b..42394ba 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -368,10 +368,6 @@ class PodGenerator:
except Exception: # pylint: disable=W0703
image = kube_image
- task_id = make_safe_label_value(task_id)
- dag_id = make_safe_label_value(dag_id)
- scheduler_job_id = make_safe_label_value(str(scheduler_job_id))
-
dynamic_pod = k8s.V1Pod(
metadata=k8s.V1ObjectMeta(
namespace=namespace,
@@ -383,9 +379,9 @@ class PodGenerator:
},
name=PodGenerator.make_unique_pod_id(pod_id),
labels={
- 'airflow-worker': scheduler_job_id,
- 'dag_id': dag_id,
- 'task_id': task_id,
+ 'airflow-worker': make_safe_label_value(str(scheduler_job_id)),
+ 'dag_id': make_safe_label_value(dag_id),
+ 'task_id': make_safe_label_value(task_id),
'execution_date': datetime_to_label_safe_datestring(date),
'try_number': str(try_number),
'airflow_version': airflow_version.replace('+', '-'),
diff --git a/tests/kubernetes/test_pod_generator.py b/tests/kubernetes/test_pod_generator.py
index 17a942a..da2a488 100644
--- a/tests/kubernetes/test_pod_generator.py
+++ b/tests/kubernetes/test_pod_generator.py
@@ -500,6 +500,9 @@ class TestPodGenerator(unittest.TestCase):
for _, v in result.metadata.labels.items():
assert len(v) <= 63
+ assert 'a' * 512 == result.metadata.annotations['dag_id']
+ assert 'a' * 512 == result.metadata.annotations['task_id']
+
def test_merge_objects_empty(self):
annotations = {'foo1': 'bar1'}
base_obj = k8s.V1ObjectMeta(annotations=annotations)