You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/09/17 01:18:34 UTC

[GitHub] [airflow] ephraimbuddy opened a new pull request #18310: Fix mini scheduler not respecting `wait_for_downstream` dependency

ephraimbuddy opened a new pull request #18310:
URL: https://github.com/apache/airflow/pull/18310


   When wait_for_downstream is set on a task, mini scheduler doesn't respect it
   and goes ahead to schedule unrunnable task instances.
   
   This PR fixes it by checking the dependency in mini scheduler
   
   Closes: https://github.com/apache/airflow/issues/18229
   
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #18310: Fix mini scheduler not respecting `wait_for_downstream` dependency

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #18310:
URL: https://github.com/apache/airflow/pull/18310#issuecomment-921521835


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy merged pull request #18310: Fix mini scheduler not respecting `wait_for_downstream` dependency

Posted by GitBox <gi...@apache.org>.
ephraimbuddy merged pull request #18310:
URL: https://github.com/apache/airflow/pull/18310


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #18310: Fix mini scheduler not respecting `wait_for_downstream` dependency

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #18310:
URL: https://github.com/apache/airflow/pull/18310#discussion_r711364412



##########
File path: airflow/jobs/local_task_job.py
##########
@@ -226,6 +226,18 @@ def heartbeat_callback(self, session=None):
     @Sentry.enrich_errors
     def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
         try:
+
+            if (
+                self.task_instance.task.wait_for_downstream
+                and self.task_instance.get_previous_ti()
+                and not self.task_instance.are_dependents_done()
+            ):

Review comment:
       Thanks Kaxil for looking into this 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #18310: Fix mini scheduler not respecting `wait_for_downstream` dependency

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #18310:
URL: https://github.com/apache/airflow/pull/18310#issuecomment-921380276


   You can reproduce the behaviour with this DAG:
   ```python
   import datetime
   from airflow import DAG
   from airflow.operators.bash import BashOperator
   
   
   default_args = {'wait_for_downstream': True, 'provide_context': False}
   
   with DAG(
       'test_downstream',
       start_date=datetime.datetime(2021,9,13),
       schedule_interval="@daily",
       default_args=default_args,
       catchup=True) as dag:
   
       task1 = BashOperator(task_id=f'task_1', bash_command='sleep 5')
   
       task2 = BashOperator(task_id=f'task_2', bash_command='sleep 5')
   
       task3 = BashOperator(task_id=f'task_3', bash_command='sleep 20')
   
       task4 = BashOperator(task_id=f'task_4', bash_command='sleep 5')
   
       task5 = BashOperator(task_id=f'task_5', bash_command='sleep 5')
       
       task6 = BashOperator(task_id='task_6', bash_command='sleep 5')
   
       task1 >> task2 >> task3 >> task4 >> task5 >> task6
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kosteev commented on a change in pull request #18310: Fix mini scheduler not respecting `wait_for_downstream` dependency

Posted by GitBox <gi...@apache.org>.
kosteev commented on a change in pull request #18310:
URL: https://github.com/apache/airflow/pull/18310#discussion_r711101901



##########
File path: airflow/jobs/local_task_job.py
##########
@@ -226,6 +226,18 @@ def heartbeat_callback(self, session=None):
     @Sentry.enrich_errors
     def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
         try:
+
+            if (
+                self.task_instance.task.wait_for_downstream
+                and self.task_instance.get_previous_ti()
+                and not self.task_instance.are_dependents_done()
+            ):

Review comment:
       Isn't this the logic that is done in "task_instance_scheduling_decisions" (line 252 below) that is calling "_get_ready_tis" and it checks for dependencies?
   Can you, please, explain why this check doesn't work there and instead of fixing it, we should do such check here. It seems like a bug in "task_instance_scheduling_decisions" in case of mini scheduler.
   @ephraimbuddy @kaxil




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #18310: Fix mini scheduler not respecting `wait_for_downstream` dependency

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #18310:
URL: https://github.com/apache/airflow/pull/18310#discussion_r711363194



##########
File path: airflow/jobs/local_task_job.py
##########
@@ -226,6 +226,18 @@ def heartbeat_callback(self, session=None):
     @Sentry.enrich_errors
     def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
         try:
+
+            if (
+                self.task_instance.task.wait_for_downstream
+                and self.task_instance.get_previous_ti()
+                and not self.task_instance.are_dependents_done()
+            ):

Review comment:
       This logic is needed here even though the task_instance_scheduling_decisions is also performing this logic. 
   
   With mini scheduler disabled, the scheduler works fine with wait_for_downstream set. 
   
   Probably a case of race condition?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #18310: Fix mini scheduler not respecting `wait_for_downstream` dependency

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #18310:
URL: https://github.com/apache/airflow/pull/18310#discussion_r710956225



##########
File path: airflow/jobs/local_task_job.py
##########
@@ -226,6 +226,18 @@ def heartbeat_callback(self, session=None):
     @Sentry.enrich_errors
     def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
         try:
+
+            if (
+                self.task_instance.task.wait_for_downstream
+                and self.task_instance.get_previous_ti()
+                and not self.task_instance.are_dependents_done()
+            ):
+                self.log.info(
+                    "No downstream tasks scheduled because task instance "
+                    "dependents are still running and wait_for_downstream is true"

Review comment:
       ```suggestion
                       "dependents have not completed yet and wait_for_downstream is true"
   ```
   
   as the task can be in queued or scheduled state too




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #18310: Fix mini scheduler not respecting `wait_for_downstream` dependency

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #18310:
URL: https://github.com/apache/airflow/pull/18310#discussion_r710965265



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -707,6 +707,48 @@ def test_fast_follow(
             if scheduler_job.processor_agent:
                 scheduler_job.processor_agent.end()
 
+    @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+    def test_mini_scheduler_works_with_wait_for_upstream(self, caplog, dag_maker):
+        session = settings.Session()
+        dep = {'A': 'B', 'B': 'C'}
+        with dag_maker(default_args={'wait_for_downstream': True}, catchup=False) as dag:
+            task_a = PythonOperator(task_id='A', python_callable=lambda: True)
+            task_b = PythonOperator(task_id='B', python_callable=lambda: True)
+            task_c = PythonOperator(task_id='C', python_callable=lambda: True)
+            for upstream, downstream in dep.items():
+                dag.set_dependency(upstream, downstream)
+
+        scheduler_job = SchedulerJob(subdir=os.devnull)
+        scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
+
+        dr = dag.create_dagrun(run_id='test_1', state=State.RUNNING, execution_date=DEFAULT_DATE)
+        dr2 = dag.create_dagrun(
+            run_id='test_2', state=State.RUNNING, execution_date=DEFAULT_DATE + datetime.timedelta(hours=1)
+        )
+        ti_a = TaskInstance(task_a, run_id=dr.run_id, state=State.SUCCESS)
+        ti_b = TaskInstance(task_b, run_id=dr.run_id, state=State.SUCCESS)
+        ti_c = TaskInstance(task_c, run_id=dr.run_id, state=State.RUNNING)
+        ti2_a = TaskInstance(task_a, run_id=dr2.run_id, state=State.NONE)
+        ti2_b = TaskInstance(task_b, run_id=dr2.run_id, state=State.NONE)
+        ti2_c = TaskInstance(task_c, run_id=dr2.run_id, state=State.NONE)
+        session.merge(ti_a)
+        session.merge(ti_b)
+        session.merge(ti_c)
+        session.merge(ti2_a)
+        session.merge(ti2_b)
+        session.merge(ti2_c)
+        session.flush()
+
+        job1 = LocalTaskJob(task_instance=ti2_a, ignore_ti_state=True, executor=SequentialExecutor())
+        job1.task_runner = StandardTaskRunner(job1)
+        settings.engine.dispose()
+        job1.run()
+        ti2_a.refresh_from_db()
+        assert (

Review comment:
       The state should be success here. It’s this task that would run the mini scheduler on exit




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #18310: Fix mini scheduler not respecting `wait_for_downstream` dependency

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #18310:
URL: https://github.com/apache/airflow/pull/18310#discussion_r711291951



##########
File path: airflow/jobs/local_task_job.py
##########
@@ -226,6 +226,18 @@ def heartbeat_callback(self, session=None):
     @Sentry.enrich_errors
     def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
         try:
+
+            if (
+                self.task_instance.task.wait_for_downstream
+                and self.task_instance.get_previous_ti()
+                and not self.task_instance.are_dependents_done()
+            ):

Review comment:
       Good point, I will take a look




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #18310: Fix mini scheduler not respecting `wait_for_downstream` dependency

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #18310:
URL: https://github.com/apache/airflow/pull/18310#discussion_r710960302



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -707,6 +707,48 @@ def test_fast_follow(
             if scheduler_job.processor_agent:
                 scheduler_job.processor_agent.end()
 
+    @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+    def test_mini_scheduler_works_with_wait_for_upstream(self, caplog, dag_maker):
+        session = settings.Session()
+        dep = {'A': 'B', 'B': 'C'}
+        with dag_maker(default_args={'wait_for_downstream': True}, catchup=False) as dag:
+            task_a = PythonOperator(task_id='A', python_callable=lambda: True)
+            task_b = PythonOperator(task_id='B', python_callable=lambda: True)
+            task_c = PythonOperator(task_id='C', python_callable=lambda: True)
+            for upstream, downstream in dep.items():
+                dag.set_dependency(upstream, downstream)
+
+        scheduler_job = SchedulerJob(subdir=os.devnull)
+        scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
+
+        dr = dag.create_dagrun(run_id='test_1', state=State.RUNNING, execution_date=DEFAULT_DATE)
+        dr2 = dag.create_dagrun(
+            run_id='test_2', state=State.RUNNING, execution_date=DEFAULT_DATE + datetime.timedelta(hours=1)
+        )
+        ti_a = TaskInstance(task_a, run_id=dr.run_id, state=State.SUCCESS)
+        ti_b = TaskInstance(task_b, run_id=dr.run_id, state=State.SUCCESS)
+        ti_c = TaskInstance(task_c, run_id=dr.run_id, state=State.RUNNING)
+        ti2_a = TaskInstance(task_a, run_id=dr2.run_id, state=State.NONE)
+        ti2_b = TaskInstance(task_b, run_id=dr2.run_id, state=State.NONE)
+        ti2_c = TaskInstance(task_c, run_id=dr2.run_id, state=State.NONE)
+        session.merge(ti_a)
+        session.merge(ti_b)
+        session.merge(ti_c)
+        session.merge(ti2_a)
+        session.merge(ti2_b)
+        session.merge(ti2_c)
+        session.flush()
+
+        job1 = LocalTaskJob(task_instance=ti2_a, ignore_ti_state=True, executor=SequentialExecutor())
+        job1.task_runner = StandardTaskRunner(job1)
+        settings.engine.dispose()
+        job1.run()
+        ti2_a.refresh_from_db()
+        assert (

Review comment:
       ```suggestion
           assert ti2_a.state=State.NONE
           assert (
   ```

##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -707,6 +707,48 @@ def test_fast_follow(
             if scheduler_job.processor_agent:
                 scheduler_job.processor_agent.end()
 
+    @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+    def test_mini_scheduler_works_with_wait_for_upstream(self, caplog, dag_maker):
+        session = settings.Session()
+        dep = {'A': 'B', 'B': 'C'}
+        with dag_maker(default_args={'wait_for_downstream': True}, catchup=False) as dag:
+            task_a = PythonOperator(task_id='A', python_callable=lambda: True)
+            task_b = PythonOperator(task_id='B', python_callable=lambda: True)
+            task_c = PythonOperator(task_id='C', python_callable=lambda: True)
+            for upstream, downstream in dep.items():
+                dag.set_dependency(upstream, downstream)
+
+        scheduler_job = SchedulerJob(subdir=os.devnull)
+        scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
+
+        dr = dag.create_dagrun(run_id='test_1', state=State.RUNNING, execution_date=DEFAULT_DATE)
+        dr2 = dag.create_dagrun(
+            run_id='test_2', state=State.RUNNING, execution_date=DEFAULT_DATE + datetime.timedelta(hours=1)
+        )
+        ti_a = TaskInstance(task_a, run_id=dr.run_id, state=State.SUCCESS)
+        ti_b = TaskInstance(task_b, run_id=dr.run_id, state=State.SUCCESS)
+        ti_c = TaskInstance(task_c, run_id=dr.run_id, state=State.RUNNING)
+        ti2_a = TaskInstance(task_a, run_id=dr2.run_id, state=State.NONE)
+        ti2_b = TaskInstance(task_b, run_id=dr2.run_id, state=State.NONE)
+        ti2_c = TaskInstance(task_c, run_id=dr2.run_id, state=State.NONE)
+        session.merge(ti_a)
+        session.merge(ti_b)
+        session.merge(ti_c)
+        session.merge(ti2_a)
+        session.merge(ti2_b)
+        session.merge(ti2_c)
+        session.flush()
+
+        job1 = LocalTaskJob(task_instance=ti2_a, ignore_ti_state=True, executor=SequentialExecutor())
+        job1.task_runner = StandardTaskRunner(job1)
+        settings.engine.dispose()

Review comment:
       why do we need this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #18310: Fix mini scheduler not respecting `wait_for_downstream` dependency

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #18310:
URL: https://github.com/apache/airflow/pull/18310#discussion_r710965460



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -707,6 +707,48 @@ def test_fast_follow(
             if scheduler_job.processor_agent:
                 scheduler_job.processor_agent.end()
 
+    @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+    def test_mini_scheduler_works_with_wait_for_upstream(self, caplog, dag_maker):
+        session = settings.Session()
+        dep = {'A': 'B', 'B': 'C'}
+        with dag_maker(default_args={'wait_for_downstream': True}, catchup=False) as dag:
+            task_a = PythonOperator(task_id='A', python_callable=lambda: True)
+            task_b = PythonOperator(task_id='B', python_callable=lambda: True)
+            task_c = PythonOperator(task_id='C', python_callable=lambda: True)
+            for upstream, downstream in dep.items():
+                dag.set_dependency(upstream, downstream)
+
+        scheduler_job = SchedulerJob(subdir=os.devnull)
+        scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
+
+        dr = dag.create_dagrun(run_id='test_1', state=State.RUNNING, execution_date=DEFAULT_DATE)
+        dr2 = dag.create_dagrun(
+            run_id='test_2', state=State.RUNNING, execution_date=DEFAULT_DATE + datetime.timedelta(hours=1)
+        )
+        ti_a = TaskInstance(task_a, run_id=dr.run_id, state=State.SUCCESS)
+        ti_b = TaskInstance(task_b, run_id=dr.run_id, state=State.SUCCESS)
+        ti_c = TaskInstance(task_c, run_id=dr.run_id, state=State.RUNNING)
+        ti2_a = TaskInstance(task_a, run_id=dr2.run_id, state=State.NONE)
+        ti2_b = TaskInstance(task_b, run_id=dr2.run_id, state=State.NONE)
+        ti2_c = TaskInstance(task_c, run_id=dr2.run_id, state=State.NONE)
+        session.merge(ti_a)
+        session.merge(ti_b)
+        session.merge(ti_c)
+        session.merge(ti2_a)
+        session.merge(ti2_b)
+        session.merge(ti2_c)
+        session.flush()
+
+        job1 = LocalTaskJob(task_instance=ti2_a, ignore_ti_state=True, executor=SequentialExecutor())
+        job1.task_runner = StandardTaskRunner(job1)
+        settings.engine.dispose()

Review comment:
       Copied :) will remove it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #18310: Fix mini scheduler not respecting `wait_for_downstream` dependency

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #18310:
URL: https://github.com/apache/airflow/pull/18310#discussion_r711362977



##########
File path: airflow/jobs/local_task_job.py
##########
@@ -226,6 +226,18 @@ def heartbeat_callback(self, session=None):
     @Sentry.enrich_errors
     def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
         try:
+
+            if (
+                self.task_instance.task.wait_for_downstream
+                and self.task_instance.get_previous_ti()
+                and not self.task_instance.are_dependents_done()
+            ):

Review comment:
       https://github.com/apache/airflow/pull/18338 should take care of it @kosteev 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #18310: Fix mini scheduler not respecting `wait_for_downstream` dependency

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #18310:
URL: https://github.com/apache/airflow/pull/18310#discussion_r710968692



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -707,6 +707,48 @@ def test_fast_follow(
             if scheduler_job.processor_agent:
                 scheduler_job.processor_agent.end()
 
+    @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+    def test_mini_scheduler_works_with_wait_for_upstream(self, caplog, dag_maker):
+        session = settings.Session()
+        dep = {'A': 'B', 'B': 'C'}
+        with dag_maker(default_args={'wait_for_downstream': True}, catchup=False) as dag:
+            task_a = PythonOperator(task_id='A', python_callable=lambda: True)
+            task_b = PythonOperator(task_id='B', python_callable=lambda: True)
+            task_c = PythonOperator(task_id='C', python_callable=lambda: True)
+            for upstream, downstream in dep.items():
+                dag.set_dependency(upstream, downstream)
+
+        scheduler_job = SchedulerJob(subdir=os.devnull)
+        scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
+
+        dr = dag.create_dagrun(run_id='test_1', state=State.RUNNING, execution_date=DEFAULT_DATE)
+        dr2 = dag.create_dagrun(
+            run_id='test_2', state=State.RUNNING, execution_date=DEFAULT_DATE + datetime.timedelta(hours=1)
+        )
+        ti_a = TaskInstance(task_a, run_id=dr.run_id, state=State.SUCCESS)
+        ti_b = TaskInstance(task_b, run_id=dr.run_id, state=State.SUCCESS)
+        ti_c = TaskInstance(task_c, run_id=dr.run_id, state=State.RUNNING)
+        ti2_a = TaskInstance(task_a, run_id=dr2.run_id, state=State.NONE)
+        ti2_b = TaskInstance(task_b, run_id=dr2.run_id, state=State.NONE)
+        ti2_c = TaskInstance(task_c, run_id=dr2.run_id, state=State.NONE)
+        session.merge(ti_a)
+        session.merge(ti_b)
+        session.merge(ti_c)
+        session.merge(ti2_a)
+        session.merge(ti2_b)
+        session.merge(ti2_c)
+        session.flush()
+
+        job1 = LocalTaskJob(task_instance=ti2_a, ignore_ti_state=True, executor=SequentialExecutor())
+        job1.task_runner = StandardTaskRunner(job1)
+        settings.engine.dispose()
+        job1.run()
+        ti2_a.refresh_from_db()
+        assert (

Review comment:
       my bad, I meant `ti2_b`:
   
   assert ti2_b.state=State.NONE




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #18310: Fix mini scheduler not respecting `wait_for_downstream` dependency

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #18310:
URL: https://github.com/apache/airflow/pull/18310#discussion_r710957411



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -707,6 +707,48 @@ def test_fast_follow(
             if scheduler_job.processor_agent:
                 scheduler_job.processor_agent.end()
 
+    @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+    def test_mini_scheduler_works_with_wait_for_upstream(self, caplog, dag_maker):
+        session = settings.Session()
+        dep = {'A': 'B', 'B': 'C'}
+        with dag_maker(default_args={'wait_for_downstream': True}, catchup=False) as dag:
+            task_a = PythonOperator(task_id='A', python_callable=lambda: True)
+            task_b = PythonOperator(task_id='B', python_callable=lambda: True)
+            task_c = PythonOperator(task_id='C', python_callable=lambda: True)
+            for upstream, downstream in dep.items():
+                dag.set_dependency(upstream, downstream)

Review comment:
       ```suggestion
           with dag_maker(default_args={'wait_for_downstream': True}, catchup=False) as dag:
               task_a = PythonOperator(task_id='A', python_callable=lambda: True)
               task_b = PythonOperator(task_id='B', python_callable=lambda: True)
               task_c = PythonOperator(task_id='C', python_callable=lambda: True)
               task_a >> task_b >> task_c
   ```
   
   for simplicity




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kosteev commented on a change in pull request #18310: Fix mini scheduler not respecting `wait_for_downstream` dependency

Posted by GitBox <gi...@apache.org>.
kosteev commented on a change in pull request #18310:
URL: https://github.com/apache/airflow/pull/18310#discussion_r711529659



##########
File path: airflow/jobs/local_task_job.py
##########
@@ -226,6 +226,18 @@ def heartbeat_callback(self, session=None):
     @Sentry.enrich_errors
     def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
         try:
+
+            if (
+                self.task_instance.task.wait_for_downstream
+                and self.task_instance.get_previous_ti()
+                and not self.task_instance.are_dependents_done()
+            ):

Review comment:
       Thank you guys!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #18310: Fix mini scheduler not respecting `wait_for_downstream` dependency

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #18310:
URL: https://github.com/apache/airflow/pull/18310#discussion_r710968692



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -707,6 +707,48 @@ def test_fast_follow(
             if scheduler_job.processor_agent:
                 scheduler_job.processor_agent.end()
 
+    @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+    def test_mini_scheduler_works_with_wait_for_upstream(self, caplog, dag_maker):
+        session = settings.Session()
+        dep = {'A': 'B', 'B': 'C'}
+        with dag_maker(default_args={'wait_for_downstream': True}, catchup=False) as dag:
+            task_a = PythonOperator(task_id='A', python_callable=lambda: True)
+            task_b = PythonOperator(task_id='B', python_callable=lambda: True)
+            task_c = PythonOperator(task_id='C', python_callable=lambda: True)
+            for upstream, downstream in dep.items():
+                dag.set_dependency(upstream, downstream)
+
+        scheduler_job = SchedulerJob(subdir=os.devnull)
+        scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
+
+        dr = dag.create_dagrun(run_id='test_1', state=State.RUNNING, execution_date=DEFAULT_DATE)
+        dr2 = dag.create_dagrun(
+            run_id='test_2', state=State.RUNNING, execution_date=DEFAULT_DATE + datetime.timedelta(hours=1)
+        )
+        ti_a = TaskInstance(task_a, run_id=dr.run_id, state=State.SUCCESS)
+        ti_b = TaskInstance(task_b, run_id=dr.run_id, state=State.SUCCESS)
+        ti_c = TaskInstance(task_c, run_id=dr.run_id, state=State.RUNNING)
+        ti2_a = TaskInstance(task_a, run_id=dr2.run_id, state=State.NONE)
+        ti2_b = TaskInstance(task_b, run_id=dr2.run_id, state=State.NONE)
+        ti2_c = TaskInstance(task_c, run_id=dr2.run_id, state=State.NONE)
+        session.merge(ti_a)
+        session.merge(ti_b)
+        session.merge(ti_c)
+        session.merge(ti2_a)
+        session.merge(ti2_b)
+        session.merge(ti2_c)
+        session.flush()
+
+        job1 = LocalTaskJob(task_instance=ti2_a, ignore_ti_state=True, executor=SequentialExecutor())
+        job1.task_runner = StandardTaskRunner(job1)
+        settings.engine.dispose()
+        job1.run()
+        ti2_a.refresh_from_db()
+        assert (

Review comment:
       my bad, I meant `ti2_b`:
   
   
   ```suggestion
           ti2_b.state=State.NONE
           assert (
   ```

##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -707,6 +707,48 @@ def test_fast_follow(
             if scheduler_job.processor_agent:
                 scheduler_job.processor_agent.end()
 
+    @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+    def test_mini_scheduler_works_with_wait_for_upstream(self, caplog, dag_maker):
+        session = settings.Session()
+        dep = {'A': 'B', 'B': 'C'}
+        with dag_maker(default_args={'wait_for_downstream': True}, catchup=False) as dag:
+            task_a = PythonOperator(task_id='A', python_callable=lambda: True)
+            task_b = PythonOperator(task_id='B', python_callable=lambda: True)
+            task_c = PythonOperator(task_id='C', python_callable=lambda: True)
+            for upstream, downstream in dep.items():
+                dag.set_dependency(upstream, downstream)
+
+        scheduler_job = SchedulerJob(subdir=os.devnull)
+        scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
+
+        dr = dag.create_dagrun(run_id='test_1', state=State.RUNNING, execution_date=DEFAULT_DATE)
+        dr2 = dag.create_dagrun(
+            run_id='test_2', state=State.RUNNING, execution_date=DEFAULT_DATE + datetime.timedelta(hours=1)
+        )
+        ti_a = TaskInstance(task_a, run_id=dr.run_id, state=State.SUCCESS)
+        ti_b = TaskInstance(task_b, run_id=dr.run_id, state=State.SUCCESS)
+        ti_c = TaskInstance(task_c, run_id=dr.run_id, state=State.RUNNING)
+        ti2_a = TaskInstance(task_a, run_id=dr2.run_id, state=State.NONE)
+        ti2_b = TaskInstance(task_b, run_id=dr2.run_id, state=State.NONE)
+        ti2_c = TaskInstance(task_c, run_id=dr2.run_id, state=State.NONE)
+        session.merge(ti_a)
+        session.merge(ti_b)
+        session.merge(ti_c)
+        session.merge(ti2_a)
+        session.merge(ti2_b)
+        session.merge(ti2_c)
+        session.flush()
+
+        job1 = LocalTaskJob(task_instance=ti2_a, ignore_ti_state=True, executor=SequentialExecutor())
+        job1.task_runner = StandardTaskRunner(job1)
+        settings.engine.dispose()
+        job1.run()
+        ti2_a.refresh_from_db()
+        assert (

Review comment:
       my bad, I meant `ti2_b`:
   
   
   ```suggestion
           assert ti2_b.state=State.NONE
           assert (
   ```

##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -707,6 +707,48 @@ def test_fast_follow(
             if scheduler_job.processor_agent:
                 scheduler_job.processor_agent.end()
 
+    @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+    def test_mini_scheduler_works_with_wait_for_upstream(self, caplog, dag_maker):
+        session = settings.Session()
+        dep = {'A': 'B', 'B': 'C'}
+        with dag_maker(default_args={'wait_for_downstream': True}, catchup=False) as dag:
+            task_a = PythonOperator(task_id='A', python_callable=lambda: True)
+            task_b = PythonOperator(task_id='B', python_callable=lambda: True)
+            task_c = PythonOperator(task_id='C', python_callable=lambda: True)
+            for upstream, downstream in dep.items():
+                dag.set_dependency(upstream, downstream)
+
+        scheduler_job = SchedulerJob(subdir=os.devnull)
+        scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
+
+        dr = dag.create_dagrun(run_id='test_1', state=State.RUNNING, execution_date=DEFAULT_DATE)
+        dr2 = dag.create_dagrun(
+            run_id='test_2', state=State.RUNNING, execution_date=DEFAULT_DATE + datetime.timedelta(hours=1)
+        )
+        ti_a = TaskInstance(task_a, run_id=dr.run_id, state=State.SUCCESS)
+        ti_b = TaskInstance(task_b, run_id=dr.run_id, state=State.SUCCESS)
+        ti_c = TaskInstance(task_c, run_id=dr.run_id, state=State.RUNNING)
+        ti2_a = TaskInstance(task_a, run_id=dr2.run_id, state=State.NONE)
+        ti2_b = TaskInstance(task_b, run_id=dr2.run_id, state=State.NONE)
+        ti2_c = TaskInstance(task_c, run_id=dr2.run_id, state=State.NONE)
+        session.merge(ti_a)
+        session.merge(ti_b)
+        session.merge(ti_c)
+        session.merge(ti2_a)
+        session.merge(ti2_b)
+        session.merge(ti2_c)
+        session.flush()
+
+        job1 = LocalTaskJob(task_instance=ti2_a, ignore_ti_state=True, executor=SequentialExecutor())
+        job1.task_runner = StandardTaskRunner(job1)
+        settings.engine.dispose()
+        job1.run()
+        ti2_a.refresh_from_db()
+        assert (

Review comment:
       my bad, I meant `ti2_b`:
   
   
   ```suggestion
           assert ti2_b.state == State.NONE
           assert (
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #18310: Fix mini scheduler not respecting `wait_for_downstream` dependency

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #18310:
URL: https://github.com/apache/airflow/pull/18310#discussion_r710960653



##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -707,6 +707,48 @@ def test_fast_follow(
             if scheduler_job.processor_agent:
                 scheduler_job.processor_agent.end()
 
+    @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+    def test_mini_scheduler_works_with_wait_for_upstream(self, caplog, dag_maker):
+        session = settings.Session()
+        dep = {'A': 'B', 'B': 'C'}
+        with dag_maker(default_args={'wait_for_downstream': True}, catchup=False) as dag:
+            task_a = PythonOperator(task_id='A', python_callable=lambda: True)
+            task_b = PythonOperator(task_id='B', python_callable=lambda: True)
+            task_c = PythonOperator(task_id='C', python_callable=lambda: True)
+            for upstream, downstream in dep.items():
+                dag.set_dependency(upstream, downstream)
+
+        scheduler_job = SchedulerJob(subdir=os.devnull)
+        scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
+
+        dr = dag.create_dagrun(run_id='test_1', state=State.RUNNING, execution_date=DEFAULT_DATE)
+        dr2 = dag.create_dagrun(
+            run_id='test_2', state=State.RUNNING, execution_date=DEFAULT_DATE + datetime.timedelta(hours=1)
+        )
+        ti_a = TaskInstance(task_a, run_id=dr.run_id, state=State.SUCCESS)
+        ti_b = TaskInstance(task_b, run_id=dr.run_id, state=State.SUCCESS)
+        ti_c = TaskInstance(task_c, run_id=dr.run_id, state=State.RUNNING)
+        ti2_a = TaskInstance(task_a, run_id=dr2.run_id, state=State.NONE)
+        ti2_b = TaskInstance(task_b, run_id=dr2.run_id, state=State.NONE)
+        ti2_c = TaskInstance(task_c, run_id=dr2.run_id, state=State.NONE)
+        session.merge(ti_a)
+        session.merge(ti_b)
+        session.merge(ti_c)
+        session.merge(ti2_a)
+        session.merge(ti2_b)
+        session.merge(ti2_c)
+        session.flush()
+
+        job1 = LocalTaskJob(task_instance=ti2_a, ignore_ti_state=True, executor=SequentialExecutor())
+        job1.task_runner = StandardTaskRunner(job1)
+        settings.engine.dispose()
+        job1.run()
+        ti2_a.refresh_from_db()
+        assert (
+            "No downstream tasks scheduled because task instance "
+            "dependents are still running and wait_for_downstream is true"

Review comment:
       If you update https://github.com/apache/airflow/pull/18310/files#r710956225 you will have to update this message too




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #18310: Fix mini scheduler not respecting `wait_for_downstream` dependency

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #18310:
URL: https://github.com/apache/airflow/pull/18310#discussion_r711362977



##########
File path: airflow/jobs/local_task_job.py
##########
@@ -226,6 +226,18 @@ def heartbeat_callback(self, session=None):
     @Sentry.enrich_errors
     def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
         try:
+
+            if (
+                self.task_instance.task.wait_for_downstream
+                and self.task_instance.get_previous_ti()
+                and not self.task_instance.are_dependents_done()
+            ):

Review comment:
       https://github.com/apache/airflow/pull/18338 should take care of it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org