You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2019/09/11 16:27:00 UTC

[jira] [Commented] (AIRFLOW-3645) Use a base_executor_config and merge operator level executor_config

    [ https://issues.apache.org/jira/browse/AIRFLOW-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16927751#comment-16927751 ] 

ASF GitHub Bot commented on AIRFLOW-3645:
-----------------------------------------

stale[bot] commented on pull request #4456: [AIRFLOW-3645] Add base_executor_config
URL: https://github.com/apache/airflow/pull/4456
 
 
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Use a base_executor_config and merge operator level executor_config
> -------------------------------------------------------------------
>
>                 Key: AIRFLOW-3645
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3645
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: executors
>            Reporter: Kyle Hamlin
>            Assignee: Kyle Hamlin
>            Priority: Major
>             Fix For: 2.0.0
>
>
> It would be very useful to have a `base_executor_config` and merge the base config with any operator level `executor_config`.
> I imaging referencing a python dict similar to how we reference a custom logging_config
> *Example config*
> {code:java}
> [core]
> base_executor_config = config.base_executor_config.BASE_EXECUTOR_CONFIG
> {code}
> *Example base_executor_config*
> {code:java}
> BASE_EXECUTOR_CONFIG = {
>     "KubernetesExecutor": {
>         "image_pull_policy": "Always",
>         "annotations": {
>             "iam.amazonaws.com/role": "arn:aws:iam::<some arn>"
>         },
>         "volumes": [
>             {
>                 "name": "airflow-lib",
>                 "persistentVolumeClaim": {
>                     "claimName": "airflow-lib"
>                 }
>             }
>         ],
>         "volume_mounts": [
>             {
>                 "name": "airflow-lib",
>                 "mountPath": "/usr/local/airflow/lib",
>             }
>         ]
>     }
> }
> {code}
> *Example operator*
> {code:java}
> run_this = PythonOperator(
>     task_id='print_the_context',
>     provide_context=True,
>     python_callable=print_context,
>     executor_config={
>         "KubernetesExecutor": {
>             "request_memory": "256Mi",
>             "request_cpu": "100m",
>             "limit_memory": "256Mi",
>             "limit_cpu": "100m"
>         }
>     },
>     dag=dag)
> {code}
> Then we'll want to have a dict deep merge function in that returns the executor_config
> *Merge functionality*
> {code:java}
> import collections
> from airflow import conf
> from airflow.utils.module_loading import import_string
> def dict_merge(dct, merge_dct):
>     """ Recursive dict merge. Inspired by :meth:``dict.update()``, instead of
>     updating only top-level keys, dict_merge recurses down into dicts nested
>     to an arbitrary depth, updating keys. The ``merge_dct`` is merged into
>     ``dct``.
>     :param dct: dict onto which the merge is executed
>     :param merge_dct: dct merged into dct
>     :return: dct
>     """
>     for k, v in merge_dct.items():
>         if (k in dct and isinstance(dct[k], dict)
>                 and isinstance(merge_dct[k], collections.Mapping)):
>             dict_merge(dct[k], merge_dct[k])
>         else:
>             dct[k] = merge_dct[k]
>     
>     return dct
> def get_executor_config(executor_config):
>     """Try to import base_executor_config and merge it with provided
>     executor_config.
>     :param executor_config: operator level executor config
>     :return: dict"""
>     
>     try:
>         base_executor_config = import_string(
>             conf.get('core', 'base_executor_config'))
>         merged_executor_config = dict_merge(
>             base_executor_config, executor_config)
>         return merged_executor_config
>     except Exception:
>         return executor_config
> {code}
> Finally, we'll want to call the get_executor_config function in the `BaseOperator` possibly here: https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L2348



--
This message was sent by Atlassian Jira
(v8.3.2#803003)