You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Kyle Hamlin (JIRA)" <ji...@apache.org> on 2019/01/07 22:39:00 UTC

[jira] [Work started] (AIRFLOW-3645) Use a base_executor_config and merge operator level executor_config

     [ https://issues.apache.org/jira/browse/AIRFLOW-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Work on AIRFLOW-3645 started by Kyle Hamlin.
--------------------------------------------
> Use a base_executor_config and merge operator level executor_config
> -------------------------------------------------------------------
>
>                 Key: AIRFLOW-3645
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3645
>             Project: Apache Airflow
>          Issue Type: Improvement
>            Reporter: Kyle Hamlin
>            Assignee: Kyle Hamlin
>            Priority: Major
>             Fix For: 1.10.2
>
>
> It would be very useful to have a `base_executor_config` and merge the base config with any operator level `executor_config`.
> I imaging referencing a python dict similar to how we reference a custom logging_config
> *Example config*
> {code:java}
> [core]
> base_executor_config = config.base_executor_config.BASE_EXECUTOR_CONFIG
> {code}
> *Example base_executor_config*
> {code:java}
> BASE_EXECUTOR_CONFIG = {
>     "KubernetesExecutor": {
>         "image_pull_policy": "Always",
>         "annotations": {
>             "iam.amazonaws.com/role": "arn:aws:iam::<some arn>"
>         },
>         "volumes": [
>             {
>                 "name": "airflow-lib",
>                 "persistentVolumeClaim": {
>                     "claimName": "airflow-lib"
>                 }
>             }
>         ],
>         "volume_mounts": [
>             {
>                 "name": "airflow-lib",
>                 "mountPath": "/usr/local/airflow/lib",
>             }
>         ]
>     }
> }
> {code}
> *Example operator*
> {code:java}
> run_this = PythonOperator(
>     task_id='print_the_context',
>     provide_context=True,
>     python_callable=print_context,
>     executor_config={
>         "KubernetesExecutor": {
>             "request_memory": "256Mi",
>             "request_cpu": "100m",
>             "limit_memory": "256Mi",
>             "limit_cpu": "100m"
>         }
>     },
>     dag=dag)
> {code}
> Then we'll want to have a dict deep merge function in that returns the executor_config
> *Merge functionality*
> {code:java}
> import collections
> from airflow import conf
> from airflow.utils.module_loading import import_string
> def dict_merge(dct, merge_dct):
>     """ Recursive dict merge. Inspired by :meth:``dict.update()``, instead of
>     updating only top-level keys, dict_merge recurses down into dicts nested
>     to an arbitrary depth, updating keys. The ``merge_dct`` is merged into
>     ``dct``.
>     :param dct: dict onto which the merge is executed
>     :param merge_dct: dct merged into dct
>     :return: dct
>     """
>     for k, v in merge_dct.items():
>         if (k in dct and isinstance(dct[k], dict)
>                 and isinstance(merge_dct[k], collections.Mapping)):
>             dict_merge(dct[k], merge_dct[k])
>         else:
>             dct[k] = merge_dct[k]
>     
>     return dct
> def get_executor_config(executor_config):
>     """Try to import base_executor_config and merge it with provided
>     executor_config.
>     :param executor_config: operator level executor config
>     :return: dict"""
>     
>     try:
>         base_executor_config = import_string(
>             conf.get('core', 'base_executor_config'))
>         merged_executor_config = dict_merge(
>             base_executor_config, executor_config)
>         return merged_executor_config
>     except Exception:
>         return executor_config
> {code}
> Finally, we'll want to call the get_executor_config function in the `BaseOperator` possibly here: https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L2348



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)