You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Kyle Hamlin (JIRA)" <ji...@apache.org> on 2019/01/07 22:39:00 UTC
[jira] [Work started] (AIRFLOW-3645) Use a base_executor_config and
merge operator level executor_config
[ https://issues.apache.org/jira/browse/AIRFLOW-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Work on AIRFLOW-3645 started by Kyle Hamlin.
--------------------------------------------
> Use a base_executor_config and merge operator level executor_config
> -------------------------------------------------------------------
>
> Key: AIRFLOW-3645
> URL: https://issues.apache.org/jira/browse/AIRFLOW-3645
> Project: Apache Airflow
> Issue Type: Improvement
> Reporter: Kyle Hamlin
> Assignee: Kyle Hamlin
> Priority: Major
> Fix For: 1.10.2
>
>
> It would be very useful to have a `base_executor_config` and merge the base config with any operator level `executor_config`.
> I imaging referencing a python dict similar to how we reference a custom logging_config
> *Example config*
> {code:java}
> [core]
> base_executor_config = config.base_executor_config.BASE_EXECUTOR_CONFIG
> {code}
> *Example base_executor_config*
> {code:java}
> BASE_EXECUTOR_CONFIG = {
> "KubernetesExecutor": {
> "image_pull_policy": "Always",
> "annotations": {
> "iam.amazonaws.com/role": "arn:aws:iam::<some arn>"
> },
> "volumes": [
> {
> "name": "airflow-lib",
> "persistentVolumeClaim": {
> "claimName": "airflow-lib"
> }
> }
> ],
> "volume_mounts": [
> {
> "name": "airflow-lib",
> "mountPath": "/usr/local/airflow/lib",
> }
> ]
> }
> }
> {code}
> *Example operator*
> {code:java}
> run_this = PythonOperator(
> task_id='print_the_context',
> provide_context=True,
> python_callable=print_context,
> executor_config={
> "KubernetesExecutor": {
> "request_memory": "256Mi",
> "request_cpu": "100m",
> "limit_memory": "256Mi",
> "limit_cpu": "100m"
> }
> },
> dag=dag)
> {code}
> Then we'll want to have a dict deep merge function in that returns the executor_config
> *Merge functionality*
> {code:java}
> import collections
> from airflow import conf
> from airflow.utils.module_loading import import_string
> def dict_merge(dct, merge_dct):
> """ Recursive dict merge. Inspired by :meth:``dict.update()``, instead of
> updating only top-level keys, dict_merge recurses down into dicts nested
> to an arbitrary depth, updating keys. The ``merge_dct`` is merged into
> ``dct``.
> :param dct: dict onto which the merge is executed
> :param merge_dct: dct merged into dct
> :return: dct
> """
> for k, v in merge_dct.items():
> if (k in dct and isinstance(dct[k], dict)
> and isinstance(merge_dct[k], collections.Mapping)):
> dict_merge(dct[k], merge_dct[k])
> else:
> dct[k] = merge_dct[k]
>
> return dct
> def get_executor_config(executor_config):
> """Try to import base_executor_config and merge it with provided
> executor_config.
> :param executor_config: operator level executor config
> :return: dict"""
>
> try:
> base_executor_config = import_string(
> conf.get('core', 'base_executor_config'))
> merged_executor_config = dict_merge(
> base_executor_config, executor_config)
> return merged_executor_config
> except Exception:
> return executor_config
> {code}
> Finally, we'll want to call the get_executor_config function in the `BaseOperator` possibly here: https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L2348
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)