You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Greg Lira <hr...@gmail.com> on 2016/09/01 08:24:56 UTC

BranchPythonOperator and wait_for_downstream

Hi,

For some of our DAGs [where we clear and re-import the staging tables], having wait_for_downstream on the tasks is very important.
However, we have a BranchPythonOperator for happy and error path, so if the flow has successfully completed, the tasks of the error path as marked as skipped.
It seems that it doesn't work well with  wait_for_downstream, where airflow checks for the task instances with SUCCESS state only.
Is this 'by design'?
How can we handle sequential execution of the DAGs then? Should we completely re-design the workflow or there is a way to do that with airflow configuration settings?

Thanks,
Greg

Re: BranchPythonOperator and wait_for_downstream

Posted by Maxime Beauchemin <ma...@gmail.com>.
Since `depends_on_past` allows for `skipped` status, it seems like `
wait_for_downstream` should have the same behavior.

I think it's just a matter of changing this line:
https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L990
to
TaskInstance.state.in_(State.SUCCESS, State.SKIPPED),

This, and writing a test or two for it and making this clear in the
documentation. I'd approve this PR if no one opposes it.

Max

On Thu, Sep 1, 2016 at 1:24 AM, Greg Lira <hr...@gmail.com> wrote:

> Hi,
>
> For some of our DAGs [where we clear and re-import the staging tables],
> having wait_for_downstream on the tasks is very important.
> However, we have a BranchPythonOperator for happy and error path, so if
> the flow has successfully completed, the tasks of the error path as marked
> as skipped.
> It seems that it doesn't work well with  wait_for_downstream, where
> airflow checks for the task instances with SUCCESS state only.
> Is this 'by design'?
> How can we handle sequential execution of the DAGs then? Should we
> completely re-design the workflow or there is a way to do that with airflow
> configuration settings?
>
> Thanks,
> Greg
>