You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ds...@apache.org on 2022/07/08 16:41:37 UTC

[airflow] branch main updated: Fix cartesian join re dataset deps in update_state (#24925)

This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4f595e9031 Fix cartesian join re dataset deps in update_state (#24925)
4f595e9031 is described below

commit 4f595e9031890a335b12e783e5f70b4e5623f856
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Fri Jul 8 09:41:22 2022 -0700

    Fix cartesian join re dataset deps in update_state (#24925)
    
    We use the subquery to get distinct downstream dataset references for the dag (there could be
    multiple tasks that touch the same dataset), then join to DDR to get the dags pointing to
    those datasets.  The subuery avoids a many-to-many join.
---
 airflow/models/dagrun.py | 14 ++++++++++----
 1 file changed, 10 insertions(+), 4 deletions(-)

diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 31888fe22e..ab65bbabcf 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -651,14 +651,20 @@ class DagRun(Base, LoggingMixin):
                     if isinstance(obj, Dataset):
                         has_dataset_outlets = True
                         break
-        dependent_dag_ids = []
+        dependent_dag_ids = {}
         if self.dag and has_dataset_outlets:
-            dependent_dag_ids = [
+            subquery = (
+                session.query(DatasetTaskRef.dataset_id)
+                .filter(DatasetTaskRef.dag_id == self.dag_id)
+                .distinct(DatasetTaskRef.dataset_id)
+                .subquery()
+            )
+            dependent_dag_ids = {
                 x.dag_id
                 for x in session.query(DatasetDagRef.dag_id)
-                .filter(DatasetTaskRef.dag_id == self.dag_id)
+                .join(subquery, subquery.c.dataset_id == DatasetDagRef.dataset_id)
                 .all()
-            ]
+            }
 
         from airflow.models.dataset import DatasetDagRunQueue as DDRQ
         from airflow.models.serialized_dag import SerializedDagModel