You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/02/25 21:50:45 UTC

[airflow] branch main updated: Fixing Task Duration view in case of manual DAG runs only (#22015) (#29195)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8b8552f5c4 Fixing Task Duration view in case of manual DAG runs only (#22015) (#29195)
8b8552f5c4 is described below

commit 8b8552f5c4111fe0732067d7af06aa5285498a79
Author: Max <ri...@googlemail.com>
AuthorDate: Sat Feb 25 22:50:37 2023 +0100

    Fixing Task Duration view in case of manual DAG runs only (#22015) (#29195)
    
    * Fixing Task Duration view in case of manual DAG runs only (#22015)
    
    Currently, if a DAG history is solely comprised of manual runs, they cannot be filtered down by `nums` in the duration view, because in this case, the min_date defaults to the current utc_epoch. Consequently, an unlimited number of DAG runs is displayed.
    With the implemented fallback, the behavior for mixed (scheduled + manual) DAG runs is similar to the default behavior, as it is for manual-only runs. It was implemented this way to avoid breaking changes.
    
    * Fixing get_task_instances_before method and adding a unit test
    
    With this new fix, the determination of the min_date is done on a set of
    execution_date values - via querying - limited by `num`and taking the
    last element of it. the fallback value if none are found is simply the
    base_date. this way, the invoked get_task_instances cannot default to
    30 days prior to the current time.
    
    Furthermore, the query itself is no longer confined to non-manual runs,
    but includes all DAGRun types.
    
    More context:
    
    During writing the tests, it became evident
    that the current implementation of the get_task_instances_before
    method has two problems:
    1) With the implementation of a limit + offset determination of the
    min_date, no results can occur:
      * Either due to less DAGRuns in the DB than `num` or
      * Because of the constraining to non-manual runs (see more below).
    Consequently, the fallback in the invoked get_task_instances is then a
    -30d from datetime.now(), so an entirely different time dimension than
    the one selected in the UI (execution_date).
    2) With the explicit omission of manual runs in the query, in case of n
    manual runs and 1 scheduled one before the base_date (e.g.
    S-M-M-...-M-M-BD), the scheduled one + n manual runs are returned. in
    the case of mainly triggered/manual DAGRuns, this leads to more results
    displayed than expected. in case of solely manual DAGRuns, the method
    will return no results and hence default to the above mentioned
    fallback, which operates on a different timeline. prior tp this fix,
    there was even a fallback to the start of the current epochal period,
    i.e. showing all DAGRuns in the DB.
    
    With the previously suggested non-breaking fix, it was not possible to
    avoid this scenario, with any scheduled run found, the fallback was
    never reached.
    
    The suggested changes may be considered
    breaking. the changes are confined to the scope of the
    get_task_instances_before method and usage is only in the views.py for
    the duration, tries and landing times, so where the changes  meant to
    fix the existing behavior.
    
    * Update airflow/models/dag.py
    
    Co-authored-by: Andrey Anshin <An...@taragol.is>
    
    * Update airflow/models/dag.py
    
    Co-authored-by: Andrey Anshin <An...@taragol.is>
    
    * Update airflow/models/dag.py
    
    Co-authored-by: Andrey Anshin <An...@taragol.is>
    
    ---------
    
    Co-authored-by: Andrey Anshin <An...@taragol.is>
---
 airflow/models/dag.py    |  26 +++++-----
 tests/models/test_dag.py | 120 +++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 135 insertions(+), 11 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index fc9fbcdaa0..53681f6fb6 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1501,25 +1501,28 @@ class DAG(LoggingMixin):
     ) -> list[TaskInstance]:
         """Get ``num`` task instances before (including) ``base_date``.
 
-        The returned list may contain exactly ``num`` task instances. It can
-        have less if there are less than ``num`` scheduled DAG runs before
-        ``base_date``, or more if there are manual task runs between the
-        requested period, which does not count toward ``num``.
+        The returned list may contain exactly ``num`` task instances
+        corresponding to any DagRunType. It can have less if there are
+        less than ``num`` scheduled DAG runs before ``base_date``.
         """
-        min_date: datetime | None = (
+        execution_dates: list[Any] = (
             session.query(DagRun.execution_date)
             .filter(
                 DagRun.dag_id == self.dag_id,
                 DagRun.execution_date <= base_date,
-                DagRun.run_type != DagRunType.MANUAL,
             )
             .order_by(DagRun.execution_date.desc())
-            .offset(num)
-            .limit(1)
-            .scalar()
+            .limit(num)
+            .all()
         )
-        if min_date is None:
-            min_date = timezone.utc_epoch()
+
+        if len(execution_dates) == 0:
+            return self.get_task_instances(start_date=base_date, end_date=base_date, session=session)
+
+        min_date: datetime | None = execution_dates[-1]._mapping.get(
+            "execution_date"
+        )  # getting the last value from the list
+
         return self.get_task_instances(start_date=min_date, end_date=base_date, session=session)
 
     @provide_session
@@ -1534,6 +1537,7 @@ class DAG(LoggingMixin):
             start_date = (timezone.utcnow() - timedelta(30)).replace(
                 hour=0, minute=0, second=0, microsecond=0
             )
+
         query = self._get_task_instances(
             task_ids=None,
             start_date=start_date,
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index a99e1b585f..d5a8de4864 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -470,6 +470,126 @@ class TestDag:
         )
         session.close()
 
+    def test_get_task_instances_before(self):
+
+        BASE_DATE = timezone.datetime(2022, 7, 20, 20)
+
+        test_dag_id = "test_get_task_instances_before"
+        test_task_id = "the_task"
+
+        test_dag = DAG(dag_id=test_dag_id, start_date=BASE_DATE)
+        EmptyOperator(task_id=test_task_id, dag=test_dag)
+
+        session = settings.Session()
+
+        def dag_run_before(delta_h=0, type=DagRunType.SCHEDULED):
+            dagrun = test_dag.create_dagrun(
+                state=State.SUCCESS, run_type=type, run_id=f"test_{delta_h}", session=session
+            )
+            dagrun.start_date = BASE_DATE + timedelta(hours=delta_h)
+            dagrun.execution_date = BASE_DATE + timedelta(hours=delta_h)
+            return dagrun
+
+        dr1 = dag_run_before(delta_h=-1, type=DagRunType.MANUAL)  # H19
+        dr2 = dag_run_before(delta_h=-2, type=DagRunType.MANUAL)  # H18
+        dr3 = dag_run_before(delta_h=-3, type=DagRunType.MANUAL)  # H17
+        dr4 = dag_run_before(delta_h=-4, type=DagRunType.MANUAL)  # H16
+        dr5 = dag_run_before(delta_h=-5)  # H15
+        dr6 = dag_run_before(delta_h=-6)  # H14
+        dr7 = dag_run_before(delta_h=-7)  # H13
+        dr8 = dag_run_before(delta_h=-8)  # H12
+
+        session.commit()
+
+        REF_DATE = BASE_DATE
+
+        assert set([dr.run_id for dr in [dr1]]) == set(
+            [
+                ti.run_id
+                for ti in test_dag.get_task_instances_before(base_date=REF_DATE, num=1, session=session)
+            ]
+        )
+        assert set([dr.run_id for dr in [dr1, dr2, dr3]]) == set(
+            [
+                ti.run_id
+                for ti in test_dag.get_task_instances_before(base_date=REF_DATE, num=3, session=session)
+            ]
+        )
+        assert set([dr.run_id for dr in [dr1, dr2, dr3, dr4, dr5]]) == set(
+            [
+                ti.run_id
+                for ti in test_dag.get_task_instances_before(base_date=REF_DATE, num=5, session=session)
+            ]
+        )
+        assert set([dr.run_id for dr in [dr1, dr2, dr3, dr4, dr5, dr6, dr7]]) == set(
+            [
+                ti.run_id
+                for ti in test_dag.get_task_instances_before(base_date=REF_DATE, num=7, session=session)
+            ]
+        )
+        assert set([dr.run_id for dr in [dr1, dr2, dr3, dr4, dr5, dr6, dr7, dr8]]) == set(
+            [
+                ti.run_id
+                for ti in test_dag.get_task_instances_before(base_date=REF_DATE, num=9, session=session)
+            ]
+        )
+        assert set([dr.run_id for dr in [dr1, dr2, dr3, dr4, dr5, dr6, dr7, dr8]]) == set(
+            [
+                ti.run_id
+                for ti in test_dag.get_task_instances_before(base_date=REF_DATE, num=10, session=session)
+            ]
+        )  # stays constrained to available ones
+
+        REF_DATE = BASE_DATE + timedelta(hours=-3.5)
+
+        assert set([dr.run_id for dr in [dr4]]) == set(
+            [
+                ti.run_id
+                for ti in test_dag.get_task_instances_before(base_date=REF_DATE, num=1, session=session)
+            ]
+        )
+        assert set([dr.run_id for dr in [dr4, dr5, dr6]]) == set(
+            [
+                ti.run_id
+                for ti in test_dag.get_task_instances_before(base_date=REF_DATE, num=3, session=session)
+            ]
+        )
+        assert set([dr.run_id for dr in [dr4, dr5, dr6, dr7, dr8]]) == set(
+            [
+                ti.run_id
+                for ti in test_dag.get_task_instances_before(base_date=REF_DATE, num=5, session=session)
+            ]
+        )
+        assert set([dr.run_id for dr in [dr4, dr5, dr6, dr7, dr8]]) == set(
+            [
+                ti.run_id
+                for ti in test_dag.get_task_instances_before(base_date=REF_DATE, num=6, session=session)
+            ]
+        )  # stays constrained to available ones
+
+        REF_DATE = BASE_DATE + timedelta(hours=-8)
+
+        assert set([dr.run_id for dr in [dr8]]) == set(
+            [
+                ti.run_id
+                for ti in test_dag.get_task_instances_before(base_date=REF_DATE, num=0, session=session)
+            ]
+        )
+        assert set([dr.run_id for dr in [dr8]]) == set(
+            [
+                ti.run_id
+                for ti in test_dag.get_task_instances_before(base_date=REF_DATE, num=1, session=session)
+            ]
+        )
+        assert set([dr.run_id for dr in [dr8]]) == set(
+            [
+                ti.run_id
+                for ti in test_dag.get_task_instances_before(base_date=REF_DATE, num=10, session=session)
+            ]
+        )
+
+        session.close()
+
     def test_user_defined_filters_macros(self):
         def jinja_udf(name):
             return f"Hello {name}"