You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by yp...@mailed.ro on 2017/01/04 15:15:00 UTC
Wrong DAG state after failure inside a branch
Hello
I just became aware that failure inside a branch (BranchPythonOperator) set
the DAG state to success despites the task failure.
Indeed Airflow set all downstream tasks to the skipped state because of the
trigger_rule needed for the branching mecanism and as a consequence Airflow
wrongly set the DAG state to success. Please find below a example DAG to
trigger this behaviour.
Is there any way to make tasks run more reliable inside a branch ?
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator,
BranchPythonOperator
from datetime import datetime, timedelta
DAG_NAME = 'branch_issue'
dag = DAG(DAG_NAME + '_v1', start_date=datetime(2017, 1, 4, 9, 10),
schedule_interval=timedelta(hours=1))
branch1 = BranchPythonOperator(
task_id='branch1',
python_callable=lambda: 't1',
dag=dag)
t1 = PythonOperator(
task_id='t1',
python_callable=lambda: sys.exit(1),
dag=dag)
t1.set_upstream(branch1)
t2 = PythonOperator(
task_id='t2',
python_callable=lambda: True,
dag=dag)
t2.set_upstream(branch1)
process1 = PythonOperator(
task_id='process1',
python_callable=lambda: True,
trigger_rule='one_success',
dag=dag)
process1.set_upstream(t1)
process1.set_upstream(t2)
process2 = PythonOperator(
task_id='process2',
python_callable=lambda: True,
dag=dag)
process2.set_upstream(process1)
process3 = PythonOperator(
task_id='process3',
python_callable=lambda: True,
dag=dag)
process3.set_upstream(process2)
At moment, I want my privacy to be protected.
https://mytemp.email/
Re: Wrong DAG state after failure inside a branch
Posted by Alex Van Boxel <al...@vanboxel.be>.
Hey ypwdr,
I got a pull request that will fix this problem. It works for me but I need
to fix the failing branch. Be my guest to try it out and see if it resolves
your problem.
On Wed, Jan 4, 2017 at 4:14 PM <yp...@mailed.ro> wrote:
> Hello
>
> I just became aware that failure inside a branch (BranchPythonOperator) set
> the DAG state to success despites the task failure.
> Indeed Airflow set all downstream tasks to the skipped state because of the
> trigger_rule needed for the branching mecanism and as a consequence Airflow
> wrongly set the DAG state to success. Please find below a example DAG to
> trigger this behaviour.
> Is there any way to make tasks run more reliable inside a branch ?
>
>
> from airflow.models import DAG
> from airflow.operators.python_operator import PythonOperator,
> BranchPythonOperator
> from datetime import datetime, timedelta
>
> DAG_NAME = 'branch_issue'
>
> dag = DAG(DAG_NAME + '_v1', start_date=datetime(2017, 1, 4, 9, 10),
> schedule_interval=timedelta(hours=1))
>
>
> branch1 = BranchPythonOperator(
> task_id='branch1',
> python_callable=lambda: 't1',
> dag=dag)
>
> t1 = PythonOperator(
> task_id='t1',
> python_callable=lambda: sys.exit(1),
> dag=dag)
> t1.set_upstream(branch1)
>
> t2 = PythonOperator(
> task_id='t2',
> python_callable=lambda: True,
> dag=dag)
> t2.set_upstream(branch1)
>
> process1 = PythonOperator(
> task_id='process1',
> python_callable=lambda: True,
> trigger_rule='one_success',
> dag=dag)
> process1.set_upstream(t1)
> process1.set_upstream(t2)
>
> process2 = PythonOperator(
> task_id='process2',
> python_callable=lambda: True,
> dag=dag)
> process2.set_upstream(process1)
>
> process3 = PythonOperator(
> task_id='process3',
> python_callable=lambda: True,
> dag=dag)
> process3.set_upstream(process2)
>
>
> At moment, I want my privacy to be protected.
> https://mytemp.email/
>
--
_/
_/ Alex Van Boxel