You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/09/12 15:59:34 UTC

[airflow] 04/04: Fix `dags_needing_dagruns` dataset info timestamp (#26288)

This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit c642e1a8373173355013700aa614c87202b2e47f
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Mon Sep 12 01:05:28 2022 -0700

    Fix `dags_needing_dagruns` dataset info timestamp (#26288)
    
    (cherry picked from commit 3c9c0f940b67c25285259541478ebb413b94a73a)
---
 airflow/models/dag.py    |  8 +++-----
 tests/models/test_dag.py | 48 +++++++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 50 insertions(+), 6 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index e761352c8c..be400f9427 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -3238,15 +3238,13 @@ class DagModel(Base):
         you should ensure that any scheduling decisions are made in a single transaction -- as soon as the
         transaction is committed it will be unlocked.
         """
-        from airflow.models.dataset import DagScheduleDatasetReference, DatasetDagRunQueue as DDRQ
-
         # these dag ids are triggered by datasets, and they are ready to go.
         dataset_triggered_dag_info_list = {
-            x.dag_id: (x.first_event_time, x.last_event_time)
+            x.dag_id: (x.first_queued_time, x.last_queued_time)
             for x in session.query(
                 DagScheduleDatasetReference.dag_id,
-                func.max(DDRQ.created_at).label('last_event_time'),
-                func.max(DDRQ.created_at).label('first_event_time'),
+                func.max(DDRQ.created_at).label('last_queued_time'),
+                func.min(DDRQ.created_at).label('first_queued_time'),
             )
             .join(DagScheduleDatasetReference.queue_records, isouter=True)
             .group_by(DagScheduleDatasetReference.dag_id)
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index f0f53db8fa..0070c54c38 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -45,7 +45,7 @@ from airflow.exceptions import AirflowException, DuplicateTaskIdFound, ParamVali
 from airflow.models import DAG, DagModel, DagRun, DagTag, TaskFail, TaskInstance as TI
 from airflow.models.baseoperator import BaseOperator
 from airflow.models.dag import DagOwnerAttributes, dag as dag_decorator, get_dataset_triggered_next_run_info
-from airflow.models.dataset import DatasetDagRunQueue, DatasetModel, TaskOutletDatasetReference
+from airflow.models.dataset import DatasetDagRunQueue, DatasetEvent, DatasetModel, TaskOutletDatasetReference
 from airflow.models.param import DagParam, Param, ParamsDict
 from airflow.operators.bash import BashOperator
 from airflow.operators.empty import EmptyOperator
@@ -2220,6 +2220,52 @@ class TestDagModel:
 
         assert dag.relative_fileloc == expected_relative
 
+    @pytest.mark.need_serialized_dag
+    def test_dags_needing_dagruns_dataset_triggered_dag_info_queued_times(self, session, dag_maker):
+        dataset1 = Dataset(uri="ds1")
+        dataset2 = Dataset(uri="ds2")
+
+        for dag_id, dataset in [("datasets-1", dataset1), ("datasets-2", dataset2)]:
+            with dag_maker(dag_id=dag_id, start_date=timezone.utcnow(), session=session):
+                EmptyOperator(task_id="task", outlets=[dataset])
+            dr = dag_maker.create_dagrun()
+
+            ds_id = session.query(DatasetModel.id).filter_by(uri=dataset.uri).scalar()
+
+            session.add(
+                DatasetEvent(
+                    dataset_id=ds_id,
+                    source_task_id="task",
+                    source_dag_id=dr.dag_id,
+                    source_run_id=dr.run_id,
+                    source_map_index=-1,
+                )
+            )
+
+        ds1_id = session.query(DatasetModel.id).filter_by(uri=dataset1.uri).scalar()
+        ds2_id = session.query(DatasetModel.id).filter_by(uri=dataset2.uri).scalar()
+
+        with dag_maker(dag_id="datasets-consumer-multiple", schedule=[dataset1, dataset2]) as dag:
+            pass
+
+        session.flush()
+        session.add_all(
+            [
+                DatasetDagRunQueue(dataset_id=ds1_id, target_dag_id=dag.dag_id, created_at=DEFAULT_DATE),
+                DatasetDagRunQueue(
+                    dataset_id=ds2_id, target_dag_id=dag.dag_id, created_at=DEFAULT_DATE + timedelta(hours=1)
+                ),
+            ]
+        )
+        session.flush()
+
+        query, dataset_triggered_dag_info = DagModel.dags_needing_dagruns(session)
+        assert 1 == len(dataset_triggered_dag_info)
+        assert dag.dag_id in dataset_triggered_dag_info
+        first_queued_time, last_queued_time = dataset_triggered_dag_info[dag.dag_id]
+        assert first_queued_time == DEFAULT_DATE
+        assert last_queued_time == DEFAULT_DATE + timedelta(hours=1)
+
 
 class TestQueries:
     def setup_method(self) -> None: