You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Sam Danbury (Jira)" <ji...@apache.org> on 2019/09/26 13:02:00 UTC
[jira] [Created] (AIRFLOW-5559) Kubernetes Executor operator-level
executor_config override applies to all operators in a DAG
Sam Danbury created AIRFLOW-5559:
------------------------------------
Summary: Kubernetes Executor operator-level executor_config override applies to all operators in a DAG
Key: AIRFLOW-5559
URL: https://issues.apache.org/jira/browse/AIRFLOW-5559
Project: Apache Airflow
Issue Type: Bug
Components: executor-kubernetes
Affects Versions: 1.10.4
Reporter: Sam Danbury
Assignee: Daniel Imberman
When applying requests and limits to the worker pods spun up by the Kubernetes Executor (using the executor_config argument), the last operator to set the the resources wins and *all* worker pods in the DAG get the settings applied, not just the single operator.
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
from airflow.models import DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(hours=1)
}
dag = DAG(
'two_tasked_dag',
start_date=datetime(2018, 8, 21),
default_args=default_args,
catchup=False
)
a = BashOperator(
task_id="A",
bash_command="""
echo hello
""",
dag=dag,
executor_config={
"KubernetesExecutor": {
"request_memory": "700Mi",
"request_cpu": "300m",
"limit_memory": "900Mi",
"limit_cpu": "500m",
}
}
)
b = BashOperator(
task_id="B",
bash_command="""
echo hello
""",
dag=dag,
executor_config={
"KubernetesExecutor": {
"request_memory": "800Mi",
"request_cpu": "200m",
"limit_memory": "1Gi",
"limit_cpu": "600m",
}
}
)
{code:java}
$ kubectl get pods -n airflow
NAME READY STATUS RESTARTS AGE
airflow-postgresql-5bf5b6ddf5-f86sz 1/1 Running 0 22h
airflow-scheduler-6496dfd5db-cbhkc 2/2 Running 2 52m
airflow-web-7d76c9c4c-f7cpz 3/3 Running 3 52m
twotaskeddaga-350ad7a98c4b4908a8bfb02478e346a6 1/1 Running 0 1m
twotaskeddagb-faa8831f58b94b2fa771b62cb6bb952e 1/1 Running 0 1m
$ kubectl describe pod -n airflow twotaskeddaga-350ad7a98c4b4908a8bfb02478e346a6
...
Limits:
cpu: 600m
memory: 1Gi
Requests:
cpu: 200m
memory: 800Mi
...
$ kubectl describe pod -n airflow twotaskeddagb-faa8831f58b94b2fa771b62cb6bb952e
...
Limits:
cpu: 600m
memory: 1Gi
Requests:
cpu: 200m
memory: 800Mi
...
{code}
The above shows that the last task in the DAG sets the resource limits for the first task in the DAG, even though they have separate executor_config's. This also happens if task A were to not have any executor_config set at all.
It is also worth mentioning (although maybe not relevant) that I am setting default pod resources using an airflow_local_settings.py file:
{code:java}
from airflow.utils.log.logging_mixin import LoggingMixin
from deepmerge import Merger
DEFAULT_EXECUTOR_CONFIG = {
"KubernetesExecutor": {
"request_memory": "250Mi",
"request_cpu": "250m",
"limit_memory": "500Mi",
"limit_cpu": "500m",
}
}
merger = Merger(
type_strategies=[(list, ["append"]), (dict, ["merge"])],
fallback_strategies=["override"],
type_conflict_strategies=["override"],
)
log = LoggingMixin().log
def policy(ti):
"""Add the default executor config to all task instances."""
executor_config = getattr(ti, "executor_config", {})
if executor_config:
log.debug(f"Merging {ti.task_id} executor config with default executor config")
ti.executor_config = merger.merge(DEFAULT_EXECUTOR_CONFIG, executor_config)
log.debug(ti.executor_config)
else:
log.debug(f"Applying default executor config to {ti.task_id}")
ti.executor_config = DEFAULT_EXECUTOR_CONFIG
log.debug(ti.executor_config)
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)