You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Ping Zhang <pi...@umich.edu> on 2021/12/17 00:19:45 UTC

[DISCUSS] Leverage serialized DAG in airflow run local process to avoid dag parsing

Hi Airflow Community,


We (Airbnb) would like to open source an internal developed feature:
streaming deserialize the airflow task in airflow run process to avoid dag
parsing.

This has been running in our prod for around 3 quarters and it is very
stable. It helped to bring down average task start time from 24 seconds to
3 seconds (across all 3 airflow clusters) and reduce peak memory usage by
around 45%

I will create an AIP after the discussion.

Thanks,

Ping

Motivation

To run airflow tasks, airflow needs to parse dag file twice, once in
airflow run local process, once in airflow run raw. This is a waste of
memory, doubling the dag parsing ram. During the peak hour, the CPU can
spike for a long time due to double parsing, thus slowing down the task
starting time , even causing dag parsing timeout. This is especially true
when there are complex large dags in the airflow cluster.

What change do you propose to make?

The dag parsing in the `airflow run local` can be removed by reading the
dag from the serialized_dag table. Also, the memory usage can be reduced by
only deserializing the target task in a streaming way instead of
deserializing the whole dag object.

Re: [DISCUSS] Leverage serialized DAG in airflow run local process to avoid dag parsing

Posted by Ping Zhang <pi...@umich.edu>.
Hi Kaxil, Jarek and Ash,

Here is the AIP
https://cwiki.apache.org/confluence/display/AIRFLOW/Remove+double+dag+parsing+in+airflow+run.
Looking forward to your feedback.

Thanks,

Ping


On Mon, Dec 20, 2021 at 10:46 AM Ping Zhang <pi...@umich.edu> wrote:

> Hi Kaxil,
>
> Thanks for the comment. The serialized_dag isn't used to run the task in
> the `airflow run --raw` process. It is used in the `airflow run --local` to
> perform `check_and_change_state_before_execution`
>
> https://github.com/apache/airflow/blob/main/airflow/jobs/local_task_job.py#L88-L99
>
>
> Thanks,
>
> Ping
>
>
> On Mon, Dec 20, 2021 at 4:51 AM Kaxil Naik <ka...@gmail.com> wrote:
>
>> Yup, forking only applies when os.fork is available and run_as_user
>> isn't specified. We had only added enough details in Serialized DAGs that
>> are needed for the Webserver and to make any scheduling decisions in the
>> Scheduler.
>>
>> So it does not contain all the information (all the args, kwargs
>> including callables) required to run the task.
>>
>> Looking forward for the AIP.
>>
>> Regards,
>> Kaxil
>>
>> On Fri, Dec 17, 2021 at 11:04 PM Ping Zhang <pi...@umich.edu> wrote:
>>
>>> Hi Ash,
>>>
>>> Thanks for the inputs about the fork approach. I have checked the code.
>>> The fork only applies when there is no run_as_user. I think the run_as_user
>>> is an important feature.
>>>
>>> I will create an AIP with more details.
>>>
>>> Best wishes
>>>
>>> Ping Zhang
>>>
>>>
>>> On Fri, Dec 17, 2021 at 9:59 AM Jarek Potiuk <ja...@potiuk.com> wrote:
>>>
>>>> Yeah. I would also love to see some details in the meeting I proposed
>>>> :). I am particularly interested about the current limitation of the
>>>> solution in "general" case.
>>>>
>>>> J,
>>>>
>>>> On Fri, Dec 17, 2021 at 11:16 AM Ash Berlin-Taylor <as...@apache.org>
>>>> wrote:
>>>> >
>>>> > On Thu, Dec 16 2021 at 16:19:45 -0800, Ping Zhang <pi...@umich.edu>
>>>> wrote:
>>>> >
>>>> > To run airflow tasks, airflow needs to parse dag file twice, once in
>>>> airflow run local process, once in airflow run raw
>>>> >
>>>> >
>>>> > This isn't true in most cases anymore thanks to a change from
>>>> spawning a new process (os.exec(["airflow",...]) to fork instead.
>>>> >
>>>> > The serialized_dag table doesn't (currently) contain enough
>>>> information to actually execute every dag, especially in the case of
>>>> PythonOperator, so the actual dag file on disk needs to be loaded to get
>>>> code to run, so perhaps it would be possible to do this for some operators,
>>>> but not all.
>>>> >
>>>> > Still might be worth looking at it and I'm looking forward to the
>>>> proposal!
>>>> >
>>>> > -ash
>>>>
>>>

Re: [DISCUSS] Leverage serialized DAG in airflow run local process to avoid dag parsing

Posted by Ping Zhang <pi...@umich.edu>.
Hi Kaxil,

Thanks for the comment. The serialized_dag isn't used to run the task in
the `airflow run --raw` process. It is used in the `airflow run --local` to
perform `check_and_change_state_before_execution`
https://github.com/apache/airflow/blob/main/airflow/jobs/local_task_job.py#L88-L99


Thanks,

Ping


On Mon, Dec 20, 2021 at 4:51 AM Kaxil Naik <ka...@gmail.com> wrote:

> Yup, forking only applies when os.fork is available and run_as_user isn't
> specified. We had only added enough details in Serialized DAGs that are
> needed for the Webserver and to make any scheduling decisions in the
> Scheduler.
>
> So it does not contain all the information (all the args, kwargs including
> callables) required to run the task.
>
> Looking forward for the AIP.
>
> Regards,
> Kaxil
>
> On Fri, Dec 17, 2021 at 11:04 PM Ping Zhang <pi...@umich.edu> wrote:
>
>> Hi Ash,
>>
>> Thanks for the inputs about the fork approach. I have checked the code.
>> The fork only applies when there is no run_as_user. I think the run_as_user
>> is an important feature.
>>
>> I will create an AIP with more details.
>>
>> Best wishes
>>
>> Ping Zhang
>>
>>
>> On Fri, Dec 17, 2021 at 9:59 AM Jarek Potiuk <ja...@potiuk.com> wrote:
>>
>>> Yeah. I would also love to see some details in the meeting I proposed
>>> :). I am particularly interested about the current limitation of the
>>> solution in "general" case.
>>>
>>> J,
>>>
>>> On Fri, Dec 17, 2021 at 11:16 AM Ash Berlin-Taylor <as...@apache.org>
>>> wrote:
>>> >
>>> > On Thu, Dec 16 2021 at 16:19:45 -0800, Ping Zhang <pi...@umich.edu>
>>> wrote:
>>> >
>>> > To run airflow tasks, airflow needs to parse dag file twice, once in
>>> airflow run local process, once in airflow run raw
>>> >
>>> >
>>> > This isn't true in most cases anymore thanks to a change from spawning
>>> a new process (os.exec(["airflow",...]) to fork instead.
>>> >
>>> > The serialized_dag table doesn't (currently) contain enough
>>> information to actually execute every dag, especially in the case of
>>> PythonOperator, so the actual dag file on disk needs to be loaded to get
>>> code to run, so perhaps it would be possible to do this for some operators,
>>> but not all.
>>> >
>>> > Still might be worth looking at it and I'm looking forward to the
>>> proposal!
>>> >
>>> > -ash
>>>
>>

Re: [DISCUSS] Leverage serialized DAG in airflow run local process to avoid dag parsing

Posted by Kaxil Naik <ka...@gmail.com>.
Yup, forking only applies when os.fork is available and run_as_user isn't
specified. We had only added enough details in Serialized DAGs that are
needed for the Webserver and to make any scheduling decisions in the
Scheduler.

So it does not contain all the information (all the args, kwargs including
callables) required to run the task.

Looking forward for the AIP.

Regards,
Kaxil

On Fri, Dec 17, 2021 at 11:04 PM Ping Zhang <pi...@umich.edu> wrote:

> Hi Ash,
>
> Thanks for the inputs about the fork approach. I have checked the code.
> The fork only applies when there is no run_as_user. I think the run_as_user
> is an important feature.
>
> I will create an AIP with more details.
>
> Best wishes
>
> Ping Zhang
>
>
> On Fri, Dec 17, 2021 at 9:59 AM Jarek Potiuk <ja...@potiuk.com> wrote:
>
>> Yeah. I would also love to see some details in the meeting I proposed
>> :). I am particularly interested about the current limitation of the
>> solution in "general" case.
>>
>> J,
>>
>> On Fri, Dec 17, 2021 at 11:16 AM Ash Berlin-Taylor <as...@apache.org>
>> wrote:
>> >
>> > On Thu, Dec 16 2021 at 16:19:45 -0800, Ping Zhang <pi...@umich.edu>
>> wrote:
>> >
>> > To run airflow tasks, airflow needs to parse dag file twice, once in
>> airflow run local process, once in airflow run raw
>> >
>> >
>> > This isn't true in most cases anymore thanks to a change from spawning
>> a new process (os.exec(["airflow",...]) to fork instead.
>> >
>> > The serialized_dag table doesn't (currently) contain enough information
>> to actually execute every dag, especially in the case of PythonOperator, so
>> the actual dag file on disk needs to be loaded to get code to run, so
>> perhaps it would be possible to do this for some operators, but not all.
>> >
>> > Still might be worth looking at it and I'm looking forward to the
>> proposal!
>> >
>> > -ash
>>
>

Re: [DISCUSS] Leverage serialized DAG in airflow run local process to avoid dag parsing

Posted by Ping Zhang <pi...@umich.edu>.
Hi Ash,

Thanks for the inputs about the fork approach. I have checked the code. The
fork only applies when there is no run_as_user. I think the run_as_user is
an important feature.

I will create an AIP with more details.

Best wishes

Ping Zhang


On Fri, Dec 17, 2021 at 9:59 AM Jarek Potiuk <ja...@potiuk.com> wrote:

> Yeah. I would also love to see some details in the meeting I proposed
> :). I am particularly interested about the current limitation of the
> solution in "general" case.
>
> J,
>
> On Fri, Dec 17, 2021 at 11:16 AM Ash Berlin-Taylor <as...@apache.org> wrote:
> >
> > On Thu, Dec 16 2021 at 16:19:45 -0800, Ping Zhang <pi...@umich.edu>
> wrote:
> >
> > To run airflow tasks, airflow needs to parse dag file twice, once in
> airflow run local process, once in airflow run raw
> >
> >
> > This isn't true in most cases anymore thanks to a change from spawning a
> new process (os.exec(["airflow",...]) to fork instead.
> >
> > The serialized_dag table doesn't (currently) contain enough information
> to actually execute every dag, especially in the case of PythonOperator, so
> the actual dag file on disk needs to be loaded to get code to run, so
> perhaps it would be possible to do this for some operators, but not all.
> >
> > Still might be worth looking at it and I'm looking forward to the
> proposal!
> >
> > -ash
>

Re: [DISCUSS] Leverage serialized DAG in airflow run local process to avoid dag parsing

Posted by Jarek Potiuk <ja...@potiuk.com>.
Yeah. I would also love to see some details in the meeting I proposed
:). I am particularly interested about the current limitation of the
solution in "general" case.

J,

On Fri, Dec 17, 2021 at 11:16 AM Ash Berlin-Taylor <as...@apache.org> wrote:
>
> On Thu, Dec 16 2021 at 16:19:45 -0800, Ping Zhang <pi...@umich.edu> wrote:
>
> To run airflow tasks, airflow needs to parse dag file twice, once in airflow run local process, once in airflow run raw
>
>
> This isn't true in most cases anymore thanks to a change from spawning a new process (os.exec(["airflow",...]) to fork instead.
>
> The serialized_dag table doesn't (currently) contain enough information to actually execute every dag, especially in the case of PythonOperator, so the actual dag file on disk needs to be loaded to get code to run, so perhaps it would be possible to do this for some operators, but not all.
>
> Still might be worth looking at it and I'm looking forward to the proposal!
>
> -ash

Re: [DISCUSS] Leverage serialized DAG in airflow run local process to avoid dag parsing

Posted by Ash Berlin-Taylor <as...@apache.org>.
On Thu, Dec 16 2021 at 16:19:45 -0800, Ping Zhang <pi...@umich.edu> 
wrote:
> To run airflow tasks, airflow needs to parse dag file twice, once in 
> airflow run local process, once in airflow run raw

This isn't true in most cases anymore thanks to a change from spawning 
a new process (os.exec(["airflow",...]) to fork instead.

The serialized_dag table doesn't (currently) contain enough information 
to actually execute every dag, especially in the case of 
PythonOperator, so the actual dag file on disk needs to be loaded to 
get code to run, so perhaps it would be possible to do this for some 
operators, but not all.

Still might be worth looking at it and I'm looking forward to the 
proposal!

-ash