You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/09/12 15:59:34 UTC
[airflow] 04/04: Fix `dags_needing_dagruns` dataset info timestamp (#26288)
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit c642e1a8373173355013700aa614c87202b2e47f
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Mon Sep 12 01:05:28 2022 -0700
Fix `dags_needing_dagruns` dataset info timestamp (#26288)
(cherry picked from commit 3c9c0f940b67c25285259541478ebb413b94a73a)
---
airflow/models/dag.py | 8 +++-----
tests/models/test_dag.py | 48 +++++++++++++++++++++++++++++++++++++++++++++++-
2 files changed, 50 insertions(+), 6 deletions(-)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index e761352c8c..be400f9427 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -3238,15 +3238,13 @@ class DagModel(Base):
you should ensure that any scheduling decisions are made in a single transaction -- as soon as the
transaction is committed it will be unlocked.
"""
- from airflow.models.dataset import DagScheduleDatasetReference, DatasetDagRunQueue as DDRQ
-
# these dag ids are triggered by datasets, and they are ready to go.
dataset_triggered_dag_info_list = {
- x.dag_id: (x.first_event_time, x.last_event_time)
+ x.dag_id: (x.first_queued_time, x.last_queued_time)
for x in session.query(
DagScheduleDatasetReference.dag_id,
- func.max(DDRQ.created_at).label('last_event_time'),
- func.max(DDRQ.created_at).label('first_event_time'),
+ func.max(DDRQ.created_at).label('last_queued_time'),
+ func.min(DDRQ.created_at).label('first_queued_time'),
)
.join(DagScheduleDatasetReference.queue_records, isouter=True)
.group_by(DagScheduleDatasetReference.dag_id)
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index f0f53db8fa..0070c54c38 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -45,7 +45,7 @@ from airflow.exceptions import AirflowException, DuplicateTaskIdFound, ParamVali
from airflow.models import DAG, DagModel, DagRun, DagTag, TaskFail, TaskInstance as TI
from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import DagOwnerAttributes, dag as dag_decorator, get_dataset_triggered_next_run_info
-from airflow.models.dataset import DatasetDagRunQueue, DatasetModel, TaskOutletDatasetReference
+from airflow.models.dataset import DatasetDagRunQueue, DatasetEvent, DatasetModel, TaskOutletDatasetReference
from airflow.models.param import DagParam, Param, ParamsDict
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
@@ -2220,6 +2220,52 @@ class TestDagModel:
assert dag.relative_fileloc == expected_relative
+ @pytest.mark.need_serialized_dag
+ def test_dags_needing_dagruns_dataset_triggered_dag_info_queued_times(self, session, dag_maker):
+ dataset1 = Dataset(uri="ds1")
+ dataset2 = Dataset(uri="ds2")
+
+ for dag_id, dataset in [("datasets-1", dataset1), ("datasets-2", dataset2)]:
+ with dag_maker(dag_id=dag_id, start_date=timezone.utcnow(), session=session):
+ EmptyOperator(task_id="task", outlets=[dataset])
+ dr = dag_maker.create_dagrun()
+
+ ds_id = session.query(DatasetModel.id).filter_by(uri=dataset.uri).scalar()
+
+ session.add(
+ DatasetEvent(
+ dataset_id=ds_id,
+ source_task_id="task",
+ source_dag_id=dr.dag_id,
+ source_run_id=dr.run_id,
+ source_map_index=-1,
+ )
+ )
+
+ ds1_id = session.query(DatasetModel.id).filter_by(uri=dataset1.uri).scalar()
+ ds2_id = session.query(DatasetModel.id).filter_by(uri=dataset2.uri).scalar()
+
+ with dag_maker(dag_id="datasets-consumer-multiple", schedule=[dataset1, dataset2]) as dag:
+ pass
+
+ session.flush()
+ session.add_all(
+ [
+ DatasetDagRunQueue(dataset_id=ds1_id, target_dag_id=dag.dag_id, created_at=DEFAULT_DATE),
+ DatasetDagRunQueue(
+ dataset_id=ds2_id, target_dag_id=dag.dag_id, created_at=DEFAULT_DATE + timedelta(hours=1)
+ ),
+ ]
+ )
+ session.flush()
+
+ query, dataset_triggered_dag_info = DagModel.dags_needing_dagruns(session)
+ assert 1 == len(dataset_triggered_dag_info)
+ assert dag.dag_id in dataset_triggered_dag_info
+ first_queued_time, last_queued_time = dataset_triggered_dag_info[dag.dag_id]
+ assert first_queued_time == DEFAULT_DATE
+ assert last_queued_time == DEFAULT_DATE + timedelta(hours=1)
+
class TestQueries:
def setup_method(self) -> None: