You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Ash Berlin-Taylor (Jira)" <ji...@apache.org> on 2019/09/27 10:14:00 UTC
[jira] [Commented] (AIRFLOW-5559) Kubernetes Executor
operator-level executor_config override applies to all operators in a DAG
[ https://issues.apache.org/jira/browse/AIRFLOW-5559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16939297#comment-16939297 ]
Ash Berlin-Taylor commented on AIRFLOW-5559:
--------------------------------------------
I have a feeling that it is your policy that is causing this behaviour. Try without the policy, or use {{ti.executor_config = copy.deepcopy(...)}}
> Kubernetes Executor operator-level executor_config override applies to all operators in a DAG
> ---------------------------------------------------------------------------------------------
>
> Key: AIRFLOW-5559
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5559
> Project: Apache Airflow
> Issue Type: Bug
> Components: executor-kubernetes
> Affects Versions: 1.10.4
> Reporter: Sam Danbury
> Assignee: Daniel Imberman
> Priority: Minor
>
> When applying requests and limits to the worker pods spun up by the Kubernetes Executor (using the executor_config argument), the last operator to set the the resources wins and *all* worker pods in the DAG get the settings applied, not just the single operator.
>
> The problematic DAG:
> {code:java}
> from datetime import datetime, timedelta
> from airflow.operators.bash_operator import BashOperator
> from airflow.models import DAG
> default_args =
> { 'owner': 'airflow', 'depends_on_past': False, 'email_on_failure': False, 'email_on_retry': False, 'retries': 2, 'retry_delay': timedelta(hours=1) }
> dag = DAG(
> 'two_tasked_dag',
> start_date=datetime(2018, 8, 21),
> default_args=default_args,
> catchup=False
> )
> a = BashOperator(
> task_id="A",
> bash_command="""
> echo hello
> """,
> dag=dag,
> executor_config={
> "KubernetesExecutor":
> { "request_memory": "700Mi",
> "request_cpu": "300m",
> "limit_memory": "900Mi",
> "limit_cpu": "500m", }
> }
> )
> b = BashOperator(
> task_id="B",
> bash_command="""
> echo hello
> """,
> dag=dag,
> executor_config={
> "KubernetesExecutor":
> { "request_memory": "800Mi",
> "request_cpu": "200m",
> "limit_memory": "1Gi",
> "limit_cpu": "600m", }
> }
> ){code}
>
> Pod descriptions showing the applied resource requests and limits:
> {code:java}
> $ kubectl get pods -n airflow
> NAME READY STATUS RESTARTS AGE
> airflow-postgresql-5bf5b6ddf5-f86sz 1/1 Running 0 22h
> airflow-scheduler-6496dfd5db-cbhkc 2/2 Running 2 52m
> airflow-web-7d76c9c4c-f7cpz 3/3 Running 3 52m
> twotaskeddaga-350ad7a98c4b4908a8bfb02478e346a6 1/1 Running 0 1m
> twotaskeddagb-faa8831f58b94b2fa771b62cb6bb952e 1/1 Running 0 1m
> $ kubectl describe pod -n airflow twotaskeddaga-350ad7a98c4b4908a8bfb02478e346a6
> ...
> Limits:
> cpu: 600m
> memory: 1Gi
> Requests:
> cpu: 200m
> memory: 800Mi
> ...
> $ kubectl describe pod -n airflow twotaskeddagb-faa8831f58b94b2fa771b62cb6bb952e
> ...
> Limits:
> cpu: 600m
> memory: 1Gi
> Requests:
> cpu: 200m
> memory: 800Mi
> ...
> {code}
>
> The above shows that the last task in the DAG sets the resource limits for the first task in the DAG, even though they have separate executor_config's. This also happens if task A were to not have any executor_config set at all.
>
> It is also worth mentioning (although maybe not relevant) that I am setting default pod resources using an airflow_local_settings.py file:
>
> {code:java}
> from airflow.utils.log.logging_mixin import LoggingMixin
> from deepmerge import Merger
> DEFAULT_EXECUTOR_CONFIG = {
> "KubernetesExecutor": {
> "request_memory": "250Mi",
> "request_cpu": "250m",
> "limit_memory": "500Mi",
> "limit_cpu": "500m",
> }
> }
> merger = Merger(
> type_strategies=[(list, ["append"]), (dict, ["merge"])],
> fallback_strategies=["override"],
> type_conflict_strategies=["override"],
> )
> log = LoggingMixin().log
> def policy(ti):
> """Add the default executor config to all task instances."""
> executor_config = getattr(ti, "executor_config", {})
> if executor_config:
> log.debug(f"Merging {ti.task_id} executor config with default executor config")
> ti.executor_config = merger.merge(DEFAULT_EXECUTOR_CONFIG, executor_config)
> log.debug(ti.executor_config)
> else:
> log.debug(f"Applying default executor config to {ti.task_id}")
> ti.executor_config = DEFAULT_EXECUTOR_CONFIG
> log.debug(ti.executor_config)
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)