You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by "grantnicholas2015@u.northwestern.edu" <gr...@u.northwestern.edu> on 2017/08/06 17:25:01 UTC

Repurposing the `queue` field for non-celery executors

Basic proposal:

Change the queue column in the TaskInstance table from a varchar(n) => pickle. Additionally rename the column from queue => executor_config. 

Why?

Currently the `queue` column is only used for the CeleryExecutor since the concept of a `queue` doesn't make sense for other executors. Adding this extra column that is only used by one executor was a fine solution when airflow did not have many pluggable backends, but as the number of backends that need configuration increases it probably makes sense to make some abstraction for the concept of "executor specific configurations". 

This was driven out of conversations about the KubernetesExecutor. Specifically, needing a way to configure on a per-task basis properties for the pod on Kubernetes (think: docker image, any volume mounts or secrets, etc). One proposal during the meeting was to add a new "docker_image" column in the baseoperator/task_instance table. However, this requires adding a new custom column that will only be used by one executor, increasing the amount of unused fields in the baseoperator and requiring airflow core code changes whenever a new configuration field is needed. 

By moving all executor-specific configuration into the `executor_config` field, this will let us extend airflow as needed with new backends and new configs without any core code changes. 

Here is a before and after view from before the switch to after the switch (targeting a celery backend)

t1  = PythonOperator(
    task_id="task1",
    python_callable=my_func,
    queue="my-small-queue"
)

t1  = PythonOperator(
    task_id="task1",
    python_callable=my_func,
    executor_config={"queue": "my-small-queue"}
)

Here is what task configuration for the Kubernetes Executor could look like using this concept:

t1  = PythonOperator(
    task_id="task1",
    python_callable=my_func,
    executor_config={"image": "my-special-image:latest", "volume_mounts" : "..."}
)

Let me know what your thoughts are. At my current company, I implemented this feature for us and we are using it extensively to configure out KubernetesExecutor plugin on a per-task basis. While there are a few downsides (mainly having to deal with misconfigurations inside the executor) overall it has worked well for us. 

Re: Repurposing the `queue` field for non-celery executors

Posted by Daniel Imberman <da...@gmail.com>.
I think its a really good idea from the kubernetes executor perspective.
There are a lot of elements that should be automated when launching a pod
(i.e. a bash command wouldn't need the same amount of compute as building a
model with scikitlearn)

@Alexander
> don't you think that you give too
much power to operators to configure how executeros should acts like?

As long as we are picky about which features we allow them to write keys
for, it should be fine. If I'm only given options such as "CPU size" then
there won't be any problems. I'm a little reticent to allow users to custom
pick images in non-k8s specified pods, but I think that generalizing a dict
rather than adding new columns for each executor we add could prevent the
table from getting messy in the future.

> Why executtor itself can't decide which config to apply per queue basics?

Isn't that basically what would happen? If I write out a k8s image argument
and then use the celery executor, the executor can just disregard arguments
that don't apply to it.



On Sun, Aug 6, 2017 at 1:45 PM Alexander Shorin <kx...@gmail.com> wrote:

> I think it's a good intention, but don't you think that you give too
> much power to operators to configure how executeros should acts like?
>
> Why executtor itself can't decide which config to apply per queue
> basics? It's a question of ops domain, not devs.
>
>
> --
> ,,,^..^,,,
>
>
> On Sun, Aug 6, 2017 at 8:25 PM, grantnicholas2015@u.northwestern.edu
> <gr...@u.northwestern.edu> wrote:
> > Basic proposal:
> >
> > Change the queue column in the TaskInstance table from a varchar(n) =>
> pickle. Additionally rename the column from queue => executor_config.
> >
> > Why?
> >
> > Currently the `queue` column is only used for the CeleryExecutor since
> the concept of a `queue` doesn't make sense for other executors. Adding
> this extra column that is only used by one executor was a fine solution
> when airflow did not have many pluggable backends, but as the number of
> backends that need configuration increases it probably makes sense to make
> some abstraction for the concept of "executor specific configurations".
> >
> > This was driven out of conversations about the KubernetesExecutor.
> Specifically, needing a way to configure on a per-task basis properties for
> the pod on Kubernetes (think: docker image, any volume mounts or secrets,
> etc). One proposal during the meeting was to add a new "docker_image"
> column in the baseoperator/task_instance table. However, this requires
> adding a new custom column that will only be used by one executor,
> increasing the amount of unused fields in the baseoperator and requiring
> airflow core code changes whenever a new configuration field is needed.
> >
> > By moving all executor-specific configuration into the `executor_config`
> field, this will let us extend airflow as needed with new backends and new
> configs without any core code changes.
> >
> > Here is a before and after view from before the switch to after the
> switch (targeting a celery backend)
> >
> > t1  = PythonOperator(
> >     task_id="task1",
> >     python_callable=my_func,
> >     queue="my-small-queue"
> > )
> >
> > t1  = PythonOperator(
> >     task_id="task1",
> >     python_callable=my_func,
> >     executor_config={"queue": "my-small-queue"}
> > )
> >
> > Here is what task configuration for the Kubernetes Executor could look
> like using this concept:
> >
> > t1  = PythonOperator(
> >     task_id="task1",
> >     python_callable=my_func,
> >     executor_config={"image": "my-special-image:latest", "volume_mounts"
> : "..."}
> > )
> >
> > Let me know what your thoughts are. At my current company, I implemented
> this feature for us and we are using it extensively to configure out
> KubernetesExecutor plugin on a per-task basis. While there are a few
> downsides (mainly having to deal with misconfigurations inside the
> executor) overall it has worked well for us.
>

Re: Repurposing the `queue` field for non-celery executors

Posted by Alexander Shorin <kx...@gmail.com>.
I think it's a good intention, but don't you think that you give too
much power to operators to configure how executeros should acts like?

Why executtor itself can't decide which config to apply per queue
basics? It's a question of ops domain, not devs.


--
,,,^..^,,,


On Sun, Aug 6, 2017 at 8:25 PM, grantnicholas2015@u.northwestern.edu
<gr...@u.northwestern.edu> wrote:
> Basic proposal:
>
> Change the queue column in the TaskInstance table from a varchar(n) => pickle. Additionally rename the column from queue => executor_config.
>
> Why?
>
> Currently the `queue` column is only used for the CeleryExecutor since the concept of a `queue` doesn't make sense for other executors. Adding this extra column that is only used by one executor was a fine solution when airflow did not have many pluggable backends, but as the number of backends that need configuration increases it probably makes sense to make some abstraction for the concept of "executor specific configurations".
>
> This was driven out of conversations about the KubernetesExecutor. Specifically, needing a way to configure on a per-task basis properties for the pod on Kubernetes (think: docker image, any volume mounts or secrets, etc). One proposal during the meeting was to add a new "docker_image" column in the baseoperator/task_instance table. However, this requires adding a new custom column that will only be used by one executor, increasing the amount of unused fields in the baseoperator and requiring airflow core code changes whenever a new configuration field is needed.
>
> By moving all executor-specific configuration into the `executor_config` field, this will let us extend airflow as needed with new backends and new configs without any core code changes.
>
> Here is a before and after view from before the switch to after the switch (targeting a celery backend)
>
> t1  = PythonOperator(
>     task_id="task1",
>     python_callable=my_func,
>     queue="my-small-queue"
> )
>
> t1  = PythonOperator(
>     task_id="task1",
>     python_callable=my_func,
>     executor_config={"queue": "my-small-queue"}
> )
>
> Here is what task configuration for the Kubernetes Executor could look like using this concept:
>
> t1  = PythonOperator(
>     task_id="task1",
>     python_callable=my_func,
>     executor_config={"image": "my-special-image:latest", "volume_mounts" : "..."}
> )
>
> Let me know what your thoughts are. At my current company, I implemented this feature for us and we are using it extensively to configure out KubernetesExecutor plugin on a per-task basis. While there are a few downsides (mainly having to deal with misconfigurations inside the executor) overall it has worked well for us.