You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Jared (Jira)" <ji...@apache.org> on 2020/01/29 16:34:00 UTC

[jira] [Commented] (AIRFLOW-6163) BaseSensorOperator with soft_fail=True does not take into account downstream trigger_rule

    [ https://issues.apache.org/jira/browse/AIRFLOW-6163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17026020#comment-17026020 ] 

Jared commented on AIRFLOW-6163:
--------------------------------

This is an issue for us too (we're on 1.10.7).  We have a workflow with one task at the end that waits on various upstream tasks to finish.  We want to run the task as soon as the upstream tasks finish, but if they don't finish within a certain time frame, we want the to run the downstream task anyway.  We should be able to accomplish this with external task sensors set to soft fail after a certain time and the downstream task having a trigger rule of "none_failed".  But we can't because `BaseSensorOperator` forces all downstream tasks to skip when it soft fails, regardless of their trigger rules.

Here's another example DAG that proves this behavior:

 
{code:java}
from airflow import DAG
import datetime
from airflow.operators.sensors import ExternalTaskSensor
from airflow.operators.dummy_operator import DummyOperator
dag = DAG(
    dag_id='test_none_failed',
    start_date=datetime.datetime(2019, 1, 28),
    schedule_interval='0 8 * * *')
task_will_get_skipped = ExternalTaskSensor(
    dag=dag,
    task_id='task_will_get_skipped',
    external_dag_id='dag_does_not_exist',
    external_task_id='task_does_not_exist',
    timeout=1,
    retires=0,
    soft_fail=True)
task_should_run_but_will_get_skipped = DummyOperator(
    dag=dag,
    trigger_rule='none_failed',
    task_id='task_should_run_but_will_get_skipped')
task_will_get_skipped >> task_should_run_but_will_get_skipped
{code}

> BaseSensorOperator with soft_fail=True does not take into account downstream trigger_rule
> -----------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-6163
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6163
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: operators, scheduler
>    Affects Versions: 1.10.6
>            Reporter: Damian Momot
>            Priority: Major
>
> It looks like BaseSensorOperator skips all downstream tasks regardless of their trigger_rule setting. Using following example I would expect that "default_task" will be skipped, but "none_failed_task" should be run. Unfortunately both tasks are skipped
>  
> {code:java}
> from datetime import datetime, timedelta
> from airflow.operators.dummy_operator import DummyOperator
> from airflow.utils.trigger_rule import TriggerRule
> from airflow import DAG
> from airflow.contrib.sensors.python_sensor import PythonSensor
> dag = DAG(
>     dag_id="trigger_rule_test",
>     schedule_interval=timedelta(minutes=1),
>     start_date=(datetime.today() - timedelta(minutes=2))
> )
> soft_fail_sensor = PythonSensor(
>     task_id="soft_fail_sensor",
>     soft_fail=True,
>     timeout=5,
>     poke_interval=1,
>     retries=0,
>     python_callable=lambda *args, **kwargs: False,
>     dag=dag
> )
> default_task = DummyOperator(
>     task_id="default_task",
>     dag=dag
> )
> none_failed_task = DummyOperator(
>     task_id="none_failed_task",
>     trigger_rule=TriggerRule.NONE_FAILED,
>     dag=dag
> )
> default_task.set_upstream(soft_fail_sensor)
> none_failed_task.set_upstream(soft_fail_sensor)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)