You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/09/17 01:18:34 UTC
[GitHub] [airflow] ephraimbuddy opened a new pull request #18310: Fix mini scheduler not respecting `wait_for_downstream` dependency
ephraimbuddy opened a new pull request #18310:
URL: https://github.com/apache/airflow/pull/18310
When wait_for_downstream is set on a task, mini scheduler doesn't respect it
and goes ahead to schedule unrunnable task instances.
This PR fixes it by checking the dependency in mini scheduler
Closes: https://github.com/apache/airflow/issues/18229
---
**^ Add meaningful description above**
Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] github-actions[bot] commented on pull request #18310: Fix mini scheduler not respecting `wait_for_downstream` dependency
Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #18310:
URL: https://github.com/apache/airflow/pull/18310#issuecomment-921521835
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ephraimbuddy merged pull request #18310: Fix mini scheduler not respecting `wait_for_downstream` dependency
Posted by GitBox <gi...@apache.org>.
ephraimbuddy merged pull request #18310:
URL: https://github.com/apache/airflow/pull/18310
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ephraimbuddy commented on a change in pull request #18310: Fix mini scheduler not respecting `wait_for_downstream` dependency
Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #18310:
URL: https://github.com/apache/airflow/pull/18310#discussion_r711364412
##########
File path: airflow/jobs/local_task_job.py
##########
@@ -226,6 +226,18 @@ def heartbeat_callback(self, session=None):
@Sentry.enrich_errors
def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
try:
+
+ if (
+ self.task_instance.task.wait_for_downstream
+ and self.task_instance.get_previous_ti()
+ and not self.task_instance.are_dependents_done()
+ ):
Review comment:
Thanks Kaxil for looking into this
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ephraimbuddy commented on pull request #18310: Fix mini scheduler not respecting `wait_for_downstream` dependency
Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #18310:
URL: https://github.com/apache/airflow/pull/18310#issuecomment-921380276
You can reproduce the behaviour with this DAG:
```python
import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
default_args = {'wait_for_downstream': True, 'provide_context': False}
with DAG(
'test_downstream',
start_date=datetime.datetime(2021,9,13),
schedule_interval="@daily",
default_args=default_args,
catchup=True) as dag:
task1 = BashOperator(task_id=f'task_1', bash_command='sleep 5')
task2 = BashOperator(task_id=f'task_2', bash_command='sleep 5')
task3 = BashOperator(task_id=f'task_3', bash_command='sleep 20')
task4 = BashOperator(task_id=f'task_4', bash_command='sleep 5')
task5 = BashOperator(task_id=f'task_5', bash_command='sleep 5')
task6 = BashOperator(task_id='task_6', bash_command='sleep 5')
task1 >> task2 >> task3 >> task4 >> task5 >> task6
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kosteev commented on a change in pull request #18310: Fix mini scheduler not respecting `wait_for_downstream` dependency
Posted by GitBox <gi...@apache.org>.
kosteev commented on a change in pull request #18310:
URL: https://github.com/apache/airflow/pull/18310#discussion_r711101901
##########
File path: airflow/jobs/local_task_job.py
##########
@@ -226,6 +226,18 @@ def heartbeat_callback(self, session=None):
@Sentry.enrich_errors
def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
try:
+
+ if (
+ self.task_instance.task.wait_for_downstream
+ and self.task_instance.get_previous_ti()
+ and not self.task_instance.are_dependents_done()
+ ):
Review comment:
Isn't this the logic that is done in "task_instance_scheduling_decisions" (line 252 below) that is calling "_get_ready_tis" and it checks for dependencies?
Can you, please, explain why this check doesn't work there and instead of fixing it, we should do such check here. It seems like a bug in "task_instance_scheduling_decisions" in case of mini scheduler.
@ephraimbuddy @kaxil
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ephraimbuddy commented on a change in pull request #18310: Fix mini scheduler not respecting `wait_for_downstream` dependency
Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #18310:
URL: https://github.com/apache/airflow/pull/18310#discussion_r711363194
##########
File path: airflow/jobs/local_task_job.py
##########
@@ -226,6 +226,18 @@ def heartbeat_callback(self, session=None):
@Sentry.enrich_errors
def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
try:
+
+ if (
+ self.task_instance.task.wait_for_downstream
+ and self.task_instance.get_previous_ti()
+ and not self.task_instance.are_dependents_done()
+ ):
Review comment:
This logic is needed here even though the task_instance_scheduling_decisions is also performing this logic.
With mini scheduler disabled, the scheduler works fine with wait_for_downstream set.
Probably a case of race condition?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on a change in pull request #18310: Fix mini scheduler not respecting `wait_for_downstream` dependency
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #18310:
URL: https://github.com/apache/airflow/pull/18310#discussion_r710956225
##########
File path: airflow/jobs/local_task_job.py
##########
@@ -226,6 +226,18 @@ def heartbeat_callback(self, session=None):
@Sentry.enrich_errors
def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
try:
+
+ if (
+ self.task_instance.task.wait_for_downstream
+ and self.task_instance.get_previous_ti()
+ and not self.task_instance.are_dependents_done()
+ ):
+ self.log.info(
+ "No downstream tasks scheduled because task instance "
+ "dependents are still running and wait_for_downstream is true"
Review comment:
```suggestion
"dependents have not completed yet and wait_for_downstream is true"
```
as the task can be in queued or scheduled state too
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ephraimbuddy commented on a change in pull request #18310: Fix mini scheduler not respecting `wait_for_downstream` dependency
Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #18310:
URL: https://github.com/apache/airflow/pull/18310#discussion_r710965265
##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -707,6 +707,48 @@ def test_fast_follow(
if scheduler_job.processor_agent:
scheduler_job.processor_agent.end()
+ @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+ def test_mini_scheduler_works_with_wait_for_upstream(self, caplog, dag_maker):
+ session = settings.Session()
+ dep = {'A': 'B', 'B': 'C'}
+ with dag_maker(default_args={'wait_for_downstream': True}, catchup=False) as dag:
+ task_a = PythonOperator(task_id='A', python_callable=lambda: True)
+ task_b = PythonOperator(task_id='B', python_callable=lambda: True)
+ task_c = PythonOperator(task_id='C', python_callable=lambda: True)
+ for upstream, downstream in dep.items():
+ dag.set_dependency(upstream, downstream)
+
+ scheduler_job = SchedulerJob(subdir=os.devnull)
+ scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
+
+ dr = dag.create_dagrun(run_id='test_1', state=State.RUNNING, execution_date=DEFAULT_DATE)
+ dr2 = dag.create_dagrun(
+ run_id='test_2', state=State.RUNNING, execution_date=DEFAULT_DATE + datetime.timedelta(hours=1)
+ )
+ ti_a = TaskInstance(task_a, run_id=dr.run_id, state=State.SUCCESS)
+ ti_b = TaskInstance(task_b, run_id=dr.run_id, state=State.SUCCESS)
+ ti_c = TaskInstance(task_c, run_id=dr.run_id, state=State.RUNNING)
+ ti2_a = TaskInstance(task_a, run_id=dr2.run_id, state=State.NONE)
+ ti2_b = TaskInstance(task_b, run_id=dr2.run_id, state=State.NONE)
+ ti2_c = TaskInstance(task_c, run_id=dr2.run_id, state=State.NONE)
+ session.merge(ti_a)
+ session.merge(ti_b)
+ session.merge(ti_c)
+ session.merge(ti2_a)
+ session.merge(ti2_b)
+ session.merge(ti2_c)
+ session.flush()
+
+ job1 = LocalTaskJob(task_instance=ti2_a, ignore_ti_state=True, executor=SequentialExecutor())
+ job1.task_runner = StandardTaskRunner(job1)
+ settings.engine.dispose()
+ job1.run()
+ ti2_a.refresh_from_db()
+ assert (
Review comment:
The state should be success here. It’s this task that would run the mini scheduler on exit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on a change in pull request #18310: Fix mini scheduler not respecting `wait_for_downstream` dependency
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #18310:
URL: https://github.com/apache/airflow/pull/18310#discussion_r711291951
##########
File path: airflow/jobs/local_task_job.py
##########
@@ -226,6 +226,18 @@ def heartbeat_callback(self, session=None):
@Sentry.enrich_errors
def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
try:
+
+ if (
+ self.task_instance.task.wait_for_downstream
+ and self.task_instance.get_previous_ti()
+ and not self.task_instance.are_dependents_done()
+ ):
Review comment:
Good point, I will take a look
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on a change in pull request #18310: Fix mini scheduler not respecting `wait_for_downstream` dependency
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #18310:
URL: https://github.com/apache/airflow/pull/18310#discussion_r710960302
##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -707,6 +707,48 @@ def test_fast_follow(
if scheduler_job.processor_agent:
scheduler_job.processor_agent.end()
+ @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+ def test_mini_scheduler_works_with_wait_for_upstream(self, caplog, dag_maker):
+ session = settings.Session()
+ dep = {'A': 'B', 'B': 'C'}
+ with dag_maker(default_args={'wait_for_downstream': True}, catchup=False) as dag:
+ task_a = PythonOperator(task_id='A', python_callable=lambda: True)
+ task_b = PythonOperator(task_id='B', python_callable=lambda: True)
+ task_c = PythonOperator(task_id='C', python_callable=lambda: True)
+ for upstream, downstream in dep.items():
+ dag.set_dependency(upstream, downstream)
+
+ scheduler_job = SchedulerJob(subdir=os.devnull)
+ scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
+
+ dr = dag.create_dagrun(run_id='test_1', state=State.RUNNING, execution_date=DEFAULT_DATE)
+ dr2 = dag.create_dagrun(
+ run_id='test_2', state=State.RUNNING, execution_date=DEFAULT_DATE + datetime.timedelta(hours=1)
+ )
+ ti_a = TaskInstance(task_a, run_id=dr.run_id, state=State.SUCCESS)
+ ti_b = TaskInstance(task_b, run_id=dr.run_id, state=State.SUCCESS)
+ ti_c = TaskInstance(task_c, run_id=dr.run_id, state=State.RUNNING)
+ ti2_a = TaskInstance(task_a, run_id=dr2.run_id, state=State.NONE)
+ ti2_b = TaskInstance(task_b, run_id=dr2.run_id, state=State.NONE)
+ ti2_c = TaskInstance(task_c, run_id=dr2.run_id, state=State.NONE)
+ session.merge(ti_a)
+ session.merge(ti_b)
+ session.merge(ti_c)
+ session.merge(ti2_a)
+ session.merge(ti2_b)
+ session.merge(ti2_c)
+ session.flush()
+
+ job1 = LocalTaskJob(task_instance=ti2_a, ignore_ti_state=True, executor=SequentialExecutor())
+ job1.task_runner = StandardTaskRunner(job1)
+ settings.engine.dispose()
+ job1.run()
+ ti2_a.refresh_from_db()
+ assert (
Review comment:
```suggestion
assert ti2_a.state=State.NONE
assert (
```
##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -707,6 +707,48 @@ def test_fast_follow(
if scheduler_job.processor_agent:
scheduler_job.processor_agent.end()
+ @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+ def test_mini_scheduler_works_with_wait_for_upstream(self, caplog, dag_maker):
+ session = settings.Session()
+ dep = {'A': 'B', 'B': 'C'}
+ with dag_maker(default_args={'wait_for_downstream': True}, catchup=False) as dag:
+ task_a = PythonOperator(task_id='A', python_callable=lambda: True)
+ task_b = PythonOperator(task_id='B', python_callable=lambda: True)
+ task_c = PythonOperator(task_id='C', python_callable=lambda: True)
+ for upstream, downstream in dep.items():
+ dag.set_dependency(upstream, downstream)
+
+ scheduler_job = SchedulerJob(subdir=os.devnull)
+ scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
+
+ dr = dag.create_dagrun(run_id='test_1', state=State.RUNNING, execution_date=DEFAULT_DATE)
+ dr2 = dag.create_dagrun(
+ run_id='test_2', state=State.RUNNING, execution_date=DEFAULT_DATE + datetime.timedelta(hours=1)
+ )
+ ti_a = TaskInstance(task_a, run_id=dr.run_id, state=State.SUCCESS)
+ ti_b = TaskInstance(task_b, run_id=dr.run_id, state=State.SUCCESS)
+ ti_c = TaskInstance(task_c, run_id=dr.run_id, state=State.RUNNING)
+ ti2_a = TaskInstance(task_a, run_id=dr2.run_id, state=State.NONE)
+ ti2_b = TaskInstance(task_b, run_id=dr2.run_id, state=State.NONE)
+ ti2_c = TaskInstance(task_c, run_id=dr2.run_id, state=State.NONE)
+ session.merge(ti_a)
+ session.merge(ti_b)
+ session.merge(ti_c)
+ session.merge(ti2_a)
+ session.merge(ti2_b)
+ session.merge(ti2_c)
+ session.flush()
+
+ job1 = LocalTaskJob(task_instance=ti2_a, ignore_ti_state=True, executor=SequentialExecutor())
+ job1.task_runner = StandardTaskRunner(job1)
+ settings.engine.dispose()
Review comment:
why do we need this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ephraimbuddy commented on a change in pull request #18310: Fix mini scheduler not respecting `wait_for_downstream` dependency
Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #18310:
URL: https://github.com/apache/airflow/pull/18310#discussion_r710965460
##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -707,6 +707,48 @@ def test_fast_follow(
if scheduler_job.processor_agent:
scheduler_job.processor_agent.end()
+ @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+ def test_mini_scheduler_works_with_wait_for_upstream(self, caplog, dag_maker):
+ session = settings.Session()
+ dep = {'A': 'B', 'B': 'C'}
+ with dag_maker(default_args={'wait_for_downstream': True}, catchup=False) as dag:
+ task_a = PythonOperator(task_id='A', python_callable=lambda: True)
+ task_b = PythonOperator(task_id='B', python_callable=lambda: True)
+ task_c = PythonOperator(task_id='C', python_callable=lambda: True)
+ for upstream, downstream in dep.items():
+ dag.set_dependency(upstream, downstream)
+
+ scheduler_job = SchedulerJob(subdir=os.devnull)
+ scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
+
+ dr = dag.create_dagrun(run_id='test_1', state=State.RUNNING, execution_date=DEFAULT_DATE)
+ dr2 = dag.create_dagrun(
+ run_id='test_2', state=State.RUNNING, execution_date=DEFAULT_DATE + datetime.timedelta(hours=1)
+ )
+ ti_a = TaskInstance(task_a, run_id=dr.run_id, state=State.SUCCESS)
+ ti_b = TaskInstance(task_b, run_id=dr.run_id, state=State.SUCCESS)
+ ti_c = TaskInstance(task_c, run_id=dr.run_id, state=State.RUNNING)
+ ti2_a = TaskInstance(task_a, run_id=dr2.run_id, state=State.NONE)
+ ti2_b = TaskInstance(task_b, run_id=dr2.run_id, state=State.NONE)
+ ti2_c = TaskInstance(task_c, run_id=dr2.run_id, state=State.NONE)
+ session.merge(ti_a)
+ session.merge(ti_b)
+ session.merge(ti_c)
+ session.merge(ti2_a)
+ session.merge(ti2_b)
+ session.merge(ti2_c)
+ session.flush()
+
+ job1 = LocalTaskJob(task_instance=ti2_a, ignore_ti_state=True, executor=SequentialExecutor())
+ job1.task_runner = StandardTaskRunner(job1)
+ settings.engine.dispose()
Review comment:
Copied :) will remove it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on a change in pull request #18310: Fix mini scheduler not respecting `wait_for_downstream` dependency
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #18310:
URL: https://github.com/apache/airflow/pull/18310#discussion_r711362977
##########
File path: airflow/jobs/local_task_job.py
##########
@@ -226,6 +226,18 @@ def heartbeat_callback(self, session=None):
@Sentry.enrich_errors
def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
try:
+
+ if (
+ self.task_instance.task.wait_for_downstream
+ and self.task_instance.get_previous_ti()
+ and not self.task_instance.are_dependents_done()
+ ):
Review comment:
https://github.com/apache/airflow/pull/18338 should take care of it @kosteev
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on a change in pull request #18310: Fix mini scheduler not respecting `wait_for_downstream` dependency
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #18310:
URL: https://github.com/apache/airflow/pull/18310#discussion_r710968692
##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -707,6 +707,48 @@ def test_fast_follow(
if scheduler_job.processor_agent:
scheduler_job.processor_agent.end()
+ @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+ def test_mini_scheduler_works_with_wait_for_upstream(self, caplog, dag_maker):
+ session = settings.Session()
+ dep = {'A': 'B', 'B': 'C'}
+ with dag_maker(default_args={'wait_for_downstream': True}, catchup=False) as dag:
+ task_a = PythonOperator(task_id='A', python_callable=lambda: True)
+ task_b = PythonOperator(task_id='B', python_callable=lambda: True)
+ task_c = PythonOperator(task_id='C', python_callable=lambda: True)
+ for upstream, downstream in dep.items():
+ dag.set_dependency(upstream, downstream)
+
+ scheduler_job = SchedulerJob(subdir=os.devnull)
+ scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
+
+ dr = dag.create_dagrun(run_id='test_1', state=State.RUNNING, execution_date=DEFAULT_DATE)
+ dr2 = dag.create_dagrun(
+ run_id='test_2', state=State.RUNNING, execution_date=DEFAULT_DATE + datetime.timedelta(hours=1)
+ )
+ ti_a = TaskInstance(task_a, run_id=dr.run_id, state=State.SUCCESS)
+ ti_b = TaskInstance(task_b, run_id=dr.run_id, state=State.SUCCESS)
+ ti_c = TaskInstance(task_c, run_id=dr.run_id, state=State.RUNNING)
+ ti2_a = TaskInstance(task_a, run_id=dr2.run_id, state=State.NONE)
+ ti2_b = TaskInstance(task_b, run_id=dr2.run_id, state=State.NONE)
+ ti2_c = TaskInstance(task_c, run_id=dr2.run_id, state=State.NONE)
+ session.merge(ti_a)
+ session.merge(ti_b)
+ session.merge(ti_c)
+ session.merge(ti2_a)
+ session.merge(ti2_b)
+ session.merge(ti2_c)
+ session.flush()
+
+ job1 = LocalTaskJob(task_instance=ti2_a, ignore_ti_state=True, executor=SequentialExecutor())
+ job1.task_runner = StandardTaskRunner(job1)
+ settings.engine.dispose()
+ job1.run()
+ ti2_a.refresh_from_db()
+ assert (
Review comment:
my bad, I meant `ti2_b`:
assert ti2_b.state=State.NONE
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on a change in pull request #18310: Fix mini scheduler not respecting `wait_for_downstream` dependency
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #18310:
URL: https://github.com/apache/airflow/pull/18310#discussion_r710957411
##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -707,6 +707,48 @@ def test_fast_follow(
if scheduler_job.processor_agent:
scheduler_job.processor_agent.end()
+ @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+ def test_mini_scheduler_works_with_wait_for_upstream(self, caplog, dag_maker):
+ session = settings.Session()
+ dep = {'A': 'B', 'B': 'C'}
+ with dag_maker(default_args={'wait_for_downstream': True}, catchup=False) as dag:
+ task_a = PythonOperator(task_id='A', python_callable=lambda: True)
+ task_b = PythonOperator(task_id='B', python_callable=lambda: True)
+ task_c = PythonOperator(task_id='C', python_callable=lambda: True)
+ for upstream, downstream in dep.items():
+ dag.set_dependency(upstream, downstream)
Review comment:
```suggestion
with dag_maker(default_args={'wait_for_downstream': True}, catchup=False) as dag:
task_a = PythonOperator(task_id='A', python_callable=lambda: True)
task_b = PythonOperator(task_id='B', python_callable=lambda: True)
task_c = PythonOperator(task_id='C', python_callable=lambda: True)
task_a >> task_b >> task_c
```
for simplicity
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kosteev commented on a change in pull request #18310: Fix mini scheduler not respecting `wait_for_downstream` dependency
Posted by GitBox <gi...@apache.org>.
kosteev commented on a change in pull request #18310:
URL: https://github.com/apache/airflow/pull/18310#discussion_r711529659
##########
File path: airflow/jobs/local_task_job.py
##########
@@ -226,6 +226,18 @@ def heartbeat_callback(self, session=None):
@Sentry.enrich_errors
def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
try:
+
+ if (
+ self.task_instance.task.wait_for_downstream
+ and self.task_instance.get_previous_ti()
+ and not self.task_instance.are_dependents_done()
+ ):
Review comment:
Thank you guys!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on a change in pull request #18310: Fix mini scheduler not respecting `wait_for_downstream` dependency
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #18310:
URL: https://github.com/apache/airflow/pull/18310#discussion_r710968692
##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -707,6 +707,48 @@ def test_fast_follow(
if scheduler_job.processor_agent:
scheduler_job.processor_agent.end()
+ @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+ def test_mini_scheduler_works_with_wait_for_upstream(self, caplog, dag_maker):
+ session = settings.Session()
+ dep = {'A': 'B', 'B': 'C'}
+ with dag_maker(default_args={'wait_for_downstream': True}, catchup=False) as dag:
+ task_a = PythonOperator(task_id='A', python_callable=lambda: True)
+ task_b = PythonOperator(task_id='B', python_callable=lambda: True)
+ task_c = PythonOperator(task_id='C', python_callable=lambda: True)
+ for upstream, downstream in dep.items():
+ dag.set_dependency(upstream, downstream)
+
+ scheduler_job = SchedulerJob(subdir=os.devnull)
+ scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
+
+ dr = dag.create_dagrun(run_id='test_1', state=State.RUNNING, execution_date=DEFAULT_DATE)
+ dr2 = dag.create_dagrun(
+ run_id='test_2', state=State.RUNNING, execution_date=DEFAULT_DATE + datetime.timedelta(hours=1)
+ )
+ ti_a = TaskInstance(task_a, run_id=dr.run_id, state=State.SUCCESS)
+ ti_b = TaskInstance(task_b, run_id=dr.run_id, state=State.SUCCESS)
+ ti_c = TaskInstance(task_c, run_id=dr.run_id, state=State.RUNNING)
+ ti2_a = TaskInstance(task_a, run_id=dr2.run_id, state=State.NONE)
+ ti2_b = TaskInstance(task_b, run_id=dr2.run_id, state=State.NONE)
+ ti2_c = TaskInstance(task_c, run_id=dr2.run_id, state=State.NONE)
+ session.merge(ti_a)
+ session.merge(ti_b)
+ session.merge(ti_c)
+ session.merge(ti2_a)
+ session.merge(ti2_b)
+ session.merge(ti2_c)
+ session.flush()
+
+ job1 = LocalTaskJob(task_instance=ti2_a, ignore_ti_state=True, executor=SequentialExecutor())
+ job1.task_runner = StandardTaskRunner(job1)
+ settings.engine.dispose()
+ job1.run()
+ ti2_a.refresh_from_db()
+ assert (
Review comment:
my bad, I meant `ti2_b`:
```suggestion
ti2_b.state=State.NONE
assert (
```
##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -707,6 +707,48 @@ def test_fast_follow(
if scheduler_job.processor_agent:
scheduler_job.processor_agent.end()
+ @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+ def test_mini_scheduler_works_with_wait_for_upstream(self, caplog, dag_maker):
+ session = settings.Session()
+ dep = {'A': 'B', 'B': 'C'}
+ with dag_maker(default_args={'wait_for_downstream': True}, catchup=False) as dag:
+ task_a = PythonOperator(task_id='A', python_callable=lambda: True)
+ task_b = PythonOperator(task_id='B', python_callable=lambda: True)
+ task_c = PythonOperator(task_id='C', python_callable=lambda: True)
+ for upstream, downstream in dep.items():
+ dag.set_dependency(upstream, downstream)
+
+ scheduler_job = SchedulerJob(subdir=os.devnull)
+ scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
+
+ dr = dag.create_dagrun(run_id='test_1', state=State.RUNNING, execution_date=DEFAULT_DATE)
+ dr2 = dag.create_dagrun(
+ run_id='test_2', state=State.RUNNING, execution_date=DEFAULT_DATE + datetime.timedelta(hours=1)
+ )
+ ti_a = TaskInstance(task_a, run_id=dr.run_id, state=State.SUCCESS)
+ ti_b = TaskInstance(task_b, run_id=dr.run_id, state=State.SUCCESS)
+ ti_c = TaskInstance(task_c, run_id=dr.run_id, state=State.RUNNING)
+ ti2_a = TaskInstance(task_a, run_id=dr2.run_id, state=State.NONE)
+ ti2_b = TaskInstance(task_b, run_id=dr2.run_id, state=State.NONE)
+ ti2_c = TaskInstance(task_c, run_id=dr2.run_id, state=State.NONE)
+ session.merge(ti_a)
+ session.merge(ti_b)
+ session.merge(ti_c)
+ session.merge(ti2_a)
+ session.merge(ti2_b)
+ session.merge(ti2_c)
+ session.flush()
+
+ job1 = LocalTaskJob(task_instance=ti2_a, ignore_ti_state=True, executor=SequentialExecutor())
+ job1.task_runner = StandardTaskRunner(job1)
+ settings.engine.dispose()
+ job1.run()
+ ti2_a.refresh_from_db()
+ assert (
Review comment:
my bad, I meant `ti2_b`:
```suggestion
assert ti2_b.state=State.NONE
assert (
```
##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -707,6 +707,48 @@ def test_fast_follow(
if scheduler_job.processor_agent:
scheduler_job.processor_agent.end()
+ @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+ def test_mini_scheduler_works_with_wait_for_upstream(self, caplog, dag_maker):
+ session = settings.Session()
+ dep = {'A': 'B', 'B': 'C'}
+ with dag_maker(default_args={'wait_for_downstream': True}, catchup=False) as dag:
+ task_a = PythonOperator(task_id='A', python_callable=lambda: True)
+ task_b = PythonOperator(task_id='B', python_callable=lambda: True)
+ task_c = PythonOperator(task_id='C', python_callable=lambda: True)
+ for upstream, downstream in dep.items():
+ dag.set_dependency(upstream, downstream)
+
+ scheduler_job = SchedulerJob(subdir=os.devnull)
+ scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
+
+ dr = dag.create_dagrun(run_id='test_1', state=State.RUNNING, execution_date=DEFAULT_DATE)
+ dr2 = dag.create_dagrun(
+ run_id='test_2', state=State.RUNNING, execution_date=DEFAULT_DATE + datetime.timedelta(hours=1)
+ )
+ ti_a = TaskInstance(task_a, run_id=dr.run_id, state=State.SUCCESS)
+ ti_b = TaskInstance(task_b, run_id=dr.run_id, state=State.SUCCESS)
+ ti_c = TaskInstance(task_c, run_id=dr.run_id, state=State.RUNNING)
+ ti2_a = TaskInstance(task_a, run_id=dr2.run_id, state=State.NONE)
+ ti2_b = TaskInstance(task_b, run_id=dr2.run_id, state=State.NONE)
+ ti2_c = TaskInstance(task_c, run_id=dr2.run_id, state=State.NONE)
+ session.merge(ti_a)
+ session.merge(ti_b)
+ session.merge(ti_c)
+ session.merge(ti2_a)
+ session.merge(ti2_b)
+ session.merge(ti2_c)
+ session.flush()
+
+ job1 = LocalTaskJob(task_instance=ti2_a, ignore_ti_state=True, executor=SequentialExecutor())
+ job1.task_runner = StandardTaskRunner(job1)
+ settings.engine.dispose()
+ job1.run()
+ ti2_a.refresh_from_db()
+ assert (
Review comment:
my bad, I meant `ti2_b`:
```suggestion
assert ti2_b.state == State.NONE
assert (
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on a change in pull request #18310: Fix mini scheduler not respecting `wait_for_downstream` dependency
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #18310:
URL: https://github.com/apache/airflow/pull/18310#discussion_r710960653
##########
File path: tests/jobs/test_local_task_job.py
##########
@@ -707,6 +707,48 @@ def test_fast_follow(
if scheduler_job.processor_agent:
scheduler_job.processor_agent.end()
+ @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+ def test_mini_scheduler_works_with_wait_for_upstream(self, caplog, dag_maker):
+ session = settings.Session()
+ dep = {'A': 'B', 'B': 'C'}
+ with dag_maker(default_args={'wait_for_downstream': True}, catchup=False) as dag:
+ task_a = PythonOperator(task_id='A', python_callable=lambda: True)
+ task_b = PythonOperator(task_id='B', python_callable=lambda: True)
+ task_c = PythonOperator(task_id='C', python_callable=lambda: True)
+ for upstream, downstream in dep.items():
+ dag.set_dependency(upstream, downstream)
+
+ scheduler_job = SchedulerJob(subdir=os.devnull)
+ scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
+
+ dr = dag.create_dagrun(run_id='test_1', state=State.RUNNING, execution_date=DEFAULT_DATE)
+ dr2 = dag.create_dagrun(
+ run_id='test_2', state=State.RUNNING, execution_date=DEFAULT_DATE + datetime.timedelta(hours=1)
+ )
+ ti_a = TaskInstance(task_a, run_id=dr.run_id, state=State.SUCCESS)
+ ti_b = TaskInstance(task_b, run_id=dr.run_id, state=State.SUCCESS)
+ ti_c = TaskInstance(task_c, run_id=dr.run_id, state=State.RUNNING)
+ ti2_a = TaskInstance(task_a, run_id=dr2.run_id, state=State.NONE)
+ ti2_b = TaskInstance(task_b, run_id=dr2.run_id, state=State.NONE)
+ ti2_c = TaskInstance(task_c, run_id=dr2.run_id, state=State.NONE)
+ session.merge(ti_a)
+ session.merge(ti_b)
+ session.merge(ti_c)
+ session.merge(ti2_a)
+ session.merge(ti2_b)
+ session.merge(ti2_c)
+ session.flush()
+
+ job1 = LocalTaskJob(task_instance=ti2_a, ignore_ti_state=True, executor=SequentialExecutor())
+ job1.task_runner = StandardTaskRunner(job1)
+ settings.engine.dispose()
+ job1.run()
+ ti2_a.refresh_from_db()
+ assert (
+ "No downstream tasks scheduled because task instance "
+ "dependents are still running and wait_for_downstream is true"
Review comment:
If you update https://github.com/apache/airflow/pull/18310/files#r710956225 you will have to update this message too
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on a change in pull request #18310: Fix mini scheduler not respecting `wait_for_downstream` dependency
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #18310:
URL: https://github.com/apache/airflow/pull/18310#discussion_r711362977
##########
File path: airflow/jobs/local_task_job.py
##########
@@ -226,6 +226,18 @@ def heartbeat_callback(self, session=None):
@Sentry.enrich_errors
def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
try:
+
+ if (
+ self.task_instance.task.wait_for_downstream
+ and self.task_instance.get_previous_ti()
+ and not self.task_instance.are_dependents_done()
+ ):
Review comment:
https://github.com/apache/airflow/pull/18338 should take care of it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org