You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Paul Elliot <pa...@mymusictaste.com> on 2017/08/17 10:14:50 UTC

Issue with Paused SubDAG

Hi all,

We've been using Airflow for the past year in our data operations. Been
really happy with it so far!

Environment: Airflow 1.8.0 with the LocalExecutor running on a single
machine.

Recently ran into an issue with a DAG containing one subdag operator with
around 20 - 30 simple ETL subdags. Relevant code posted below. I poked
around the mailing list archives but couldn't find anything related.

When running the main DAG, things seem to work for the majority of the
tasks. Then the DAG freezes: scheduler looks fine, main DAG is not paused,
but several of the subdags never run. Their tasks are registered with the
scheduler, and the UI shows the subdag operator task to be 'running'. The
subdag tasks which aren't running have status 'null'. Clicking 'Task
Instance Details' shows me a dependencies problem:

Dag Not Paused Task's DAG 'ticket_gateways.ticket_gateways_subdag' is
paused.
For clarity, "ticket_gateways.ticket_gateways_subdag" is one of the paused
subdags in question. Any reason why this happens, or what I can do to
avoid, or even just, how do I unpause the subdags?

Thanks,
Paul



# ... import stuff ...

run_date = "{{ ds }}"


def subdag_generator(parent_dag_name, child_dag_name, args):
    dag_subdag = DAG(
        dag_id='{}.{}'.format(parent_dag_name, child_dag_name),
        default_args=args,
        schedule_interval=None,
    )

    for tg, task, tables in get_ticket_gateways():
        crawl_event_task = PythonOperator(
            task_id='{}-{}_extract_event'.format(child_dag_name, tg),
            default_args=args,
            provide_context=True,
            op_args=['Tickets.extractors', task],
            templates_dict={'date': run_date},
            op_kwargs={'mode': 'extract'},
            trigger_rule='all_success',
            python_callable=airflow_callable,
            dag=dag_subdag)

        process_event_task = PythonOperator(
            task_id='{}-{}_process_event'.format(child_dag_name, tg),
            default_args=args,
            provide_context=True,
            op_args=['Tickets.extractors', task],
            op_kwargs={'mode': 'process'},
            templates_dict={'date': run_date},
            trigger_rule='all_success',
            python_callable=airflow_callable,
            dag=dag_subdag)

        post_to_slack_task = SlackAPIPostOperator(
            task_id='{}-{}_post_to_slack'.format(child_dag_name, tg),
            default_args=args,
            token=SLACK_TOKEN,
            channel=SLACK_LOG_CHANNEL,
            text='{} finished running for execution {}.'.format(tg,
run_date),
            dag=dag_subdag)

        crawl_event_task.set_downstream(process_event_task)
        process_event_task.set_downstream(post_to_slack_task)

    return dag_subdag


dag = DAG(
    dag_id='ticket_gateways', default_args=DAG_DEFAULT_ARGS,
    schedule_interval=get_dag_schedule()) # just '@weekly' or '@once'

subdag_task = SubDagOperator(
    task_id='ticket_gateways_subdag',
    subdag=subdag_generator('ticket_gateways', 'ticket_gateways_subdag',
DAG_DEFAULT_ARGS),
    dag=dag
)

post_to_slack_task = SlackAPIPostOperator(
    task_id='ticket_gateways_post_to_slack',
    token=SLACK_TOKEN,
    channel=SLACK_LOG_CHANNEL,
    trigger_rule='all_done',
    text='success',
    dag=dag
)

notify_slack_task = SlackAPIPostOperator(
    task_id='ticket_gateways_notify_slack',
    token=SLACK_TOKEN,
    channel=SLACK_NOTIFICATION_CHANNEL,
    trigger_rule='one_failed',
    text='failed',
    dag=dag
)

subdag_task.set_downstream([post_to_slack_task, notify_slack_task])


if __name__ == "__main__":
    dag.cli()

-- 

*Paul Elliott | 엘리엇폴*
Developer / Platform Dev. team
paul@mymusictaste.com
+82-10-2990-8642

Re: Issue with Paused SubDAG

Posted by Paul Elliot <pa...@mymusictaste.com>.
[OP found a solution]

Alright so I did some more digging and found the references to subdag
issues in 1.8.0, particularly in a thread "1.8.0 Backfill Clarification"
which helped quite a bit.

We ended up switching to a vanilla main dag with a fat branch :   (start
task) < { a fat list of parallel branch pipelines with 3 or so tasks each }
> (end task) - (..other stuff..)

[In an attempt to be useful, ]I did notice a couple things when hacking at
the subdag solution.

1. From the airflow subdag docs
<https://airflow.incubator.apache.org/concepts.html#subdags> state:

> SubDAGs must have a schedule and be enabled. If the SubDAG’s schedule is
> set to None or @once, the SubDAG will succeed without having done anything

that doesn't seem to be true, in my experience. Both the working example
using subdags and my own implementation give the subdag a schedule of None,
and will run the subdag tasks (and certainly not succeed).

2. Probably known and likely that deadlock issue mentioned in "1.8.0
Backfill Clarification" thread but, if the main dag's subdag operator has
retries=<0|less than ANY retries for subdag tasks>, the 'backfill' gets
deadlocked. Setting all subdag tasks and my subdag operator to retries=0
solved that, but not my other problems.

Regardless thanks to whoever/whatever snippet of code allowed me on the
mailing list ;)

P

On Mon, Aug 21, 2017 at 12:34 PM, Paul Elliot <pa...@mymusictaste.com> wrote:

> Quick update, I see a log message on the paused subdag tasks in question:
>
> FIXME: Rescheduling due to concurrency limits reached at task runtime.
> Attempt 1 of 2. State set to NONE.
>
> I dunno if that helps or if there's something I can do about this.
>
> Cheers,
> P
>
> On Thu, Aug 17, 2017 at 7:14 PM, Paul Elliot <pa...@mymusictaste.com>
> wrote:
>
>> Hi all,
>>
>> We've been using Airflow for the past year in our data operations. Been
>> really happy with it so far!
>>
>> Environment: Airflow 1.8.0 with the LocalExecutor running on a single
>> machine.
>>
>> Recently ran into an issue with a DAG containing one subdag operator with
>> around 20 - 30 simple ETL subdags. Relevant code posted below. I poked
>> around the mailing list archives but couldn't find anything related.
>>
>> When running the main DAG, things seem to work for the majority of the
>> tasks. Then the DAG freezes: scheduler looks fine, main DAG is not paused,
>> but several of the subdags never run. Their tasks are registered with the
>> scheduler, and the UI shows the subdag operator task to be 'running'. The
>> subdag tasks which aren't running have status 'null'. Clicking 'Task
>> Instance Details' shows me a dependencies problem:
>>
>> Dag Not Paused Task's DAG 'ticket_gateways.ticket_gateways_subdag' is
>> paused.
>> For clarity, "ticket_gateways.ticket_gateways_subdag" is one of the
>> paused subdags in question. Any reason why this happens, or what I can do
>> to avoid, or even just, how do I unpause the subdags?
>>
>> Thanks,
>> Paul
>>
>>
>>
>> # ... import stuff ...
>>
>> run_date = "{{ ds }}"
>>
>>
>> def subdag_generator(parent_dag_name, child_dag_name, args):
>>     dag_subdag = DAG(
>>         dag_id='{}.{}'.format(parent_dag_name, child_dag_name),
>>         default_args=args,
>>         schedule_interval=None,
>>     )
>>
>>     for tg, task, tables in get_ticket_gateways():
>>         crawl_event_task = PythonOperator(
>>             task_id='{}-{}_extract_event'.format(child_dag_name, tg),
>>             default_args=args,
>>             provide_context=True,
>>             op_args=['Tickets.extractors', task],
>>             templates_dict={'date': run_date},
>>             op_kwargs={'mode': 'extract'},
>>             trigger_rule='all_success',
>>             python_callable=airflow_callable,
>>             dag=dag_subdag)
>>
>>         process_event_task = PythonOperator(
>>             task_id='{}-{}_process_event'.format(child_dag_name, tg),
>>             default_args=args,
>>             provide_context=True,
>>             op_args=['Tickets.extractors', task],
>>             op_kwargs={'mode': 'process'},
>>             templates_dict={'date': run_date},
>>             trigger_rule='all_success',
>>             python_callable=airflow_callable,
>>             dag=dag_subdag)
>>
>>         post_to_slack_task = SlackAPIPostOperator(
>>             task_id='{}-{}_post_to_slack'.format(child_dag_name, tg),
>>             default_args=args,
>>             token=SLACK_TOKEN,
>>             channel=SLACK_LOG_CHANNEL,
>>             text='{} finished running for execution {}.'.format(tg,
>> run_date),
>>             dag=dag_subdag)
>>
>>         crawl_event_task.set_downstream(process_event_task)
>>         process_event_task.set_downstream(post_to_slack_task)
>>
>>     return dag_subdag
>>
>>
>> dag = DAG(
>>     dag_id='ticket_gateways', default_args=DAG_DEFAULT_ARGS,
>>     schedule_interval=get_dag_schedule()) # just '@weekly' or '@once'
>>
>> subdag_task = SubDagOperator(
>>     task_id='ticket_gateways_subdag',
>>     subdag=subdag_generator('ticket_gateways', 'ticket_gateways_subdag',
>> DAG_DEFAULT_ARGS),
>>     dag=dag
>> )
>>
>> post_to_slack_task = SlackAPIPostOperator(
>>     task_id='ticket_gateways_post_to_slack',
>>     token=SLACK_TOKEN,
>>     channel=SLACK_LOG_CHANNEL,
>>     trigger_rule='all_done',
>>     text='success',
>>     dag=dag
>> )
>>
>> notify_slack_task = SlackAPIPostOperator(
>>     task_id='ticket_gateways_notify_slack',
>>     token=SLACK_TOKEN,
>>     channel=SLACK_NOTIFICATION_CHANNEL,
>>     trigger_rule='one_failed',
>>     text='failed',
>>     dag=dag
>> )
>>
>> subdag_task.set_downstream([post_to_slack_task, notify_slack_task])
>>
>>
>> if __name__ == "__main__":
>>     dag.cli()
>>
>> --
>>
>> *Paul Elliott | 엘리엇폴*
>> Developer / Platform Dev. team
>> paul@mymusictaste.com
>> +82-10-2990-8642
>>
>
>
>
> --
>
> *Paul Elliott | 엘리엇폴*
> Developer / Platform Dev. team
> paul@mymusictaste.com
> +82-10-2990-8642
>



-- 

*Paul Elliott | 엘리엇폴*
Developer / Platform Dev. team
paul@mymusictaste.com
+82-10-2990-8642

Re: Issue with Paused SubDAG

Posted by Paul Elliot <pa...@mymusictaste.com>.
Quick update, I see a log message on the paused subdag tasks in question:

FIXME: Rescheduling due to concurrency limits reached at task runtime.
Attempt 1 of 2. State set to NONE.

I dunno if that helps or if there's something I can do about this.

Cheers,
P

On Thu, Aug 17, 2017 at 7:14 PM, Paul Elliot <pa...@mymusictaste.com> wrote:

> Hi all,
>
> We've been using Airflow for the past year in our data operations. Been
> really happy with it so far!
>
> Environment: Airflow 1.8.0 with the LocalExecutor running on a single
> machine.
>
> Recently ran into an issue with a DAG containing one subdag operator with
> around 20 - 30 simple ETL subdags. Relevant code posted below. I poked
> around the mailing list archives but couldn't find anything related.
>
> When running the main DAG, things seem to work for the majority of the
> tasks. Then the DAG freezes: scheduler looks fine, main DAG is not paused,
> but several of the subdags never run. Their tasks are registered with the
> scheduler, and the UI shows the subdag operator task to be 'running'. The
> subdag tasks which aren't running have status 'null'. Clicking 'Task
> Instance Details' shows me a dependencies problem:
>
> Dag Not Paused Task's DAG 'ticket_gateways.ticket_gateways_subdag' is
> paused.
> For clarity, "ticket_gateways.ticket_gateways_subdag" is one of the
> paused subdags in question. Any reason why this happens, or what I can do
> to avoid, or even just, how do I unpause the subdags?
>
> Thanks,
> Paul
>
>
>
> # ... import stuff ...
>
> run_date = "{{ ds }}"
>
>
> def subdag_generator(parent_dag_name, child_dag_name, args):
>     dag_subdag = DAG(
>         dag_id='{}.{}'.format(parent_dag_name, child_dag_name),
>         default_args=args,
>         schedule_interval=None,
>     )
>
>     for tg, task, tables in get_ticket_gateways():
>         crawl_event_task = PythonOperator(
>             task_id='{}-{}_extract_event'.format(child_dag_name, tg),
>             default_args=args,
>             provide_context=True,
>             op_args=['Tickets.extractors', task],
>             templates_dict={'date': run_date},
>             op_kwargs={'mode': 'extract'},
>             trigger_rule='all_success',
>             python_callable=airflow_callable,
>             dag=dag_subdag)
>
>         process_event_task = PythonOperator(
>             task_id='{}-{}_process_event'.format(child_dag_name, tg),
>             default_args=args,
>             provide_context=True,
>             op_args=['Tickets.extractors', task],
>             op_kwargs={'mode': 'process'},
>             templates_dict={'date': run_date},
>             trigger_rule='all_success',
>             python_callable=airflow_callable,
>             dag=dag_subdag)
>
>         post_to_slack_task = SlackAPIPostOperator(
>             task_id='{}-{}_post_to_slack'.format(child_dag_name, tg),
>             default_args=args,
>             token=SLACK_TOKEN,
>             channel=SLACK_LOG_CHANNEL,
>             text='{} finished running for execution {}.'.format(tg,
> run_date),
>             dag=dag_subdag)
>
>         crawl_event_task.set_downstream(process_event_task)
>         process_event_task.set_downstream(post_to_slack_task)
>
>     return dag_subdag
>
>
> dag = DAG(
>     dag_id='ticket_gateways', default_args=DAG_DEFAULT_ARGS,
>     schedule_interval=get_dag_schedule()) # just '@weekly' or '@once'
>
> subdag_task = SubDagOperator(
>     task_id='ticket_gateways_subdag',
>     subdag=subdag_generator('ticket_gateways', 'ticket_gateways_subdag',
> DAG_DEFAULT_ARGS),
>     dag=dag
> )
>
> post_to_slack_task = SlackAPIPostOperator(
>     task_id='ticket_gateways_post_to_slack',
>     token=SLACK_TOKEN,
>     channel=SLACK_LOG_CHANNEL,
>     trigger_rule='all_done',
>     text='success',
>     dag=dag
> )
>
> notify_slack_task = SlackAPIPostOperator(
>     task_id='ticket_gateways_notify_slack',
>     token=SLACK_TOKEN,
>     channel=SLACK_NOTIFICATION_CHANNEL,
>     trigger_rule='one_failed',
>     text='failed',
>     dag=dag
> )
>
> subdag_task.set_downstream([post_to_slack_task, notify_slack_task])
>
>
> if __name__ == "__main__":
>     dag.cli()
>
> --
>
> *Paul Elliott | 엘리엇폴*
> Developer / Platform Dev. team
> paul@mymusictaste.com
> +82-10-2990-8642
>



-- 

*Paul Elliott | 엘리엇폴*
Developer / Platform Dev. team
paul@mymusictaste.com
+82-10-2990-8642