You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2022/03/03 16:58:05 UTC

[airflow] branch main updated: Change KubePodOperator labels from exeuction_date to run_id (#21960)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8d8d072  Change KubePodOperator labels from exeuction_date to run_id (#21960)
8d8d072 is described below

commit 8d8d07228907d32403056af7acb3b2da003a7542
Author: Ash Berlin-Taylor <as...@apache.org>
AuthorDate: Thu Mar 3 16:57:18 2022 +0000

    Change KubePodOperator labels from exeuction_date to run_id (#21960)
    
    Now that execution_date isn't the PK for TaskInstance we should replace
    it with run_id.
    
    There is on backwards compatibility concern here, as these labels are
    only needed for re-attaching to a container, and the way we get the
    run_id is compatible back to 2.1
---
 .../cncf/kubernetes/operators/kubernetes_pod.py       |  3 ++-
 kubernetes_tests/test_kubernetes_pod_operator.py      | 19 ++++++++++++-------
 .../test_kubernetes_pod_operator_backcompat.py        | 13 +++++++++----
 .../cncf/kubernetes/operators/test_kubernetes_pod.py  | 17 +++++++----------
 4 files changed, 30 insertions(+), 22 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index 180d0c9..7ca930b 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -283,8 +283,9 @@ class KubernetesPodOperator(BaseOperator):
             return {}
 
         ti = context['ti']
+        run_id = getattr(ti, 'run_id') or context['run_id']
 
-        labels = {'dag_id': ti.dag_id, 'task_id': ti.task_id, 'execution_date': context['ts']}
+        labels = {'dag_id': ti.dag_id, 'task_id': ti.task_id, 'run_id': run_id}
 
         # If running on Airflow 2.3+:
         map_index = getattr(ti, 'map_index', -1)
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py
index 9c85599..58c75f1 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -39,6 +39,7 @@ from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import Kubernete
 from airflow.providers.cncf.kubernetes.utils.pod_manager import PodManager
 from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import PodDefaults
 from airflow.utils import timezone
+from airflow.utils.types import DagRunType
 from airflow.version import version as airflow_version
 
 
@@ -46,14 +47,18 @@ def create_context(task):
     dag = DAG(dag_id="dag")
     tzinfo = pendulum.timezone("Europe/Amsterdam")
     execution_date = timezone.datetime(2016, 1, 1, 1, 0, 0, tzinfo=tzinfo)
-    dag_run = DagRun(dag_id=dag.dag_id, execution_date=execution_date)
+    dag_run = DagRun(
+        dag_id=dag.dag_id,
+        execution_date=execution_date,
+        run_id=DagRun.generate_run_id(DagRunType.MANUAL, execution_date),
+    )
     task_instance = TaskInstance(task=task)
     task_instance.dag_run = dag_run
     task_instance.dag_id = dag.dag_id
     task_instance.xcom_push = mock.Mock()
     return {
         "dag": dag,
-        "ts": execution_date.isoformat(),
+        "run_id": dag_run.run_id,
         "task": task,
         "ti": task_instance,
         "task_instance": task_instance,
@@ -84,7 +89,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
                     'foo': 'bar',
                     'kubernetes_pod_operator': 'True',
                     'airflow_version': airflow_version.replace('+', '-'),
-                    'execution_date': '2016-01-01T0100000100-a2f50a31f',
+                    'run_id': 'manual__2016-01-01T0100000100-da4d1ce7b',
                     'dag_id': 'dag',
                     'task_id': ANY,
                     'try_number': '1',
@@ -670,7 +675,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             'foo': 'bar',
             'airflow_version': mock.ANY,
             'dag_id': 'dag',
-            'execution_date': mock.ANY,
+            'run_id': 'manual__2016-01-01T0100000100-da4d1ce7b',
             'kubernetes_pod_operator': 'True',
             'task_id': mock.ANY,
             'try_number': '1',
@@ -709,7 +714,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             'foo': 'bar',
             'airflow_version': mock.ANY,
             'dag_id': 'dag',
-            'execution_date': mock.ANY,
+            'run_id': 'manual__2016-01-01T0100000100-da4d1ce7b',
             'kubernetes_pod_operator': 'True',
             'task_id': mock.ANY,
             'try_number': '1',
@@ -751,7 +756,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             'foo': 'bar',
             'airflow_version': mock.ANY,
             'dag_id': 'dag',
-            'execution_date': mock.ANY,
+            'run_id': 'manual__2016-01-01T0100000100-da4d1ce7b',
             'kubernetes_pod_operator': 'True',
             'task_id': mock.ANY,
             'try_number': '1',
@@ -855,7 +860,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
                 'annotations': {},
                 'labels': {
                     'dag_id': 'dag',
-                    'execution_date': mock.ANY,
+                    'run_id': 'manual__2016-01-01T0100000100-da4d1ce7b',
                     'kubernetes_pod_operator': 'True',
                     'task_id': mock.ANY,
                     'try_number': '1',
diff --git a/kubernetes_tests/test_kubernetes_pod_operator_backcompat.py b/kubernetes_tests/test_kubernetes_pod_operator_backcompat.py
index 441f6fd..8fc4fe1 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator_backcompat.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator_backcompat.py
@@ -39,6 +39,7 @@ from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import Kubernete
 from airflow.providers.cncf.kubernetes.utils.pod_manager import PodManager
 from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import PodDefaults
 from airflow.utils import timezone
+from airflow.utils.types import DagRunType
 from airflow.version import version as airflow_version
 
 # noinspection DuplicatedCode
@@ -48,14 +49,18 @@ def create_context(task):
     dag = DAG(dag_id="dag")
     tzinfo = pendulum.timezone("Europe/Amsterdam")
     execution_date = timezone.datetime(2016, 1, 1, 1, 0, 0, tzinfo=tzinfo)
-    dag_run = DagRun(dag_id=dag.dag_id, execution_date=execution_date)
+    dag_run = DagRun(
+        dag_id=dag.dag_id,
+        execution_date=execution_date,
+        run_id=DagRun.generate_run_id(DagRunType.MANUAL, execution_date),
+    )
     task_instance = TaskInstance(task=task)
     task_instance.dag_run = dag_run
     task_instance.dag_id = dag.dag_id
     task_instance.xcom_push = mock.Mock()
     return {
         "dag": dag,
-        "ts": execution_date.isoformat(),
+        "run_id": dag_run.run_id,
         "task": task,
         "ti": task_instance,
         "task_instance": task_instance,
@@ -82,7 +87,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
                     'foo': 'bar',
                     'kubernetes_pod_operator': 'True',
                     'airflow_version': airflow_version.replace('+', '-'),
-                    'execution_date': '2016-01-01T0100000100-a2f50a31f',
+                    'run_id': 'manual__2016-01-01T0100000100-da4d1ce7b',
                     'dag_id': 'dag',
                     'task_id': 'task',
                     'try_number': '1',
@@ -562,7 +567,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             'foo': 'bar',
             'airflow_version': mock.ANY,
             'dag_id': 'dag',
-            'execution_date': mock.ANY,
+            'run_id': 'manual__2016-01-01T0100000100-da4d1ce7b',
             'kubernetes_pod_operator': 'True',
             'task_id': mock.ANY,
             'try_number': '1',
diff --git a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
index 59e4db5..e20aba8 100644
--- a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
@@ -175,7 +175,7 @@ class TestKubernetesPodOperator:
             "task_id": "task",
             "try_number": "1",
             "airflow_version": mock.ANY,
-            "execution_date": mock.ANY,
+            "run_id": "test",
         }
 
     def test_labels_mapped(self):
@@ -193,7 +193,7 @@ class TestKubernetesPodOperator:
             "task_id": "task",
             "try_number": "1",
             "airflow_version": mock.ANY,
-            "execution_date": mock.ANY,
+            "run_id": "test",
             "map_index": "10",
         }
 
@@ -211,10 +211,7 @@ class TestKubernetesPodOperator:
         self.run_pod(k)
         self.client_mock.return_value.list_namespaced_pod.assert_called_once()
         _, kwargs = self.client_mock.return_value.list_namespaced_pod.call_args
-        assert (
-            kwargs['label_selector']
-            == 'dag_id=dag,execution_date=2016-01-01T0100000000-26816529d,task_id=task,already_checked!=True'
-        )
+        assert kwargs['label_selector'] == 'dag_id=dag,run_id=test,task_id=task,already_checked!=True'
 
     def test_image_pull_secrets_correctly_set(self):
         fake_pull_secrets = "fakeSecret"
@@ -353,7 +350,7 @@ class TestKubernetesPodOperator:
             "task_id": "task",
             "try_number": "1",
             "airflow_version": mock.ANY,
-            "execution_date": mock.ANY,
+            "run_id": "test",
         }
 
     @pytest.mark.parametrize(("randomize_name",), ([True], [False]))
@@ -391,7 +388,7 @@ class TestKubernetesPodOperator:
             "task_id": "task",
             "try_number": "1",
             "airflow_version": mock.ANY,
-            "execution_date": mock.ANY,
+            "run_id": "test",
         }
 
     @pytest.fixture
@@ -461,7 +458,7 @@ class TestKubernetesPodOperator:
             "task_id": "task",
             "try_number": "1",
             "airflow_version": mock.ANY,
-            "execution_date": mock.ANY,
+            "run_id": "test",
         }
         assert pod.metadata.namespace == "mynamespace"
         assert pod.spec.containers[0].image == "ubuntu:16.04"
@@ -530,7 +527,7 @@ class TestKubernetesPodOperator:
             "task_id": "task",
             "try_number": "1",
             "airflow_version": mock.ANY,
-            "execution_date": mock.ANY,
+            "run_id": "test",
         }
 
     @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.follow_container_logs")