You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/02/25 21:50:45 UTC
[airflow] branch main updated: Fixing Task Duration view in case of manual DAG runs only (#22015) (#29195)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8b8552f5c4 Fixing Task Duration view in case of manual DAG runs only (#22015) (#29195)
8b8552f5c4 is described below
commit 8b8552f5c4111fe0732067d7af06aa5285498a79
Author: Max <ri...@googlemail.com>
AuthorDate: Sat Feb 25 22:50:37 2023 +0100
Fixing Task Duration view in case of manual DAG runs only (#22015) (#29195)
* Fixing Task Duration view in case of manual DAG runs only (#22015)
Currently, if a DAG history is solely comprised of manual runs, they cannot be filtered down by `nums` in the duration view, because in this case, the min_date defaults to the current utc_epoch. Consequently, an unlimited number of DAG runs is displayed.
With the implemented fallback, the behavior for mixed (scheduled + manual) DAG runs is similar to the default behavior, as it is for manual-only runs. It was implemented this way to avoid breaking changes.
* Fixing get_task_instances_before method and adding a unit test
With this new fix, the determination of the min_date is done on a set of
execution_date values - via querying - limited by `num`and taking the
last element of it. the fallback value if none are found is simply the
base_date. this way, the invoked get_task_instances cannot default to
30 days prior to the current time.
Furthermore, the query itself is no longer confined to non-manual runs,
but includes all DAGRun types.
More context:
During writing the tests, it became evident
that the current implementation of the get_task_instances_before
method has two problems:
1) With the implementation of a limit + offset determination of the
min_date, no results can occur:
* Either due to less DAGRuns in the DB than `num` or
* Because of the constraining to non-manual runs (see more below).
Consequently, the fallback in the invoked get_task_instances is then a
-30d from datetime.now(), so an entirely different time dimension than
the one selected in the UI (execution_date).
2) With the explicit omission of manual runs in the query, in case of n
manual runs and 1 scheduled one before the base_date (e.g.
S-M-M-...-M-M-BD), the scheduled one + n manual runs are returned. in
the case of mainly triggered/manual DAGRuns, this leads to more results
displayed than expected. in case of solely manual DAGRuns, the method
will return no results and hence default to the above mentioned
fallback, which operates on a different timeline. prior tp this fix,
there was even a fallback to the start of the current epochal period,
i.e. showing all DAGRuns in the DB.
With the previously suggested non-breaking fix, it was not possible to
avoid this scenario, with any scheduled run found, the fallback was
never reached.
The suggested changes may be considered
breaking. the changes are confined to the scope of the
get_task_instances_before method and usage is only in the views.py for
the duration, tries and landing times, so where the changes meant to
fix the existing behavior.
* Update airflow/models/dag.py
Co-authored-by: Andrey Anshin <An...@taragol.is>
* Update airflow/models/dag.py
Co-authored-by: Andrey Anshin <An...@taragol.is>
* Update airflow/models/dag.py
Co-authored-by: Andrey Anshin <An...@taragol.is>
---------
Co-authored-by: Andrey Anshin <An...@taragol.is>
---
airflow/models/dag.py | 26 +++++-----
tests/models/test_dag.py | 120 +++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 135 insertions(+), 11 deletions(-)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index fc9fbcdaa0..53681f6fb6 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1501,25 +1501,28 @@ class DAG(LoggingMixin):
) -> list[TaskInstance]:
"""Get ``num`` task instances before (including) ``base_date``.
- The returned list may contain exactly ``num`` task instances. It can
- have less if there are less than ``num`` scheduled DAG runs before
- ``base_date``, or more if there are manual task runs between the
- requested period, which does not count toward ``num``.
+ The returned list may contain exactly ``num`` task instances
+ corresponding to any DagRunType. It can have less if there are
+ less than ``num`` scheduled DAG runs before ``base_date``.
"""
- min_date: datetime | None = (
+ execution_dates: list[Any] = (
session.query(DagRun.execution_date)
.filter(
DagRun.dag_id == self.dag_id,
DagRun.execution_date <= base_date,
- DagRun.run_type != DagRunType.MANUAL,
)
.order_by(DagRun.execution_date.desc())
- .offset(num)
- .limit(1)
- .scalar()
+ .limit(num)
+ .all()
)
- if min_date is None:
- min_date = timezone.utc_epoch()
+
+ if len(execution_dates) == 0:
+ return self.get_task_instances(start_date=base_date, end_date=base_date, session=session)
+
+ min_date: datetime | None = execution_dates[-1]._mapping.get(
+ "execution_date"
+ ) # getting the last value from the list
+
return self.get_task_instances(start_date=min_date, end_date=base_date, session=session)
@provide_session
@@ -1534,6 +1537,7 @@ class DAG(LoggingMixin):
start_date = (timezone.utcnow() - timedelta(30)).replace(
hour=0, minute=0, second=0, microsecond=0
)
+
query = self._get_task_instances(
task_ids=None,
start_date=start_date,
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index a99e1b585f..d5a8de4864 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -470,6 +470,126 @@ class TestDag:
)
session.close()
+ def test_get_task_instances_before(self):
+
+ BASE_DATE = timezone.datetime(2022, 7, 20, 20)
+
+ test_dag_id = "test_get_task_instances_before"
+ test_task_id = "the_task"
+
+ test_dag = DAG(dag_id=test_dag_id, start_date=BASE_DATE)
+ EmptyOperator(task_id=test_task_id, dag=test_dag)
+
+ session = settings.Session()
+
+ def dag_run_before(delta_h=0, type=DagRunType.SCHEDULED):
+ dagrun = test_dag.create_dagrun(
+ state=State.SUCCESS, run_type=type, run_id=f"test_{delta_h}", session=session
+ )
+ dagrun.start_date = BASE_DATE + timedelta(hours=delta_h)
+ dagrun.execution_date = BASE_DATE + timedelta(hours=delta_h)
+ return dagrun
+
+ dr1 = dag_run_before(delta_h=-1, type=DagRunType.MANUAL) # H19
+ dr2 = dag_run_before(delta_h=-2, type=DagRunType.MANUAL) # H18
+ dr3 = dag_run_before(delta_h=-3, type=DagRunType.MANUAL) # H17
+ dr4 = dag_run_before(delta_h=-4, type=DagRunType.MANUAL) # H16
+ dr5 = dag_run_before(delta_h=-5) # H15
+ dr6 = dag_run_before(delta_h=-6) # H14
+ dr7 = dag_run_before(delta_h=-7) # H13
+ dr8 = dag_run_before(delta_h=-8) # H12
+
+ session.commit()
+
+ REF_DATE = BASE_DATE
+
+ assert set([dr.run_id for dr in [dr1]]) == set(
+ [
+ ti.run_id
+ for ti in test_dag.get_task_instances_before(base_date=REF_DATE, num=1, session=session)
+ ]
+ )
+ assert set([dr.run_id for dr in [dr1, dr2, dr3]]) == set(
+ [
+ ti.run_id
+ for ti in test_dag.get_task_instances_before(base_date=REF_DATE, num=3, session=session)
+ ]
+ )
+ assert set([dr.run_id for dr in [dr1, dr2, dr3, dr4, dr5]]) == set(
+ [
+ ti.run_id
+ for ti in test_dag.get_task_instances_before(base_date=REF_DATE, num=5, session=session)
+ ]
+ )
+ assert set([dr.run_id for dr in [dr1, dr2, dr3, dr4, dr5, dr6, dr7]]) == set(
+ [
+ ti.run_id
+ for ti in test_dag.get_task_instances_before(base_date=REF_DATE, num=7, session=session)
+ ]
+ )
+ assert set([dr.run_id for dr in [dr1, dr2, dr3, dr4, dr5, dr6, dr7, dr8]]) == set(
+ [
+ ti.run_id
+ for ti in test_dag.get_task_instances_before(base_date=REF_DATE, num=9, session=session)
+ ]
+ )
+ assert set([dr.run_id for dr in [dr1, dr2, dr3, dr4, dr5, dr6, dr7, dr8]]) == set(
+ [
+ ti.run_id
+ for ti in test_dag.get_task_instances_before(base_date=REF_DATE, num=10, session=session)
+ ]
+ ) # stays constrained to available ones
+
+ REF_DATE = BASE_DATE + timedelta(hours=-3.5)
+
+ assert set([dr.run_id for dr in [dr4]]) == set(
+ [
+ ti.run_id
+ for ti in test_dag.get_task_instances_before(base_date=REF_DATE, num=1, session=session)
+ ]
+ )
+ assert set([dr.run_id for dr in [dr4, dr5, dr6]]) == set(
+ [
+ ti.run_id
+ for ti in test_dag.get_task_instances_before(base_date=REF_DATE, num=3, session=session)
+ ]
+ )
+ assert set([dr.run_id for dr in [dr4, dr5, dr6, dr7, dr8]]) == set(
+ [
+ ti.run_id
+ for ti in test_dag.get_task_instances_before(base_date=REF_DATE, num=5, session=session)
+ ]
+ )
+ assert set([dr.run_id for dr in [dr4, dr5, dr6, dr7, dr8]]) == set(
+ [
+ ti.run_id
+ for ti in test_dag.get_task_instances_before(base_date=REF_DATE, num=6, session=session)
+ ]
+ ) # stays constrained to available ones
+
+ REF_DATE = BASE_DATE + timedelta(hours=-8)
+
+ assert set([dr.run_id for dr in [dr8]]) == set(
+ [
+ ti.run_id
+ for ti in test_dag.get_task_instances_before(base_date=REF_DATE, num=0, session=session)
+ ]
+ )
+ assert set([dr.run_id for dr in [dr8]]) == set(
+ [
+ ti.run_id
+ for ti in test_dag.get_task_instances_before(base_date=REF_DATE, num=1, session=session)
+ ]
+ )
+ assert set([dr.run_id for dr in [dr8]]) == set(
+ [
+ ti.run_id
+ for ti in test_dag.get_task_instances_before(base_date=REF_DATE, num=10, session=session)
+ ]
+ )
+
+ session.close()
+
def test_user_defined_filters_macros(self):
def jinja_udf(name):
return f"Hello {name}"