You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Kyle Hamlin (JIRA)" <ji...@apache.org> on 2019/01/07 18:24:00 UTC

[jira] [Created] (AIRFLOW-3645) Use a base_executor_config and merge operator level executor_config

Kyle Hamlin created AIRFLOW-3645:
------------------------------------

             Summary: Use a base_executor_config and merge operator level executor_config
                 Key: AIRFLOW-3645
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3645
             Project: Apache Airflow
          Issue Type: Improvement
            Reporter: Kyle Hamlin
             Fix For: 1.10.2


It would be very useful to have a `base_executor_config` and merge the base config with any operator level `executor_config`.

I imaging referencing a python dict similar to how we reference a custom logging_config

*Example config*
{code:java}
[core]
base_executor_config = config.base_executor_config.BASE_EXECUTOR_CONFIG
{code}
*Example base_executor_config*
{code:java}
BASE_EXECUTOR_CONFIG = {
    "KubernetesExecutor": {
        "image_pull_policy": "Always",
        "annotations": {
            "iam.amazonaws.com/role": "arn:aws:iam::<some arn>"
        },
        "volumes": [
            {
                "name": "airflow-lib",
                "persistentVolumeClaim": {
                    "claimName": "airflow-lib"
                }
            }
        ],
        "volume_mounts": [
            {
                "name": "airflow-lib",
                "mountPath": "/usr/local/airflow/lib",
            }
        ]
    }
}
{code}
*Example operator*
{code:java}
run_this = PythonOperator(
    task_id='print_the_context',
    provide_context=True,
    python_callable=print_context,
    executor_config={
        "KubernetesExecutor": {
            "request_memory": "256Mi",
            "request_cpu": "100m",
            "limit_memory": "256Mi",
            "limit_cpu": "100m"
        }
    },
    dag=dag)
{code}
Then we'll want to have a dict deep merge function in that returns the executor_config

*Merge functionality*
{code:java}
import collections
from airflow import conf
from airflow.utils.module_loading import import_string

def dict_merge(dct, merge_dct):
    """ Recursive dict merge. Inspired by :meth:``dict.update()``, instead of
    updating only top-level keys, dict_merge recurses down into dicts nested
    to an arbitrary depth, updating keys. The ``merge_dct`` is merged into
    ``dct``.
    :param dct: dict onto which the merge is executed
    :param merge_dct: dct merged into dct
    :return: dct
    """

    for k, v in merge_dct.items():
        if (k in dct and isinstance(dct[k], dict)
                and isinstance(merge_dct[k], collections.Mapping)):
            dict_merge(dct[k], merge_dct[k])
        else:
            dct[k] = merge_dct[k]
    
    return dct


def get_executor_config(executor_config):
    """Try to import base_executor_config and merge it with provided
    executor_config.
    :param executor_config: operator level executor config
    :return: dict"""
    
    try:
        base_executor_config = import_string(
            conf.get('core', 'base_executor_config'))
        merged_executor_config = dict_merge(
            base_executor_config, executor_config)
        return merged_executor_config
    except Exception:
        return executor_config
{code}

Finally, we'll want to call the get_executor_config function in the `BaseOperator` possibly here: https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L2348



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)