You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/04/22 21:37:34 UTC

[GitHub] [airflow] tanelk opened a new pull request, #23181: Fix tasks being wrongly skipped by schedule_after_task_execution

tanelk opened a new pull request, #23181:
URL: https://github.com/apache/airflow/pull/23181

   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   closes: #19222
   Alternative to #22374
   
   #22374 explains the issue well, but the aproach would limit the mini scheduler to the most basic trigger rules.
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragement file, named `{pr_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb merged pull request #23181: Fix tasks being wrongly skipped by schedule_after_task_execution

Posted by GitBox <gi...@apache.org>.
ashb merged PR #23181:
URL: https://github.com/apache/airflow/pull/23181


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] tanelk commented on a diff in pull request #23181: Fix tasks being wrongly skipped by schedule_after_task_execution

Posted by GitBox <gi...@apache.org>.
tanelk commented on code in PR #23181:
URL: https://github.com/apache/airflow/pull/23181#discussion_r856590083


##########
tests/jobs/test_local_task_job.py:
##########
@@ -799,6 +800,121 @@ def test_mini_scheduler_works_with_wait_for_upstream(self, caplog, dag_maker):
         assert failed_deps[0].dep_name == "Previous Dagrun State"
         assert not failed_deps[0].passed
 
+    @pytest.mark.parametrize(
+        "exception, trigger_rule",
+        [
+            (AirflowFailException(), TriggerRule.ALL_DONE),
+            (AirflowFailException(), TriggerRule.ALL_FAILED),
+            (AirflowSkipException(), TriggerRule.ALL_DONE),
+            (AirflowSkipException(), TriggerRule.ALL_SKIPPED),
+            (AirflowSkipException(), TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS),
+        ],
+    )
+    @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+    def test_mini_scheduler_works_with_skipped_and_failed(self, exception, trigger_rule, caplog, dag_maker):

Review Comment:
   These pass even without the PR - initially tried to recreate the issue with these, but now left just to improve coverage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] tanelk commented on a diff in pull request #23181: Fix tasks being wrongly skipped by schedule_after_task_execution

Posted by GitBox <gi...@apache.org>.
tanelk commented on code in PR #23181:
URL: https://github.com/apache/airflow/pull/23181#discussion_r856590207


##########
tests/jobs/test_local_task_job.py:
##########
@@ -799,6 +800,121 @@ def test_mini_scheduler_works_with_wait_for_upstream(self, caplog, dag_maker):
         assert failed_deps[0].dep_name == "Previous Dagrun State"
         assert not failed_deps[0].passed
 
+    @pytest.mark.parametrize(
+        "exception, trigger_rule",
+        [
+            (AirflowFailException(), TriggerRule.ALL_DONE),
+            (AirflowFailException(), TriggerRule.ALL_FAILED),
+            (AirflowSkipException(), TriggerRule.ALL_DONE),
+            (AirflowSkipException(), TriggerRule.ALL_SKIPPED),
+            (AirflowSkipException(), TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS),
+        ],
+    )
+    @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+    def test_mini_scheduler_works_with_skipped_and_failed(self, exception, trigger_rule, caplog, dag_maker):
+        """
+        In these cases D is running, at no decision can be made about C.
+        """
+
+        def raise_():
+            raise exception
+
+        session = settings.Session()
+
+        with dag_maker(catchup=False) as dag:
+            task_a = PythonOperator(task_id='A', python_callable=raise_)
+            task_b = PythonOperator(task_id='B', python_callable=lambda: True)
+            task_c = PythonOperator(task_id='C', python_callable=lambda: True, trigger_rule=trigger_rule)
+            task_d = PythonOperator(task_id='D', python_callable=lambda: True)
+            task_a >> task_b >> task_c
+            task_d >> task_c
+
+        dr = dag.create_dagrun(run_id='test_1', state=State.RUNNING, execution_date=DEFAULT_DATE)
+        ti_a = TaskInstance(task_a, run_id=dr.run_id, state=State.QUEUED)
+        ti_b = TaskInstance(task_b, run_id=dr.run_id, state=State.NONE)
+        ti_c = TaskInstance(task_c, run_id=dr.run_id, state=State.NONE)
+        ti_d = TaskInstance(task_d, run_id=dr.run_id, state=State.RUNNING)
+
+        session.merge(ti_a)
+        session.merge(ti_b)
+        session.merge(ti_c)
+        session.merge(ti_d)
+        session.flush()
+
+        job1 = LocalTaskJob(task_instance=ti_a, ignore_ti_state=True, executor=SequentialExecutor())
+        job1.task_runner = StandardTaskRunner(job1)
+        job1.run()
+
+        ti_b.refresh_from_db(session)
+        ti_c.refresh_from_db(session)
+        assert ti_b.state in (State.SKIPPED, State.UPSTREAM_FAILED)
+        assert ti_c.state == State.NONE
+        assert "0 downstream tasks scheduled from follow-on schedule" in caplog.text
+
+        failed_deps = list(ti_c.get_failed_dep_statuses(session=session))
+        assert len(failed_deps) == 1
+        assert failed_deps[0].dep_name == "Trigger Rule"
+        assert not failed_deps[0].passed
+
+        session.rollback()
+
+    @pytest.mark.parametrize(
+        "trigger_rule",
+        [
+            TriggerRule.ONE_SUCCESS,
+            TriggerRule.ALL_SKIPPED,
+            TriggerRule.NONE_FAILED,
+            TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
+        ],
+    )
+    @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+    def test_mini_scheduler_works_with_branch_python_operator(self, trigger_rule, caplog, dag_maker):

Review Comment:
   This fails without PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] tanelk commented on a diff in pull request #23181: Fix tasks being wrongly skipped by schedule_after_task_execution

Posted by GitBox <gi...@apache.org>.
tanelk commented on code in PR #23181:
URL: https://github.com/apache/airflow/pull/23181#discussion_r856589545


##########
airflow/models/dag.py:
##########
@@ -1964,7 +1964,13 @@ def partial_subset(
             tasks, in addition to matched tasks.
         :param include_upstream: Include all upstream tasks of matched tasks,
             in addition to matched tasks.
+        :param include_direct_upstream: Include all tasks directly upstream of matched
+            and downstream (if include_downstream = True) tasks. Can not be use when
+            include_upstream = True - would be a noop with overhead.

Review Comment:
   Before it was "Include all tasks directly upstream of matched", now it has the "and downstream" part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] github-actions[bot] commented on pull request #23181: Fix tasks being wrongly skipped by schedule_after_task_execution

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #23181:
URL: https://github.com/apache/airflow/pull/23181#issuecomment-1109648236

   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #23181: Fix tasks being wrongly skipped by schedule_after_task_execution

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #23181:
URL: https://github.com/apache/airflow/pull/23181#discussion_r857290031


##########
tests/jobs/test_local_task_job.py:
##########
@@ -799,6 +800,121 @@ def test_mini_scheduler_works_with_wait_for_upstream(self, caplog, dag_maker):
         assert failed_deps[0].dep_name == "Previous Dagrun State"
         assert not failed_deps[0].passed
 
+    @pytest.mark.parametrize(
+        "exception, trigger_rule",
+        [
+            (AirflowFailException(), TriggerRule.ALL_DONE),
+            (AirflowFailException(), TriggerRule.ALL_FAILED),
+            (AirflowSkipException(), TriggerRule.ALL_DONE),
+            (AirflowSkipException(), TriggerRule.ALL_SKIPPED),
+            (AirflowSkipException(), TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS),
+        ],
+    )
+    @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+    def test_mini_scheduler_works_with_skipped_and_failed(self, exception, trigger_rule, caplog, dag_maker):
+        """
+        In these cases D is running, at no decision can be made about C.
+        """
+
+        def raise_():
+            raise exception
+
+        session = settings.Session()

Review Comment:
   You can use the `session` fixture here instead, which automatically calls `rollback` on test teardown.



##########
tests/jobs/test_local_task_job.py:
##########
@@ -799,6 +800,121 @@ def test_mini_scheduler_works_with_wait_for_upstream(self, caplog, dag_maker):
         assert failed_deps[0].dep_name == "Previous Dagrun State"
         assert not failed_deps[0].passed
 
+    @pytest.mark.parametrize(
+        "exception, trigger_rule",
+        [
+            (AirflowFailException(), TriggerRule.ALL_DONE),
+            (AirflowFailException(), TriggerRule.ALL_FAILED),
+            (AirflowSkipException(), TriggerRule.ALL_DONE),
+            (AirflowSkipException(), TriggerRule.ALL_SKIPPED),
+            (AirflowSkipException(), TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS),
+        ],
+    )
+    @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+    def test_mini_scheduler_works_with_skipped_and_failed(self, exception, trigger_rule, caplog, dag_maker):
+        """
+        In these cases D is running, at no decision can be made about C.
+        """
+
+        def raise_():
+            raise exception
+
+        session = settings.Session()
+
+        with dag_maker(catchup=False) as dag:
+            task_a = PythonOperator(task_id='A', python_callable=raise_)
+            task_b = PythonOperator(task_id='B', python_callable=lambda: True)
+            task_c = PythonOperator(task_id='C', python_callable=lambda: True, trigger_rule=trigger_rule)
+            task_d = PythonOperator(task_id='D', python_callable=lambda: True)
+            task_a >> task_b >> task_c
+            task_d >> task_c
+
+        dr = dag.create_dagrun(run_id='test_1', state=State.RUNNING, execution_date=DEFAULT_DATE)
+        ti_a = TaskInstance(task_a, run_id=dr.run_id, state=State.QUEUED)
+        ti_b = TaskInstance(task_b, run_id=dr.run_id, state=State.NONE)
+        ti_c = TaskInstance(task_c, run_id=dr.run_id, state=State.NONE)
+        ti_d = TaskInstance(task_d, run_id=dr.run_id, state=State.RUNNING)
+
+        session.merge(ti_a)
+        session.merge(ti_b)
+        session.merge(ti_c)
+        session.merge(ti_d)
+        session.flush()
+
+        job1 = LocalTaskJob(task_instance=ti_a, ignore_ti_state=True, executor=SequentialExecutor())
+        job1.task_runner = StandardTaskRunner(job1)
+        job1.run()
+
+        ti_b.refresh_from_db(session)
+        ti_c.refresh_from_db(session)
+        assert ti_b.state in (State.SKIPPED, State.UPSTREAM_FAILED)
+        assert ti_c.state == State.NONE
+        assert "0 downstream tasks scheduled from follow-on schedule" in caplog.text
+
+        failed_deps = list(ti_c.get_failed_dep_statuses(session=session))
+        assert len(failed_deps) == 1
+        assert failed_deps[0].dep_name == "Trigger Rule"
+        assert not failed_deps[0].passed
+
+        session.rollback()

Review Comment:
   … because due to how tests run, this won’t be called if any of the asserts fail.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #23181: Fix tasks being wrongly skipped by schedule_after_task_execution

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #23181:
URL: https://github.com/apache/airflow/pull/23181#discussion_r857270983


##########
airflow/models/dag.py:
##########
@@ -1984,15 +1987,18 @@ def partial_subset(
                 also_include.extend(t.get_flat_relatives(upstream=False))
             if include_upstream:
                 also_include.extend(t.get_flat_relatives(upstream=True))
-            elif include_direct_upstream:
+
+        direct_upstream: List[Operator] = []
+        if include_direct_upstream:
+            for t in matched_tasks + also_include:
                 upstream = (u for u in t.upstream_list if isinstance(u, (BaseOperator, MappedOperator)))
-                also_include.extend(upstream)
+                direct_upstream.extend(upstream)
 
         # Compiling the unique list of tasks that made the cut
         # Make sure to not recursively deepcopy the dag while copying the task
         dag.task_dict = {
             t.task_id: copy.deepcopy(t, {id(t.dag): dag})  # type: ignore
-            for t in matched_tasks + also_include
+            for t in matched_tasks + also_include + direct_upstream

Review Comment:
   We should use `itertools.chain` here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #23181: Fix tasks being wrongly skipped by schedule_after_task_execution

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #23181:
URL: https://github.com/apache/airflow/pull/23181#discussion_r857270875


##########
airflow/models/dag.py:
##########
@@ -1984,15 +1987,18 @@ def partial_subset(
                 also_include.extend(t.get_flat_relatives(upstream=False))
             if include_upstream:
                 also_include.extend(t.get_flat_relatives(upstream=True))
-            elif include_direct_upstream:
+
+        direct_upstream: List[Operator] = []

Review Comment:
   ```suggestion
           direct_upstreams: List[Operator] = []
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org