You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/02/07 00:45:49 UTC

[GitHub] [airflow] yuqian90 commented on issue #19222: none_failed_min_one_success trigger rule not working with BranchPythonOperator in certain cases.

yuqian90 commented on issue #19222:
URL: https://github.com/apache/airflow/issues/19222#issuecomment-1030960341


   The issue reported here started after this change by @kaxil : Fix mini scheduler not respecting wait_for_downstream dep (#18338)[https://github.com/apache/airflow/pull/18338]. `BranchPythonOperator` returning empty or non-existent branches is irrelevant to this issue.
   
   How to reproduce:
   
   ```
   import pendulum
   
   from airflow.operators.python_operator import BranchPythonOperator
   from airflow.sensors.python import PythonSensor
   from airflow.operators.python import PythonOperator
   from airflow.models import DAG
   from airflow.utils.trigger_rule import TriggerRule
   
   with DAG(
       dag_id="example_wrong_skip",
       schedule_interval="@daily",
       catchup=False,
       start_date=pendulum.DateTime(2022, 1, 1),
   ) as dag:
       branch = BranchPythonOperator(task_id="branch", python_callable=lambda: "task_b")
       task_a = PythonOperator(task_id="task_a", python_callable=lambda: True)
       task_b = PythonOperator(task_id="task_b", python_callable=lambda: True)
       task_c = PythonSensor(task_id="task_c", python_callable=lambda: False)
       task_d = PythonOperator(task_id="task_d", python_callable=lambda: True, trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
   
       branch >> [task_a, task_b]
       [task_a, task_c] >> task_d
   ```
   ![Screen Shot 2022-02-06 at 9 20 51 PM](https://user-images.githubusercontent.com/6637585/152707755-771f3ccd-87ea-4efa-b7e6-5cf9d5b2c268.png)
   
   Observe that `task_d` which has `none_failed_min_one_success` trigger_rule is skipped before `task_c` even finishes. This violates the `trigger_rule` logic of `none_failed_min_one_success`.
   
   This happens because #18338 changed the following line to `include_downstream=True`:
   
   ```
               partial_dag = task.dag.partial_subset(
                   task.downstream_task_ids,
                   include_downstream=True,
                   include_upstream=False,
                   include_direct_upstream=True,
               )
   ```
   
   This change caused the `partial_dag` in the "mini scheduler" to include all downstream tasks (even the indirect downstream tasks).
   
   In the reproducing example, once `branch` finishes, it creates a `partial_dag` which includes `task_a`, `task_b` and `task_d` (but does not include `task_c` because it's not downstream of `branch`). Looking at only this `partial_dag`, the "mini scheduler" determines that `task_d` can be skipped because its only upstream task in `partial_dag` `task_a` is in skipped state. This happens in `DagRun._get_ready_tis()` when calling `st.are_dependencies_met()`.
   
   
   A temporary workaround is to set `schedule_after_task_execution` to `False`. This will stop the bad behaviour (by stopping using "mini scheduler" after each task finishes).
   
   ```
   schedule_after_task_execution = False
   ```
   
   A proper fix should be to make the "mini scheduler" evaluate the `trigger_rule` properly like how the scheduler does.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org