You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/10/16 16:20:24 UTC

[GitHub] [airflow] jhtimmins opened a new pull request #11589: Perform "mini scheduling run" after task has finished

jhtimmins opened a new pull request #11589:
URL: https://github.com/apache/airflow/pull/11589


   In order to further reduce intra-dag task scheduling lag we add an
   optimization: when a task has just finished executing (success or
   failure) we can look at the downstream tasks of just that task, and then
   make scheduling decisions for those tasks there -- we've already got the
   dag loaded, and we know they are likely actionable as we just finished.
   
   We should set tasks to scheduled if we can (but no further, i.e. not to
   queued, as the scheduler has to make that decision with info about the
   Pool usage etc.).
   
   Co-authored-by: Ash Berlin-Taylor <as...@firemirror.com>
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jhtimmins closed pull request #11589: Perform "mini scheduling run" after task has finished

Posted by GitBox <gi...@apache.org>.
jhtimmins closed pull request #11589:
URL: https://github.com/apache/airflow/pull/11589


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jhtimmins commented on a change in pull request #11589: Perform "mini scheduling run" after task has finished

Posted by GitBox <gi...@apache.org>.
jhtimmins commented on a change in pull request #11589:
URL: https://github.com/apache/airflow/pull/11589#discussion_r513157031



##########
File path: airflow/models/dagrun.py
##########
@@ -380,27 +395,21 @@ def update_state(
         self.last_scheduling_decision = start_dttm
 
         dag = self.get_dag()
-        ready_tis: List[TI] = []
-        tis = list(self.get_task_instances(session=session, state=State.task_states + (State.SHUTDOWN,)))
-        self.log.debug("number of tis tasks for %s: %s task(s)", self, len(tis))
-        for ti in tis:
-            ti.task = dag.get_task(ti.task_id)
+        info = self.task_instance_scheduling_decisions(session)
+
+        tis = info['tis']
+        schedulable_tis = info['schedulable_tis']
+        changed_tis = info['changed_tis']
+        finished_tasks = info['finished_tasks']
+        unfinished_tasks = info['unfinished_tasks']
 
-        unfinished_tasks = [t for t in tis if t.state in State.unfinished]
-        finished_tasks = [t for t in tis if t.state in State.finished]
         none_depends_on_past = all(not t.task.depends_on_past for t in unfinished_tasks)
         none_task_concurrency = all(t.task.task_concurrency is None for t in unfinished_tasks)
-        if unfinished_tasks:
-            scheduleable_tasks = [ut for ut in unfinished_tasks if ut.state in SCHEDULEABLE_STATES]
-            self.log.debug(
-                "number of scheduleable tasks for %s: %s task(s)",
-                self, len(scheduleable_tasks))
-            ready_tis, changed_tis = self._get_ready_tis(scheduleable_tasks, finished_tasks, session)
-            self.log.debug("ready tis length for %s: %s task(s)", self, len(ready_tis))
-            if none_depends_on_past and none_task_concurrency:
-                # small speed up
-                are_runnable_tasks = ready_tis or self._are_premature_tis(
-                    unfinished_tasks, finished_tasks, session) or changed_tis
+
+        if unfinished_tasks and none_depends_on_past and none_task_concurrency:
+            # small speed up

Review comment:
       Ah looks like it's there so it's outside the conditional. nvm.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #11589: Perform "mini scheduling run" after task has finished

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #11589:
URL: https://github.com/apache/airflow/pull/11589#issuecomment-718962546


   [The Workflow run](https://github.com/apache/airflow/actions/runs/336512927) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #11589: WIP: Perform "mini scheduling run" after task has finished

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #11589:
URL: https://github.com/apache/airflow/pull/11589#issuecomment-710273706


   [The Workflow run](https://github.com/apache/airflow/actions/runs/311084105) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #11589: Perform "mini scheduling run" after task has finished

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #11589:
URL: https://github.com/apache/airflow/pull/11589#discussion_r516152828



##########
File path: airflow/models/taskinstance.py
##########
@@ -1135,7 +1136,53 @@ def _run_raw_task(
         if not test_mode:
             session.add(Log(self.state, self))
             session.merge(self)
+
         session.commit()
+        if conf.getboolean('scheduler', 'schedule_after_task_execution', fallback=True):
+            from airflow.models.dagrun import DagRun  # Avoid circular import
+
+            try:
+                # Re-select the row with a lock
+                dag_run = with_row_locks(session.query(DagRun).filter_by(
+                    dag_id=self.dag_id,
+                    execution_date=self.execution_date,
+                )).one()
+
+                # Get a partial dag with just the specific tasks we want to
+                # examine. In order for dep checks to work correctly, we
+                # include ourself (so TriggerRuleDep can check the state of the
+                # task we just executed)
+                partial_dag = self.task.dag.partial_subset(
+                    self.task.downstream_task_ids,
+                    include_downstream=False,
+                    include_upstream=False,
+                    include_direct_upstream=True,
+                )
+
+                dag_run.dag = partial_dag
+                info = dag_run.task_instance_scheduling_decisions(session)
+
+                skippable_task_ids = {
+                    task_id
+                    for task_id in partial_dag.task_ids
+                    if task_id not in self.task.downstream_task_ids
+                }
+
+                schedulable_tis = [
+                    ti for ti in info.schedulable_tis if ti.task_id not in skippable_task_ids
+                ]
+                for schedulable_ti in schedulable_tis:
+                    if not hasattr(schedulable_ti, "task"):
+                        schedulable_ti.task = self.task.dag.get_task(schedulable_ti.task_id)
+
+                num = dag_run.schedule_tis(schedulable_tis)
+                self.log.info("%d downstream tasks scheduled from follow-on schedule check", num)
+
+                session.commit()
+            except OperationalError:
+                # Any kind of DB error here is _non fatal_ as this block is just an optimisation.
+                self.log.info("DB error when checking downstream tasks ignored", exc_info=True)
+                session.rollback()

Review comment:
       How about encapsulating this into separate method? The `_run_raw_task` is already a long one 😉 

##########
File path: airflow/models/taskinstance.py
##########
@@ -1135,7 +1136,53 @@ def _run_raw_task(
         if not test_mode:
             session.add(Log(self.state, self))
             session.merge(self)
+
         session.commit()
+        if conf.getboolean('scheduler', 'schedule_after_task_execution', fallback=True):
+            from airflow.models.dagrun import DagRun  # Avoid circular import
+
+            try:
+                # Re-select the row with a lock
+                dag_run = with_row_locks(session.query(DagRun).filter_by(
+                    dag_id=self.dag_id,
+                    execution_date=self.execution_date,
+                )).one()
+
+                # Get a partial dag with just the specific tasks we want to
+                # examine. In order for dep checks to work correctly, we
+                # include ourself (so TriggerRuleDep can check the state of the
+                # task we just executed)
+                partial_dag = self.task.dag.partial_subset(
+                    self.task.downstream_task_ids,
+                    include_downstream=False,
+                    include_upstream=False,
+                    include_direct_upstream=True,
+                )
+
+                dag_run.dag = partial_dag
+                info = dag_run.task_instance_scheduling_decisions(session)
+
+                skippable_task_ids = {
+                    task_id
+                    for task_id in partial_dag.task_ids
+                    if task_id not in self.task.downstream_task_ids
+                }
+
+                schedulable_tis = [
+                    ti for ti in info.schedulable_tis if ti.task_id not in skippable_task_ids
+                ]
+                for schedulable_ti in schedulable_tis:
+                    if not hasattr(schedulable_ti, "task"):
+                        schedulable_ti.task = self.task.dag.get_task(schedulable_ti.task_id)
+
+                num = dag_run.schedule_tis(schedulable_tis)
+                self.log.info("%d downstream tasks scheduled from follow-on schedule check", num)
+
+                session.commit()
+            except OperationalError:
+                # Any kind of DB error here is _non fatal_ as this block is just an optimisation.
+                self.log.info("DB error when checking downstream tasks ignored", exc_info=True)

Review comment:
       As I user I would be worried about seeing such info logs. Should it be debug? 

##########
File path: airflow/models/dagrun.py
##########
@@ -638,3 +674,52 @@ def get_latest_runs(cls, session=None):
             .all()
         )
         return dagruns
+
+    @provide_session
+    def schedule_tis(self, schedulable_tis: Iterable[TI], session: Session = None) -> int:
+        """
+        Set the given task instances in to the scheduled state.
+
+        Each element of ``schedulable_tis`` should have it's ``task`` attribute already set.
+
+        Any DummyOperator without callbacks is instead set straight to the success state.
+
+        All the TIs should belong to this DagRun, but this code is in the hot-path, this is not checked -- it
+        is the caller's responsibility to call this function only with TIs from a single dag run.
+        """
+        # Get list of TIs that do not need to executed, these are
+        # tasks using DummyOperator and without on_execute_callback / on_success_callback
+        dummy_tis = {
+            ti for ti in schedulable_tis
+            if
+            (
+                ti.task.task_type == "DummyOperator"

Review comment:
       Will it work for operators that inherit from `DummyOperator`?

##########
File path: airflow/models/dagrun.py
##########
@@ -43,6 +43,20 @@
 from airflow.utils.types import DagRunType
 
 
+class TISchedulingDecision(NamedTuple):
+    """
+    Type of return for DagRun.task_instance_scheduling_decisions
+
+    This is only used by type checkers, at run time this is a plain dict.

Review comment:
       Is this comment still valid?

##########
File path: airflow/models/taskinstance.py
##########
@@ -1135,7 +1136,53 @@ def _run_raw_task(
         if not test_mode:
             session.add(Log(self.state, self))
             session.merge(self)
+
         session.commit()
+        if conf.getboolean('scheduler', 'schedule_after_task_execution', fallback=True):
+            from airflow.models.dagrun import DagRun  # Avoid circular import
+
+            try:
+                # Re-select the row with a lock
+                dag_run = with_row_locks(session.query(DagRun).filter_by(
+                    dag_id=self.dag_id,
+                    execution_date=self.execution_date,
+                )).one()
+
+                # Get a partial dag with just the specific tasks we want to
+                # examine. In order for dep checks to work correctly, we
+                # include ourself (so TriggerRuleDep can check the state of the
+                # task we just executed)
+                partial_dag = self.task.dag.partial_subset(
+                    self.task.downstream_task_ids,
+                    include_downstream=False,
+                    include_upstream=False,
+                    include_direct_upstream=True,
+                )
+
+                dag_run.dag = partial_dag
+                info = dag_run.task_instance_scheduling_decisions(session)
+
+                skippable_task_ids = {
+                    task_id
+                    for task_id in partial_dag.task_ids
+                    if task_id not in self.task.downstream_task_ids
+                }
+
+                schedulable_tis = [
+                    ti for ti in info.schedulable_tis if ti.task_id not in skippable_task_ids
+                ]
+                for schedulable_ti in schedulable_tis:
+                    if not hasattr(schedulable_ti, "task"):
+                        schedulable_ti.task = self.task.dag.get_task(schedulable_ti.task_id)
+
+                num = dag_run.schedule_tis(schedulable_tis)
+                self.log.info("%d downstream tasks scheduled from follow-on schedule check", num)
+
+                session.commit()
+            except OperationalError:
+                # Any kind of DB error here is _non fatal_ as this block is just an optimisation.
+                self.log.info("DB error when checking downstream tasks ignored", exc_info=True)

Review comment:
       It always will be a "database exception" which is rather critical one imho. And here we are telling users "your database refused something but you don't have to worry about it". I think we either have to make it less "critical" like `Skipping mini scheduling run due to exception: %s`. But still, logging the exception will show problem with database...

##########
File path: airflow/models/taskinstance.py
##########
@@ -1135,8 +1136,63 @@ def _run_raw_task(
         if not test_mode:
             session.add(Log(self.state, self))
             session.merge(self)
+
         session.commit()
 
+        self._run_mini_scheduler_on_child_tasks(session)

Review comment:
       It may happened that we will do `rollback` on session we already committed, is is expected @jhtimmins ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb merged pull request #11589: Perform "mini scheduling run" after task has finished

Posted by GitBox <gi...@apache.org>.
ashb merged pull request #11589:
URL: https://github.com/apache/airflow/pull/11589


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #11589: WIP: Perform "mini scheduling run" after task has finished

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #11589:
URL: https://github.com/apache/airflow/pull/11589#discussion_r506771453



##########
File path: airflow/models/dagrun.py
##########
@@ -43,6 +44,20 @@
 from airflow.utils.types import DagRunType
 
 
+class _TISchedulingDecision(TypedDict):

Review comment:
       Is there a reason we need to use TypedDict? NamedTuple is much easier to use in many cases.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #11589: WIP: Perform "mini scheduling run" after task has finished

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #11589:
URL: https://github.com/apache/airflow/pull/11589#discussion_r506771250



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1664,42 +1664,11 @@ def _schedule_dag_run(self, dag_run: DagRun, currently_active_runs: int, session
 
         self._send_dag_callbacks_to_processor(dag_run, callback_to_run)
 
-        # Get list of TIs that do not need to executed, these are
-        # tasks using DummyOperator and without on_execute_callback / on_success_callback
-        dummy_tis = [
-            ti for ti in schedulable_tis
-            if
-            (
-                ti.task.task_type == "DummyOperator"
-                and not ti.task.on_execute_callback
-                and not ti.task.on_success_callback
-            )
-        ]
-
         # This will do one query per dag run. We "could" build up a complex

Review comment:
       It seems to me that we should also move the comment. Now it has lost context.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #11589: Perform "mini scheduling run" after task has finished

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #11589:
URL: https://github.com/apache/airflow/pull/11589#discussion_r512897303



##########
File path: airflow/models/dagrun.py
##########
@@ -380,27 +395,21 @@ def update_state(
         self.last_scheduling_decision = start_dttm
 
         dag = self.get_dag()
-        ready_tis: List[TI] = []
-        tis = list(self.get_task_instances(session=session, state=State.task_states + (State.SHUTDOWN,)))
-        self.log.debug("number of tis tasks for %s: %s task(s)", self, len(tis))
-        for ti in tis:
-            ti.task = dag.get_task(ti.task_id)
+        info = self.task_instance_scheduling_decisions(session)
+
+        tis = info['tis']
+        schedulable_tis = info['schedulable_tis']
+        changed_tis = info['changed_tis']
+        finished_tasks = info['finished_tasks']
+        unfinished_tasks = info['unfinished_tasks']
 
-        unfinished_tasks = [t for t in tis if t.state in State.unfinished]
-        finished_tasks = [t for t in tis if t.state in State.finished]
         none_depends_on_past = all(not t.task.depends_on_past for t in unfinished_tasks)
         none_task_concurrency = all(t.task.task_concurrency is None for t in unfinished_tasks)
-        if unfinished_tasks:
-            scheduleable_tasks = [ut for ut in unfinished_tasks if ut.state in SCHEDULEABLE_STATES]
-            self.log.debug(
-                "number of scheduleable tasks for %s: %s task(s)",
-                self, len(scheduleable_tasks))
-            ready_tis, changed_tis = self._get_ready_tis(scheduleable_tasks, finished_tasks, session)
-            self.log.debug("ready tis length for %s: %s task(s)", self, len(ready_tis))
-            if none_depends_on_past and none_task_concurrency:
-                # small speed up
-                are_runnable_tasks = ready_tis or self._are_premature_tis(
-                    unfinished_tasks, finished_tasks, session) or changed_tis
+
+        if unfinished_tasks and none_depends_on_past and none_task_concurrency:
+            # small speed up

Review comment:
       Bad/lazy refactor most likely.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #11589: WIP: Perform "mini scheduling run" after task has finished

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #11589:
URL: https://github.com/apache/airflow/pull/11589#issuecomment-710186502






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #11589: Perform "mini scheduling run" after task has finished

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #11589:
URL: https://github.com/apache/airflow/pull/11589#discussion_r513375796



##########
File path: tests/models/test_dagrun.py
##########
@@ -215,6 +218,110 @@ def test_dagrun_success_conditions(self):
         dr.update_state()
         self.assertEqual(State.SUCCESS, dr.state)
 
+    def validate_ti_states(self, dag_run, ti_state_mapping):
+        for task_id, expected_state in ti_state_mapping.items():
+            task_instance = dag_run.get_task_instance(task_id=task_id)
+            self.assertEqual(task_instance.state, expected_state)
+
+    @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+    def test_dagrun_fast_follow(self):
+        session = settings.Session()
+
+        dag = DAG(
+            'test_dagrun_fast_follow',
+            start_date=DEFAULT_DATE
+        )
+
+        dag_model = DagModel(
+            dag_id=dag.dag_id,
+            next_dagrun=dag.start_date,
+            is_active=True,
+        )
+        session.add(dag_model)
+        session.flush()
+
+        python_callable = lambda: True
+        with dag:
+            task_a = PythonOperator(task_id='A', python_callable=python_callable)
+            task_b = PythonOperator(task_id='B', python_callable=python_callable)
+            task_c = PythonOperator(task_id='C', python_callable=python_callable)
+            task_a >> task_b
+            task_b >> task_c
+
+        scheduler = SchedulerJob()
+        scheduler.dagbag.bag_dag(dag, root_dag=dag)
+
+        dag_run = dag.create_dagrun(run_id='test_dagrun_fast_follow', state=State.RUNNING)
+
+        task_instance_a = dag_run.get_task_instance(task_id=task_a.task_id)
+        task_instance_a.task = dag.get_task(task_a.task_id)

Review comment:
       ```suggestion
           task_instance_a.task = task_a
   ```

##########
File path: tests/models/test_dagrun.py
##########
@@ -215,6 +218,110 @@ def test_dagrun_success_conditions(self):
         dr.update_state()
         self.assertEqual(State.SUCCESS, dr.state)
 
+    def validate_ti_states(self, dag_run, ti_state_mapping):
+        for task_id, expected_state in ti_state_mapping.items():
+            task_instance = dag_run.get_task_instance(task_id=task_id)
+            self.assertEqual(task_instance.state, expected_state)
+
+    @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+    def test_dagrun_fast_follow(self):
+        session = settings.Session()
+
+        dag = DAG(
+            'test_dagrun_fast_follow',
+            start_date=DEFAULT_DATE
+        )
+
+        dag_model = DagModel(
+            dag_id=dag.dag_id,
+            next_dagrun=dag.start_date,
+            is_active=True,
+        )
+        session.add(dag_model)
+        session.flush()
+
+        python_callable = lambda: True
+        with dag:
+            task_a = PythonOperator(task_id='A', python_callable=python_callable)
+            task_b = PythonOperator(task_id='B', python_callable=python_callable)
+            task_c = PythonOperator(task_id='C', python_callable=python_callable)
+            task_a >> task_b
+            task_b >> task_c
+
+        scheduler = SchedulerJob()
+        scheduler.dagbag.bag_dag(dag, root_dag=dag)
+
+        dag_run = dag.create_dagrun(run_id='test_dagrun_fast_follow', state=State.RUNNING)
+
+        task_instance_a = dag_run.get_task_instance(task_id=task_a.task_id)
+        task_instance_a.task = dag.get_task(task_a.task_id)
+
+        task_instance_b = dag_run.get_task_instance(task_id=task_b.task_id)
+        task_instance_b.task = dag.get_task(task_b.task_id)

Review comment:
       ```suggestion
           task_instance_b.task = task_b
   ```

##########
File path: tests/models/test_dagrun.py
##########
@@ -215,6 +218,110 @@ def test_dagrun_success_conditions(self):
         dr.update_state()
         self.assertEqual(State.SUCCESS, dr.state)
 
+    def validate_ti_states(self, dag_run, ti_state_mapping):
+        for task_id, expected_state in ti_state_mapping.items():
+            task_instance = dag_run.get_task_instance(task_id=task_id)
+            self.assertEqual(task_instance.state, expected_state)
+
+    @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+    def test_dagrun_fast_follow(self):
+        session = settings.Session()
+
+        dag = DAG(
+            'test_dagrun_fast_follow',
+            start_date=DEFAULT_DATE
+        )
+
+        dag_model = DagModel(
+            dag_id=dag.dag_id,
+            next_dagrun=dag.start_date,
+            is_active=True,
+        )
+        session.add(dag_model)
+        session.flush()
+
+        python_callable = lambda: True
+        with dag:
+            task_a = PythonOperator(task_id='A', python_callable=python_callable)
+            task_b = PythonOperator(task_id='B', python_callable=python_callable)
+            task_c = PythonOperator(task_id='C', python_callable=python_callable)
+            task_a >> task_b
+            task_b >> task_c
+
+        scheduler = SchedulerJob()
+        scheduler.dagbag.bag_dag(dag, root_dag=dag)
+
+        dag_run = dag.create_dagrun(run_id='test_dagrun_fast_follow', state=State.RUNNING)
+
+        task_instance_a = dag_run.get_task_instance(task_id=task_a.task_id)
+        task_instance_a.task = dag.get_task(task_a.task_id)
+
+        task_instance_b = dag_run.get_task_instance(task_id=task_b.task_id)
+        task_instance_b.task = dag.get_task(task_b.task_id)
+
+        schedulable_tis, _ = dag_run.update_state()
+        self.assertEqual(len(schedulable_tis), 1)
+        self.assertEqual(schedulable_tis[0].task_id, task_a.task_id)
+        self.assertEqual(schedulable_tis[0].state, State.NONE)
+
+        dag_run.schedule_tis(schedulable_tis, session)
+        self.validate_ti_states(dag_run, {'A': State.SCHEDULED, 'B': State.NONE, 'C': State.NONE})
+
+        scheduler._critical_section_execute_task_instances(session=session)
+        self.validate_ti_states(dag_run, {'A': State.QUEUED, 'B': State.NONE, 'C': State.NONE})

Review comment:
       (For this to work the TI would need to be attached to the session you would need to pass `session=session` to dag_run.get_task_instance)

##########
File path: tests/models/test_dagrun.py
##########
@@ -215,6 +218,110 @@ def test_dagrun_success_conditions(self):
         dr.update_state()
         self.assertEqual(State.SUCCESS, dr.state)
 
+    def validate_ti_states(self, dag_run, ti_state_mapping):
+        for task_id, expected_state in ti_state_mapping.items():
+            task_instance = dag_run.get_task_instance(task_id=task_id)
+            self.assertEqual(task_instance.state, expected_state)
+
+    @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+    def test_dagrun_fast_follow(self):
+        session = settings.Session()
+
+        dag = DAG(
+            'test_dagrun_fast_follow',
+            start_date=DEFAULT_DATE
+        )
+
+        dag_model = DagModel(
+            dag_id=dag.dag_id,
+            next_dagrun=dag.start_date,
+            is_active=True,
+        )
+        session.add(dag_model)
+        session.flush()
+
+        python_callable = lambda: True
+        with dag:
+            task_a = PythonOperator(task_id='A', python_callable=python_callable)
+            task_b = PythonOperator(task_id='B', python_callable=python_callable)
+            task_c = PythonOperator(task_id='C', python_callable=python_callable)
+            task_a >> task_b
+            task_b >> task_c
+
+        scheduler = SchedulerJob()
+        scheduler.dagbag.bag_dag(dag, root_dag=dag)
+
+        dag_run = dag.create_dagrun(run_id='test_dagrun_fast_follow', state=State.RUNNING)
+
+        task_instance_a = dag_run.get_task_instance(task_id=task_a.task_id)
+        task_instance_a.task = dag.get_task(task_a.task_id)
+
+        task_instance_b = dag_run.get_task_instance(task_id=task_b.task_id)
+        task_instance_b.task = dag.get_task(task_b.task_id)
+
+        schedulable_tis, _ = dag_run.update_state()
+        self.assertEqual(len(schedulable_tis), 1)
+        self.assertEqual(schedulable_tis[0].task_id, task_a.task_id)
+        self.assertEqual(schedulable_tis[0].state, State.NONE)
+
+        dag_run.schedule_tis(schedulable_tis, session)
+        self.validate_ti_states(dag_run, {'A': State.SCHEDULED, 'B': State.NONE, 'C': State.NONE})
+
+        scheduler._critical_section_execute_task_instances(session=session)
+        self.validate_ti_states(dag_run, {'A': State.QUEUED, 'B': State.NONE, 'C': State.NONE})

Review comment:
       Do we need this block? I think this test would be clearer if we instead just directly set task_instance_a to a runnable state: For example:
   
   
   ```python
           task_instance_a.state = State.QUEUED
           session.commit()
   ```
   
   My reason here is that this is the "pre-condition/setup" for the test, not part of what are actually testing here, so by having these asserts and calling the scheduler job code we are not testing this feature in isolation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #11589: WIP: Perform "mini scheduling run" after task has finished

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #11589:
URL: https://github.com/apache/airflow/pull/11589#discussion_r506970169



##########
File path: airflow/models/dagrun.py
##########
@@ -43,6 +44,20 @@
 from airflow.utils.types import DagRunType
 
 
+class _TISchedulingDecision(TypedDict):

Review comment:
       No reason -- just happened to be the example we were looking at in PoolStats and copied that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #11589: WIP: Perform "mini scheduling run" after task has finished

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #11589:
URL: https://github.com/apache/airflow/pull/11589#discussion_r512603098



##########
File path: airflow/models/dagrun.py
##########
@@ -466,7 +475,35 @@ def update_state(
 
         session.merge(self)
 
-        return ready_tis, callback
+        return schedulable_tis, callback
+
+    @provide_session
+    def task_instance_scheduling_decisions(self, session: Session = None) -> _TISchedulingDecision:
+
+        schedulable_tis: List[TI] = []
+        changed_tis = False
+
+        tis = list(self.get_task_instances(session=session, state=State.task_states + (State.SHUTDOWN,)))
+        self.log.debug("number of tis tasks for %s: %s task(s)", self, len(tis))
+        for ti in tis:
+            ti.task = self.get_dag().get_task(ti.task_id)
+
+        unfinished_tasks = [t for t in tis if t.state in State.unfinished]
+        finished_tasks = [t for t in tis if t.state in State.finished | {State.UPSTREAM_FAILED}]

Review comment:
       ```suggestion
           finished_tasks = [t for t in tis if t.state in State.finished]
   ```
   
   I added UPSTREAM_FAILED in to `finished` in a PR that's already been merged.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #11589: WIP: Perform "mini scheduling run" after task has finished

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #11589:
URL: https://github.com/apache/airflow/pull/11589#discussion_r506969739



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1664,42 +1664,11 @@ def _schedule_dag_run(self, dag_run: DagRun, currently_active_runs: int, session
 
         self._send_dag_callbacks_to_processor(dag_run, callback_to_run)
 
-        # Get list of TIs that do not need to executed, these are
-        # tasks using DummyOperator and without on_execute_callback / on_success_callback
-        dummy_tis = [
-            ti for ti in schedulable_tis
-            if
-            (
-                ti.task.task_type == "DummyOperator"
-                and not ti.task.on_execute_callback
-                and not ti.task.on_success_callback
-            )
-        ]
-
         # This will do one query per dag run. We "could" build up a complex

Review comment:
       This comment doesn't make sense when called on a (instance) method on DagRun, as that almost by definition only operators on a single dag run. The comment is kept here in the scheduler because that's where might think we want to batch the queries up, but shouldn't.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #11589: Perform "mini scheduling run" after task has finished

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #11589:
URL: https://github.com/apache/airflow/pull/11589#discussion_r515934876



##########
File path: airflow/models/taskinstance.py
##########
@@ -1135,7 +1136,59 @@ def _run_raw_task(
         if not test_mode:
             session.add(Log(self.state, self))
             session.merge(self)
+
         session.commit()
+        if conf.getboolean('scheduler', 'schedule_after_task_execution', fallback=True):
+            from airflow.models.dagrun import DagRun  # Avoid circular import
+
+            try:
+                # Re-select the row with a lock
+                dag_run = with_row_locks(session.query(DagRun).filter_by(
+                    dag_id=self.dag_id,
+                    execution_date=self.execution_date,
+                )).one()
+
+                # Get a partial dag with just the specific tasks we want to
+                # examine. In order for dep checks to work correctly, we
+                # include ourself (so TriggerRuleDep can check the state of the
+                # task we just executed)
+                tasks = self.task.downstream_list + [self.task]
+                task_ids = [t.task_id for t in tasks]
+
+                partial_dag = self.task.dag.partial_subset(
+                    task_ids,
+                    include_downstream=True,

Review comment:
       Hmmmm, this is going to include _all_ downstream tasks, recursively, not just the tasks we've selected.

##########
File path: airflow/models/taskinstance.py
##########
@@ -1135,7 +1136,53 @@ def _run_raw_task(
         if not test_mode:
             session.add(Log(self.state, self))
             session.merge(self)
+
         session.commit()
+        if conf.getboolean('scheduler', 'schedule_after_task_execution', fallback=True):
+            from airflow.models.dagrun import DagRun  # Avoid circular import
+
+            try:
+                # Re-select the row with a lock
+                dag_run = with_row_locks(session.query(DagRun).filter_by(
+                    dag_id=self.dag_id,
+                    execution_date=self.execution_date,
+                )).one()
+
+                # Get a partial dag with just the specific tasks we want to
+                # examine. In order for dep checks to work correctly, we
+                # include ourself (so TriggerRuleDep can check the state of the
+                # task we just executed)
+                partial_dag = self.task.dag.partial_subset(
+                    self.task.downstream_task_ids,
+                    include_downstream=False,
+                    include_upstream=False,
+                    include_direct_upstream=True,
+                )
+
+                dag_run.dag = partial_dag
+                info = dag_run.task_instance_scheduling_decisions(session)
+
+                skippable_task_ids = {
+                    task_id
+                    for task_id in partial_dag.task_ids
+                    if task_id not in self.task.downstream_task_ids
+                }
+
+                schedulable_tis = [
+                    ti for ti in info.schedulable_tis if ti.task_id not in skippable_task_ids
+                ]
+                for schedulable_ti in schedulable_tis:
+                    if not hasattr(schedulable_ti, "task"):
+                        schedulable_ti.task = self.task.dag.get_task(schedulable_ti.task_id)
+
+                num = dag_run.schedule_tis(schedulable_tis)
+                self.log.info("%d downstream tasks scheduled from follow-on schedule check", num)
+
+                session.commit()
+            except OperationalError:
+                # Any kind of DB error here is _non fatal_ as this block is just an optimisation.
+                self.log.info("DB error when checking downstream tasks ignored", exc_info=True)

Review comment:
       I think it depends what the exception was

##########
File path: airflow/models/dagrun.py
##########
@@ -638,3 +674,52 @@ def get_latest_runs(cls, session=None):
             .all()
         )
         return dagruns
+
+    @provide_session
+    def schedule_tis(self, schedulable_tis: Iterable[TI], session: Session = None) -> int:
+        """
+        Set the given task instances in to the scheduled state.
+
+        Each element of ``schedulable_tis`` should have it's ``task`` attribute already set.
+
+        Any DummyOperator without callbacks is instead set straight to the success state.
+
+        All the TIs should belong to this DagRun, but this code is in the hot-path, this is not checked -- it
+        is the caller's responsibility to call this function only with TIs from a single dag run.
+        """
+        # Get list of TIs that do not need to executed, these are
+        # tasks using DummyOperator and without on_execute_callback / on_success_callback
+        dummy_tis = {
+            ti for ti in schedulable_tis
+            if
+            (
+                ti.task.task_type == "DummyOperator"

Review comment:
       No, there is already an issue for that though

##########
File path: airflow/models/dagrun.py
##########
@@ -43,6 +43,20 @@
 from airflow.utils.types import DagRunType
 
 
+class TISchedulingDecision(NamedTuple):
+    """
+    Type of return for DagRun.task_instance_scheduling_decisions
+
+    This is only used by type checkers, at run time this is a plain dict.

Review comment:
       No, not any more. Good catch

##########
File path: airflow/models/dagrun.py
##########
@@ -638,3 +674,52 @@ def get_latest_runs(cls, session=None):
             .all()
         )
         return dagruns
+
+    @provide_session
+    def schedule_tis(self, schedulable_tis: Iterable[TI], session: Session = None) -> int:
+        """
+        Set the given task instances in to the scheduled state.
+
+        Each element of ``schedulable_tis`` should have it's ``task`` attribute already set.
+
+        Any DummyOperator without callbacks is instead set straight to the success state.
+
+        All the TIs should belong to this DagRun, but this code is in the hot-path, this is not checked -- it
+        is the caller's responsibility to call this function only with TIs from a single dag run.
+        """
+        # Get list of TIs that do not need to executed, these are
+        # tasks using DummyOperator and without on_execute_callback / on_success_callback
+        dummy_tis = {
+            ti for ti in schedulable_tis
+            if
+            (
+                ti.task.task_type == "DummyOperator"

Review comment:
       #11393

##########
File path: airflow/models/taskinstance.py
##########
@@ -1135,7 +1136,53 @@ def _run_raw_task(
         if not test_mode:
             session.add(Log(self.state, self))
             session.merge(self)
+
         session.commit()
+        if conf.getboolean('scheduler', 'schedule_after_task_execution', fallback=True):
+            from airflow.models.dagrun import DagRun  # Avoid circular import
+
+            try:
+                # Re-select the row with a lock
+                dag_run = with_row_locks(session.query(DagRun).filter_by(
+                    dag_id=self.dag_id,
+                    execution_date=self.execution_date,
+                )).one()
+
+                # Get a partial dag with just the specific tasks we want to
+                # examine. In order for dep checks to work correctly, we
+                # include ourself (so TriggerRuleDep can check the state of the
+                # task we just executed)
+                partial_dag = self.task.dag.partial_subset(
+                    self.task.downstream_task_ids,
+                    include_downstream=False,
+                    include_upstream=False,
+                    include_direct_upstream=True,
+                )
+
+                dag_run.dag = partial_dag
+                info = dag_run.task_instance_scheduling_decisions(session)
+
+                skippable_task_ids = {
+                    task_id
+                    for task_id in partial_dag.task_ids
+                    if task_id not in self.task.downstream_task_ids
+                }
+
+                schedulable_tis = [
+                    ti for ti in info.schedulable_tis if ti.task_id not in skippable_task_ids
+                ]
+                for schedulable_ti in schedulable_tis:
+                    if not hasattr(schedulable_ti, "task"):
+                        schedulable_ti.task = self.task.dag.get_task(schedulable_ti.task_id)
+
+                num = dag_run.schedule_tis(schedulable_tis)
+                self.log.info("%d downstream tasks scheduled from follow-on schedule check", num)
+
+                session.commit()
+            except OperationalError:
+                # Any kind of DB error here is _non fatal_ as this block is just an optimisation.
+                self.log.info("DB error when checking downstream tasks ignored", exc_info=True)

Review comment:
       The most likely case I expect here is a "cannot reach DB" network error.
   
   But yeah, I like your message better.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #11589: Perform "mini scheduling run" after task has finished

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #11589:
URL: https://github.com/apache/airflow/pull/11589#issuecomment-720559967


   The PR needs to run all tests because it modifies core of Airflow! Please rebase it to latest master or ask committer to re-run it!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #11589: Perform "mini scheduling run" after task has finished

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #11589:
URL: https://github.com/apache/airflow/pull/11589#issuecomment-717689029


   [The Workflow run](https://github.com/apache/airflow/actions/runs/332862385) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jhtimmins commented on a change in pull request #11589: Perform "mini scheduling run" after task has finished

Posted by GitBox <gi...@apache.org>.
jhtimmins commented on a change in pull request #11589:
URL: https://github.com/apache/airflow/pull/11589#discussion_r516181198



##########
File path: airflow/models/taskinstance.py
##########
@@ -1135,7 +1136,53 @@ def _run_raw_task(
         if not test_mode:
             session.add(Log(self.state, self))
             session.merge(self)
+
         session.commit()
+        if conf.getboolean('scheduler', 'schedule_after_task_execution', fallback=True):
+            from airflow.models.dagrun import DagRun  # Avoid circular import
+
+            try:
+                # Re-select the row with a lock
+                dag_run = with_row_locks(session.query(DagRun).filter_by(
+                    dag_id=self.dag_id,
+                    execution_date=self.execution_date,
+                )).one()
+
+                # Get a partial dag with just the specific tasks we want to
+                # examine. In order for dep checks to work correctly, we
+                # include ourself (so TriggerRuleDep can check the state of the
+                # task we just executed)
+                partial_dag = self.task.dag.partial_subset(
+                    self.task.downstream_task_ids,
+                    include_downstream=False,
+                    include_upstream=False,
+                    include_direct_upstream=True,
+                )
+
+                dag_run.dag = partial_dag
+                info = dag_run.task_instance_scheduling_decisions(session)
+
+                skippable_task_ids = {
+                    task_id
+                    for task_id in partial_dag.task_ids
+                    if task_id not in self.task.downstream_task_ids
+                }
+
+                schedulable_tis = [
+                    ti for ti in info.schedulable_tis if ti.task_id not in skippable_task_ids
+                ]
+                for schedulable_ti in schedulable_tis:
+                    if not hasattr(schedulable_ti, "task"):
+                        schedulable_ti.task = self.task.dag.get_task(schedulable_ti.task_id)
+
+                num = dag_run.schedule_tis(schedulable_tis)
+                self.log.info("%d downstream tasks scheduled from follow-on schedule check", num)
+
+                session.commit()
+            except OperationalError:
+                # Any kind of DB error here is _non fatal_ as this block is just an optimisation.
+                self.log.info("DB error when checking downstream tasks ignored", exc_info=True)
+                session.rollback()

Review comment:
       Yeah that's a good point. Will do.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jhtimmins commented on a change in pull request #11589: Perform "mini scheduling run" after task has finished

Posted by GitBox <gi...@apache.org>.
jhtimmins commented on a change in pull request #11589:
URL: https://github.com/apache/airflow/pull/11589#discussion_r512895746



##########
File path: airflow/models/dagrun.py
##########
@@ -380,27 +395,21 @@ def update_state(
         self.last_scheduling_decision = start_dttm
 
         dag = self.get_dag()
-        ready_tis: List[TI] = []
-        tis = list(self.get_task_instances(session=session, state=State.task_states + (State.SHUTDOWN,)))
-        self.log.debug("number of tis tasks for %s: %s task(s)", self, len(tis))
-        for ti in tis:
-            ti.task = dag.get_task(ti.task_id)
+        info = self.task_instance_scheduling_decisions(session)
+
+        tis = info['tis']
+        schedulable_tis = info['schedulable_tis']
+        changed_tis = info['changed_tis']
+        finished_tasks = info['finished_tasks']
+        unfinished_tasks = info['unfinished_tasks']
 
-        unfinished_tasks = [t for t in tis if t.state in State.unfinished]
-        finished_tasks = [t for t in tis if t.state in State.finished]
         none_depends_on_past = all(not t.task.depends_on_past for t in unfinished_tasks)
         none_task_concurrency = all(t.task.task_concurrency is None for t in unfinished_tasks)
-        if unfinished_tasks:
-            scheduleable_tasks = [ut for ut in unfinished_tasks if ut.state in SCHEDULEABLE_STATES]
-            self.log.debug(
-                "number of scheduleable tasks for %s: %s task(s)",
-                self, len(scheduleable_tasks))
-            ready_tis, changed_tis = self._get_ready_tis(scheduleable_tasks, finished_tasks, session)
-            self.log.debug("ready tis length for %s: %s task(s)", self, len(ready_tis))
-            if none_depends_on_past and none_task_concurrency:
-                # small speed up
-                are_runnable_tasks = ready_tis or self._are_premature_tis(
-                    unfinished_tasks, finished_tasks, session) or changed_tis
+
+        if unfinished_tasks and none_depends_on_past and none_task_concurrency:
+            # small speed up

Review comment:
       @ashb It looks like `are_runnable_tasks` isn't used until line 456. Is there a reason this conditional needs to be here rather than right before it's used?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #11589: WIP: Perform "mini scheduling run" after task has finished

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #11589:
URL: https://github.com/apache/airflow/pull/11589#issuecomment-717009726


   [The Workflow run](https://github.com/apache/airflow/actions/runs/330676749) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org