You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/05/01 16:22:01 UTC

[GitHub] [airflow] andriisoldatenko commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

andriisoldatenko commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r280125168
 
 

 ##########
 File path: airflow/ti_deps/deps/trigger_rule_dep.py
 ##########
 @@ -49,33 +50,46 @@ def _get_dep_statuses(self, ti, session, dep_context):
             yield self._passing_status(reason="The task had a dummy trigger rule set.")
             return
 
-        # TODO(unknown): this query becomes quite expensive with dags that have many
-        # tasks. It should be refactored to let the task report to the dag run and get the
-        # aggregates from there.
-        qry = (
-            session
-            .query(
-                func.coalesce(func.sum(
-                    case([(TI.state == State.SUCCESS, 1)], else_=0)), 0),
-                func.coalesce(func.sum(
-                    case([(TI.state == State.SKIPPED, 1)], else_=0)), 0),
-                func.coalesce(func.sum(
-                    case([(TI.state == State.FAILED, 1)], else_=0)), 0),
-                func.coalesce(func.sum(
-                    case([(TI.state == State.UPSTREAM_FAILED, 1)], else_=0)), 0),
-                func.count(TI.task_id),
+        successes, skipped, failed, upstream_failed, done = 0, 0, 0, 0, 0
+        if dep_context.finished_tasks is None:
+            qry = (
+                session
+                .query(
+                    func.coalesce(func.sum(
+                        case([(TI.state == State.SUCCESS, 1)], else_=0)), 0),
+                    func.coalesce(func.sum(
+                        case([(TI.state == State.SKIPPED, 1)], else_=0)), 0),
+                    func.coalesce(func.sum(
+                        case([(TI.state == State.FAILED, 1)], else_=0)), 0),
+                    func.coalesce(func.sum(
+                        case([(TI.state == State.UPSTREAM_FAILED, 1)], else_=0)), 0),
+                    func.count(TI.task_id),
+                )
+                .filter(
+                    TI.dag_id == ti.dag_id,
+                    TI.task_id.in_(ti.task.upstream_task_ids),
+                    TI.execution_date == ti.execution_date,
+                    TI.state.in_(State.finished()),
+                )
             )
-            .filter(
-                TI.dag_id == ti.dag_id,
-                TI.task_id.in_(ti.task.upstream_task_ids),
-                TI.execution_date == ti.execution_date,
-                TI.state.in_([
-                    State.SUCCESS, State.FAILED,
-                    State.UPSTREAM_FAILED, State.SKIPPED]),
-            )
-        )
+            successes, skipped, failed, upstream_failed, done = qry.first()
+        else:
+            # see if the task name is in the task upstream for our task
+            upstream_tasks = [finished_task for finished_task in dep_context.finished_tasks
+                              if finished_task.task_id in ti.task.upstream_task_ids]
+            if upstream_tasks:
 
 Review comment:
   I was wondering before this change we had only SQL query, but now it's both python + sql.
   🤔 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services