You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Bolke de Bruin <bd...@gmail.com> on 2018/08/04 13:10:19 UTC

The need for LocalTaskJob

Hi Max, Dan et al,

Currently, when a scheduled task runs this happens in three steps: 

1. Worker 
2. LocalTaskJob
3. Raw task instance

It uses (by default) 5 (!) different processes:

1. Worker 
2. Bash + Airflow
3. Bash + Airflow 

I think we can merge worker and LocalTaskJob as the latter seems exist only to track a particular task. This can be done within the worker without side effects. Next to thatI think we can limit the amount of (airflow) processes to 2 if we remove the bash dependency. I don’t see any reason to depend on bash.

Can you guys shed some light on what the thoughts were around those choices? Am I missing anything on why they should exist?

Cheers
Bolke

Verstuurd vanaf mijn iPad

Re: The need for LocalTaskJob

Posted by Maxime Beauchemin <ma...@gmail.com>.
Yes clearly this area needs TLC. Thanks for getting the ball rolling.

Max

On Sat, Aug 4, 2018 at 1:58 PM Ash Berlin-Taylor <
ash_airflowlist@firemirror.com> wrote:

>
> > On 4 Aug 2018, at 21:25, Bolke de Bruin <bd...@gmail.com> wrote:
> >
> > We can just execute “python” just fine. Because it will run in a
> separate interpreter no issues will come from sys.modules as that is not
> inherited. Will still parse DAGs in a separate process then. Forking (@ash)
> probably does not work as that does share sys.modules.
>
> Some sharing of modules was my idea - if we are careful about what modules
> we load, and we only load the airflow core pre fork, and don't parse any
> DAG pre-fork, then forking sharing currently loaded modules is a good thing
> for speed. Think of it like the preload_app option to a gunicorn worker,
> where the master loads the app and then forks.
>
> > [snip]
> >
> > I’m writing AIP-2
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-2+Simplify+process+launching
> to work this out.
>
> Sounds good. I'm not proposing we try my forking idea yet, and your
> proposal is a definite improvement from where we are now.
>
> >
> > B.
> >
> > Verstuurd vanaf mijn iPad
> >
> >> Op 4 aug. 2018 om 19:40 heeft Ash Berlin-Taylor <
> ash_airflowlist@firemirror.com> het volgende geschreven:
> >>
> >> Comments inline.
> >>
> >>> On 4 Aug 2018, at 18:28, Maxime Beauchemin <ma...@gmail.com>
> wrote:
> >>>
> >>> Let me confirm I'm understanding this right, we're talking specifically
> >>> about the CeleryExecutor not starting and `airflow run` (not --raw)
> >>> command, and fire up a LocalTaskJob instead? Then we'd still have the
> >>> worker fire up the `airflow run --raw` command?
> >>>
> >>> Seems reasonable. One thing to keep in mind is the fact that shelling
> out
> >>> guarantees no `sys.module` caching, which is a real issue for slowly
> >>> changing DAG definitions. That's the reason why we'd have to reboot the
> >>> scheduler periodically before it used sub-processes to evaluate DAGs.
> Any
> >>> code that needs to evaluate a DAG should probably be done in a
> subprocess.
> >>
> >>>
> >>> Shelling out also allows for doing things like unix impersonation and
> >>> applying CGROUPS. This currently happens between `airflow run` and
> `airflow
> >>> run --raw`. The parent process also does heartbeat and listen for
> external
> >>> kill signal (kill pills).
> >>>
> >>> I think what we want is smarter executors and only one level of bash
> >>> command: the `airflow run --raw`, and ideally the system that fires
> this up
> >>> is not Airflow itself, and cannot be DAG-aware (or it will need to get
> >>> restarted to flush the cache).
> >>
> >> Rather than shelling out to `airflow run` could we instead fork and run
> the CLI code directly? This involves parsing the config twice, loading all
> of the airflow and SQLAlchemy deps twice etc. This I think would account
> for a not-insignificant speed difference for the unit tests. In the case of
> impersonation we'd probably have no option but to exec `airflow`, but
> most(?) people don't use that?
> >>
> >> Avoiding the extra parsing pentalty and process when we don't need it
> might be worth it for test speed up alone. And we've already got
> impersonation covered in the tests so we'll know that it still works.
> >>
> >>>
> >>> To me that really brings up the whole question of what should be
> handled by
> >>> the Executor, and what belongs in core Airflow. The Executor needs to
> do
> >>> more, and Airflow core less.
> >>
> >> I agree with the sentiment that Core should do less and Executors more
> -- many parts of the core are reimplementing what Celery itself could do.
> >>
> >>
> >>>
> >>> When you think about how this should all work on Kubernetes, it looks
> >>> something like this:
> >>> * the scheduler, through KubeExecutor, calls the k8s API, tells it to
> fire
> >>> up and Airflow task
> >>> * container boots up and starts an `airflow run --raw` command
> >>> * k8s handles heartbeats, monitors tasks, knows how to kill a running
> task
> >>> * the scheduler process (call it supervisor), talks with k8s through
> >>> KubeExecutor
> >>> and handles zombie cleanup and sending kill pills
> >>>
> >>> Now because Celery doesn't offer as many guarantees it gets a bit more
> >>> tricky. Is there even a way to send a kill pill through Celery? Are
> there
> >>> other ways than using a parent process to accomplish this?
> >>
> >> It does
> http://docs.celeryproject.org/en/latest/userguide/workers.html#revoke-revoking-tasks
> (at least it does now)
> >>
> >>>
> >>> At a higher level, it seems like we need to move more logic from core
> >>> Airflow into the executors. For instance, the heartbeat construct
> should
> >>> probably be 100% handled by the executor, and not an assumption in the
> core
> >>> code base.
> >>>
> >>> I think I drifted a bit, hopefully that's still helpful.
> >>>
> >>> Max
>
>

Re: The need for LocalTaskJob

Posted by Ash Berlin-Taylor <as...@firemirror.com>.
> On 4 Aug 2018, at 21:25, Bolke de Bruin <bd...@gmail.com> wrote:
> 
> We can just execute “python” just fine. Because it will run in a separate interpreter no issues will come from sys.modules as that is not inherited. Will still parse DAGs in a separate process then. Forking (@ash) probably does not work as that does share sys.modules. 

Some sharing of modules was my idea - if we are careful about what modules we load, and we only load the airflow core pre fork, and don't parse any DAG pre-fork, then forking sharing currently loaded modules is a good thing for speed. Think of it like the preload_app option to a gunicorn worker, where the master loads the app and then forks.

> [snip]
> 
> I’m writing AIP-2 https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-2+Simplify+process+launching to work this out.

Sounds good. I'm not proposing we try my forking idea yet, and your proposal is a definite improvement from where we are now.

> 
> B.
> 
> Verstuurd vanaf mijn iPad
> 
>> Op 4 aug. 2018 om 19:40 heeft Ash Berlin-Taylor <as...@firemirror.com> het volgende geschreven:
>> 
>> Comments inline.
>> 
>>> On 4 Aug 2018, at 18:28, Maxime Beauchemin <ma...@gmail.com> wrote:
>>> 
>>> Let me confirm I'm understanding this right, we're talking specifically
>>> about the CeleryExecutor not starting and `airflow run` (not --raw)
>>> command, and fire up a LocalTaskJob instead? Then we'd still have the
>>> worker fire up the `airflow run --raw` command?
>>> 
>>> Seems reasonable. One thing to keep in mind is the fact that shelling out
>>> guarantees no `sys.module` caching, which is a real issue for slowly
>>> changing DAG definitions. That's the reason why we'd have to reboot the
>>> scheduler periodically before it used sub-processes to evaluate DAGs. Any
>>> code that needs to evaluate a DAG should probably be done in a subprocess.
>> 
>>> 
>>> Shelling out also allows for doing things like unix impersonation and
>>> applying CGROUPS. This currently happens between `airflow run` and `airflow
>>> run --raw`. The parent process also does heartbeat and listen for external
>>> kill signal (kill pills).
>>> 
>>> I think what we want is smarter executors and only one level of bash
>>> command: the `airflow run --raw`, and ideally the system that fires this up
>>> is not Airflow itself, and cannot be DAG-aware (or it will need to get
>>> restarted to flush the cache).
>> 
>> Rather than shelling out to `airflow run` could we instead fork and run the CLI code directly? This involves parsing the config twice, loading all of the airflow and SQLAlchemy deps twice etc. This I think would account for a not-insignificant speed difference for the unit tests. In the case of impersonation we'd probably have no option but to exec `airflow`, but most(?) people don't use that?
>> 
>> Avoiding the extra parsing pentalty and process when we don't need it might be worth it for test speed up alone. And we've already got impersonation covered in the tests so we'll know that it still works.
>> 
>>> 
>>> To me that really brings up the whole question of what should be handled by
>>> the Executor, and what belongs in core Airflow. The Executor needs to do
>>> more, and Airflow core less.
>> 
>> I agree with the sentiment that Core should do less and Executors more -- many parts of the core are reimplementing what Celery itself could do.
>> 
>> 
>>> 
>>> When you think about how this should all work on Kubernetes, it looks
>>> something like this:
>>> * the scheduler, through KubeExecutor, calls the k8s API, tells it to fire
>>> up and Airflow task
>>> * container boots up and starts an `airflow run --raw` command
>>> * k8s handles heartbeats, monitors tasks, knows how to kill a running task
>>> * the scheduler process (call it supervisor), talks with k8s through
>>> KubeExecutor
>>> and handles zombie cleanup and sending kill pills
>>> 
>>> Now because Celery doesn't offer as many guarantees it gets a bit more
>>> tricky. Is there even a way to send a kill pill through Celery? Are there
>>> other ways than using a parent process to accomplish this?
>> 
>> It does http://docs.celeryproject.org/en/latest/userguide/workers.html#revoke-revoking-tasks (at least it does now)
>> 
>>> 
>>> At a higher level, it seems like we need to move more logic from core
>>> Airflow into the executors. For instance, the heartbeat construct should
>>> probably be 100% handled by the executor, and not an assumption in the core
>>> code base.
>>> 
>>> I think I drifted a bit, hopefully that's still helpful.
>>> 
>>> Max


Re: The need for LocalTaskJob

Posted by Bolke de Bruin <bd...@gmail.com>.
It is actually all Executors doing this (at least Local, Celery). And yes (although you description is a bit cryptic) I think you are right. 

What you call “shelling out” does not really cover what happens on a process level though. We execute “bash -c” with “shell=True” which probably makes the issue worse. Basically what happens is “<run_user_shell> -> “bash -c” -> “python (airflow)”. That’s three processes and then twice. 

We can just execute “python” just fine. Because it will run in a separate interpreter no issues will come from sys.modules as that is not inherited. Will still parse DAGs in a separate process then. Forking (@ash) probably does not work as that does share sys.modules. 

Same goes for jobs running through sudo and with cgroups. No shell is required at all. 

The worker of the executor we can relatively easily extend to take over what LocalTaskJob does. If necessary we can keep it a bit dumber and either report back by API or MQ instead of DB.

The way we handle SIGTERM is pretty messy anyways and not really standard (we need to kill all descendent processes most of them are our own, e.g. airflow core). It also can be handled within the executor/worker. A cleanup will probably increase reliability. 

I’m writing AIP-2 https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-2+Simplify+process+launching to work this out.

B.

Verstuurd vanaf mijn iPad

> Op 4 aug. 2018 om 19:40 heeft Ash Berlin-Taylor <as...@firemirror.com> het volgende geschreven:
> 
> Comments inline.
> 
>> On 4 Aug 2018, at 18:28, Maxime Beauchemin <ma...@gmail.com> wrote:
>> 
>> Let me confirm I'm understanding this right, we're talking specifically
>> about the CeleryExecutor not starting and `airflow run` (not --raw)
>> command, and fire up a LocalTaskJob instead? Then we'd still have the
>> worker fire up the `airflow run --raw` command?
>> 
>> Seems reasonable. One thing to keep in mind is the fact that shelling out
>> guarantees no `sys.module` caching, which is a real issue for slowly
>> changing DAG definitions. That's the reason why we'd have to reboot the
>> scheduler periodically before it used sub-processes to evaluate DAGs. Any
>> code that needs to evaluate a DAG should probably be done in a subprocess.
> 
>> 
>> Shelling out also allows for doing things like unix impersonation and
>> applying CGROUPS. This currently happens between `airflow run` and `airflow
>> run --raw`. The parent process also does heartbeat and listen for external
>> kill signal (kill pills).
>> 
>> I think what we want is smarter executors and only one level of bash
>> command: the `airflow run --raw`, and ideally the system that fires this up
>> is not Airflow itself, and cannot be DAG-aware (or it will need to get
>> restarted to flush the cache).
> 
> Rather than shelling out to `airflow run` could we instead fork and run the CLI code directly? This involves parsing the config twice, loading all of the airflow and SQLAlchemy deps twice etc. This I think would account for a not-insignificant speed difference for the unit tests. In the case of impersonation we'd probably have no option but to exec `airflow`, but most(?) people don't use that?
> 
> Avoiding the extra parsing pentalty and process when we don't need it might be worth it for test speed up alone. And we've already got impersonation covered in the tests so we'll know that it still works.
> 
>> 
>> To me that really brings up the whole question of what should be handled by
>> the Executor, and what belongs in core Airflow. The Executor needs to do
>> more, and Airflow core less.
> 
> I agree with the sentiment that Core should do less and Executors more -- many parts of the core are reimplementing what Celery itself could do.
> 
> 
>> 
>> When you think about how this should all work on Kubernetes, it looks
>> something like this:
>> * the scheduler, through KubeExecutor, calls the k8s API, tells it to fire
>> up and Airflow task
>> * container boots up and starts an `airflow run --raw` command
>> * k8s handles heartbeats, monitors tasks, knows how to kill a running task
>> * the scheduler process (call it supervisor), talks with k8s through
>> KubeExecutor
>> and handles zombie cleanup and sending kill pills
>> 
>> Now because Celery doesn't offer as many guarantees it gets a bit more
>> tricky. Is there even a way to send a kill pill through Celery? Are there
>> other ways than using a parent process to accomplish this?
> 
> It does http://docs.celeryproject.org/en/latest/userguide/workers.html#revoke-revoking-tasks (at least it does now)
> 
>> 
>> At a higher level, it seems like we need to move more logic from core
>> Airflow into the executors. For instance, the heartbeat construct should
>> probably be 100% handled by the executor, and not an assumption in the core
>> code base.
>> 
>> I think I drifted a bit, hopefully that's still helpful.
>> 
>> Max

Re: The need for LocalTaskJob

Posted by Ash Berlin-Taylor <as...@firemirror.com>.
Comments inline.

> On 4 Aug 2018, at 18:28, Maxime Beauchemin <ma...@gmail.com> wrote:
> 
> Let me confirm I'm understanding this right, we're talking specifically
> about the CeleryExecutor not starting and `airflow run` (not --raw)
> command, and fire up a LocalTaskJob instead? Then we'd still have the
> worker fire up the `airflow run --raw` command?
> 
> Seems reasonable. One thing to keep in mind is the fact that shelling out
> guarantees no `sys.module` caching, which is a real issue for slowly
> changing DAG definitions. That's the reason why we'd have to reboot the
> scheduler periodically before it used sub-processes to evaluate DAGs. Any
> code that needs to evaluate a DAG should probably be done in a subprocess.

> 
> Shelling out also allows for doing things like unix impersonation and
> applying CGROUPS. This currently happens between `airflow run` and `airflow
> run --raw`. The parent process also does heartbeat and listen for external
> kill signal (kill pills).
> 
> I think what we want is smarter executors and only one level of bash
> command: the `airflow run --raw`, and ideally the system that fires this up
> is not Airflow itself, and cannot be DAG-aware (or it will need to get
> restarted to flush the cache).

Rather than shelling out to `airflow run` could we instead fork and run the CLI code directly? This involves parsing the config twice, loading all of the airflow and SQLAlchemy deps twice etc. This I think would account for a not-insignificant speed difference for the unit tests. In the case of impersonation we'd probably have no option but to exec `airflow`, but most(?) people don't use that?

Avoiding the extra parsing pentalty and process when we don't need it might be worth it for test speed up alone. And we've already got impersonation covered in the tests so we'll know that it still works.

> 
> To me that really brings up the whole question of what should be handled by
> the Executor, and what belongs in core Airflow. The Executor needs to do
> more, and Airflow core less.

I agree with the sentiment that Core should do less and Executors more -- many parts of the core are reimplementing what Celery itself could do.


> 
> When you think about how this should all work on Kubernetes, it looks
> something like this:
> * the scheduler, through KubeExecutor, calls the k8s API, tells it to fire
> up and Airflow task
> * container boots up and starts an `airflow run --raw` command
> * k8s handles heartbeats, monitors tasks, knows how to kill a running task
> * the scheduler process (call it supervisor), talks with k8s through
> KubeExecutor
> and handles zombie cleanup and sending kill pills
> 
> Now because Celery doesn't offer as many guarantees it gets a bit more
> tricky. Is there even a way to send a kill pill through Celery? Are there
> other ways than using a parent process to accomplish this?

It does http://docs.celeryproject.org/en/latest/userguide/workers.html#revoke-revoking-tasks (at least it does now)

> 
> At a higher level, it seems like we need to move more logic from core
> Airflow into the executors. For instance, the heartbeat construct should
> probably be 100% handled by the executor, and not an assumption in the core
> code base.
> 
> I think I drifted a bit, hopefully that's still helpful.
> 
> Max

Re: The need for LocalTaskJob

Posted by Maxime Beauchemin <ma...@gmail.com>.
Let me confirm I'm understanding this right, we're talking specifically
about the CeleryExecutor not starting and `airflow run` (not --raw)
command, and fire up a LocalTaskJob instead? Then we'd still have the
worker fire up the `airflow run --raw` command?

Seems reasonable. One thing to keep in mind is the fact that shelling out
guarantees no `sys.module` caching, which is a real issue for slowly
changing DAG definitions. That's the reason why we'd have to reboot the
scheduler periodically before it used sub-processes to evaluate DAGs. Any
code that needs to evaluate a DAG should probably be done in a subprocess.

Shelling out also allows for doing things like unix impersonation and
applying CGROUPS. This currently happens between `airflow run` and `airflow
run --raw`. The parent process also does heartbeat and listen for external
kill signal (kill pills).

I think what we want is smarter executors and only one level of bash
command: the `airflow run --raw`, and ideally the system that fires this up
is not Airflow itself, and cannot be DAG-aware (or it will need to get
restarted to flush the cache).

To me that really brings up the whole question of what should be handled by
the Executor, and what belongs in core Airflow. The Executor needs to do
more, and Airflow core less.

When you think about how this should all work on Kubernetes, it looks
something like this:
* the scheduler, through KubeExecutor, calls the k8s API, tells it to fire
up and Airflow task
* container boots up and starts an `airflow run --raw` command
* k8s handles heartbeats, monitors tasks, knows how to kill a running task
* the scheduler process (call it supervisor), talks with k8s through
KubeExecutor
and handles zombie cleanup and sending kill pills

Now because Celery doesn't offer as many guarantees it gets a bit more
tricky. Is there even a way to send a kill pill through Celery? Are there
other ways than using a parent process to accomplish this?

At a higher level, it seems like we need to move more logic from core
Airflow into the executors. For instance, the heartbeat construct should
probably be 100% handled by the executor, and not an assumption in the core
code base.

I think I drifted a bit, hopefully that's still helpful.

Max

On Sat, Aug 4, 2018 at 6:42 AM Dan Davydov <dd...@twitter.com.invalid>
wrote:

> Alex (cc'd) brought this up to me about this a while ago too, and I agreed
> with him. It is definitely something we should do, I remember there were
> some things that were a bit tricky about removing the intermediate process
> and would be a bit of work to fix (something about the tasks needing to
> heartbeat the parent process maybe?).
>
> TLDR: No blockers from me, just might be a bit of work to implement.
>
>
> On Sat, Aug 4, 2018 at 9:15 AM Bolke de Bruin <bd...@gmail.com> wrote:
>
> > Hi Max, Dan et al,
> >
> > Currently, when a scheduled task runs this happens in three steps:
> >
> > 1. Worker
> > 2. LocalTaskJob
> > 3. Raw task instance
> >
> > It uses (by default) 5 (!) different processes:
> >
> > 1. Worker
> > 2. Bash + Airflow
> > 3. Bash + Airflow
> >
> > I think we can merge worker and LocalTaskJob as the latter seems exist
> > only to track a particular task. This can be done within the worker
> without
> > side effects. Next to thatI think we can limit the amount of (airflow)
> > processes to 2 if we remove the bash dependency. I don’t see any reason
> to
> > depend on bash.
> >
> > Can you guys shed some light on what the thoughts were around those
> > choices? Am I missing anything on why they should exist?
> >
> > Cheers
> > Bolke
> >
> > Verstuurd vanaf mijn iPad
>

Re: The need for LocalTaskJob

Posted by Dan Davydov <dd...@twitter.com.INVALID>.
Alex (cc'd) brought this up to me about this a while ago too, and I agreed
with him. It is definitely something we should do, I remember there were
some things that were a bit tricky about removing the intermediate process
and would be a bit of work to fix (something about the tasks needing to
heartbeat the parent process maybe?).

TLDR: No blockers from me, just might be a bit of work to implement.


On Sat, Aug 4, 2018 at 9:15 AM Bolke de Bruin <bd...@gmail.com> wrote:

> Hi Max, Dan et al,
>
> Currently, when a scheduled task runs this happens in three steps:
>
> 1. Worker
> 2. LocalTaskJob
> 3. Raw task instance
>
> It uses (by default) 5 (!) different processes:
>
> 1. Worker
> 2. Bash + Airflow
> 3. Bash + Airflow
>
> I think we can merge worker and LocalTaskJob as the latter seems exist
> only to track a particular task. This can be done within the worker without
> side effects. Next to thatI think we can limit the amount of (airflow)
> processes to 2 if we remove the bash dependency. I don’t see any reason to
> depend on bash.
>
> Can you guys shed some light on what the thoughts were around those
> choices? Am I missing anything on why they should exist?
>
> Cheers
> Bolke
>
> Verstuurd vanaf mijn iPad