You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/11/03 14:05:28 UTC

[GitHub] [airflow] jhtimmins commented on a change in pull request #11589: Perform "mini scheduling run" after task has finished

jhtimmins commented on a change in pull request #11589:
URL: https://github.com/apache/airflow/pull/11589#discussion_r516181198



##########
File path: airflow/models/taskinstance.py
##########
@@ -1135,7 +1136,53 @@ def _run_raw_task(
         if not test_mode:
             session.add(Log(self.state, self))
             session.merge(self)
+
         session.commit()
+        if conf.getboolean('scheduler', 'schedule_after_task_execution', fallback=True):
+            from airflow.models.dagrun import DagRun  # Avoid circular import
+
+            try:
+                # Re-select the row with a lock
+                dag_run = with_row_locks(session.query(DagRun).filter_by(
+                    dag_id=self.dag_id,
+                    execution_date=self.execution_date,
+                )).one()
+
+                # Get a partial dag with just the specific tasks we want to
+                # examine. In order for dep checks to work correctly, we
+                # include ourself (so TriggerRuleDep can check the state of the
+                # task we just executed)
+                partial_dag = self.task.dag.partial_subset(
+                    self.task.downstream_task_ids,
+                    include_downstream=False,
+                    include_upstream=False,
+                    include_direct_upstream=True,
+                )
+
+                dag_run.dag = partial_dag
+                info = dag_run.task_instance_scheduling_decisions(session)
+
+                skippable_task_ids = {
+                    task_id
+                    for task_id in partial_dag.task_ids
+                    if task_id not in self.task.downstream_task_ids
+                }
+
+                schedulable_tis = [
+                    ti for ti in info.schedulable_tis if ti.task_id not in skippable_task_ids
+                ]
+                for schedulable_ti in schedulable_tis:
+                    if not hasattr(schedulable_ti, "task"):
+                        schedulable_ti.task = self.task.dag.get_task(schedulable_ti.task_id)
+
+                num = dag_run.schedule_tis(schedulable_tis)
+                self.log.info("%d downstream tasks scheduled from follow-on schedule check", num)
+
+                session.commit()
+            except OperationalError:
+                # Any kind of DB error here is _non fatal_ as this block is just an optimisation.
+                self.log.info("DB error when checking downstream tasks ignored", exc_info=True)
+                session.rollback()

Review comment:
       Yeah that's a good point. Will do.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org