You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2019/01/07 23:05:00 UTC

[jira] [Commented] (AIRFLOW-3645) Use a base_executor_config and merge operator level executor_config

    [ https://issues.apache.org/jira/browse/AIRFLOW-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16736462#comment-16736462 ] 

ASF GitHub Bot commented on AIRFLOW-3645:
-----------------------------------------

Mokubyow commented on pull request #4456: [AIRFLOW-3645] Add base_executor_config
URL: https://github.com/apache/airflow/pull/4456
 
 
   Make sure you have checked _all_ steps below.
   
   ### Jira
   
   - [x] My PR addresses the following [Airflow Jira](https://issues.apache.org/jira/browse/AIRFLOW-3645)
   
   ### Description
   
   - [x] Add a base_executor_config that merges any operator_level executor_config into itself. This helps to dry up KubernetesExecutor deployments that might need to pass an executor config to all operators.
   
   ### Tests
   
   - [x] My PR adds the following unit tests: `nosetests -v tests/utils/test_helpers.py:TestHelpers.test_dict_merge`
   
   ### Commits
   
   - [x] My commits all reference Jira issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
     1. Subject is separated from body by a blank line
     1. Subject is limited to 50 characters (not including Jira issue reference)
     1. Subject does not end with a period
     1. Subject uses the imperative mood ("add", not "adding")
     1. Body wraps at 72 characters
     1. Body explains "what" and "why", not "how"
   
   ### Documentation
   
   - [x] In case of new functionality, my PR adds documentation that describes how to use it.
     - When adding new operators/hooks/sensors, the autoclass documentation generation needs to be added.
     - All the public functions and the classes in the PR contain docstrings that explain what it does
   
   ### Code Quality
   
   - [x] Passes `flake8`
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Use a base_executor_config and merge operator level executor_config
> -------------------------------------------------------------------
>
>                 Key: AIRFLOW-3645
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3645
>             Project: Apache Airflow
>          Issue Type: Improvement
>            Reporter: Kyle Hamlin
>            Assignee: Kyle Hamlin
>            Priority: Major
>             Fix For: 1.10.2
>
>
> It would be very useful to have a `base_executor_config` and merge the base config with any operator level `executor_config`.
> I imaging referencing a python dict similar to how we reference a custom logging_config
> *Example config*
> {code:java}
> [core]
> base_executor_config = config.base_executor_config.BASE_EXECUTOR_CONFIG
> {code}
> *Example base_executor_config*
> {code:java}
> BASE_EXECUTOR_CONFIG = {
>     "KubernetesExecutor": {
>         "image_pull_policy": "Always",
>         "annotations": {
>             "iam.amazonaws.com/role": "arn:aws:iam::<some arn>"
>         },
>         "volumes": [
>             {
>                 "name": "airflow-lib",
>                 "persistentVolumeClaim": {
>                     "claimName": "airflow-lib"
>                 }
>             }
>         ],
>         "volume_mounts": [
>             {
>                 "name": "airflow-lib",
>                 "mountPath": "/usr/local/airflow/lib",
>             }
>         ]
>     }
> }
> {code}
> *Example operator*
> {code:java}
> run_this = PythonOperator(
>     task_id='print_the_context',
>     provide_context=True,
>     python_callable=print_context,
>     executor_config={
>         "KubernetesExecutor": {
>             "request_memory": "256Mi",
>             "request_cpu": "100m",
>             "limit_memory": "256Mi",
>             "limit_cpu": "100m"
>         }
>     },
>     dag=dag)
> {code}
> Then we'll want to have a dict deep merge function in that returns the executor_config
> *Merge functionality*
> {code:java}
> import collections
> from airflow import conf
> from airflow.utils.module_loading import import_string
> def dict_merge(dct, merge_dct):
>     """ Recursive dict merge. Inspired by :meth:``dict.update()``, instead of
>     updating only top-level keys, dict_merge recurses down into dicts nested
>     to an arbitrary depth, updating keys. The ``merge_dct`` is merged into
>     ``dct``.
>     :param dct: dict onto which the merge is executed
>     :param merge_dct: dct merged into dct
>     :return: dct
>     """
>     for k, v in merge_dct.items():
>         if (k in dct and isinstance(dct[k], dict)
>                 and isinstance(merge_dct[k], collections.Mapping)):
>             dict_merge(dct[k], merge_dct[k])
>         else:
>             dct[k] = merge_dct[k]
>     
>     return dct
> def get_executor_config(executor_config):
>     """Try to import base_executor_config and merge it with provided
>     executor_config.
>     :param executor_config: operator level executor config
>     :return: dict"""
>     
>     try:
>         base_executor_config = import_string(
>             conf.get('core', 'base_executor_config'))
>         merged_executor_config = dict_merge(
>             base_executor_config, executor_config)
>         return merged_executor_config
>     except Exception:
>         return executor_config
> {code}
> Finally, we'll want to call the get_executor_config function in the `BaseOperator` possibly here: https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L2348



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)