You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Kyle Hamlin (JIRA)" <ji...@apache.org> on 2019/01/07 18:24:00 UTC
[jira] [Created] (AIRFLOW-3645) Use a base_executor_config and
merge operator level executor_config
Kyle Hamlin created AIRFLOW-3645:
------------------------------------
Summary: Use a base_executor_config and merge operator level executor_config
Key: AIRFLOW-3645
URL: https://issues.apache.org/jira/browse/AIRFLOW-3645
Project: Apache Airflow
Issue Type: Improvement
Reporter: Kyle Hamlin
Fix For: 1.10.2
It would be very useful to have a `base_executor_config` and merge the base config with any operator level `executor_config`.
I imaging referencing a python dict similar to how we reference a custom logging_config
*Example config*
{code:java}
[core]
base_executor_config = config.base_executor_config.BASE_EXECUTOR_CONFIG
{code}
*Example base_executor_config*
{code:java}
BASE_EXECUTOR_CONFIG = {
"KubernetesExecutor": {
"image_pull_policy": "Always",
"annotations": {
"iam.amazonaws.com/role": "arn:aws:iam::<some arn>"
},
"volumes": [
{
"name": "airflow-lib",
"persistentVolumeClaim": {
"claimName": "airflow-lib"
}
}
],
"volume_mounts": [
{
"name": "airflow-lib",
"mountPath": "/usr/local/airflow/lib",
}
]
}
}
{code}
*Example operator*
{code:java}
run_this = PythonOperator(
task_id='print_the_context',
provide_context=True,
python_callable=print_context,
executor_config={
"KubernetesExecutor": {
"request_memory": "256Mi",
"request_cpu": "100m",
"limit_memory": "256Mi",
"limit_cpu": "100m"
}
},
dag=dag)
{code}
Then we'll want to have a dict deep merge function in that returns the executor_config
*Merge functionality*
{code:java}
import collections
from airflow import conf
from airflow.utils.module_loading import import_string
def dict_merge(dct, merge_dct):
""" Recursive dict merge. Inspired by :meth:``dict.update()``, instead of
updating only top-level keys, dict_merge recurses down into dicts nested
to an arbitrary depth, updating keys. The ``merge_dct`` is merged into
``dct``.
:param dct: dict onto which the merge is executed
:param merge_dct: dct merged into dct
:return: dct
"""
for k, v in merge_dct.items():
if (k in dct and isinstance(dct[k], dict)
and isinstance(merge_dct[k], collections.Mapping)):
dict_merge(dct[k], merge_dct[k])
else:
dct[k] = merge_dct[k]
return dct
def get_executor_config(executor_config):
"""Try to import base_executor_config and merge it with provided
executor_config.
:param executor_config: operator level executor config
:return: dict"""
try:
base_executor_config = import_string(
conf.get('core', 'base_executor_config'))
merged_executor_config = dict_merge(
base_executor_config, executor_config)
return merged_executor_config
except Exception:
return executor_config
{code}
Finally, we'll want to call the get_executor_config function in the `BaseOperator` possibly here: https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L2348
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)