You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Conrad Lee (JIRA)" <ji...@apache.org> on 2017/07/18 06:34:00 UTC

[jira] [Updated] (AIRFLOW-1419) Trigger Rule not respected downstream of BranchPythonOperator

     [ https://issues.apache.org/jira/browse/AIRFLOW-1419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Conrad Lee updated AIRFLOW-1419:
--------------------------------
    Description: 
Lets consider the following DAG:
{noformat}
              ____________________
               /                      \
branch_op                     confluence_op
             \______work_op________/

{noformat}

This is implemented in the following code:


{code}
import airflow
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.models import DAG

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
    dag_id='branch_skip_problem',
    default_args=args,
    schedule_interval="@daily")

branch_op = BranchPythonOperator(
    task_id='branch_op',
    python_callable=lambda: 'right_branch_op1',
    dag=dag)


work_op = DummyOperator(task_id='work_op', dag=dag)

confluence_op = DummyOperator(task_id='confluence_op', dag=dag, trigger_rule=TriggerRule.ALL_DONE)

branch_op.set_downstream(confluence_op)
branch_op.set_downstream(work_op)
work_op.set_downstream(confluence_op)

{code}


Note that branch_op is a BranchPythonOperator, work_op and confluence_op are DummyOperators, and that confluence_op has its trigger_rule set to ALL_DONE.

In dag runs where brancher_op chooses to activate work_op as its child, confluence_op never runs.  This doesn't seem right, because confluence_op has two parents and a trigger_rule set that it'll run as soon as all of its parents are done (whether or not they are skipped).

I know this example seems contrived and that in practice there are better ways of conditionally executing workerop.  However, this is the minimal code to illustrate the problem.  You can imagine that this problem might actually creep up in practice where originally there was a good reason to use the BranchPythonOperator, and then time passes and someone modifies one of the branches so that it doesn't really contain any children anymore, thus resembling the example.

  was:
Lets consider the following DAG:
{noformat}
                ____________________
               /                      \
branch_op                     confluence_op
             \______work_op________/

{noformat}

This is implemented in the following code:


{code}
import airflow
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.models import DAG

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
    dag_id='branch_skip_problem',
    default_args=args,
    schedule_interval="@daily")

branch_op = BranchPythonOperator(
    task_id='branch_op',
    python_callable=lambda: 'right_branch_op1',
    dag=dag)


work_op = DummyOperator(task_id='work_op', dag=dag)

confluence_op = DummyOperator(task_id='confluence_op', dag=dag, trigger_rule=TriggerRule.ALL_DONE)

branch_op.set_downstream(confluence_op)
branch_op.set_downstream(work_op)
work_op.set_downstream(confluence_op)

{code}


Note that branch_op is a BranchPythonOperator, work_op and confluence_op are DummyOperators, and that confluence_op has its trigger_rule set to ALL_DONE.

In dag runs where brancher_op chooses to activate work_op as its child, confluence_op never runs.  This doesn't seem right, because confluence_op has two parents and a trigger_rule set that it'll run as soon as all of its parents are done (whether or not they are skipped).

I know this example seems contrived and that in practice there are better ways of conditionally executing workerop.  However, this is the minimal code to illustrate the problem.  You can imagine that this problem might actually creep up in practice where originally there was a good reason to use the BranchPythonOperator, and then time passes and someone modifies one of the branches so that it doesn't really contain any children anymore, thus resembling the example.


> Trigger Rule not respected downstream of BranchPythonOperator
> -------------------------------------------------------------
>
>                 Key: AIRFLOW-1419
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1419
>             Project: Apache Airflow
>          Issue Type: Bug
>    Affects Versions: 1.8.2
>            Reporter: Conrad Lee
>
> Lets consider the following DAG:
> {noformat}
>               ____________________
>                /                      \
> branch_op                     confluence_op
>              \______work_op________/
> {noformat}
> This is implemented in the following code:
> {code}
> import airflow
> from airflow.operators.python_operator import BranchPythonOperator
> from airflow.operators.dummy_operator import DummyOperator
> from airflow.utils.trigger_rule import TriggerRule
> from airflow.models import DAG
> args = {
>     'owner': 'airflow',
>     'start_date': airflow.utils.dates.days_ago(2)
> }
> dag = DAG(
>     dag_id='branch_skip_problem',
>     default_args=args,
>     schedule_interval="@daily")
> branch_op = BranchPythonOperator(
>     task_id='branch_op',
>     python_callable=lambda: 'right_branch_op1',
>     dag=dag)
> work_op = DummyOperator(task_id='work_op', dag=dag)
> confluence_op = DummyOperator(task_id='confluence_op', dag=dag, trigger_rule=TriggerRule.ALL_DONE)
> branch_op.set_downstream(confluence_op)
> branch_op.set_downstream(work_op)
> work_op.set_downstream(confluence_op)
> {code}
> Note that branch_op is a BranchPythonOperator, work_op and confluence_op are DummyOperators, and that confluence_op has its trigger_rule set to ALL_DONE.
> In dag runs where brancher_op chooses to activate work_op as its child, confluence_op never runs.  This doesn't seem right, because confluence_op has two parents and a trigger_rule set that it'll run as soon as all of its parents are done (whether or not they are skipped).
> I know this example seems contrived and that in practice there are better ways of conditionally executing workerop.  However, this is the minimal code to illustrate the problem.  You can imagine that this problem might actually creep up in practice where originally there was a good reason to use the BranchPythonOperator, and then time passes and someone modifies one of the branches so that it doesn't really contain any children anymore, thus resembling the example.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)