You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/09/12 15:59:33 UTC

[airflow] 03/04: Flush dataset events before queuing dagruns (#26276)

This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 05a6fcfd3e6fc7ec4274e06ef3a8cea1ee4bdecd
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Fri Sep 9 13:15:26 2022 -0700

    Flush dataset events before queuing dagruns (#26276)
    
    When we go to schedule dagruns from the dataset dagrun queue, we assume
    the events will happen before the queue records, which isn't the case
    unless we explicitly flush them first. This ensures that dagruns are
    properly related to their upstream dataset events.
    
    (cherry picked from commit 954349a952d929dc82087e4bb20d19736f84d381)
---
 airflow/datasets/manager.py       | 1 +
 airflow/jobs/scheduler_job.py     | 1 +
 tests/models/test_taskinstance.py | 6 ++++++
 3 files changed, 8 insertions(+)

diff --git a/airflow/datasets/manager.py b/airflow/datasets/manager.py
index 83539d5965..f044510863 100644
--- a/airflow/datasets/manager.py
+++ b/airflow/datasets/manager.py
@@ -61,6 +61,7 @@ class DatasetManager(LoggingMixin):
                 extra=extra,
             )
         )
+        session.flush()
         if dataset_model.consuming_dags:
             self._queue_dagruns(dataset_model, session)
         session.flush()
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index b175c6bfd7..5a24b530bf 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1135,6 +1135,7 @@ class SchedulerJob(BaseJob):
                 ]
                 if previous_dag_run:
                     dataset_event_filters.append(DatasetEvent.timestamp > previous_dag_run.execution_date)
+
                 dataset_events = (
                     session.query(DatasetEvent)
                     .join(
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index 7b90ffd4cb..43f5db3e44 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -1732,6 +1732,12 @@ class TestTaskInstance:
             DatasetEvent.source_task_instance == ti
         ).one() == ('s3://dag1/output_1.txt',)
 
+        # check that the dataset event has an earlier timestamp than the DDRQ's
+        ddrq_timestamps = (
+            session.query(DatasetDagRunQueue.created_at).filter_by(dataset_id=event.dataset.id).all()
+        )
+        assert all([event.timestamp < ddrq_timestamp for (ddrq_timestamp,) in ddrq_timestamps])
+
     def test_outlet_datasets_failed(self, create_task_instance):
         """
         Verify that when we have an outlet dataset on a task, and the task