You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by yp...@mailed.ro on 2017/01/04 15:15:00 UTC

Wrong DAG state after failure inside a branch

Hello

I just became aware that failure inside a branch (BranchPythonOperator) set 
the DAG state to success despites the task failure. 
Indeed Airflow set all downstream tasks to the skipped state because of the 
trigger_rule needed for the branching mecanism and as a consequence Airflow 
wrongly set the DAG state to success. Please find below a example DAG to 
trigger this behaviour.
Is there any way to make tasks run more reliable inside a branch ?


 from airflow.models import DAG
 from airflow.operators.python_operator import PythonOperator, 
BranchPythonOperator
 from datetime import datetime, timedelta

DAG_NAME = 'branch_issue'

dag = DAG(DAG_NAME + '_v1', start_date=datetime(2017, 1, 4, 9, 10), 
schedule_interval=timedelta(hours=1))


branch1 = BranchPythonOperator(
         task_id='branch1',
         python_callable=lambda: 't1',
         dag=dag)

t1 = PythonOperator(
     task_id='t1',
     python_callable=lambda: sys.exit(1),
     dag=dag)
t1.set_upstream(branch1)

t2 = PythonOperator(
     task_id='t2',
     python_callable=lambda: True,
     dag=dag)
t2.set_upstream(branch1)

process1 = PythonOperator(
     task_id='process1',
     python_callable=lambda: True,
     trigger_rule='one_success',
     dag=dag)
process1.set_upstream(t1)
process1.set_upstream(t2)

process2 = PythonOperator(
     task_id='process2',
     python_callable=lambda: True,
     dag=dag)
process2.set_upstream(process1)

process3 = PythonOperator(
     task_id='process3',
     python_callable=lambda: True,
     dag=dag)
process3.set_upstream(process2)


At moment, I want my privacy to be protected.
https://mytemp.email/

Re: Wrong DAG state after failure inside a branch

Posted by Alex Van Boxel <al...@vanboxel.be>.
Hey ypwdr,

I got a pull request that will fix this problem. It works for me but I need
to fix the failing branch. Be my guest to try it out and see if it resolves
your problem.


On Wed, Jan 4, 2017 at 4:14 PM <yp...@mailed.ro> wrote:

> Hello
>
> I just became aware that failure inside a branch (BranchPythonOperator) set
> the DAG state to success despites the task failure.
> Indeed Airflow set all downstream tasks to the skipped state because of the
> trigger_rule needed for the branching mecanism and as a consequence Airflow
> wrongly set the DAG state to success. Please find below a example DAG to
> trigger this behaviour.
> Is there any way to make tasks run more reliable inside a branch ?
>
>
>  from airflow.models import DAG
>  from airflow.operators.python_operator import PythonOperator,
> BranchPythonOperator
>  from datetime import datetime, timedelta
>
> DAG_NAME = 'branch_issue'
>
> dag = DAG(DAG_NAME + '_v1', start_date=datetime(2017, 1, 4, 9, 10),
> schedule_interval=timedelta(hours=1))
>
>
> branch1 = BranchPythonOperator(
>          task_id='branch1',
>          python_callable=lambda: 't1',
>          dag=dag)
>
> t1 = PythonOperator(
>      task_id='t1',
>      python_callable=lambda: sys.exit(1),
>      dag=dag)
> t1.set_upstream(branch1)
>
> t2 = PythonOperator(
>      task_id='t2',
>      python_callable=lambda: True,
>      dag=dag)
> t2.set_upstream(branch1)
>
> process1 = PythonOperator(
>      task_id='process1',
>      python_callable=lambda: True,
>      trigger_rule='one_success',
>      dag=dag)
> process1.set_upstream(t1)
> process1.set_upstream(t2)
>
> process2 = PythonOperator(
>      task_id='process2',
>      python_callable=lambda: True,
>      dag=dag)
> process2.set_upstream(process1)
>
> process3 = PythonOperator(
>      task_id='process3',
>      python_callable=lambda: True,
>      dag=dag)
> process3.set_upstream(process2)
>
>
> At moment, I want my privacy to be protected.
> https://mytemp.email/
>
-- 
  _/
_/ Alex Van Boxel