You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2022/03/03 16:58:05 UTC
[airflow] branch main updated: Change KubePodOperator labels from exeuction_date to run_id (#21960)
This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8d8d072 Change KubePodOperator labels from exeuction_date to run_id (#21960)
8d8d072 is described below
commit 8d8d07228907d32403056af7acb3b2da003a7542
Author: Ash Berlin-Taylor <as...@apache.org>
AuthorDate: Thu Mar 3 16:57:18 2022 +0000
Change KubePodOperator labels from exeuction_date to run_id (#21960)
Now that execution_date isn't the PK for TaskInstance we should replace
it with run_id.
There is on backwards compatibility concern here, as these labels are
only needed for re-attaching to a container, and the way we get the
run_id is compatible back to 2.1
---
.../cncf/kubernetes/operators/kubernetes_pod.py | 3 ++-
kubernetes_tests/test_kubernetes_pod_operator.py | 19 ++++++++++++-------
.../test_kubernetes_pod_operator_backcompat.py | 13 +++++++++----
.../cncf/kubernetes/operators/test_kubernetes_pod.py | 17 +++++++----------
4 files changed, 30 insertions(+), 22 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index 180d0c9..7ca930b 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -283,8 +283,9 @@ class KubernetesPodOperator(BaseOperator):
return {}
ti = context['ti']
+ run_id = getattr(ti, 'run_id') or context['run_id']
- labels = {'dag_id': ti.dag_id, 'task_id': ti.task_id, 'execution_date': context['ts']}
+ labels = {'dag_id': ti.dag_id, 'task_id': ti.task_id, 'run_id': run_id}
# If running on Airflow 2.3+:
map_index = getattr(ti, 'map_index', -1)
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py
index 9c85599..58c75f1 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -39,6 +39,7 @@ from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import Kubernete
from airflow.providers.cncf.kubernetes.utils.pod_manager import PodManager
from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import PodDefaults
from airflow.utils import timezone
+from airflow.utils.types import DagRunType
from airflow.version import version as airflow_version
@@ -46,14 +47,18 @@ def create_context(task):
dag = DAG(dag_id="dag")
tzinfo = pendulum.timezone("Europe/Amsterdam")
execution_date = timezone.datetime(2016, 1, 1, 1, 0, 0, tzinfo=tzinfo)
- dag_run = DagRun(dag_id=dag.dag_id, execution_date=execution_date)
+ dag_run = DagRun(
+ dag_id=dag.dag_id,
+ execution_date=execution_date,
+ run_id=DagRun.generate_run_id(DagRunType.MANUAL, execution_date),
+ )
task_instance = TaskInstance(task=task)
task_instance.dag_run = dag_run
task_instance.dag_id = dag.dag_id
task_instance.xcom_push = mock.Mock()
return {
"dag": dag,
- "ts": execution_date.isoformat(),
+ "run_id": dag_run.run_id,
"task": task,
"ti": task_instance,
"task_instance": task_instance,
@@ -84,7 +89,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
'foo': 'bar',
'kubernetes_pod_operator': 'True',
'airflow_version': airflow_version.replace('+', '-'),
- 'execution_date': '2016-01-01T0100000100-a2f50a31f',
+ 'run_id': 'manual__2016-01-01T0100000100-da4d1ce7b',
'dag_id': 'dag',
'task_id': ANY,
'try_number': '1',
@@ -670,7 +675,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
'foo': 'bar',
'airflow_version': mock.ANY,
'dag_id': 'dag',
- 'execution_date': mock.ANY,
+ 'run_id': 'manual__2016-01-01T0100000100-da4d1ce7b',
'kubernetes_pod_operator': 'True',
'task_id': mock.ANY,
'try_number': '1',
@@ -709,7 +714,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
'foo': 'bar',
'airflow_version': mock.ANY,
'dag_id': 'dag',
- 'execution_date': mock.ANY,
+ 'run_id': 'manual__2016-01-01T0100000100-da4d1ce7b',
'kubernetes_pod_operator': 'True',
'task_id': mock.ANY,
'try_number': '1',
@@ -751,7 +756,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
'foo': 'bar',
'airflow_version': mock.ANY,
'dag_id': 'dag',
- 'execution_date': mock.ANY,
+ 'run_id': 'manual__2016-01-01T0100000100-da4d1ce7b',
'kubernetes_pod_operator': 'True',
'task_id': mock.ANY,
'try_number': '1',
@@ -855,7 +860,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
'annotations': {},
'labels': {
'dag_id': 'dag',
- 'execution_date': mock.ANY,
+ 'run_id': 'manual__2016-01-01T0100000100-da4d1ce7b',
'kubernetes_pod_operator': 'True',
'task_id': mock.ANY,
'try_number': '1',
diff --git a/kubernetes_tests/test_kubernetes_pod_operator_backcompat.py b/kubernetes_tests/test_kubernetes_pod_operator_backcompat.py
index 441f6fd..8fc4fe1 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator_backcompat.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator_backcompat.py
@@ -39,6 +39,7 @@ from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import Kubernete
from airflow.providers.cncf.kubernetes.utils.pod_manager import PodManager
from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import PodDefaults
from airflow.utils import timezone
+from airflow.utils.types import DagRunType
from airflow.version import version as airflow_version
# noinspection DuplicatedCode
@@ -48,14 +49,18 @@ def create_context(task):
dag = DAG(dag_id="dag")
tzinfo = pendulum.timezone("Europe/Amsterdam")
execution_date = timezone.datetime(2016, 1, 1, 1, 0, 0, tzinfo=tzinfo)
- dag_run = DagRun(dag_id=dag.dag_id, execution_date=execution_date)
+ dag_run = DagRun(
+ dag_id=dag.dag_id,
+ execution_date=execution_date,
+ run_id=DagRun.generate_run_id(DagRunType.MANUAL, execution_date),
+ )
task_instance = TaskInstance(task=task)
task_instance.dag_run = dag_run
task_instance.dag_id = dag.dag_id
task_instance.xcom_push = mock.Mock()
return {
"dag": dag,
- "ts": execution_date.isoformat(),
+ "run_id": dag_run.run_id,
"task": task,
"ti": task_instance,
"task_instance": task_instance,
@@ -82,7 +87,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
'foo': 'bar',
'kubernetes_pod_operator': 'True',
'airflow_version': airflow_version.replace('+', '-'),
- 'execution_date': '2016-01-01T0100000100-a2f50a31f',
+ 'run_id': 'manual__2016-01-01T0100000100-da4d1ce7b',
'dag_id': 'dag',
'task_id': 'task',
'try_number': '1',
@@ -562,7 +567,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
'foo': 'bar',
'airflow_version': mock.ANY,
'dag_id': 'dag',
- 'execution_date': mock.ANY,
+ 'run_id': 'manual__2016-01-01T0100000100-da4d1ce7b',
'kubernetes_pod_operator': 'True',
'task_id': mock.ANY,
'try_number': '1',
diff --git a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
index 59e4db5..e20aba8 100644
--- a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
@@ -175,7 +175,7 @@ class TestKubernetesPodOperator:
"task_id": "task",
"try_number": "1",
"airflow_version": mock.ANY,
- "execution_date": mock.ANY,
+ "run_id": "test",
}
def test_labels_mapped(self):
@@ -193,7 +193,7 @@ class TestKubernetesPodOperator:
"task_id": "task",
"try_number": "1",
"airflow_version": mock.ANY,
- "execution_date": mock.ANY,
+ "run_id": "test",
"map_index": "10",
}
@@ -211,10 +211,7 @@ class TestKubernetesPodOperator:
self.run_pod(k)
self.client_mock.return_value.list_namespaced_pod.assert_called_once()
_, kwargs = self.client_mock.return_value.list_namespaced_pod.call_args
- assert (
- kwargs['label_selector']
- == 'dag_id=dag,execution_date=2016-01-01T0100000000-26816529d,task_id=task,already_checked!=True'
- )
+ assert kwargs['label_selector'] == 'dag_id=dag,run_id=test,task_id=task,already_checked!=True'
def test_image_pull_secrets_correctly_set(self):
fake_pull_secrets = "fakeSecret"
@@ -353,7 +350,7 @@ class TestKubernetesPodOperator:
"task_id": "task",
"try_number": "1",
"airflow_version": mock.ANY,
- "execution_date": mock.ANY,
+ "run_id": "test",
}
@pytest.mark.parametrize(("randomize_name",), ([True], [False]))
@@ -391,7 +388,7 @@ class TestKubernetesPodOperator:
"task_id": "task",
"try_number": "1",
"airflow_version": mock.ANY,
- "execution_date": mock.ANY,
+ "run_id": "test",
}
@pytest.fixture
@@ -461,7 +458,7 @@ class TestKubernetesPodOperator:
"task_id": "task",
"try_number": "1",
"airflow_version": mock.ANY,
- "execution_date": mock.ANY,
+ "run_id": "test",
}
assert pod.metadata.namespace == "mynamespace"
assert pod.spec.containers[0].image == "ubuntu:16.04"
@@ -530,7 +527,7 @@ class TestKubernetesPodOperator:
"task_id": "task",
"try_number": "1",
"airflow_version": mock.ANY,
- "execution_date": mock.ANY,
+ "run_id": "test",
}
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.follow_container_logs")