You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/04/22 21:37:34 UTC
[GitHub] [airflow] tanelk opened a new pull request, #23181: Fix tasks being wrongly skipped by schedule_after_task_execution
tanelk opened a new pull request, #23181:
URL: https://github.com/apache/airflow/pull/23181
<!--
Thank you for contributing! Please make sure that your code changes
are covered with tests. And in case of new features or big changes
remember to adjust the documentation.
Feel free to ping committers for the review!
In case of existing issue, reference it using one of the following:
closes: #ISSUE
related: #ISSUE
How to write a good git commit message:
http://chris.beams.io/posts/git-commit/
-->
closes: #19222
Alternative to #22374
#22374 explains the issue well, but the aproach would limit the mini scheduler to the most basic trigger rules.
---
**^ Add meaningful description above**
Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in a newsfragement file, named `{pr_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ashb merged pull request #23181: Fix tasks being wrongly skipped by schedule_after_task_execution
Posted by GitBox <gi...@apache.org>.
ashb merged PR #23181:
URL: https://github.com/apache/airflow/pull/23181
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] tanelk commented on a diff in pull request #23181: Fix tasks being wrongly skipped by schedule_after_task_execution
Posted by GitBox <gi...@apache.org>.
tanelk commented on code in PR #23181:
URL: https://github.com/apache/airflow/pull/23181#discussion_r856590083
##########
tests/jobs/test_local_task_job.py:
##########
@@ -799,6 +800,121 @@ def test_mini_scheduler_works_with_wait_for_upstream(self, caplog, dag_maker):
assert failed_deps[0].dep_name == "Previous Dagrun State"
assert not failed_deps[0].passed
+ @pytest.mark.parametrize(
+ "exception, trigger_rule",
+ [
+ (AirflowFailException(), TriggerRule.ALL_DONE),
+ (AirflowFailException(), TriggerRule.ALL_FAILED),
+ (AirflowSkipException(), TriggerRule.ALL_DONE),
+ (AirflowSkipException(), TriggerRule.ALL_SKIPPED),
+ (AirflowSkipException(), TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS),
+ ],
+ )
+ @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+ def test_mini_scheduler_works_with_skipped_and_failed(self, exception, trigger_rule, caplog, dag_maker):
Review Comment:
These pass even without the PR - initially tried to recreate the issue with these, but now left just to improve coverage.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] tanelk commented on a diff in pull request #23181: Fix tasks being wrongly skipped by schedule_after_task_execution
Posted by GitBox <gi...@apache.org>.
tanelk commented on code in PR #23181:
URL: https://github.com/apache/airflow/pull/23181#discussion_r856590207
##########
tests/jobs/test_local_task_job.py:
##########
@@ -799,6 +800,121 @@ def test_mini_scheduler_works_with_wait_for_upstream(self, caplog, dag_maker):
assert failed_deps[0].dep_name == "Previous Dagrun State"
assert not failed_deps[0].passed
+ @pytest.mark.parametrize(
+ "exception, trigger_rule",
+ [
+ (AirflowFailException(), TriggerRule.ALL_DONE),
+ (AirflowFailException(), TriggerRule.ALL_FAILED),
+ (AirflowSkipException(), TriggerRule.ALL_DONE),
+ (AirflowSkipException(), TriggerRule.ALL_SKIPPED),
+ (AirflowSkipException(), TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS),
+ ],
+ )
+ @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+ def test_mini_scheduler_works_with_skipped_and_failed(self, exception, trigger_rule, caplog, dag_maker):
+ """
+ In these cases D is running, at no decision can be made about C.
+ """
+
+ def raise_():
+ raise exception
+
+ session = settings.Session()
+
+ with dag_maker(catchup=False) as dag:
+ task_a = PythonOperator(task_id='A', python_callable=raise_)
+ task_b = PythonOperator(task_id='B', python_callable=lambda: True)
+ task_c = PythonOperator(task_id='C', python_callable=lambda: True, trigger_rule=trigger_rule)
+ task_d = PythonOperator(task_id='D', python_callable=lambda: True)
+ task_a >> task_b >> task_c
+ task_d >> task_c
+
+ dr = dag.create_dagrun(run_id='test_1', state=State.RUNNING, execution_date=DEFAULT_DATE)
+ ti_a = TaskInstance(task_a, run_id=dr.run_id, state=State.QUEUED)
+ ti_b = TaskInstance(task_b, run_id=dr.run_id, state=State.NONE)
+ ti_c = TaskInstance(task_c, run_id=dr.run_id, state=State.NONE)
+ ti_d = TaskInstance(task_d, run_id=dr.run_id, state=State.RUNNING)
+
+ session.merge(ti_a)
+ session.merge(ti_b)
+ session.merge(ti_c)
+ session.merge(ti_d)
+ session.flush()
+
+ job1 = LocalTaskJob(task_instance=ti_a, ignore_ti_state=True, executor=SequentialExecutor())
+ job1.task_runner = StandardTaskRunner(job1)
+ job1.run()
+
+ ti_b.refresh_from_db(session)
+ ti_c.refresh_from_db(session)
+ assert ti_b.state in (State.SKIPPED, State.UPSTREAM_FAILED)
+ assert ti_c.state == State.NONE
+ assert "0 downstream tasks scheduled from follow-on schedule" in caplog.text
+
+ failed_deps = list(ti_c.get_failed_dep_statuses(session=session))
+ assert len(failed_deps) == 1
+ assert failed_deps[0].dep_name == "Trigger Rule"
+ assert not failed_deps[0].passed
+
+ session.rollback()
+
+ @pytest.mark.parametrize(
+ "trigger_rule",
+ [
+ TriggerRule.ONE_SUCCESS,
+ TriggerRule.ALL_SKIPPED,
+ TriggerRule.NONE_FAILED,
+ TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
+ ],
+ )
+ @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+ def test_mini_scheduler_works_with_branch_python_operator(self, trigger_rule, caplog, dag_maker):
Review Comment:
This fails without PR
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] tanelk commented on a diff in pull request #23181: Fix tasks being wrongly skipped by schedule_after_task_execution
Posted by GitBox <gi...@apache.org>.
tanelk commented on code in PR #23181:
URL: https://github.com/apache/airflow/pull/23181#discussion_r856589545
##########
airflow/models/dag.py:
##########
@@ -1964,7 +1964,13 @@ def partial_subset(
tasks, in addition to matched tasks.
:param include_upstream: Include all upstream tasks of matched tasks,
in addition to matched tasks.
+ :param include_direct_upstream: Include all tasks directly upstream of matched
+ and downstream (if include_downstream = True) tasks. Can not be use when
+ include_upstream = True - would be a noop with overhead.
Review Comment:
Before it was "Include all tasks directly upstream of matched", now it has the "and downstream" part.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] github-actions[bot] commented on pull request #23181: Fix tasks being wrongly skipped by schedule_after_task_execution
Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #23181:
URL: https://github.com/apache/airflow/pull/23181#issuecomment-1109648236
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] uranusjr commented on a diff in pull request #23181: Fix tasks being wrongly skipped by schedule_after_task_execution
Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #23181:
URL: https://github.com/apache/airflow/pull/23181#discussion_r857290031
##########
tests/jobs/test_local_task_job.py:
##########
@@ -799,6 +800,121 @@ def test_mini_scheduler_works_with_wait_for_upstream(self, caplog, dag_maker):
assert failed_deps[0].dep_name == "Previous Dagrun State"
assert not failed_deps[0].passed
+ @pytest.mark.parametrize(
+ "exception, trigger_rule",
+ [
+ (AirflowFailException(), TriggerRule.ALL_DONE),
+ (AirflowFailException(), TriggerRule.ALL_FAILED),
+ (AirflowSkipException(), TriggerRule.ALL_DONE),
+ (AirflowSkipException(), TriggerRule.ALL_SKIPPED),
+ (AirflowSkipException(), TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS),
+ ],
+ )
+ @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+ def test_mini_scheduler_works_with_skipped_and_failed(self, exception, trigger_rule, caplog, dag_maker):
+ """
+ In these cases D is running, at no decision can be made about C.
+ """
+
+ def raise_():
+ raise exception
+
+ session = settings.Session()
Review Comment:
You can use the `session` fixture here instead, which automatically calls `rollback` on test teardown.
##########
tests/jobs/test_local_task_job.py:
##########
@@ -799,6 +800,121 @@ def test_mini_scheduler_works_with_wait_for_upstream(self, caplog, dag_maker):
assert failed_deps[0].dep_name == "Previous Dagrun State"
assert not failed_deps[0].passed
+ @pytest.mark.parametrize(
+ "exception, trigger_rule",
+ [
+ (AirflowFailException(), TriggerRule.ALL_DONE),
+ (AirflowFailException(), TriggerRule.ALL_FAILED),
+ (AirflowSkipException(), TriggerRule.ALL_DONE),
+ (AirflowSkipException(), TriggerRule.ALL_SKIPPED),
+ (AirflowSkipException(), TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS),
+ ],
+ )
+ @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+ def test_mini_scheduler_works_with_skipped_and_failed(self, exception, trigger_rule, caplog, dag_maker):
+ """
+ In these cases D is running, at no decision can be made about C.
+ """
+
+ def raise_():
+ raise exception
+
+ session = settings.Session()
+
+ with dag_maker(catchup=False) as dag:
+ task_a = PythonOperator(task_id='A', python_callable=raise_)
+ task_b = PythonOperator(task_id='B', python_callable=lambda: True)
+ task_c = PythonOperator(task_id='C', python_callable=lambda: True, trigger_rule=trigger_rule)
+ task_d = PythonOperator(task_id='D', python_callable=lambda: True)
+ task_a >> task_b >> task_c
+ task_d >> task_c
+
+ dr = dag.create_dagrun(run_id='test_1', state=State.RUNNING, execution_date=DEFAULT_DATE)
+ ti_a = TaskInstance(task_a, run_id=dr.run_id, state=State.QUEUED)
+ ti_b = TaskInstance(task_b, run_id=dr.run_id, state=State.NONE)
+ ti_c = TaskInstance(task_c, run_id=dr.run_id, state=State.NONE)
+ ti_d = TaskInstance(task_d, run_id=dr.run_id, state=State.RUNNING)
+
+ session.merge(ti_a)
+ session.merge(ti_b)
+ session.merge(ti_c)
+ session.merge(ti_d)
+ session.flush()
+
+ job1 = LocalTaskJob(task_instance=ti_a, ignore_ti_state=True, executor=SequentialExecutor())
+ job1.task_runner = StandardTaskRunner(job1)
+ job1.run()
+
+ ti_b.refresh_from_db(session)
+ ti_c.refresh_from_db(session)
+ assert ti_b.state in (State.SKIPPED, State.UPSTREAM_FAILED)
+ assert ti_c.state == State.NONE
+ assert "0 downstream tasks scheduled from follow-on schedule" in caplog.text
+
+ failed_deps = list(ti_c.get_failed_dep_statuses(session=session))
+ assert len(failed_deps) == 1
+ assert failed_deps[0].dep_name == "Trigger Rule"
+ assert not failed_deps[0].passed
+
+ session.rollback()
Review Comment:
… because due to how tests run, this won’t be called if any of the asserts fail.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] uranusjr commented on a diff in pull request #23181: Fix tasks being wrongly skipped by schedule_after_task_execution
Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #23181:
URL: https://github.com/apache/airflow/pull/23181#discussion_r857270983
##########
airflow/models/dag.py:
##########
@@ -1984,15 +1987,18 @@ def partial_subset(
also_include.extend(t.get_flat_relatives(upstream=False))
if include_upstream:
also_include.extend(t.get_flat_relatives(upstream=True))
- elif include_direct_upstream:
+
+ direct_upstream: List[Operator] = []
+ if include_direct_upstream:
+ for t in matched_tasks + also_include:
upstream = (u for u in t.upstream_list if isinstance(u, (BaseOperator, MappedOperator)))
- also_include.extend(upstream)
+ direct_upstream.extend(upstream)
# Compiling the unique list of tasks that made the cut
# Make sure to not recursively deepcopy the dag while copying the task
dag.task_dict = {
t.task_id: copy.deepcopy(t, {id(t.dag): dag}) # type: ignore
- for t in matched_tasks + also_include
+ for t in matched_tasks + also_include + direct_upstream
Review Comment:
We should use `itertools.chain` here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] uranusjr commented on a diff in pull request #23181: Fix tasks being wrongly skipped by schedule_after_task_execution
Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #23181:
URL: https://github.com/apache/airflow/pull/23181#discussion_r857270875
##########
airflow/models/dag.py:
##########
@@ -1984,15 +1987,18 @@ def partial_subset(
also_include.extend(t.get_flat_relatives(upstream=False))
if include_upstream:
also_include.extend(t.get_flat_relatives(upstream=True))
- elif include_direct_upstream:
+
+ direct_upstream: List[Operator] = []
Review Comment:
```suggestion
direct_upstreams: List[Operator] = []
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org